Comments (49)
First, thanks for the amazing PyFunctional. Now my 2 cents. I've been playing with the idea of supporting basic parallel execution using a multiprocessing.Pool
(I could change to threads too). In this scenario, we either choose to send each individual sequence entry to a process, or to split the sequence in as many parts as CPUS we have, and send each part to a process. Right now I've implemented this second case, although is pretty easy to change it to the first one. You can find the code in my fork.
As of now, map
/where
and filter
/select
are supported. The main issue here is serialization. Only properly defined functions, def
, will work, as lambda
's and fn
's do not serialize very well. I've tried pickle
, dill
, cloudpickle
and even pyspark.serializers
, but I must've done something wrong because I've been unable to make it work. Furthermore, the use of namedtuple
's in the tests makes it even more difficult, as namedtuples
's do not serialize well either. So I kept the basic pickle
in order to avoid an external dependency.
I hope is at least a start.
Cheers.
from pyfunctional.
Thanks @versae! The second option, as you highlighted, is a good way to accomplish this with less overhead.
I spent a while playing around with your POC. I liked a few things you had there (namely using execution strategies), but couldn't seem to get it to work correctly. It looked like things were not getting computed in parallel for some reason:
In [13]: def f(x):
....: print("f({0}) on Core {1}".format(x, os.getpid()))
....: return x
In [50]: seq.range(20).map(f)
Out[50]: f(0) on Core 44668
f(1) on Core 44668
f(2) on Core 44668
f(3) on Core 44668
f(4) on Core 44668
f(5) on Core 44668
f(6) on Core 44668
f(7) on Core 44668
f(8) on Core 44668
f(9) on Core 44668
f(10) on Core 44668
f(11) on Core 44668
f(12) on Core 44668
f(13) on Core 44668
f(14) on Core 44668
f(15) on Core 44668
f(16) on Core 44668
f(17) on Core 44668
f(18) on Core 44668
f(19) on Core 44668
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
Using multiprocessesing.Pool
seems to work as expected. My guess is that for some reason the map
is getting computed before pool.map
, then the pickle unpack is being done in parallel.
Is the specific pickling code there (rather than let multiprocessing
do it for you) in an attempt to correctly serialize functions like lambdas? I haven't tested, but it seemed like this should work http://stackoverflow.com/questions/16626429/python-cpickle-pickling-lambda-functions. dill works it looks like
The other thing I was thinking about is that the parallelization is across different function calls when some can be trivially combined (eg map
followed by filter
should be parallelized once). Seems like there should be a way to "squash" parallel steps using the ExecutionStrategies
enum similar to the serial version. Perhaps these could be implemented as parallelization barriers similar to how cache
is a barrier to recomputing things.
For tests, that is probably the thing that requires the most work, not so much because of namedtuple
, but testing irrespective of correct order. Right now what I am planning on doing is keeping seq
how it is, and adding a pseq
function for parallel computation and writing a small set of tests that ensures things work as expected rather than explicitly test all the parallel functions.
Thanks for the start and ideas!
from pyfunctional.
I see the problem now, map
/where
are lazy so they don't actually execute the passed function until the results are collected. The Pool.map
is correctly executing the partial in parallel, though. One way to fix it could be to force the unpack
to inspect if the result of the execution of the function is an instance of collections.Iterable
, in which case it should consume it an return it. I did just that and it now runs in parallel:
# not parallel
>>> %timeit -n 1 list(seq.range(100).map(lambda x: sleep(.1) and x))
1 loop, best of 3: 10 s per loop
# parallel
>>> %timeit -n 1 list(seq.range(100).map(lambda x: sleep(.1) and x))
1 loop, best of 3: 1.64 s per loop
And now showing the PIDs. Pool.map
will wait until all processes are done and guarantees ordered results.
>>> seq.range(20).map(f)
f(16) on Core 15326
f(8) on Core 15325
f(0) on Core 15324
f(17) on Core 15326
f(1) on Core 15324
f(9) on Core 15325
f(2) on Core 15324
f(18) on Core 15326
f(10) on Core 15325
f(19) on Core 15326
f(3) on Core 15324
f(11) on Core 15325
f(4) on Core 15324
f(12) on Core 15325
f(5) on Core 15324
f(13) on Core 15325
f(6) on Core 15324
f(14) on Core 15325
f(7) on Core 15324
f(15) on Core 15325
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
The way in which pack
and unpack
work are indeed to ease serialization methods other than the builtin pickle in multiprocessing
. As an unexpected side effect of consuming the map
, dill
(if installed) is now serializing properly lambda
's 💃. There are a couple of failed tests that I have to further examine, though. On the other hand fn
functions lack '__getstate__'
and the only way I see it working is by monkeypatching fn
:(
I like the pseq
, but the user should be aware that not every action might be parallelizable. Maybe pseq
as a wrapper for seq(*args, **kwargs)
where kwarg["processes"]
indicates how many processes you want to use? It could default to 1
, with a number between -1
(all available) to CPU_COUNT
.
from pyfunctional.
Few comments/questions I had, but nice work!
Serialization
Is it necessary to serialize the collection itself with something else? Seems like pickle
or dill
will serialize, then multiprocessesing
will serialize the result of that. You have any resources to read or examples that would be informative about the advantage of pickling/dilling the collection itself?
Object model for seq/pseq
My plan in implementing different versions of seq
is to change the code structure a little bit. The primary change would be to make something like a class ExecutionEngine
which is constructed similar to how seq
is now. That is perhaps overrided/inherited like class ParallelExecutionEngine(ExecutionEngine)
to run in parallel. The class would be callable, have static methods like the ones that seq
has now (eg open
, json
...), and seq
itself would be an instance/singleton of the class. Overall, I think this would be a fairly clean way to keep lots of code in common, but cleanly separate the differences (and will help with the next thing).
Parallel Stage Barrier
I think that forcing collections.Iterable
to expand is along the right track, but it would be better to think about it in terms of synchronization/stage barriers. I would look at it like this:
- All the functions that are embarrassingly parallel, are marked as
PARALLEL
. This would includemap
,filter
, and such - By default, it is assumed that all other functions are not parallelizable at all
- When lineage is evaluated, look for blocks of
PARALLEL
transformations, and lazy execute the batched functions all together in a single call toPool.map
(by lazy execute, que that up as the nextiter
thing, which is executed in parallel all at once when the first value is asked for). - As you noted (which I didn't know), the fact that
Pool.map
preserves order makes this even easier.
The advantage of this is that for transformations that can be collapsed into a single operation (sequential maps, or filters, or such), they will be. The second advantage is that it would be easy to add new functions as PARALLEL
. Finally, this would allow the possibility for things which are parallelizable with work to be marked as something else such as PARALLEL_BARRIER
so that blocks of PARALLEL
operations can still be computed in parallel. It would take some work to go through and mark things correctly as PARALLEL
, but thats fine. One thing I would note though is that batching "compressible" functions assumes that they will be reasonably load balanced. Something like filter
might make "partitions" unbalanced potentially.
The last thing to note is that doing this would be some proper motivation for having a version of _
lambda within PyFunctional
itself since fn._
doesn't work as intended in parallel. Prior motivation was for convenience, but this is actually a real concern.
from pyfunctional.
Serialization
The thing is that pickle
is unable to serialize lambda
's and other things. And multiprocessing
uses pickle
underneath, so the pack
/unpack
trick is just to avoid delegating serialization to multiprocessing
+ pickle
, or using something else like joblib
or pathos
. In the end, the same amount of data is serialized, since AFAIK multiprocessing
serializes both the function and the data. We could partially avoid this by using threads, but other problems arise then, aka the GIL; or by using multiprocessing.Array
and multiprocessing.Value
, which is ctypes
-based shared memory, good for read-only access, but with their own concerns too. The documentation on multiprocessing and serialization in Python is a good start. joblib
documentations is also good.
Object model for seq
/pseq
Subclassing ExecutionEngine
sounds good to me too. And maybe having something like:
def map(self, func):
return self._transform(transformations.parallel.map_t(func))
Correctly implemented, Lineage
could be the same.
Parallel Stage Barrier
I think I get the idea. We could evaluate those blocks of PARALLEL
functions until the next function that does not have the PARALLEL
in its strategies, avoiding the need of PARALLEL_BARRIER
, although the main idea of composing the functions and passing them into one single map would remain unchanged. I implemented a proposal for that. If you look at the PIDs you will see the same number of different PIDs no matter the length of the chain of transformations.
>>> l = lambda x: (print(os.getpid()) or x * 2)
>>> seq.range(20).map(l)
15453
15454
15453
15455
15453
15454
15453
15455
15454
15455
15454
15455
15456
15456
15456
15456
15453
15453
15453
15453
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38]
>>> seq.range(20).map(l).map(l)
15440
15442
15442
15442
15442
15442
15442
15442
15442
15443
15443
15443
15443
15443
15443
15443
15443
15440
15440
15440
15440
15440
15440
15440
15441
15442
15441
15442
15441
15442
15441
15442
15441
15442
15442
15441
15441
15441
15442
15442
[0, 4, 8, 12, 16, 20, 24, 28, 32, 36, 40, 44, 48, 52, 56, 60, 64, 68, 72, 76]
I still need to fix some errors in the tests, though.
PD: The previous errors in the tests were due to the use of functions with side-effects, which are quite complicated to handle in parallel programming (locks, semaphores, etc.). A workaround might be using multiprocessing.Queue
, Value
, or Array
for calls
instead of regular lists, but haven't tested.
from pyfunctional.
I'll take a closer look at the code, but my suggestion would be creating the execution engine class so that seq
and pseq
are separate. Then the tests should pass (trivially), then write some tests which test basic pseq
functionality.
from pyfunctional.
Actually, on second thought perhaps pseq
should be tested on the same thing. The tests that have side effects are to enforce particular behaviors, so could be skipped for pseq
.
from pyfunctional.
Another option could be to have pmap
and pfilter
instead of pseq
.
from pyfunctional.
Seems like that would add a lot of noise to the API though. I could possibly see an argument for something like pfilter
to explicitly say to compute that in its own stage since it can cause a load imbalance, but perhaps even that should be within the engine
from pyfunctional.
As I see it, implementing pseq
involves deep refactoring, as every Sequence()
call should be aware of whether it needs to be Sequence
or, let's say, ParallelSequence
. And that info needs to be passed down all the levels, even to the pretty naive _wrap
function would need to somehow know whether to build the regular seq
or a pseq
. That's a lot of code changing, I'd be happy to help, but it's obviously your call in the end :)
from pyfunctional.
That being said, I completely understand the logical separation of seq
and pseq
, and from an API point of view, it makes perfect sense.
from pyfunctional.
Also, referring to this code:
def evaluate(self, sequence):
result = sequence
last_cache_index = self.cache_scan()
staged = []
for transform in self.transformations[last_cache_index:]:
strategies = transform.execution_strategies
if strategies and ExecutionStrategies.PRE_COMPUTE in strategies:
result = list(result)
if strategies and ExecutionStrategies.PARALLEL in strategies:
staged.append(transform.function)
else:
if staged:
result = parallelize(compose(*staged), result)
staged = []
result = transform.function(result)
if staged:
result = parallelize(compose(*staged), result)
return iter(result)
The second to last 2 lines force any currently staged things to execute at evaluation time, rather than when the first value is asked for. I would imagine having something like below would solve this:
def lazy_parallel(result, composed_funcs):
expanded = False
while True:
if not expanded:
expanded = True
result = iter(parallelize(composed_funcs, result))
yield result
Then catch the empty iteration error. I am also 100% sure there is a cleaner way than the ad-hoc method above
Saw other posts, reading them now
from pyfunctional.
I'm not sure massive refactoring would be necessary. On the call to seq
or pseq
, it could construct a Sequence
passing in either ExecutionEngine
or ParallelExecutionEngine
to it (both of which are singletons). The Sequence
in turn would pass that into Lineage
, where it can be used to provide knowledge/implementation/etc for either doing things "normally" or in parallel. The job of Sequence
is to provide the API, while Lineage
tracks the operations, and ExecutionEngine
for how Lineage.evaluate
should work. How does that sound?
from pyfunctional.
It sounds good. Some more work would be required to change stuff like pseq.range()
or pseq.csv()
, but nothing complicated. I have already some code and it seems to work, but as I was coding it I realized that having classes for the different execution engines might be a bit overkill. Just passing a variable engine
and controlling which function inside functional.execution
must be called could be a better option. Although for the future the classes provide a better foundation.
>>> i = lambda x: x
>>> pseq(range(20)).map(l).filter(i)
7487
7488
7488
7488
7490
...
[2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38]
>>> seq(range(20)).map(l).filter(i)
7411
7411
7411
...
[2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38]
And as per the lazy evaluation, I think is already done.
>>> s = seq(range(20)).map(l).filter(i)
>>> s.to_list()
7411
7411
...
[2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38]
>>> s
[2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38]
>>> ps = pseq(range(20)).map(l).filter(i)
>>> ps.to_list()
7517
7515
7516
7517
7518
...
[2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38]
>>> ps
[2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38]
from pyfunctional.
OK, I found a useful feature for having execution engines being classes, the passing of the number of processes for the parallel engine (versae@ebe41f4).
>>> pseq(range(40), processes=2).map(lambda x: (print(os.getpid()) or x*2))
from pyfunctional.
I think that the issue with seq.csv
and other extensions could be solved by a change to the object model. In essence, the execution engines are singletons which get passed around. The only disadvantage of this is not being able to have multiple parallel engines which use different number of processes, but for something that specialized the user could call the constructor I think. It also makes use of inheritance a bit more to implement the other functions.
class ExecutionEngine(object):
def evaluate(self, sequence, transformations):
result = sequence
for transform in transformations:
strategies = transform.execution_strategies
if (strategies is not None
and ExecutionStrategies.PRE_COMPUTE in strategies):
result = transform.function(list(result))
else:
result = transform.function(result)
return iter(result)
def __call__(self, *args):
return self.sequence(*args)
def sequence(self, *args):
if len(args) == 0:
raise TypeError("seq() takes at least 1 argument ({0} given)".format(len(args)))
elif len(args) > 1:
return Sequence(list(args), engine=self)
elif is_primitive(args[0]):
return Sequence([args[0]], engine=self)
else:
return Sequence(args[0], engine=self)
def csv(self, csv_file, dialect='excel', **fmt_params):
if isinstance(csv_file, str):
input_file = ReusableFile(csv_file, mode='r')
elif hasattr(csv_file, 'next') or hasattr(csv_file, '__next__'):
input_file = csv_file
else:
raise ValueError('csv_file must be a file path or implement the iterator interface')
csv_input = csvapi.reader(input_file, dialect=dialect, **fmt_params)
return self.__call__(csv_input).cache(delete_lineage=True)
# Other functions would be defined similarly
class ParallelExecutionEngine(ExecutionEngine):
def __init__(self, processes=None):
self.processes = processes
def __call__(self, *args, processes=None):
self.processes = processes
return self.sequence(*args)
def evaluate(self, sequence, transformations):
processes = self.processes
result = sequence
staged = []
for transform in transformations:
strategies = transform.execution_strategies
if strategies and ExecutionStrategies.PRE_COMPUTE in strategies:
result = list(result)
if strategies and ExecutionStrategies.PARALLEL in strategies:
staged.insert(0, transform.function)
else:
if staged:
result = parallelize(compose(*staged), result, processes)
staged = []
result = transform.function(result)
if staged:
result = parallelize(compose(*staged), result, processes)
return iter(result)
# functional.__init__
seq = ExecutionEngine()
pseq = ParallelExecutionEngine()
Its not tested, but should give the general idea. I am still not convinced that this code block doesn't immediately expand though:
if staged:
result = parallelize(compose(*staged), result, processes)
from pyfunctional.
Did some playing around in the terminal about the last bit:
from multiprocessing import Pool
p = Pool()
r = p.map(str, [1, 2, 3, 4, 5, 6])
print(type(r))
# list
This would imply that if Pool.map
is called, then everything is evaluated. Now for the source of parallelize
def parallelize(func, result, processes=None):
if not is_serializable(func):
return func(result)
if processes is None or processes < 1:
processes = CPU_COUNT
else:
processes = min(processes, CPU_COUNT)
with Pool(processes=processes) as pool:
chunks = split_every(processes, iter(result))
packed_chunks = (pack(func, (chunk, )) for chunk in chunks)
results = pool.map(unpack, packed_chunks)
return chain.from_iterable(results)
The pool.map
call will force an evaluation, then chain.from_iterable
will turn the list into an iter
. So it is an iter
, but already evaluated right when parallelize
is called.
Going back to the evaluate
code, the if staged
is called if there are no more transformations and something is staged, at which point parallelize
is called. From the above, that means the computation happens when evaluate
is called. It would be better if the computation happens when the first value is requested from the iter
, which is accomplished by the code I posted earlier (I am sure there is a cleaner way to do that though). Am I forgetting something perhaps?
Side note: should the function compute the result in serial or throw an error if it can't be serialized? On the one hand, I think the user should know that things are not being computed in parallel when they think it is, on the other hand its a decent fallback. Perhaps this could be a configurable parameter somehow set to one of those two modes?
from pyfunctional.
Any thoughts @versae? I really like where this is going and think this could very easily become a merged pull request with a little more work!
from pyfunctional.
I tried the code you posted, lazy_parallel()
, but it made my laptop crash :) I'll give it a second try.
I think I misunderstood the purpose of evaluate()
, I though PyFunctional
would call evaluate()
only when needed, and when called, all transformations would be evaluated at once. If after calling evaluate()
you need to have all transformations already applied, does it really matter if the parallelization happens when getting the first element as long as the results are there?
evaluate()
acts as an iterator, not by itself, but by the functions used in the transformations. I'll give it a second thought, but as I see it, parallelization only makes sense if executed all at once, so I'll adapt the code but only to execute the parallel computation when getting the first element instead of at evaluate()
time. I'll see what I can get.
Regarding the open()
, range()
, etc. functions, I'd say that more important than ExecutionEngine
being a singleton would be the possibility of passing the number of processors, in the end the singleton can be easily achieved at the module level. I'll think on a way of implementing this.
from pyfunctional.
Tested the code for lazy_parallel()
and it is indeed wrong. Going to play around with it and can post a tested and fixed version.
The evaluate
method is used to take the lineage of operations and turn them into an iterator. When the first next()
call is made on the iterator then things start computing. If possible, this is completely lazy, but for parallelization the next best thing is to only do it once a value is actually asked for. Sounds like that is what you are working on next which is great.
One nice thing to have would be that those functions (open
...) are methods on a class and that seq
/pseq
are instances of that class. This would help editors actually detect these methods correctly since they currently don't. It would also make it easy to re-use all the code via inheritance for both engines. I am not certain what is the best way to structure the code, the above is one way to do that, but it would be great to see alternatives.
Would seq
or pseq
or both to have number of processes passable? I agree that pseq
for sure should have that as a passable parameter since that should be configurable. I am not as convinced to have it on seq
, but open to thoughts. In general, I like the idea of the clean API separation between the two and I also think it looks cleaner in general.
# This seems better
from functional import pseq
pseq(...)
# Than this
from functional import seq
seq(..., processes=4)
# And even when you need both, this is still clean
from functional import seq, pseq
seq(,,,)
pseq(...)
# And changing core counts
pseq(..., processes=4)
# or maybe even
pseq.processes = 4
pseq(...)
from pyfunctional.
Got it working! Lazy parallelism now :D (versae@2009d36)
Reading the rest of your comment...
from pyfunctional.
I agree on using inheritance to make things easier. Your approach of implementing __call__
sounds good to me. And I actually kinda like the seq.parallel = 4
syntax, we can expose the parallel
property and have both modes, by argument when constructing a new one, and by setting the property.
About allowing processes
on seq
, I totally see your point. To me it was like pseq
will parallelize unless told not to, and seq
will not run in parallel unless told to. We could even establish that seq
will fallback to non parallel code if serialization fails, while pseq
will fail. On the other hand, explicit is always better than implicit, so it's your call, it requires minimum code change to remove the parallel
argument in seq
.
PS: Should util.py
be renamed to utils.py
?
from pyfunctional.
I also like that syntax as well, it makes it clearer that there is one execution engine and setting that is a global setting. In any case, the use case for setting two different values for that is pretty limited, in which case the user can use the constructor explicitly.
On failure modes: I think that if there is some explicit instruction to try to parallelize code (eg: by using pseq
), it should fail if serialization fails. I think it would be better that the user knows that their code is not being parallelized, and a useful error message on what they can do about it (like install dill
).
For now, I would keep seq
and pseq
separate, but if it turns out to be a more common use it can be added in without a breaking API change (the opposite not being true).
Is there a good reason to rename it? A short github search indicates that util.py
is more common anyway.
from pyfunctional.
OK, so now both syntaxes are supported:
# As property (default to cpu_count())
from functional import pseq
pseq.processes = 2
pseq.range(40)...
# Passed as parameter overrides the property
pseq(processes=3).range(40)...
pseq(range(40), processes=3)...
# There is also a new parameter raise_errors (default True)
# to fallback to non-parallel mode if serialization fails
pseq.raise_errors = False
# seq now will fail if passed processes
from functional import seq
seq(..., processes=4)
Here's the commit. I had to create new classes (Stream
and ParallelStream
) to avoid circular importing between ExecutionEngine
's and Sequence
.
from pyfunctional.
Looks great! There are a few minor things here and there, but I could comment on those individually when you open a pull request.
Next things are probably:
- Figure out how to test things correctly. I would make it so that the entire test suite is tested against both
pseq
andseq
, but skipping tests that do state/side effects testing to insure correct execution order. - Go through the transformations and mark the embarrassingly parallel operations with
PARALLEL
. I would for the moment leave everything else for future work.
from pyfunctional.
- One option for the tests is to have something like:
class TestPipeline(unittest.TestCase):
def setUp(self):
self.seq = seq
And then using self.seq
elsewhere in the code. This would allow to have a TestPipelineParallel(TestPipeline)
that uses pseq
instead.
2. I am not sure if there are any other easily parallelizable transformations. But once marked, a next step could to implement parallelization for reduction functions.
from pyfunctional.
Also, I just discovered that stuff like pseq([1, 2, 3]).sum(lambda x: x * 2))
fails while pseq([1, 2, 3]).sum())
(not passing a function) works. I'll take a look tomorrow.
from pyfunctional.
The above idea for testing is what I had in mind. I would extend this idea by making a function decorator like @parallel
for tests that should (correctly) fail in parallel, and should be skipped. I'm sure there is some hook in in unittest
that can help with that.
How does that fail?
from pyfunctional.
This demonstrates that capability:
class MyTestCase(unittest.TestCase):
@unittest.skip("demonstrating skipping")
def test_nothing(self):
self.fail("shouldn't happen")
@unittest.skipIf(mylib.__version__ < (1, 3),
"not supported in this library version")
def test_format(self):
# Tests that work for only a certain version of the library.
pass
@unittest.skipUnless(sys.platform.startswith("win"), "requires Windows")
def test_windows_support(self):
# windows specific testing code
pass
from pyfunctional.
Any updates @versae? I am actually fairly excited to get a PR open and merged. Great work so far! If you happen to be busy or don't want to work out the tests, just let me know and we can figure something out.
from pyfunctional.
I fixed the problem I found and now all tests run (I basically imported pseq
as seq
) except for those that use functions with side-effects, call []
, or len()
of the iterable returned by the pool of processes. I could workaround the len()
problem by consuming the iterable into another interable while counting the number of elements, or simply doing list()
before len()
, but not sure if that's what we want.
The tests that need to be rethought for the parallel execution are:
test_single_call()
andtest_cache()
, due to function with side-effecttest_init()
,test_inits()
, andtest_tails()
rely on callinglen()
and or[]
test_to_sqlite3_namedtuple()
, can't serialize alambda
of anamedtuple
.
Once those tests are solved I will do the pull request and then you could organize the final tests in the best way.
from pyfunctional.
Also, itertools.chain
objects are not subscriptable, so things like sequence[:i]
(in transformations.py:355
) fail as well. Only option here is to convert the generator/iterable to a list, which is what we had before the lazy_parallel
implementation.
from pyfunctional.
Here's how I solved it:
lambda sequence: (lambda s: [wrap(s[i:])
for i in range(len(s) + 1)]
)(list(sequence)),
Length could me made more efficient by using reduce(lambda acc, e: acc + 1, sequence, 0)
, but since there is a need for [i:]
in the same function, it might just be better to do the conversion to list.
With this last change, now the only errors are due to serialization and side-effects. If you are OK with this I can create the PR :)
from pyfunctional.
Little surprised at the issues with len
and indexing, both of those should and do call cache
before asking for len
or __getitem__
. At that point, cache
should have force expanded the iterable and saved it as a list (like here https://github.com/EntilZha/PyFunctional/blob/master/functional/pipeline.py#L549).
It is also similar to the case with tails
since it has the PRE_COMPUTE
execution strategy which does almost the same thing as cache
(relevant code https://github.com/EntilZha/PyFunctional/blob/master/functional/lineage.py#L49).
For these, I would like to understand why they weren't working in the first place since they should have. Could you post what was failing?
On tests, I'll take a careful look when I get home in a couple hours, but if there is anything that will obviously not work in parallel like the side effect tests, then they can be flagged to be skipped. The point of those tests is check the correctness of the code which is independent of serial/parallel execution. On serialization, I would skip this test as well and if I can fix it later I will.
from pyfunctional.
And you are absolutely right. It was a problem in the logic of the parallel execution engine. It is fixed now.
from pyfunctional.
With that fixed, I think you should be able to put in the flags on tests for the ones that should be fixed and be good to go for PR?
from pyfunctional.
I don't understand, there are no tests yet for pseq
.
from pyfunctional.
Sorry, I didn't take a close enough look at the committed code. Just to verify, currently running the tests is substituting seq
for pseq
here https://github.com/versae/PyFunctional/blob/master/functional/test/test_functional.py?
To get everything running I would first add a constructor like:
class TestPipeline(unittest.TestCase):
def __init__(self, *args, **kwargs):
self.seq = seq
super(TestPipeline, self).__init__(*args, **kwargs)
Then change all the references of seq
in the tests to self.seq
. Then add another test class like:
class TestParallelPipeline(TestPipeline):
def __init__(self, *args, **kwargs):
self.seq = pseq
super(TestParallelPipeline, self).__init__(*args, **kwargs)
For all the tests which shouldn't be there because of side effects and what not, put this code:
@unittest.skipIf(self.seq is pseq, "seq is pseq, skipping serial test")
def test_side_effect(self):
pass
If things work as I think they would, this should lead to unittest
running both of those classes and skipping the tests that shouldn't be tested in parallel.
from pyfunctional.
Gotcha now, I wasn't sure that was the approach you wanted.
from pyfunctional.
If you have a better idea, go for it. Testing like above looked like a reasonable way to go about it
from pyfunctional.
Unfortunately the decorator has no access to self
, so we have two options:
- Create our own
- Raise a
unittest.SkipTest
inside the function
from pyfunctional.
Looks like there is a method self.skipTest(reason)
https://docs.python.org/2.7/library/unittest.html#unittest.TestCase.skipTest designed for exactly this. I think thats a fine option
from pyfunctional.
from pyfunctional.
Is there anything else left to do?
from pyfunctional.
There are other kinds of functions that could be running in parallel using a reduce approach, such as sum
, but that would require a lot of code trial and error.
from pyfunctional.
My thought would be to break these different things into smaller PRs rather than make one giant PR. I think that the work so far could be a single PR which focuses on the added parallelization capabilities. Future commits/PRs could add in other parallel methods, and better reducing methods.
It would also be great to merge it so that I can work on implementing on_error
which I have waited on since the execution engine code was getting changed.
from pyfunctional.
That sounds good to me too.
from pyfunctional.
We need to add dill
to the travis setup. And pypy doesn't even work.
from pyfunctional.
Feature merged in #67
from pyfunctional.
Related Issues (20)
- any kind of join with different key name HOT 1
- Could I use the index in the map function HOT 1
- Latest version has pandas/numpy as requirements HOT 2
- for_each doesn't return the original sequence but returns None HOT 8
- Unexpected behaviour on chain with .drop_right HOT 2
- Unexpected Sequence wrapping from grouped HOT 7
- Empty sequence HOT 6
- Tuple argument unpacking functionality HOT 4
- lazy_parallellize having trouble with function context? HOT 3
- "peek" functionality HOT 1
- seq() converts pandas DataFrame into Sequence HOT 8
- Cache to file and auto-load from cache HOT 4
- [Question] VSCode language server doesn't see the pyfunctional package HOT 3
- Thank you! HOT 1
- More dictionary/tuple list helpers HOT 5
- the installation of the PyFunctional package has failed due to a missing item in the package archive. HOT 3
- Old link to documentation in pypi HOT 3
- module functions and variables not found HOT 5
- Async Support HOT 4
- Examples with peek() fail with v1.4.3 HOT 4
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from pyfunctional.