jacg / liquidata Goto Github PK
View Code? Open in Web Editor NEWEDSL for data pipelines in Python
Home Page: https://jacg.github.io/liquidata/
License: MIT License
EDSL for data pipelines in Python
Home Page: https://jacg.github.io/liquidata/
License: MIT License
The name 'dataflow' comes up with lots of hits on PyPI. Before uploading to PyPI we need to come up with a new, distinguishing and not too ugly name for this package. Take inspiration from ideas such as
This is just a reminder.
We need to specify all the dependencies for each way.
Currently we have to say
df.push(pipe = df.pipe(A, B, C), ...)
It should be possible to say
df.push(pipe = (A, B, C), ...)
instead.
In other words, the implicit pipe mechanism which already exists withn fork
and branch
should also be usable here.
The current implementation uses coroutines to push data through the pipeline.
This was motivated by the context in which the ancestor of liquidata
was written, where a single input stream was required be split into multiple independent output streams fed into separate sinks.
Let's take a step back and ask some questions about this choice: Do we need to push? Do we need coroutines? Why? When? What are the consequences? What alternatives are there?
Let's consider a very simple pipeline, consisting of a linear sequence of maps:
pipe(f, g, h)
This could be implemented using any of
Function composition is the simplest, so why bother with the others?
Let's throw in a filter or a join:
pipe(f, { p }, g, h)
pipe(f, join, g, h)
Function composition no longer cuts the mustard, because there is no longer a 1-to-1 correspendence between items in the input and output streams: something is needed to shrink or expand the stream.
A different complication, branches:
pipe(f, [x, y, z], g, h)
It's difficult to see how function composition and generators could deal with this.
That last pipe describes a graph that looks something like this:
x -- y -- z
/
source -- f -- g -- h
How about
sourceA -- a
\
g --- h
/
sourceB -- b
Generators can deal with this easily:
map(h, map(g, map(a, sourceA)
map(b, sourceB)))
but it's not obvious how this would work for function composition or coroutines.
liquidata
syntax for multiple sourcesWhat would the liquidata
syntax for this look like?
Ideally we'd have another kind of bracket (we've exhausted the possibilities that Python offers: ()
, []
, {}
). Let's imagine that <>
are valid syntactic brackets:
pipe(b, <sourceA, a>, g, h) # join two sources
pipe(b, [a, out.A], g, h) # branch out into two sinks
Working with syntax available in Python, how about:
pipe(b, source(sourceA, a), g, h)
Recall that the following are already synonymous in liquidata
pipe(f)(data)
pipe(source << data, f)
pipe(data >> source, f)
pipe(source(data), f)
so the following could work
pipe(source << sourceB, b, (source << sourceA , a), g, h)
liquidata
used to have a input-variable syntax (called slots) in its earlier prototype. If something like it were resurrected, we could write something along the lines of
fn = pipe(source << slot.B, b, (source << slot.A, a), g, h)
fn(A=sourceA, B=sourceB)
In liquidata
[]
-branches are called independent, because there is absolutely no synchronization between them (in contrast with named branches which are based on namespaces flowing through the pipe and managed with name
, get
and put
). Once the stream has split, the branches can change the length of the stream without the others knowing or caring about it.
We would need to think about the synchronization consequences for multiple independent input streams. I guess that the answer is: it's up to the user to make sure something sensible happens.
close_all
Consider the following in current liquidata
pipe(source << itertools.count(), take(3))
pipe(source << itertools.count(), take(3, close_all=True))
Because of the pull-architecture, the first never returns. In a pull architecture the issue doesn't arise.
I would like to remove the universal reliance on coroutines, with two main goals
Enabling things that were impossible before.
Simplifying portions of the code which don't need coroutines.
Currently, name clashes in the output namespace are resolved by creating a tuple of all the results that clash. This is not The Right Thing. Think about what should be done instead.
#18 might be part of the solution, or it might not.
It is a major goal of the project to issue excellent error messages.
No work has been done on this yet.
The way forward is probably to separate the compilation of the network into coroutines into separate phases including
Some optimization phases should be added eventually too.
This will tie in with other work on alternative architectures (push vs pull, pure composition, etc.).
Currently we have
pipe([out.Y], out.X) -> Namespace(Y=<branch output>, X=<main output>)
which is great, but then
pipe([out.X], out.X) -> Namespace(X=(<branch output>, <main output>))
which is not so good.
You might say "so don't use the same output name twice in the same network!" but that's easier said than done:
abstraction = [out.X]
pipe(abstraction, out.X)
Might these issues be mitigated by enabling hierarchical naming for branches?
Maybe something like
pipe(out.B([out.X]), out.X) -> Namespace(X=<main out>, B=Namespace(X=<branch out>)
This might form part of a solution to #17.
There are a bunch of TODOs left from the first version of the code related to the testing suite that shall be addressed in upcoming PRs
fork
fork
accepts pipes as arguments, but tuples can be interpreted as pipes and converted implicitly. This feature is not tested.count_filter
spy_count
pipe
)string_to_pick
and its usageslice
with close_all = True
slice
takes an optional argument close_all
to decide whether to stop the full pipeline when it runs out of values. The functionality for close_all = True
is not tested.Currently out
can be used in the following ways
out
(shorthand for out(into(list))
)out(<binary-fold-function>)
out(<binary-fold-function>, <initial-value>)
out(into(<consumer-of-iterables>))
.<name>
appended to out
.out
at the end of a pipe, it defaults to out(into(list))
into
from out
the situation becomes
out
(shorthand for into(list)
)out(<binary-fold-function>)
out(<binary-fold-function>, <initial-value>)
into(<consumer-of-iterables>)
.<name>
appended to out
or into
.out
or into
at the end of a pipe, it defaults to into(list)
into
in an out
, by shortening out(into(thing))
-> into(thing)
.Previously, every return value (except implicit ones at the end, or ones hidden inside abstractions) were marked with an out
. Now they would be marked by out
or into
.
More generally, the universal nature of out
is lost. If we ever invent more outputs (e.g. #18) then the lack of a universal out
might require more ad-hoc implementation, and avoiding this is a core design principle of liquidata
.
fold | consume iterable | |
---|---|---|
Current situation | out |
out(into ...) |
Proposal 1 | out |
into |
Option A | fold |
into |
Option B | fold |
out |
The error messages that appear when something goes wrong in a pipeline, are often difficult to understand.
We should try to detect as many problems as possible at the time graphs are constructed, rather than waiting for things to go pear-shaped when data are sent through the pipe.
Let's collect examples of unhelpful messages in the comments here.
Currently, the order is { predicate : key }
. Would it make more sense to have { key : predicate }
?
The current implementation of into
is a temporary hack, consequently it is O(N)
rather than O(1)
in space.
This needs fixing pretty urgently. It is complicated by the push architecture. In a pull architecture it would be trivial.
At present, the signature of map
is
def map(op=None, *, args=None, out=None, item=None):
The parameters args
, out
and item
are there for the purpose of working with streams which contain dictionaries, in order to select which arguments are passed to the operation, and what to do with its output. map
is not the only utility which has such parameters, and it is likely that new utilities might need similar parameters in the future.
Currently, such parameters are interpreted, usually with repetitive, and sometimes convoluted, logic in each such utility itself. Consequently, the same problem is solved repeatedly, and needs to be tested repeatedly for each component that supports item selection.
This is horribly wrong! We should provide utilities whose one and only job is to provide item selection or item setting capability to any pipe, and ensure that they can be applied to utilities such as map
(which should be blissfully ignorant of the item-fiddling) in order to achieve the desired effect. In other words, we should apply the philosophy of writing small, testable, reusable, composable units, to this particular feature!
What would it look like?
How about something along the following lines?
def xxx(pipe, in_=None, out=None, on=None, args=False, kwds=False):
in_
None
: xxx
forwards incoming data to pipe
xxx
picks one item off the dict in the stream and forwards it as sole input to pipe
xxx
picks those items out of the dict and forwards them to pipe
out
None
: send the output of pipe
downstreampipe
on to the dictionary that xxx
receivedpipe
; attach the returned values to the dict that xxx
received, under the names in the corresponding positionson=yyy
: shorthand for in_=yyy, out=yyy
args
: make pipe
apply *args
expansion to whatever it receives
kwds
: make pipe
apply **kwds
expansion to whatever it receives
Alternatively, perhaps these can be implemented as 5 separate utilities.
In any case, this idea of 'pipe decorators' will lead to much more composable and reusable code.
fn = pipe(open, join, sink(print))
is the equivalent of
def fn(names):
for name in names:
for line in open(name):
print(f)
But how about
def fn(names):
for name in names:
with open(name) as file: # this line was added
for line in file:
print(f)
A with
-equivalent is needed.
Perhaps, more importantly than side-effects, we need to consider things like
fn = pipe(<with-syntax> open, join, process, { filter }, out(into(collector)))
fn = pipe(<with-syntax> (open, join, process, { filter }), more, stuff, out(into(collector)))
Note that with
needs to be aware of where the subpipe to which it applies, ends.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.