glide.core module¶
Core classes used to power pipelines
-
class
glide.core.
AssertFunc
(name, _log=False, _debug=False, **default_context)[source]¶ Bases:
glide.core.Node
-
class
glide.core.
AsyncIOSubmit
(name, _log=False, _debug=False, **default_context)[source]¶ Bases:
glide.core.Node
A node that splits input data over an async function
-
run
(data, func, split_count=None, timeout=None, push_type='async')[source]¶ Use a asyncio to apply func to data
- Parameters
data – An iterable to process
func (callable) – A async callable that will be passed data to operate on using asyncio.
split_count (int, optional) – How many slices to split the data into for concurrent processing. Default is to set split_count = len(data).
timeout (int or float, optional) – Time to wait for jobs to complete before raising an error. Ignored unless using a push_type that waits for results.
push_type (str, optional) – If “async”, push the Futures immediately. If “input”, push the input data immediately after task submission. If “result”, collect the result synchronously and push it.
-
-
class
glide.core.
ConfigContext
(filename=None, var=None, key=None)[source]¶ Bases:
glide.core.RuntimeContext
-
class
glide.core.
ContextPush
(name, _log=False, _debug=False, **default_context)[source]¶ Bases:
glide.core.Node
Pass context to downstream nodes
-
run
(data, func, propagate=False)[source]¶ Pass dynamically generated context to downstream nodes
- Parameters
data – Data being processed by the node.
func (callable) – A function that takes the node object and data as input and returns a context dict to be used to update/add downstream node context.
propagate (bool, optional) – Passed through to node.update_downstream_context()
-
-
class
glide.core.
GlidePipeline
(node, global_state=None)[source]¶ Bases:
consecution.pipeline.Pipeline
-
class
glide.core.
Glider
(*args, **kwargs)[source]¶ Bases:
object
Main class for forming and executing pipelines. It thinly wraps Consecution’s Pipeline, but does not subclass it due to a bug in pickle that hits an infinite recursion when using multiprocessing with a super().func reference.
- Parameters
*args – Arguments passed through to Consecution’s Pipeline class.
**kwargs – Keyword arguments passed through to Consecution’s Pipeline class.
-
pipeline
¶ A Consecution Pipeline
-
cli
(*script_args, blacklist=None, parents=None, inject=None, cleanup=None)[source]¶ Generate a decorator for this Glider that can be used to expose a CLI
- Parameters
*script_args – Arguments to be added to the script CLI
blacklist (list, optional) – List of arguments to filter from appearing in the CLI
parents (list, optional) – List of parent CLIs to inherit from
inject (dict, optional) – A dictionary of arg names to functions/values that inject a value for that arg. Those args will be passed as context to nodes that can accept them in their run() method.
cleanup (dict, optional) – A dictionary of arg names to callables that will be used to perform clean up when the CLI script is complete.
- Returns
decorator – A decorator that can be used to turn a function into a CLI “main” function.
- Return type
-
consume
(data=None, cleanup=None, **node_contexts)[source]¶ Setup node contexts and consume data with the pipeline
- Parameters
data (iterable, optional) – Iterable of data to consume
cleanup (dict, optional) – A mapping of arg names to clean up functions to be run after data processing is complete.
**node_contexts – Keyword arguments that are node_name->param_dict
-
property
global_state
¶ Get the pipeline global_state attribute
-
property
top_node
¶ Get the pipeline top_node attribute
-
class
glide.core.
GliderScript
(glider, *script_args, blacklist=None, parents=None, inject=None, cleanup=None)[source]¶ Bases:
tlbx.cli_utils.Script
A decorator that can be used to create a CLI from a Glider pipeline
- Parameters
glider (Glider) – A Glider pipeline to be used to auto-generate the CLI
*script_args – Arguments to be added to the script CLI
blacklist (list, optional) – List of arguments to filter from appearing in the CLI
parents (list, optional) – List of parent CLIs to inherit from
inject (dict, optional) – A dictionary of arg names to functions/values that inject a value for that arg. Those args will be passed as context to nodes that can accept them in their run() method.
cleanup (dict, optional) – A dictionary of arg names to callables that will be used to perform clean up when the CLI script is complete.
-
class
glide.core.
GlobalState
(**kwargs)[source]¶ Bases:
tlbx.object_utils.MappingMixin
,consecution.pipeline.GlobalState
Consecution GlobalState with more dict-like behavior
-
class
glide.core.
GroupByNode
(*args, **kwargs)[source]¶ Bases:
glide.core.Node
This approach was copied from Consecution. It batches data by key and then pushes once the key changes. For that reason it requires sorting ahead of time to function properly. It may make sense to provide different behavior.
-
class
glide.core.
NoInputNode
(name, _log=False, _debug=False, **default_context)[source]¶ Bases:
glide.core.Node
A node that does not take a data positional arg in run() and is expected to generate data to be pushed
-
run_requires_data
= False¶
-
-
class
glide.core.
Node
(name, _log=False, _debug=False, **default_context)[source]¶ Bases:
consecution.nodes.Node
Override Consecution’s Node class to add necessary functionality
- Parameters
name (str) – The name of the Node.
_log (bool, optional) – If true, log data processed by the node.
_debug (bool, optional) – If true, drop into PDB right before calling run() for this node.
**default_context – Keyword args that establish the default_context of the Node. Note that this context is copy.deepcopy’d on init, so any value in default_context must be usable by deepcopy.
-
name
¶ The name of the Node.
- Type
str
-
_log
¶ If true, log data processed by the node. Note that this overrides Consecution’s log() functionality.
- Type
bool
-
_debug
¶ If true, drop into PDB right before calling run() for this node.
- Type
bool
-
default_context
¶ A dictionary to establish default context for the node that can be used to populate run() arguments.
- Type
dict
-
context
¶ The current context of the Node
- Type
dict
-
run_args
¶ An OrderedDict of positional args to run()
- Type
dict
-
run_kwargs
¶ An OrderedDict of keyword args and defaults to run()
- Type
dict
-
run_requires_data
¶ If true, the first positional arg to run is expected to be the data to process
- Type
bool
-
run_requires_data
= True
-
update_downstream_context
(context, propagate=False)[source]¶ Update the run context of downstream nodes
- Parameters
context (dict) – A dict used to update the context of downstream nodes
propagate (bool, optional) – If true, propagate the update to all child nodes in the DAG. The default behavior is to only push updates to the immediate downstream nodes.
-
class
glide.core.
ParaGlider
(*args, executor_kwargs=None, **kwargs)[source]¶ Bases:
glide.core.Glider
- Parameters
*args – Arguments passed through to Glider
executor_kwargs (dict, optional) – A dict of keyword arguments to pass to the process or thread executor
**kwargs – Keyword arguments passed through to Glider
-
pipeline
¶ A Consecution Pipeline
-
executor_kwargs
¶ A dict of keyword arguments to pass to the process or thread executor
-
consume
(data=None, cleanup=None, split_count=None, synchronous=False, timeout=None, **node_contexts)[source]¶ Setup node contexts and consume data with the pipeline
- Parameters
data (iterable, optional) – Iterable of data to consume
cleanup (dict, optional) – A mapping of arg names to clean up functions to be run after data processing is complete.
split_count (int, optional) – How many slices to split the data into for parallel processing. Default is to use executor._max_workers.
synchronous (bool, optional) – If False, return Futures. If True, wait for futures to complete and return their results, if any.
timeout (int or float, optional) – Raises a concurrent.futures.TimeoutError if __next__() is called and the result isn’t available after timeout seconds from the original call to as_completed(). Ignored if synchronous=False.
**node_contexts – Keyword arguments that are node_name->param_dict
-
class
glide.core.
PlaceholderNode
(name, _log=False, _debug=False, **default_context)[source]¶ Bases:
glide.core.PushNode
Used as a placeholder in pipelines. Will pass values through by default
-
class
glide.core.
PoolSubmit
(name, _log=False, _debug=False, **default_context)[source]¶ Bases:
glide.core.Node
Apply a function to the data in parallel
-
get_results
(futures, timeout=None)[source]¶ Override this to fetch results from an asynchronous task
-
get_worker_count
(executor)[source]¶ Override this to return a count of workers active in the executor
-
run
(data, func, executor=None, executor_kwargs=None, split_count=None, timeout=None, push_type='async', **kwargs)[source]¶ Use a parallel executor to apply func to data
- Parameters
data – An iterable to process
func (callable) – A callable that will be passed data to operate on in parallel
executor (Executor, optional) – If passed use this executor instead of creating one.
executor_kwargs (dict, optional) – Keyword arguments to pass when initalizing an executor.
split_count (int, optional) – How many slices to split the data into for parallel processing. Default is to set split_count = number of workers
timeout (int or float, optional) – Time to wait for jobs to complete before raising an error. Ignored unless using a push_type that waits for results.
push_type (str, optional) – If “async”, push the Futures immediately. If “input”, push the input data immediately after task submission. If “result”, collect the result synchronously and push it.
**kwargs – Keyword arguments passed to the executor when submitting work
-
-
class
glide.core.
ProcessPoolParaGlider
(*args, executor_kwargs=None, **kwargs)[source]¶ Bases:
glide.core.ParaGlider
A parallel Glider that uses a ProcessPoolExecutor to execute parallel calls to consume()
-
class
glide.core.
ProcessPoolSubmit
(name, _log=False, _debug=False, **default_context)[source]¶ Bases:
glide.core.PoolSubmit
A PoolExecutor that uses ProcessPoolExecutor
-
get_results
(futures, timeout=None)[source]¶ Override this to fetch results from an asynchronous task
-
-
class
glide.core.
Profile
(name, _log=False, _debug=False, **default_context)[source]¶ Bases:
glide.core.Node
A node that profiles the call to push(), thus profiling all downstream nodes
-
class
glide.core.
PushNode
(name, _log=False, _debug=False, **default_context)[source]¶ Bases:
glide.core.Node
A node that just passes all data through in run()
-
class
glide.core.
PushTypes
[source]¶ Bases:
object
The names of push strategies for nodes that support asynchronous execution
-
Async
= 'async'¶
-
Input
= 'input'¶
-
Result
= 'result'¶
-
-
class
glide.core.
RuntimeContext
(func, *args, **kwargs)[source]¶ Bases:
object
A function to be executed at runtime to populate context values
- Parameters
func (callable) – The function to execute
args – Positional arguments to pass to func when called
kwargs – Keyword arguments to pass to func when called
-
class
glide.core.
Shell
(name, _log=False, _debug=False, **default_context)[source]¶ Bases:
glide.core.NoInputNode
Run a local shell command using subprocess.run
-
run
(cmd, shell=False, capture_output=False, **kwargs)[source]¶ Run a local shell command using subprocess.run and push the return value
- Parameters
cmd (list or str) – Shell command to run. If passing a single string, either shell must be True or else the string must simply name the program to be executed without specifying any arguments.
shell (bool, optional) – Arg passed through to subprocess.run
capture_output (bool, optional) – Arg passed through to subprocess.run
**kwargs – kwargs passed to subprocess.run
-
-
class
glide.core.
ThreadPoolParaGlider
(*args, executor_kwargs=None, **kwargs)[source]¶ Bases:
glide.core.ProcessPoolParaGlider
A parallel Glider that uses a ThreadPoolExecutor to execute parallel calls to consume()
-
class
glide.core.
ThreadPoolSubmit
(name, _log=False, _debug=False, **default_context)[source]¶ Bases:
glide.core.ProcessPoolSubmit
A PoolExecutor that uses ThreadPoolExecutor
-
glide.core.
clean_up_nodes
(cleanup, contexts)[source]¶ Call clean up functions for node context objects
-
glide.core.
consume
(pipeline, data, cleanup=None, **node_contexts)[source]¶ Handles node contexts before/after calling pipeline.consume()
Note
It would have been better to subclass Pipeline and implement this logic right before/after the core consume() call, but there is a bug in pickle that prevents that from working with multiprocessing.
-
glide.core.
consume_none
(pipeline)[source]¶ This mimics the behavior of Consecution’s consume() but allows for running a pipeline with no input data.