glide.extensions.rq module¶
-
class
glide.extensions.rq.
RQJob
(name, _log=False, _debug=False, **default_context)[source]¶ Bases:
glide.core.Node
A Node that queues a function using Redis Queue
Warning
Python RQ seems to not update the job status if your function does not return a non-None value. Your code may hang if you poll waiting for a result in this scenario.
-
run
(data, func, queue=None, queue_name='default', redis_conn=None, push_type='async', poll_sleep=1, timeout=None, **kwargs)[source]¶ Execute func on data using Redis Queue
- Parameters
data – Data to process
func (callable) – Function to execute using Redis Queue
queue (Queue, optional) – An rq Queue object
queue_name (str, optional) – When creating a queue, the name of the queue to use
redis_conn (type, optional) – When creating a queue, the redis connection to use
push_type (type, optional) – If “async”, push the Job immediately. If “input”, push the input data immediately after task submission. If “result”, collect the task result synchronously and push it.
poll_sleep (int or float, optional) – If waiting for the result, sleep this many seconds between polls
timeout (int or float, optional) – If waiting for result, raise an exception if polling for all results takes longer than timeout seconds.
**kwargs – Keyword arguments to pass to enqueue()
-
-
class
glide.extensions.rq.
RQParaGlider
(queue, *args, **kwargs)[source]¶ Bases:
glide.core.ParaGlider
A ParaGlider that uses Redis Queue to execute parallel calls to consume()
- Parameters
queue – An rq Queue object
*args – Arguments passed through to ParaGlider init
**kwargs – Keyword arguments passed through to ParaGlider init
-
queue
¶ An rq Queue object
-
See ParaGlider for additional attributes.
-
consume
(data=None, cleanup=None, split_count=None, synchronous=False, timeout=None, **node_contexts)[source]¶ Setup node contexts and consume data with the pipeline
- Parameters
data (iterable, optional) – Iterable of data to consume
cleanup (dict, optional) – A mapping of arg names to clean up functions to be run after data processing is complete.
split_count (int, optional) – How many slices to split the data into for parallel processing. Default is the number of workers in the provided queue.
synchronous (bool, optional) – If False, return Jobs. If True, wait for jobs to complete and return their results, if any.
timeout (int or float, optional) – If waiting for results, raise an exception if polling for all results takes longer than timeout seconds.
**node_contexts – Keyword arguments that are node_name->param_dict
-
class
glide.extensions.rq.
RQReduce
(name, _log=False, _debug=False, **default_context)[source]¶ Bases:
glide.flow.Reduce
Collect asynchronous results before pushing
-
exception
glide.extensions.rq.
RQTimeoutException
[source]¶ Bases:
Exception
Exception for timeouts polling for results
-
glide.extensions.rq.
complete_count
(async_results)[source]¶ TODO: Would it be better to rely on the job registry instead of job.result?
— Example: from rq import job from rq.registry import FinishedJobRegistry registry = FinishedJobRegistry(‘default’, connection=redis_conn) job_ids = registry.get_job_ids() job_obj = job.Job.fetch(“job-id-here”, connection=redis_conn)