glide.extensions.celery module

http://www.celeryproject.org/

This extension assumes you have setup your own celery app and imported/added provided tasks to your app as necessary. Any code used by your Celery workers must be importable by those workers, and you may need to make sure your app allows pickle for serialization

class glide.extensions.celery.CeleryApplyAsync(name, _log=False, _debug=False, **default_context)[source]

Bases: glide.core.Node

A Node that calls apply_async on a given Celery Task

run(data, task, timeout=None, push_type='async', **kwargs)[source]

Call task.apply_async with the given data as the first task argument

Parameters
  • data – Data to process

  • task – A Celery Task registered with your app.

  • timeout (int, optional) – A timeout to use if waiting for results via AsyncResult.get()

  • push_type (str, optional) – If “async”, push the AsyncResult immediately. If “input”, push the input data immediately after task submission. If “result”, collect the task result synchronously and push it.

  • **kwargs – Keyword arguments pass to task.apply_async

class glide.extensions.celery.CeleryParaGlider(consume_task, *args, **kwargs)[source]

Bases: glide.core.ParaGlider

A ParaGlider that uses Celery to execute parallel calls to consume()

Parameters
  • consume_task – A Celery Task that will behave like consume()

  • *args – Arguments passed through to ParaGlider init

  • **kwargs – Keyword arguments passed through to ParaGlider init

consume_task

A Celery Task that behaves like consume(), such as CeleryConsumeTask.

See ParaGlider for additional attributes.
consume(data=None, cleanup=None, split_count=None, synchronous=False, timeout=None, **node_contexts)[source]

Setup node contexts and consume data with the pipeline

Parameters
  • data (iterable, optional) – Iterable of data to consume

  • cleanup (dict, optional) – A mapping of arg names to clean up functions to be run after data processing is complete.

  • split_count (int, optional) – How many slices to split the data into for parallel processing. Default is to inspect the celery app and set split_count = worker count.

  • synchronous (bool, optional) – If False, return AsyncResults. If True, wait for tasks to complete and return their results, if any.

  • timeout (int or float, optional) – If waiting for results, pass this as timeout to AsyncResult.get().

  • **node_contexts – Keyword arguments that are node_name->param_dict

class glide.extensions.celery.CeleryReduce(name, _log=False, _debug=False, **default_context)[source]

Bases: glide.flow.Reduce

Collect the asynchronous results before pushing

end()[source]

Do the push once all results are in

class glide.extensions.celery.CelerySendTask(name, _log=False, _debug=False, **default_context)[source]

Bases: glide.core.Node

A Node that calls app.send_task

run(data, app, task_name, timeout=None, push_type='async', **kwargs)[source]

Call app.send_task with the given data as the first task argument

Parameters
  • data – Data to process

  • app – Celery app

  • task_name – A name of a Celery Task registered with your app.

  • timeout (int, optional) – A timeout to use if waiting for results via AsyncResult.get()

  • push_type (str, optional) – If “async”, push the AsyncResult immediately. If “input”, push the input data immediately after task submission. If “result”, collect the task result synchronously and push it.

  • **kwargs – Keyword arguments pass to task.send_task