glide.extensions.rq module

http://python-rq.org/docs/

class glide.extensions.rq.RQJob(name, _log=False, _debug=False, **default_context)[source]

Bases: glide.core.Node

A Node that queues a function using Redis Queue

Warning

Python RQ seems to not update the job status if your function does not return a non-None value. Your code may hang if you poll waiting for a result in this scenario.

run(data, func, queue=None, queue_name='default', redis_conn=None, push_type='async', poll_sleep=1, timeout=None, **kwargs)[source]

Execute func on data using Redis Queue

Parameters
  • data – Data to process

  • func (callable) – Function to execute using Redis Queue

  • queue (Queue, optional) – An rq Queue object

  • queue_name (str, optional) – When creating a queue, the name of the queue to use

  • redis_conn (type, optional) – When creating a queue, the redis connection to use

  • push_type (type, optional) – If “async”, push the Job immediately. If “input”, push the input data immediately after task submission. If “result”, collect the task result synchronously and push it.

  • poll_sleep (int or float, optional) – If waiting for the result, sleep this many seconds between polls

  • timeout (int or float, optional) – If waiting for result, raise an exception if polling for all results takes longer than timeout seconds.

  • **kwargs – Keyword arguments to pass to enqueue()

class glide.extensions.rq.RQParaGlider(queue, *args, **kwargs)[source]

Bases: glide.core.ParaGlider

A ParaGlider that uses Redis Queue to execute parallel calls to consume()

Parameters
  • queue – An rq Queue object

  • *args – Arguments passed through to ParaGlider init

  • **kwargs – Keyword arguments passed through to ParaGlider init

queue

An rq Queue object

See ParaGlider for additional attributes.
consume(data=None, cleanup=None, split_count=None, synchronous=False, timeout=None, **node_contexts)[source]

Setup node contexts and consume data with the pipeline

Parameters
  • data (iterable, optional) – Iterable of data to consume

  • cleanup (dict, optional) – A mapping of arg names to clean up functions to be run after data processing is complete.

  • split_count (int, optional) – How many slices to split the data into for parallel processing. Default is the number of workers in the provided queue.

  • synchronous (bool, optional) – If False, return Jobs. If True, wait for jobs to complete and return their results, if any.

  • timeout (int or float, optional) – If waiting for results, raise an exception if polling for all results takes longer than timeout seconds.

  • **node_contexts – Keyword arguments that are node_name->param_dict

class glide.extensions.rq.RQReduce(name, _log=False, _debug=False, **default_context)[source]

Bases: glide.flow.Reduce

Collect asynchronous results before pushing

end()[source]

Do the push once all results are in

exception glide.extensions.rq.RQTimeoutException[source]

Bases: Exception

Exception for timeouts polling for results

glide.extensions.rq.complete_count(async_results)[source]

TODO: Would it be better to rely on the job registry instead of job.result?

— Example: from rq import job from rq.registry import FinishedJobRegistry registry = FinishedJobRegistry(‘default’, connection=redis_conn) job_ids = registry.get_job_ids() job_obj = job.Job.fetch(“job-id-here”, connection=redis_conn)

glide.extensions.rq.get_async_result(async_result, timeout=None)[source]

Poll for a result

glide.extensions.rq.get_async_results(async_results, timeout=None)[source]

Poll for results

glide.extensions.rq.rq_consume(*args, **kwargs)[source]

Hack: RQ only seems to update the job status if your function returns a non-None value. To force that, we use this simple wrapper around consume().