Creating Pipelines¶
A Glider
is a pipeline of Node
objects wired together in a DAG. It
accepts input data in its consume
method. For example:
glider = Glider(
MyExtractNode("extract")
| MyTransformNode("transform")
| MyLoadNode("load")
)
glider.consume(data)
The consume
method accepts node_name -> node_context keyword arguments
that can update the context of the pipeline’s nodes for the duration of the
consume
call. For example, if MyLoadNode
in the example above had
an argument called foo
in its run
method, you could set the value of
foo
for a particular pipeline run as follows:
glider.consume(data, load=dict(foo="bar"))
Pipelines can be templated as well for easy reuse. Any node can be replaced by name:
glider = Glider(
PlaceholderNode("extract")
| CSVLoad("load")
)
glider["extract"] = CSVExtract("extract")
glider.consume(...)
Or reuse an existing pipeline structure with GliderTemplate
:
template = GliderTemplate(
CSVExtract("extract")
| CSVLoad("load")
)
glider = template() # Copy of pipeline created with each call
glider.consume(...)
Complex Pipelines¶
Glide’s Node
class has an add_downstream
method that it inherits from
Consecution’s Node class. You can use this to form more complex topologies, such
as in the following example:
def parity_router(num):
if num % 2 == 0:
return "even"
return "odd"
def threshold_router(num):
prepend = "odd"
if num % 2 == 0:
prepend = "even"
if num >= 10:
return "%s_large" % prepend
return "%s_small" % prepend
glider = Glider(
CSVExtract("extract", nrows=40)
| IterPush("iter")
| [
parity_router,
(
Print("even")
| [threshold_router, Print("even_large"), Print("even_small")]
),
(
Print("odd")
| [threshold_router, Print("odd_large"), Print("odd_small")]
),
]
)
large = Print("large")
small = Print("small")
reducer = Reduce("reduce")
combined = LenPrint("combined")
large.add_downstream(reducer)
small.add_downstream(reducer)
reducer.add_downstream(combined)
glider["even_large"].add_downstream(large)
glider["odd_large"].add_downstream(large)
glider["even_small"].add_downstream(small)
glider["odd_small"].add_downstream(small)
glider.consume(range(20))
glider.plot("pipeline.png") # View hierarchy if you have GraphViz installed
This also takes advantage of Consecution’s router functionality to use
parity_router
and threshold_router
to steer data through the pipeline.
CLI Generation¶
With Glide you can create parameterized command line scripts from any pipeline with a simple decorator:
glider = Glider(
SQLLoad("extract")
| SQLExtract("load")
)
@glider.cli()
def main(glide_data, node_contexts):
glider.consume(glide_data, **node_contexts)
if __name__ == "__main__":
main()
The script arguments, their types, and whether they are required or not is all
inferred by inspecting the run
arguments on the nodes of the pipeline and
prefixing the node name. For example, SQLLoad
requires a conn
and a
table
argument, as well as having a few optional arguments. Since the node
is named “load”, the CLI will automatically generate required args called
--load_conn
and --load_table
. Additionally, the default help strings are
extracted from the run() method documentation if you use numpy docstring
format.
By default, the first positional argument(s) expected on the CLI are used to
populate the glide_data
argument. If the top node of your pipeline is a
subclass of NoInputNode
then the CLI will automatically skip the
glide_data
CLI arg and not try to pass any data as the first positional
argument to the wrapped function.
Let’s ignore the fact that you can’t pass a real database connection object on the command line for a second and see how you would run this script:
$ python my_script.py "select * from input_table limit 10" \
--extract_conn foo \
--load_conn bar \
--load_table output_table
To pass multiple inputs to glide_data
you would simply use space-separated
positional arguments:
$ python my_script.py "sql query 1" "sql query 2" \
--extract_conn foo \
--load_conn bar \
--load_table output_table
One way to populate the conn
arguments of pipeline nodes is to define it in
the global_state
or in the node initialization calls. In either case it is
no longer considered a required command line argument. So the following
would work:
glider = Glider(
SQLExtract("extract")
| SQLLoad("load"),
global_state=dict(conn=get_my_db_conn())
)
$ python my_script.py "select * from input_table limit 10" \
--load_table output_table
Blacklisting Args¶
In the previous example it is no longer necessary to even have the node-specific connection arguments show up on the command line (such as in –help output). You can blacklist the arg from ever getting put into the CLI as follows:
@glider.cli(blacklist=["conn"])
def main(glide_data, node_contexts):
glider.consume(glide_data, **node_contexts)
Or, if you just wanted to blacklist an argument that appears in multiple nodes
from a single node (such as the conn
argument required in both the extract
and load nodes in this example), you could be more explicit and prefix the
node name:
@glider.cli(blacklist=["load_conn"])
def main(glide_data, node_contexts):
glider.consume(glide_data, **node_contexts)
That would remove load_conn
from the CLI but not extract_conn
.
Custom Arguments¶
You can also override or add any argument you want using the Arg
class which
takes the standard argparse
arguments:
from glide.core import Glider, Arg
glider = ...
@glider.cli(Arg("--load_table", required=False, default="output_table"))
def main(glide_data, node_contexts):
glider.consume(glide_data, **node_contexts)
And now, assuming you had used the Glider
with conn
passed in the
global_state
, you could simply do:
$ python my_script.py "select * from input_table limit 10"
You can override the glide_data
positional argument in this way too if you want to
change the type/requirements:
@glider.cli(Arg("glide_data", type=str, default="some default sql query"))
def main(glide_data, node_contexts):
glider.consume(glide_data, **node_contexts)
You can also override some of the naming of specific node arguments to potentially
simplify your CLI. You can use the argparse dest
param to have an arg point
at a specific context element. Here we name the custom arg table
and have it
fill in the value of load_table
which ends up being a run
argument of the
“load” node:
@glider.cli(Arg("--table", dest="load_table", default="output_table"))
def main(glide_data, node_contexts):
glider.consume(glide_data, **node_contexts)
If your custom args match a node’s run
args exactly, they can be used to
fill in that value in the node context, potentially across multiple nodes if
many have the same arg name. We saw similar behavior with the conn
argument
on the global_state
above, but here is a more specific example sticking
with the table
custom arg:
@glider.cli(Arg("--table", default="output_table"))
def main(glide_data, node_contexts, table=None):
glider.consume(glide_data, **node_contexts)
Notice that load_table
is not targeted specifically, but its context will
be filled in by the value of table
on the CLI because the name of the CLI
arg exactly matches the name of an arg in that node’s run
method. Also
notice that table
is now passed as a keyword argument to main
. Any
custom or injected args that do not exactly match a node name qualified CLI
arg (such as “load_table”) will be included as keyword arguments to main
.
Note
Due to a known issue in argparse, even if you define an arg as required it will still show up in the optional arguments section of the help output if it has a dash or double-dash at the start of the arg name.
Argument Injection and Clean Up¶
The script decorator also has the ability to inject values into arguments based on the result of a function, and call clean up functions for the various injected arguments. The following example shows two useful cases:
def get_data():
# do something to populate data iterable
return data
@glider.cli(
Arg("--load_table", required=False, default="output_table")
inject=dict(glide_data=get_data, conn=get_my_db_conn),
cleanup=dict(conn=lambda x: x.close()),
)
def main(glide_data, node_contexts, **kwargs):
glider.consume(glide_data, **node_contexts)
Here we use the inject
decorator argument and pass a dictionary that maps
injected argument names to functions that return the values. We inject a
glide_data
arg and a conn
arg and neither are necessary for the
command line. This automatically blacklists those args from the command line
as well. Since we added the load_table
arg and gave it a default as well,
we can now simply run:
$ python my_script.py
Note
Injected args are also passed to the wrapped function as keyword args if they do not exactly match a node name qualified CLI arg.
Note
If an injected argument name is mapped to a non-function via
inject
the value will be used as is. The main difference is those values are
interpreted as soon as the module is loaded (when the decorator is init’d). If
that is not desirable, pass a function as shown above which will only be
executed once the decorated function is actually called. Injected
RuntimeContexts and other objects that are not a types.FunctionType
or
functools.partial
are passed through as-is.
The cleanup
decorator argument takes a dictionary that maps argument names to
callables that accept the argument value to perform some clean up. In this
case, it closes the database connection after the wrapped method is complete.
Boolean Args¶
Node run
args whose default is a boolean value will be converted to boolean
flags on the CLI. If the default is True
, the flag will invert the logic of
the flag and prepend no_
to the beginning of the arg name for clarity.
For example, the SQLLoad
node has a run
keyword arg with a default of
commit=True
. Assuming this node was named load
, this will produce a CLI
flag --load_no_commit
which, when passed in a terminal, will set
commit=False
in the node. If the default had been False
the CLI arg name
would have simply been --load_commit
and it would set the value to True
when passed in a terminal.
This leads to more clear CLI behavior as opposed to having a flag with a
truth-like name getting a false-like result when passed in a terminal. Of
course another option would have been to define the node keyword arg as
no_commit=False
instead of commit=True
. This would also lead to
understandable CLI behavior but, in my opinion, would lead to more confusing
variable naming in your code.
Parent CLIs¶
If you want to inherit or share arguments you can accomplish that using the
Parent
and Arg
decorators together. These are using
climax. under the hood, which
is utilizing argparse
. For example, the following script inherits a
--dry_run
boolean CLI flag:
from glide.core import Parent, Arg
@Parent()
@Arg("--dry_run", action="store_true")
def parent_cli():
pass
@glider.cli(parents=[parent_cli])
def main(glide_data, dry_run=False, node_contexts):
if dry_run:
something_else()
else:
glider.consume(glide_data, **node_contexts)
Parallel Processing¶
There are three main ways you can attempt parallel processing using Glide:
Method 1: Parallelization within nodes such as
ProcessPoolSubmit
or a distributed processing extension such as Dask/Celery/Redis QueueMethod 2: Completely parallel pipelines via
ParaGliders
(each process executes the entire pipeline)Method 3: Branched parallelism using parallel push nodes such as
ProcessPoolPush
orThreadPoolPush
Each has its own use cases. Method 1 is perhaps the most straightforward since you can return to single process operation after the node is done doing whatever it needed to do in parallel. Method 2 may be useful and easy to understand in certain cases as well. Method 3 can lead to more complex/confusing flows and should probably only be used towards the end of pipelines to branch the output in parallel, such as if writing to several databases in parallel as a final step.
Please see the quickstart or tests for examples of each method.
Note: Combining the approaches may not work and has not been tested. Standard limitations apply regarding what types of data can be serialized and passed to a parallel process.
Common Pipelines¶
Glide
comes with some common, templated ETL pipelines that connect
combinations of common nodes. The names are generally of the format
“Source2Destination”. The names of the available pipelines are listed
in the glide.pipelines
module documentation.
To use these pipelines, simply call the template to get an instance of a Glider, such as:
glider = File2Email()
glider.consume([file1, file2], load=dict(client=my_smtp_cient))
By default these templated pipelines have a PlaceholderNode
named
“transform” that you can easily replace once the glider is created:
glider["transform"] = MyTransformerNode("transform")
glider.consume(...)
You can also override the Glider
class used to create the pipeline:
glider = File2Email(glider=ProcessPoolParaGlider)
All of these templated pipelines are simply a convenience and are meant to cover very simple cases. More often than not it’s likely best to create your own explicit pipelines.