glide.flow module

class glide.flow.ArraySplitByNode(name, _log=False, _debug=False, **default_context)[source]

Bases: glide.flow.SplitByNode

A node that splits the data before pushing

get_splits(data, split_count)[source]

Split the data into split_count slices

class glide.flow.ArraySplitPush(name, _log=False, _debug=False, **default_context)[source]

Bases: glide.flow.SplitPush

A node that splits the data before pushing

get_splits(data, split_count)[source]

Split the data into split_count slices

class glide.flow.AsyncIOFuturesReduce(name, _log=False, _debug=False, **default_context)[source]

Bases: glide.flow.Reduce

Collect results from asyncio futures before pushing

The following are parameters that get pulled from the node context and used in end().

Parameters
  • flatten (bool, optional) – Flatten the results into a single list before pushing

  • timeout (int or float, optional) – Timeout to pass to asyncio.wait

  • close (bool, optional) – Whether to call loop.close() after processing is done

end()[source]

Do the push once all Futures results are in

class glide.flow.DateTimeWindowPush(name, _log=False, _debug=False, **default_context)[source]

Bases: glide.core.NoInputNode

run(start_date, end_date, window_size_hours=None, num_windows=None, reverse=False, add_second=True)[source]

Subclasses will override this method to implement core node logic

class glide.flow.DateWindowPush(name, _log=False, _debug=False, **default_context)[source]

Bases: glide.core.NoInputNode

run(start_date, end_date, reverse=False)[source]

Subclasses will override this method to implement core node logic

class glide.flow.FileConcat(name, _log=False, _debug=False, **default_context)[source]

Bases: glide.core.Node

Concat a set of input files into one output file

run(files, f_out, in_flags='rb', out_flags='wb', push_input=False)[source]

Concat a set of input files into one output file

Parameters
  • f_in (file path or buffer) – File path or buffer to read

  • f_out (file path or buffer) – File path or buffer to write

  • in_flags (str, optional) – Flags to use when opening the input file

  • out_flags (str, optional) – Flags to use when opening the output file

  • push_input (bool, optional) – If true, push f_in instead of f_out

class glide.flow.FileCopy(name, _log=False, _debug=False, **default_context)[source]

Bases: glide.core.Node

Copy one file to another

run(f_in, f_out, in_flags='rb', out_flags='wb', push_input=False)[source]

Copy f_in to f_out and push file reference

Parameters
  • f_in (file path or buffer) – File path or buffer to read

  • f_out (file path or buffer) – File path or buffer to write

  • in_flags (str, optional) – Flags to use when opening the input file

  • out_flags (str, optional) – Flags to use when opening the output file

  • push_input (bool, optional) – If true, push f_in instead of f_out

class glide.flow.Flatten(name, _log=False, _debug=False, **default_context)[source]

Bases: glide.core.Node

Flatten the input before pushing

run(data)[source]

Flatten the input before pushing. Assumes data is in ~list of ~lists format

class glide.flow.FuturesPush(name, _log=False, _debug=False, **default_context)[source]

Bases: glide.core.PushNode

A node that either splits or duplicates its input to pass to multiple downstream nodes in parallel according to the executor_class that supports the futures interface. If an executor_kwargs dict is in the context of this node it will be passed to the parallel executor.

Parameters

Node documentation for parameters (See) –

executor_class[source]

An Executor that will be used to parallelize the push

as_completed_func[source]

A callable used to get the Futures results as completed

See Node documentation for additional attributes
as_completed_func(timeout=None)[source]

An iterator over the given futures that yields each as it completes.

Parameters
  • fs – The sequence of Futures (possibly created by different Executors) to iterate over.

  • timeout – The maximum number of seconds to wait. If None, then there is no limit on the wait time.

Returns

An iterator that yields the given Futures as they complete (finished or cancelled). If any given Futures are duplicated, they will be returned once.

Raises

TimeoutError – If the entire result iterator could not be generated before the given timeout.

executor_class[source]

alias of concurrent.futures.process.ProcessPoolExecutor

class glide.flow.FuturesReduce(name, _log=False, _debug=False, **default_context)[source]

Bases: glide.flow.Reduce

Collect results from futures before pushing

The following are parameters that get pulled from the node context and used in end().

Parameters
  • flatten (bool, optional) – Flatten the results into a single list before pushing

  • timeout (int or float, optional) – Timeout to pass to futures.as_completed()

end()[source]

Do the push once all Futures results are in

class glide.flow.IterPush(name, _log=False, _debug=False, **default_context)[source]

Bases: glide.core.Node

Push each item of an iterable individually

run(data, **kwargs)[source]

Subclasses will override this method to implement core node logic

class glide.flow.Join(name, _log=False, _debug=False, **default_context)[source]

Bases: glide.core.Node

Join iterables before pushing

run(data, on=None, how='left', rsuffixes=None)[source]

Join items before pushing. This converts each dataset to a DataFrame and reuses pandas join method under the hood.

Parameters
  • data – The datasets to join (i.e. a list of datasets or DataFrames)

  • on (optional) – Passed to the underlying pandas join method

  • how (str, optional) – Passed to the underlying pandas join method

  • rsuffixes (list, optional) – A list of suffixes to append to duplicate column names in the right datasets. The length of this should be len(data) - 1.

class glide.flow.PollFunc(name, _log=False, _debug=False, **default_context)[source]

Bases: glide.core.Node

run(data, func, result_param='status', result_value='success', sleep_time=2, max_iter=10, data_param=None, **kwargs)[source]

Poll a function for a result

Parameters
  • data – Data to pass to func. Typically a request or URL that needs to be polled for a result.

  • func (callable) – The function that will be called on each iteration to get a result. It is expected to return a dict with a key/value representing completion (see result_param/result_value).

  • result_param (str) – The key to extract from the func result to look for success.

  • result_value – The value representing success. Keep polling until this value is found.

  • sleep_time (float) – The amount of time to sleep between iterations

  • max_iter (int) – The maximum number of iterations before giving up

  • data_param (str, optional) – If given, pull this param out of the func result on success and push. Otherwise push the full response from func.

  • kwargs – Keyword arguments passed to func

class glide.flow.ProcessPoolPush(name, _log=False, _debug=False, **default_context)[source]

Bases: glide.flow.FuturesPush

A multi-process FuturesPushNode

class glide.flow.Reduce(name, _log=False, _debug=False, **default_context)[source]

Bases: glide.core.Node

Waits until end() to call push(), effectively waiting for all nodes before it to finish before continuing the pipeline.

The following are parameters that get pulled from the node context and used in end().

Parameters

flatten (bool, optional) – Flatten the results into a single list before pushing

begin()[source]

Setup a place for results to be collected

end()[source]

Do the push once all results are in

run(data, **kwargs)[source]

Collect results from previous nodes

class glide.flow.Return(name, _log=False, _debug=False, **default_context)[source]

Bases: glide.flow.Reduce

Collects upstream data and sets the result in the global state

Notes

Because this relies on the pipeline’s global_state under the hood it will not work with pipelines that do process branching mid-pipeline such as ProcessPoolPush.

Parameters

flatten (bool, optional) – Flatten the results into a single list before returning

end()[source]

Collects upstream data and sets the result in the global state

class glide.flow.SkipFalseNode(name, _log=False, _debug=False, **default_context)[source]

Bases: glide.core.Node

This overrides the behavior of calling run() such that if a “false” object is pushed it will never call run, just push to next node instead

class glide.flow.SplitByNode(name, _log=False, _debug=False, **default_context)[source]

Bases: glide.core.PushNode

A node that splits the data based on the number of immediate downstream nodes.

If the data is a Pandas object it will use np.array_split, otherwise it will split the iterator into chunks of roughly equal size.

get_splits(data, split_count)[source]

Split the data into split_count slices

class glide.flow.SplitPush(name, _log=False, _debug=False, **default_context)[source]

Bases: glide.core.Node

A node that splits the data before pushing.

If the data is a Pandas object it will use np.array_split, otherwise it will split the iterator into chunks of roughly equal size.

get_splits(data, split_count)[source]

Split the data into split_count slices

run(data, split_count, **kwargs)[source]

Split the data and push each slice

class glide.flow.ThreadPoolPush(name, _log=False, _debug=False, **default_context)[source]

Bases: glide.flow.FuturesPush

A multi-threaded FuturesPushNode

executor_class[source]

alias of concurrent.futures.thread.ThreadPoolExecutor

class glide.flow.ThreadReduce(name, _log=False, _debug=False, **default_context)[source]

Bases: glide.flow.Reduce

A plain-old Reducer with a name that makes it clear it works with threads

class glide.flow.WindowPush(name, _log=False, _debug=False, **default_context)[source]

Bases: glide.core.Node

run(data, size, **kwargs)[source]

Push windows of the specified size

Parameters
  • data – The data to slice into windows

  • size (int) – The window size

class glide.flow.WindowReduce(name, _log=False, _debug=False, **default_context)[source]

Bases: glide.core.Node

begin()[source]

Initialize a place for a window to be collected

run(data, size, **kwargs)[source]

Collect results to fill and push windows

Parameters
  • data – Data to collect into window

  • size (int) – Size of window to collect