Creating Nodes¶
To create a custom node you simply inherit from the Glide Node
class and
define a run
method that takes at least one positional argument for the data
being pushed to it. The run
method should call self.push(data)
with the
data it wants to push downstream.
Here is an example of a simple transformer node:
class ExampleTransformer(Node):
def run(self, data):
# Do something to the data here
self.push(data)
Node Context¶
Each node has a context
. This comes into play when run
is called on
the node, as the required and optional parts of the context are inferred from
the positional and keyword args of run
. Take for example:
class MyNode(Node):
def run(self, data, conn, chunksize=None, **kwargs):
# Some node-specific code here
self.push(data)
By default all nodes expect their first positional arg to be the data going
through the pipeline. This node also requires a conn
argument, and has an
optional chunksize
argument. These values can be filled in from the
following inputs in priority order, with earlier methods overriding those
further down the list:
Context args passed to
consume
for the particular node:conn = get_my_db_conn() glider.consume( data, my_node=dict(conn=conn, chunksize=100) )
Default context set on the node at init time:
conn = get_my_db_conn() glider = Glider( MyNode("my_node", conn=conn, chunksize=100) )
Global pipeline state passed via
global_state
. This only works for populating positional args currently:conn = get_my_db_conn() glider = Glider( MyNode("my_node"), global_state=dict(conn=conn) )
Additionally, you can update the context of nodes at runtime by using the
update_context
or update_downstream_context
node methods.
Runtime Context¶
Sometimes it is useful or necessary to fill in node context values at
runtime. A prime example is when using SQL-based nodes in a parallel
processing context. Since the database connection objects can not be pickled
and passed to the spawned processes you need to establish the connection
within the subprocess. Glide has a special RuntimeContext
class for this
purpose. Any callable wrapped as a RuntimeContext will not be called until
consume
is called. In the example below, get_pymysql_conn
will be executed
in a subprocess to fill in the “conn” context variable for the “extract” node:
glider = ProcessPoolParaGlider(
SQLExtract("extract")
| PrettyPrint("load")
)
glider.consume(
[sql],
extract=dict(
conn=RuntimeContext(get_pymysql_conn),
cursor_type=pymysql.cursors.DictCursor,
)
)
In this case it is also necessary to specify the cursor_type so SQLExtract
can create a dict-based cursor for query execution within the subprocess as
required by SQLExtract
. Any args/kwargs passed to RuntimeContext will be
passed to the function when called.
Config Context¶
ConfigContext
is an alternative type of RuntimeContext
that can read a
config file to fill in node context. It supports reading from JSON, INI, or
YAML config files and optionally extracting specific data from the file. The
following shows an example of reading a key (“nrows”) from a JSON structure:
glider = Glider(
CSVExtract("extract", nrows=ConfigContext("myconfig.json", key="nrows"))
| Print("load")
)
glider.consume(...)
As another example, the following reads from an INI file and also passes a
callable for the key
parameter to extract a value from the config:
glider = Glider(
CSVExtract("extract", nrows=ConfigContext(
"myconfig.ini", key=lambda x: int(x["Section"]["nrows"])
))
| Print("load")
)
glider.consume(...)
If no value is specified for key
, the entire config file is
returned. ConfigContext
may be particularly useful when you want to load
sensitive information such as API login details that you would not want to
store in your code.
Cleaning Up¶
Sometimes it is also necessary to call clean up functionality after processing
is complete. Sticking with the example above that utilizes SQL-based nodes in
a parallel processing context, you’ll want to explicitly close your database
connections in each subprocess. The consume
method accepts a cleanup
argument that is a dictionary mapping argument names to cleaner functions. The
following example tells the Glider
to call the function closer
with the
value from extract_conn
once consume
is finished. Note that closer
is a
convenience function provided by Glide that just calls close
on the given
object.:
glider = ProcessPoolParaGlider(
SQLExtract("extract")
| PrettyPrint("load")
)
glider.consume(
[sql],
cleanup=dict(extract_conn=closer),
extract=dict(
conn=RuntimeContext(get_pymysql_conn),
cursor_type=pymysql.cursors.DictCursor,
)
)
The keys of the cleanup
dict can either be explicit (node name prefixed) or
more generic arg names that will map that function to every node that has that
arg in its run
method signature (so just “conn=” would have worked
too). It’s often better to be explicit as shown here.
> Note: In single-process cases the use of cleanup
is usually not
necessary, as you often have access to the objects you need to clean up in the
main process and can just do normal clean up there with context managers or
explicit calls to close
methods.
Common Nodes¶
Glide
comes with a suite of nodes to handle common data processing
tasks. The easiest way to view the options and understand their behavior is to
peruse the module documentation and/or review the source code for each node.
For extractor nodes, such as SQL/CSV/Excel/File/URL extractors, see
glide.extract
.For transformer nodes, see
glide.transform
.For filter nodes, see
glide.filter
.For loader nodes, such as SQL/CSV/Excel/File/URL loaders, see
glide.load
.For some additional flow control nodes see
glide.core
.
Keep in mind it’s very easy to write your own nodes. If you don’t see something you want, or you want slightly different behavior, create your own node. If you think it’s something that could benefit other users please contribute!