glide.core module

Core classes used to power pipelines

class glide.core.AssertFunc(name, _log=False, _debug=False, **default_context)[source]

Bases: glide.core.Node

run(data, func)[source]

Assert that a function returns a truthy value

Parameters
  • data – Data to push if func(self, data) is truthy

  • func (callable) – A callable that accepts (node, data) args and returns a truthy value if the assertion should pass.

class glide.core.AsyncIOSubmit(name, _log=False, _debug=False, **default_context)[source]

Bases: glide.core.Node

A node that splits input data over an async function

get_results(futures, timeout=None)[source]
run(data, func, split_count=None, timeout=None, push_type='async')[source]

Use a asyncio to apply func to data

Parameters
  • data – An iterable to process

  • func (callable) – A async callable that will be passed data to operate on using asyncio.

  • split_count (int, optional) – How many slices to split the data into for concurrent processing. Default is to set split_count = len(data).

  • timeout (int or float, optional) – Time to wait for jobs to complete before raising an error. Ignored unless using a push_type that waits for results.

  • push_type (str, optional) – If “async”, push the Futures immediately. If “input”, push the input data immediately after task submission. If “result”, collect the result synchronously and push it.

class glide.core.ConfigContext(filename=None, var=None, key=None)[source]

Bases: glide.core.RuntimeContext

class glide.core.ContextPush(name, _log=False, _debug=False, **default_context)[source]

Bases: glide.core.Node

Pass context to downstream nodes

run(data, func, propagate=False)[source]

Pass dynamically generated context to downstream nodes

Parameters
  • data – Data being processed by the node.

  • func (callable) – A function that takes the node object and data as input and returns a context dict to be used to update/add downstream node context.

  • propagate (bool, optional) – Passed through to node.update_downstream_context()

class glide.core.GlidePipeline(node, global_state=None)[source]

Bases: consecution.pipeline.Pipeline

end()[source]

Override this method to execute any logic you want to perform after all nodes are done processing data. The .end() method of all nodes will be called.

class glide.core.Glider(*args, **kwargs)[source]

Bases: object

Main class for forming and executing pipelines. It thinly wraps Consecution’s Pipeline, but does not subclass it due to a bug in pickle that hits an infinite recursion when using multiprocessing with a super().func reference.

Parameters
  • *args – Arguments passed through to Consecution’s Pipeline class.

  • **kwargs – Keyword arguments passed through to Consecution’s Pipeline class.

pipeline

A Consecution Pipeline

cli(*script_args, blacklist=None, parents=None, inject=None, cleanup=None)[source]

Generate a decorator for this Glider that can be used to expose a CLI

Parameters
  • *script_args – Arguments to be added to the script CLI

  • blacklist (list, optional) – List of arguments to filter from appearing in the CLI

  • parents (list, optional) – List of parent CLIs to inherit from

  • inject (dict, optional) – A dictionary of arg names to functions/values that inject a value for that arg. Those args will be passed as context to nodes that can accept them in their run() method.

  • cleanup (dict, optional) – A dictionary of arg names to callables that will be used to perform clean up when the CLI script is complete.

Returns

decorator – A decorator that can be used to turn a function into a CLI “main” function.

Return type

GliderScript

consume(data=None, cleanup=None, **node_contexts)[source]

Setup node contexts and consume data with the pipeline

Parameters
  • data (iterable, optional) – Iterable of data to consume

  • cleanup (dict, optional) – A mapping of arg names to clean up functions to be run after data processing is complete.

  • **node_contexts – Keyword arguments that are node_name->param_dict

get_node_lookup()[source]

Passthrough to Consecution Pipeline._node_lookup

property global_state

Get the pipeline global_state attribute

plot(*args, **kwargs)[source]

Passthrough to Consecution Pipeline.plot

property top_node

Get the pipeline top_node attribute

class glide.core.GliderScript(glider, *script_args, blacklist=None, parents=None, inject=None, cleanup=None)[source]

Bases: tlbx.cli_utils.Script

A decorator that can be used to create a CLI from a Glider pipeline

Parameters
  • glider (Glider) – A Glider pipeline to be used to auto-generate the CLI

  • *script_args – Arguments to be added to the script CLI

  • blacklist (list, optional) – List of arguments to filter from appearing in the CLI

  • parents (list, optional) – List of parent CLIs to inherit from

  • inject (dict, optional) – A dictionary of arg names to functions/values that inject a value for that arg. Those args will be passed as context to nodes that can accept them in their run() method.

  • cleanup (dict, optional) – A dictionary of arg names to callables that will be used to perform clean up when the CLI script is complete.

blacklisted(node_name, arg_name)[source]

Determine if an argument has been blacklisted from the CLI

clean_up(**kwargs)[source]

Override Script method to do any required clean up

get_injected_kwargs()[source]

Override Script method to return populated kwargs from inject arg

class glide.core.GlobalState(**kwargs)[source]

Bases: tlbx.object_utils.MappingMixin, consecution.pipeline.GlobalState

Consecution GlobalState with more dict-like behavior

class glide.core.GroupByNode(*args, **kwargs)[source]

Bases: glide.core.Node

This approach was copied from Consecution. It batches data by key and then pushes once the key changes. For that reason it requires sorting ahead of time to function properly. It may make sense to provide different behavior.

key(data)[source]
process(data)[source]

Required method used by Consecution to process nodes

class glide.core.NoInputNode(name, _log=False, _debug=False, **default_context)[source]

Bases: glide.core.Node

A node that does not take a data positional arg in run() and is expected to generate data to be pushed

run_requires_data = False
class glide.core.Node(name, _log=False, _debug=False, **default_context)[source]

Bases: consecution.nodes.Node

Override Consecution’s Node class to add necessary functionality

Parameters
  • name (str) – The name of the Node.

  • _log (bool, optional) – If true, log data processed by the node.

  • _debug (bool, optional) – If true, drop into PDB right before calling run() for this node.

  • **default_context – Keyword args that establish the default_context of the Node. Note that this context is copy.deepcopy’d on init, so any value in default_context must be usable by deepcopy.

name

The name of the Node.

Type

str

_log

If true, log data processed by the node. Note that this overrides Consecution’s log() functionality.

Type

bool

_debug

If true, drop into PDB right before calling run() for this node.

Type

bool

default_context

A dictionary to establish default context for the node that can be used to populate run() arguments.

Type

dict

context

The current context of the Node

Type

dict

run_args

An OrderedDict of positional args to run()

Type

dict

run_kwargs

An OrderedDict of keyword args and defaults to run()

Type

dict

run_requires_data

If true, the first positional arg to run is expected to be the data to process

Type

bool

process(data)[source]

Required method used by Consecution to process nodes

reset_context()[source]

Reset context dict for this Node to the default

run(data, **kwargs)[source]

Subclasses will override this method to implement core node logic

run_requires_data = True
set_global_results(results)[source]
update_context(context)[source]

Update the context dict for this Node

update_downstream_context(context, propagate=False)[source]

Update the run context of downstream nodes

Parameters
  • context (dict) – A dict used to update the context of downstream nodes

  • propagate (bool, optional) – If true, propagate the update to all child nodes in the DAG. The default behavior is to only push updates to the immediate downstream nodes.

class glide.core.ParaGlider(*args, executor_kwargs=None, **kwargs)[source]

Bases: glide.core.Glider

Parameters
  • *args – Arguments passed through to Glider

  • executor_kwargs (dict, optional) – A dict of keyword arguments to pass to the process or thread executor

  • **kwargs – Keyword arguments passed through to Glider

pipeline

A Consecution Pipeline

executor_kwargs

A dict of keyword arguments to pass to the process or thread executor

consume(data=None, cleanup=None, split_count=None, synchronous=False, timeout=None, **node_contexts)[source]

Setup node contexts and consume data with the pipeline

Parameters
  • data (iterable, optional) – Iterable of data to consume

  • cleanup (dict, optional) – A mapping of arg names to clean up functions to be run after data processing is complete.

  • split_count (int, optional) – How many slices to split the data into for parallel processing. Default is to use executor._max_workers.

  • synchronous (bool, optional) – If False, return Futures. If True, wait for futures to complete and return their results, if any.

  • timeout (int or float, optional) – Raises a concurrent.futures.TimeoutError if __next__() is called and the result isn’t available after timeout seconds from the original call to as_completed(). Ignored if synchronous=False.

  • **node_contexts – Keyword arguments that are node_name->param_dict

get_executor()[source]

Override this method to create the parallel executor

get_results(futures, timeout=None)[source]

Override this method to get the asynchronous results

get_worker_count(executor)[source]

Override this method to get the active worker count from the executor

class glide.core.PlaceholderNode(name, _log=False, _debug=False, **default_context)[source]

Bases: glide.core.PushNode

Used as a placeholder in pipelines. Will pass values through by default

class glide.core.PoolSubmit(name, _log=False, _debug=False, **default_context)[source]

Bases: glide.core.Node

Apply a function to the data in parallel

check_data(data)[source]

Optional input data check

get_executor(**executor_kwargs)[source]

Override this to return the parallel executor

get_results(futures, timeout=None)[source]

Override this to fetch results from an asynchronous task

get_worker_count(executor)[source]

Override this to return a count of workers active in the executor

run(data, func, executor=None, executor_kwargs=None, split_count=None, timeout=None, push_type='async', **kwargs)[source]

Use a parallel executor to apply func to data

Parameters
  • data – An iterable to process

  • func (callable) – A callable that will be passed data to operate on in parallel

  • executor (Executor, optional) – If passed use this executor instead of creating one.

  • executor_kwargs (dict, optional) – Keyword arguments to pass when initalizing an executor.

  • split_count (int, optional) – How many slices to split the data into for parallel processing. Default is to set split_count = number of workers

  • timeout (int or float, optional) – Time to wait for jobs to complete before raising an error. Ignored unless using a push_type that waits for results.

  • push_type (str, optional) – If “async”, push the Futures immediately. If “input”, push the input data immediately after task submission. If “result”, collect the result synchronously and push it.

  • **kwargs – Keyword arguments passed to the executor when submitting work

shutdown_executor(executor)[source]

Override this to shutdown the executor

submit(executor, func, splits, **kwargs)[source]

Override this to submit work to the executor

class glide.core.ProcessPoolParaGlider(*args, executor_kwargs=None, **kwargs)[source]

Bases: glide.core.ParaGlider

A parallel Glider that uses a ProcessPoolExecutor to execute parallel calls to consume()

get_executor()[source]

Override this method to create the parallel executor

get_results(futures, timeout=None)[source]

Override this method to get the asynchronous results

get_worker_count(executor)[source]

Override this method to get the active worker count from the executor

class glide.core.ProcessPoolSubmit(name, _log=False, _debug=False, **default_context)[source]

Bases: glide.core.PoolSubmit

A PoolExecutor that uses ProcessPoolExecutor

get_executor(**executor_kwargs)[source]

Override this to return the parallel executor

get_results(futures, timeout=None)[source]

Override this to fetch results from an asynchronous task

get_worker_count(executor)[source]

Override this to return a count of workers active in the executor

shutdown_executor(executor, **kwargs)[source]

Override this to shutdown the executor

submit(executor, func, splits, **kwargs)[source]

Override this to submit work to the executor

class glide.core.Profile(name, _log=False, _debug=False, **default_context)[source]

Bases: glide.core.Node

A node that profiles the call to push(), thus profiling all downstream nodes

run(data, filename=None)[source]

Profiles calls to push(), thus profiling all downstream nodes

Parameters
  • data – Data to push

  • filename (str, optional) – Filename to pass to runctx() to save stats

class glide.core.PushNode(name, _log=False, _debug=False, **default_context)[source]

Bases: glide.core.Node

A node that just passes all data through in run()

run(data, **kwargs)[source]

Subclasses will override this method to implement core node logic

class glide.core.PushTypes[source]

Bases: object

The names of push strategies for nodes that support asynchronous execution

Async = 'async'
Input = 'input'
Result = 'result'
class glide.core.RuntimeContext(func, *args, **kwargs)[source]

Bases: object

A function to be executed at runtime to populate context values

Parameters
  • func (callable) – The function to execute

  • args – Positional arguments to pass to func when called

  • kwargs – Keyword arguments to pass to func when called

copy()[source]

Create a copy of this RuntimeContext referencing the same objects

class glide.core.Shell(name, _log=False, _debug=False, **default_context)[source]

Bases: glide.core.NoInputNode

Run a local shell command using subprocess.run

run(cmd, shell=False, capture_output=False, **kwargs)[source]

Run a local shell command using subprocess.run and push the return value

Parameters
  • cmd (list or str) – Shell command to run. If passing a single string, either shell must be True or else the string must simply name the program to be executed without specifying any arguments.

  • shell (bool, optional) – Arg passed through to subprocess.run

  • capture_output (bool, optional) – Arg passed through to subprocess.run

  • **kwargs – kwargs passed to subprocess.run

class glide.core.ThreadPoolParaGlider(*args, executor_kwargs=None, **kwargs)[source]

Bases: glide.core.ProcessPoolParaGlider

A parallel Glider that uses a ThreadPoolExecutor to execute parallel calls to consume()

get_executor()[source]

Override this method to create the parallel executor

class glide.core.ThreadPoolSubmit(name, _log=False, _debug=False, **default_context)[source]

Bases: glide.core.ProcessPoolSubmit

A PoolExecutor that uses ThreadPoolExecutor

get_executor(**executor_kwargs)[source]

Override this to return the parallel executor

glide.core.clean_up_nodes(cleanup, contexts)[source]

Call clean up functions for node context objects

glide.core.consume(pipeline, data, cleanup=None, **node_contexts)[source]

Handles node contexts before/after calling pipeline.consume()

Note

It would have been better to subclass Pipeline and implement this logic right before/after the core consume() call, but there is a bug in pickle that prevents that from working with multiprocessing.

glide.core.consume_none(pipeline)[source]

This mimics the behavior of Consecution’s consume() but allows for running a pipeline with no input data.

glide.core.get_node_contexts(pipeline)[source]

Get a dict of node_name->node_context from pipeline

glide.core.reset_node_contexts(pipeline, node_contexts)[source]

Helper function for resetting node contexts in a pipeline

glide.core.update_node_contexts(pipeline, node_contexts)[source]

Helper function for updating node contexts in a pipeline