glide.extensions.celery module¶
This extension assumes you have setup your own celery app and imported/added provided tasks to your app as necessary. Any code used by your Celery workers must be importable by those workers, and you may need to make sure your app allows pickle for serialization
-
class
glide.extensions.celery.
CeleryApplyAsync
(name, _log=False, _debug=False, **default_context)[source]¶ Bases:
glide.core.Node
A Node that calls apply_async on a given Celery Task
-
run
(data, task, timeout=None, push_type='async', **kwargs)[source]¶ Call task.apply_async with the given data as the first task argument
- Parameters
data – Data to process
task – A Celery Task registered with your app.
timeout (int, optional) – A timeout to use if waiting for results via AsyncResult.get()
push_type (str, optional) – If “async”, push the AsyncResult immediately. If “input”, push the input data immediately after task submission. If “result”, collect the task result synchronously and push it.
**kwargs – Keyword arguments pass to task.apply_async
-
-
class
glide.extensions.celery.
CeleryParaGlider
(consume_task, *args, **kwargs)[source]¶ Bases:
glide.core.ParaGlider
A ParaGlider that uses Celery to execute parallel calls to consume()
- Parameters
consume_task – A Celery Task that will behave like consume()
*args – Arguments passed through to ParaGlider init
**kwargs – Keyword arguments passed through to ParaGlider init
-
consume_task
¶ A Celery Task that behaves like consume(), such as CeleryConsumeTask.
-
See ParaGlider for additional attributes.
-
consume
(data=None, cleanup=None, split_count=None, synchronous=False, timeout=None, **node_contexts)[source]¶ Setup node contexts and consume data with the pipeline
- Parameters
data (iterable, optional) – Iterable of data to consume
cleanup (dict, optional) – A mapping of arg names to clean up functions to be run after data processing is complete.
split_count (int, optional) – How many slices to split the data into for parallel processing. Default is to inspect the celery app and set split_count = worker count.
synchronous (bool, optional) – If False, return AsyncResults. If True, wait for tasks to complete and return their results, if any.
timeout (int or float, optional) – If waiting for results, pass this as timeout to AsyncResult.get().
**node_contexts – Keyword arguments that are node_name->param_dict
-
class
glide.extensions.celery.
CeleryReduce
(name, _log=False, _debug=False, **default_context)[source]¶ Bases:
glide.flow.Reduce
Collect the asynchronous results before pushing
-
class
glide.extensions.celery.
CelerySendTask
(name, _log=False, _debug=False, **default_context)[source]¶ Bases:
glide.core.Node
A Node that calls app.send_task
-
run
(data, app, task_name, timeout=None, push_type='async', **kwargs)[source]¶ Call app.send_task with the given data as the first task argument
- Parameters
data – Data to process
app – Celery app
task_name – A name of a Celery Task registered with your app.
timeout (int, optional) – A timeout to use if waiting for results via AsyncResult.get()
push_type (str, optional) – If “async”, push the AsyncResult immediately. If “input”, push the input data immediately after task submission. If “result”, collect the task result synchronously and push it.
**kwargs – Keyword arguments pass to task.send_task
-