Pypeline is a simple yet powerful python library for creating concurrent data pipelines.
- Pypeline was designed to solve simple medium data tasks that require concurrency and parallelism but where using frameworks like Spark or Dask feel exaggerated or unnatural.
- Pypeline exposes an easy to use, familiar, functional API.
- Pypeline enables you to build pipelines using Processes, Threads and asyncio.Tasks via the exact same API.
- Pypeline allows you to have control over the memory and cpu resources used at each stage of your pipeline.
Install Pypeline using pip:
pip install pypeln
With Pypeline you can easily create multi-stage data pipelines using 3 type of workers:
You can create a pipeline based on multiprocessing.Process workers by using the pr
module:
from pypeln import pr
import time
from random import random
def slow_add1(x):
time.sleep(random()) # <= some slow computation
return x + 1
def slow_gt3(x):
time.sleep(random()) # <= some slow computation
return x > 3
data = range(10) # [0, 1, 2, ..., 9]
stage = pr.map(slow_add1, data, workers = 3, maxsize = 4)
stage = pr.filter(slow_gt3, stage, workers = 2)
data = list(stage) # e.g. [5, 6, 9, 4, 8, 10, 7]
At each stage the you can specify the numbers of workers
. The maxsize
parameter limits the maximum amount of elements that the stage can hold simultaneously.
You can create a pipeline based on threading.Thread workers by using the th
module:
from pypeln import th
import time
from random import random
def slow_add1(x):
time.sleep(random()) # <= some slow computation
return x + 1
def slow_gt3(x):
time.sleep(random()) # <= some slow computation
return x > 3
data = range(10) # [0, 1, 2, ..., 9]
stage = th.map(slow_add1, data, workers = 3, maxsize = 4)
stage = th.filter(slow_gt3, stage, workers = 2)
data = list(stage) # e.g. [5, 6, 9, 4, 8, 10, 7]
Here we have the exact same situation as in the previous case except that the worker are Threads.
You can create a pipeline based on asyncio.Task workers by using the io
module:
from pypeln import io
import asyncio
from random import random
async def slow_add1(x):
await asyncio.sleep(random()) # <= some slow computation
return x + 1
async def slow_gt3(x):
await asyncio.sleep(random()) # <= some slow computation
return x > 3
data = range(10) # [0, 1, 2, ..., 9]
stage = io.map(slow_add1, data, workers = 3, maxsize = 4)
stage = io.filter(slow_gt3, stage, workers = 2)
data = list(stage) # e.g. [5, 6, 9, 4, 8, 10, 7]
Conceptually similar but everything is running in a single thread and Task workers are created dynamically.
For more information see the Pypeline Guide.
In the spirit of being a true pipeline library, Pypeline also lets you create your pipelines using the pipe |
operator:
data = (
range(10)
| pr.map(slow_add1, workers = 3, maxsize = 4)
| pr.filter(slow_gt3, workers = 2)
| list
)