A simplistic, general purpose pipeline framework, which can easily be integrated into existing (analysis) chains and workflows.
thepipe
can be installed via pip
:
pip install thepipe
- Easy to use interface and integration into existing workflows
- Modules can be either subclasses of
Module
or bare python functions - Data is passed via a simple Python dictionary from module to module (wrapped
in a class called
Blob
which adds some visual candy and error reporting) - Integrated hierarchical logging system
- Colour coded log and print messages (
self.log()
andself.cprint()
inModules
) - Performance statistics for the whole pipeline and each module individually
- Clean exit when interrupting the pipeline with CTRL+C
Here is a basic example how to create a pipeline, add some modules to it, pass some parameters and drain the pipeline.
Note that pipeline modules can either be vanilla (univariate) Python functions
or Classes which derive from thepipe.Module
.
import thepipe as tp
class AModule(tp.Module):
def configure(self):
self.cprint("Configuring AModule")
self.max_count = self.get("max_count", default=23)
self.index = 0
def process(self, blob):
self.cprint("This is cycle #%d" % self.index)
blob['index'] = self.index
self.index += 1
if self.index > self.max_count:
self.log.critical("That's enough...")
raise StopIteration
return blob
def finish(self):
self.cprint("I'm done!")
def a_function_based_module(blob):
print("Here is the blob:")
print(blob)
return blob
pipe = tp.Pipeline()
pipe.attach(AModule, max_count=5) # pass any parameters to the module
pipe.attach(a_function_based_module)
pipe.drain() # without arguments it will drain until a StopIteration is raised
This will produce the following output:
++ AModule: Configuring AModule
Pipeline and module initialisation took 0.000s (CPU 0.000s).
++ AModule: This is cycle #0
Here is the blob:
Blob (1 entries):
'index' => 0
++ AModule: This is cycle #1
Here is the blob:
Blob (1 entries):
'index' => 1
++ AModule: This is cycle #2
Here is the blob:
Blob (1 entries):
'index' => 2
++ AModule: This is cycle #3
Here is the blob:
Blob (1 entries):
'index' => 3
++ AModule: This is cycle #4
Here is the blob:
Blob (1 entries):
'index' => 4
++ AModule: This is cycle #5
CRITICAL ++ AModule: That's enough...
++ AModule: I'm done!
============================================================
5 cycles drained in 0.000793s (CPU 0.000793s). Memory peak: 20.56 MB
wall mean: 0.000063s medi: 0.000057s min: 0.000045s max: 0.000106s std: 0.000022s
CPU mean: 0.000065s medi: 0.000057s min: 0.000046s max: 0.000112s std: 0.000024s