Giter VIP home page Giter VIP logo

Comments (49)

versae avatar versae commented on May 18, 2024

First, thanks for the amazing PyFunctional. Now my 2 cents. I've been playing with the idea of supporting basic parallel execution using a multiprocessing.Pool (I could change to threads too). In this scenario, we either choose to send each individual sequence entry to a process, or to split the sequence in as many parts as CPUS we have, and send each part to a process. Right now I've implemented this second case, although is pretty easy to change it to the first one. You can find the code in my fork.

As of now, map/where and filter/select are supported. The main issue here is serialization. Only properly defined functions, def, will work, as lambda's and fn's do not serialize very well. I've tried pickle, dill, cloudpickle and even pyspark.serializers, but I must've done something wrong because I've been unable to make it work. Furthermore, the use of namedtuple's in the tests makes it even more difficult, as namedtuples's do not serialize well either. So I kept the basic pickle in order to avoid an external dependency.

I hope is at least a start.
Cheers.

from pyfunctional.

EntilZha avatar EntilZha commented on May 18, 2024

Thanks @versae! The second option, as you highlighted, is a good way to accomplish this with less overhead.

I spent a while playing around with your POC. I liked a few things you had there (namely using execution strategies), but couldn't seem to get it to work correctly. It looked like things were not getting computed in parallel for some reason:

In [13]: def f(x):
   ....:     print("f({0}) on Core {1}".format(x, os.getpid()))
   ....:     return x
In [50]: seq.range(20).map(f)
Out[50]: f(0) on Core 44668
f(1) on Core 44668
f(2) on Core 44668
f(3) on Core 44668
f(4) on Core 44668
f(5) on Core 44668
f(6) on Core 44668
f(7) on Core 44668
f(8) on Core 44668
f(9) on Core 44668
f(10) on Core 44668
f(11) on Core 44668
f(12) on Core 44668
f(13) on Core 44668
f(14) on Core 44668
f(15) on Core 44668
f(16) on Core 44668
f(17) on Core 44668
f(18) on Core 44668
f(19) on Core 44668
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]

Using multiprocessesing.Pool seems to work as expected. My guess is that for some reason the map is getting computed before pool.map, then the pickle unpack is being done in parallel.

Is the specific pickling code there (rather than let multiprocessing do it for you) in an attempt to correctly serialize functions like lambdas? I haven't tested, but it seemed like this should work http://stackoverflow.com/questions/16626429/python-cpickle-pickling-lambda-functions. dill works it looks like

The other thing I was thinking about is that the parallelization is across different function calls when some can be trivially combined (eg map followed by filter should be parallelized once). Seems like there should be a way to "squash" parallel steps using the ExecutionStrategies enum similar to the serial version. Perhaps these could be implemented as parallelization barriers similar to how cache is a barrier to recomputing things.

For tests, that is probably the thing that requires the most work, not so much because of namedtuple, but testing irrespective of correct order. Right now what I am planning on doing is keeping seq how it is, and adding a pseq function for parallel computation and writing a small set of tests that ensures things work as expected rather than explicitly test all the parallel functions.

Thanks for the start and ideas!

from pyfunctional.

versae avatar versae commented on May 18, 2024

I see the problem now, map/where are lazy so they don't actually execute the passed function until the results are collected. The Pool.map is correctly executing the partial in parallel, though. One way to fix it could be to force the unpack to inspect if the result of the execution of the function is an instance of collections.Iterable, in which case it should consume it an return it. I did just that and it now runs in parallel:

# not parallel
>>> %timeit -n 1 list(seq.range(100).map(lambda x: sleep(.1) and x))
1 loop, best of 3: 10 s per loop

# parallel
>>> %timeit -n 1 list(seq.range(100).map(lambda x: sleep(.1) and x))
1 loop, best of 3: 1.64 s per loop

And now showing the PIDs. Pool.map will wait until all processes are done and guarantees ordered results.

>>> seq.range(20).map(f)
f(16) on Core 15326
f(8) on Core 15325
f(0) on Core 15324
f(17) on Core 15326
f(1) on Core 15324
f(9) on Core 15325
f(2) on Core 15324
f(18) on Core 15326
f(10) on Core 15325
f(19) on Core 15326
f(3) on Core 15324
f(11) on Core 15325
f(4) on Core 15324
f(12) on Core 15325
f(5) on Core 15324
f(13) on Core 15325
f(6) on Core 15324
f(14) on Core 15325
f(7) on Core 15324
f(15) on Core 15325
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]

The way in which pack and unpack work are indeed to ease serialization methods other than the builtin pickle in multiprocessing. As an unexpected side effect of consuming the map, dill (if installed) is now serializing properly lambda's 💃. There are a couple of failed tests that I have to further examine, though. On the other hand fn functions lack '__getstate__' and the only way I see it working is by monkeypatching fn :(

I like the pseq, but the user should be aware that not every action might be parallelizable. Maybe pseq as a wrapper for seq(*args, **kwargs) where kwarg["processes"] indicates how many processes you want to use? It could default to 1, with a number between -1 (all available) to CPU_COUNT.

from pyfunctional.

EntilZha avatar EntilZha commented on May 18, 2024

Few comments/questions I had, but nice work!

Serialization

Is it necessary to serialize the collection itself with something else? Seems like pickle or dill will serialize, then multiprocessesing will serialize the result of that. You have any resources to read or examples that would be informative about the advantage of pickling/dilling the collection itself?

Object model for seq/pseq

My plan in implementing different versions of seq is to change the code structure a little bit. The primary change would be to make something like a class ExecutionEngine which is constructed similar to how seq is now. That is perhaps overrided/inherited like class ParallelExecutionEngine(ExecutionEngine) to run in parallel. The class would be callable, have static methods like the ones that seq has now (eg open, json...), and seq itself would be an instance/singleton of the class. Overall, I think this would be a fairly clean way to keep lots of code in common, but cleanly separate the differences (and will help with the next thing).

Parallel Stage Barrier

I think that forcing collections.Iterable to expand is along the right track, but it would be better to think about it in terms of synchronization/stage barriers. I would look at it like this:

  1. All the functions that are embarrassingly parallel, are marked as PARALLEL. This would include map, filter, and such
  2. By default, it is assumed that all other functions are not parallelizable at all
  3. When lineage is evaluated, look for blocks of PARALLEL transformations, and lazy execute the batched functions all together in a single call to Pool.map (by lazy execute, que that up as the next iter thing, which is executed in parallel all at once when the first value is asked for).
  4. As you noted (which I didn't know), the fact that Pool.map preserves order makes this even easier.

The advantage of this is that for transformations that can be collapsed into a single operation (sequential maps, or filters, or such), they will be. The second advantage is that it would be easy to add new functions as PARALLEL. Finally, this would allow the possibility for things which are parallelizable with work to be marked as something else such as PARALLEL_BARRIER so that blocks of PARALLEL operations can still be computed in parallel. It would take some work to go through and mark things correctly as PARALLEL, but thats fine. One thing I would note though is that batching "compressible" functions assumes that they will be reasonably load balanced. Something like filter might make "partitions" unbalanced potentially.

The last thing to note is that doing this would be some proper motivation for having a version of _ lambda within PyFunctional itself since fn._ doesn't work as intended in parallel. Prior motivation was for convenience, but this is actually a real concern.

from pyfunctional.

versae avatar versae commented on May 18, 2024

Serialization

The thing is that pickle is unable to serialize lambda's and other things. And multiprocessing uses pickle underneath, so the pack/unpack trick is just to avoid delegating serialization to multiprocessing + pickle, or using something else like joblib or pathos. In the end, the same amount of data is serialized, since AFAIK multiprocessing serializes both the function and the data. We could partially avoid this by using threads, but other problems arise then, aka the GIL; or by using multiprocessing.Array and multiprocessing.Value, which is ctypes-based shared memory, good for read-only access, but with their own concerns too. The documentation on multiprocessing and serialization in Python is a good start. joblib documentations is also good.

Object model for seq/pseq

Subclassing ExecutionEngine sounds good to me too. And maybe having something like:

    def map(self, func):
        return self._transform(transformations.parallel.map_t(func))

Correctly implemented, Lineage could be the same.

Parallel Stage Barrier

I think I get the idea. We could evaluate those blocks of PARALLEL functions until the next function that does not have the PARALLEL in its strategies, avoiding the need of PARALLEL_BARRIER, although the main idea of composing the functions and passing them into one single map would remain unchanged. I implemented a proposal for that. If you look at the PIDs you will see the same number of different PIDs no matter the length of the chain of transformations.

>>> l = lambda x: (print(os.getpid()) or x * 2)
>>> seq.range(20).map(l)
15453
15454
15453
15455
15453
15454
15453
15455
15454
15455
15454
15455
15456
15456
15456
15456
15453
15453
15453
15453
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38]

>>> seq.range(20).map(l).map(l)
15440
15442
15442
15442
15442
15442
15442
15442
15442
15443
15443
15443
15443
15443
15443
15443
15443
15440
15440
15440
15440
15440
15440
15440
15441
15442
15441
15442
15441
15442
15441
15442
15441
15442
15442
15441
15441
15441
15442
15442
[0, 4, 8, 12, 16, 20, 24, 28, 32, 36, 40, 44, 48, 52, 56, 60, 64, 68, 72, 76]

I still need to fix some errors in the tests, though.

PD: The previous errors in the tests were due to the use of functions with side-effects, which are quite complicated to handle in parallel programming (locks, semaphores, etc.). A workaround might be using multiprocessing.Queue, Value, or Array for calls instead of regular lists, but haven't tested.

from pyfunctional.

EntilZha avatar EntilZha commented on May 18, 2024

I'll take a closer look at the code, but my suggestion would be creating the execution engine class so that seq and pseq are separate. Then the tests should pass (trivially), then write some tests which test basic pseq functionality.

from pyfunctional.

EntilZha avatar EntilZha commented on May 18, 2024

Actually, on second thought perhaps pseq should be tested on the same thing. The tests that have side effects are to enforce particular behaviors, so could be skipped for pseq.

from pyfunctional.

versae avatar versae commented on May 18, 2024

Another option could be to have pmap and pfilter instead of pseq.

from pyfunctional.

EntilZha avatar EntilZha commented on May 18, 2024

Seems like that would add a lot of noise to the API though. I could possibly see an argument for something like pfilter to explicitly say to compute that in its own stage since it can cause a load imbalance, but perhaps even that should be within the engine

from pyfunctional.

versae avatar versae commented on May 18, 2024

As I see it, implementing pseq involves deep refactoring, as every Sequence() call should be aware of whether it needs to be Sequence or, let's say, ParallelSequence. And that info needs to be passed down all the levels, even to the pretty naive _wrap function would need to somehow know whether to build the regular seq or a pseq. That's a lot of code changing, I'd be happy to help, but it's obviously your call in the end :)

from pyfunctional.

versae avatar versae commented on May 18, 2024

That being said, I completely understand the logical separation of seq and pseq, and from an API point of view, it makes perfect sense.

from pyfunctional.

EntilZha avatar EntilZha commented on May 18, 2024

Also, referring to this code:

    def evaluate(self, sequence):
        result = sequence
        last_cache_index = self.cache_scan()
        staged = []
        for transform in self.transformations[last_cache_index:]:
            strategies = transform.execution_strategies
            if strategies and ExecutionStrategies.PRE_COMPUTE in strategies:
                result = list(result)
            if strategies and ExecutionStrategies.PARALLEL in strategies:
                staged.append(transform.function)
            else:
                if staged:
                    result = parallelize(compose(*staged), result)
                    staged = []
                result = transform.function(result)
        if staged:
            result = parallelize(compose(*staged), result)
        return iter(result)

The second to last 2 lines force any currently staged things to execute at evaluation time, rather than when the first value is asked for. I would imagine having something like below would solve this:

def lazy_parallel(result, composed_funcs):
    expanded = False
    while True:
        if not expanded:
            expanded = True
            result = iter(parallelize(composed_funcs, result))
        yield result

Then catch the empty iteration error. I am also 100% sure there is a cleaner way than the ad-hoc method above

Saw other posts, reading them now

from pyfunctional.

EntilZha avatar EntilZha commented on May 18, 2024

I'm not sure massive refactoring would be necessary. On the call to seq or pseq, it could construct a Sequence passing in either ExecutionEngine or ParallelExecutionEngine to it (both of which are singletons). The Sequence in turn would pass that into Lineage, where it can be used to provide knowledge/implementation/etc for either doing things "normally" or in parallel. The job of Sequence is to provide the API, while Lineage tracks the operations, and ExecutionEngine for how Lineage.evaluate should work. How does that sound?

from pyfunctional.

versae avatar versae commented on May 18, 2024

It sounds good. Some more work would be required to change stuff like pseq.range() or pseq.csv(), but nothing complicated. I have already some code and it seems to work, but as I was coding it I realized that having classes for the different execution engines might be a bit overkill. Just passing a variable engine and controlling which function inside functional.execution must be called could be a better option. Although for the future the classes provide a better foundation.

>>> i = lambda x: x
>>> pseq(range(20)).map(l).filter(i)  
7487
7488
7488
7488
7490
...
[2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38]

>>> seq(range(20)).map(l).filter(i)
7411
7411
7411
...
[2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38]

And as per the lazy evaluation, I think is already done.

>>> s = seq(range(20)).map(l).filter(i)
>>> s.to_list()
7411
7411
...
[2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38]

>>> s
[2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38]

>>> ps = pseq(range(20)).map(l).filter(i)
>>> ps.to_list()
7517
7515
7516
7517
7518
...
[2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38]

>>> ps
[2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38]

from pyfunctional.

versae avatar versae commented on May 18, 2024

OK, I found a useful feature for having execution engines being classes, the passing of the number of processes for the parallel engine (versae@ebe41f4).

>>> pseq(range(40), processes=2).map(lambda x: (print(os.getpid()) or x*2))

from pyfunctional.

EntilZha avatar EntilZha commented on May 18, 2024

I think that the issue with seq.csv and other extensions could be solved by a change to the object model. In essence, the execution engines are singletons which get passed around. The only disadvantage of this is not being able to have multiple parallel engines which use different number of processes, but for something that specialized the user could call the constructor I think. It also makes use of inheritance a bit more to implement the other functions.

class ExecutionEngine(object):
    def evaluate(self, sequence, transformations):
        result = sequence
        for transform in transformations:
            strategies = transform.execution_strategies
            if (strategies is not None
                    and ExecutionStrategies.PRE_COMPUTE in strategies):
                result = transform.function(list(result))
            else:
                result = transform.function(result)
        return iter(result)
    def __call__(self, *args):
        return self.sequence(*args)

    def sequence(self, *args):
        if len(args) == 0:
            raise TypeError("seq() takes at least 1 argument ({0} given)".format(len(args)))
        elif len(args) > 1:
            return Sequence(list(args), engine=self)
        elif is_primitive(args[0]):
            return Sequence([args[0]], engine=self)
        else:
            return Sequence(args[0], engine=self)

    def csv(self, csv_file, dialect='excel', **fmt_params):
        if isinstance(csv_file, str):
            input_file = ReusableFile(csv_file, mode='r')
        elif hasattr(csv_file, 'next') or hasattr(csv_file, '__next__'):
            input_file = csv_file
        else:
            raise ValueError('csv_file must be a file path or implement the iterator interface')

        csv_input = csvapi.reader(input_file, dialect=dialect, **fmt_params)
        return self.__call__(csv_input).cache(delete_lineage=True)
# Other functions would be defined similarly

class ParallelExecutionEngine(ExecutionEngine):
    def __init__(self, processes=None):
        self.processes = processes
    def __call__(self, *args, processes=None):
        self.processes = processes
        return self.sequence(*args)

    def evaluate(self, sequence, transformations):
        processes = self.processes
        result = sequence
        staged = []
        for transform in transformations:
            strategies = transform.execution_strategies
            if strategies and ExecutionStrategies.PRE_COMPUTE in strategies:
                result = list(result)
            if strategies and ExecutionStrategies.PARALLEL in strategies:
                staged.insert(0, transform.function)
            else:
                if staged:
                    result = parallelize(compose(*staged), result, processes)
                    staged = []
                result = transform.function(result)
        if staged:
            result = parallelize(compose(*staged), result, processes)
        return iter(result)

# functional.__init__
seq = ExecutionEngine()
pseq = ParallelExecutionEngine()

Its not tested, but should give the general idea. I am still not convinced that this code block doesn't immediately expand though:

if staged:
       result = parallelize(compose(*staged), result, processes)

from pyfunctional.

EntilZha avatar EntilZha commented on May 18, 2024

Did some playing around in the terminal about the last bit:

from multiprocessing import Pool
p = Pool()

r = p.map(str, [1, 2, 3, 4, 5, 6])
print(type(r))
# list

This would imply that if Pool.map is called, then everything is evaluated. Now for the source of parallelize

def parallelize(func, result, processes=None):
    if not is_serializable(func):
        return func(result)
    if processes is None or processes < 1:
        processes = CPU_COUNT
    else:
        processes = min(processes, CPU_COUNT)
    with Pool(processes=processes) as pool:
        chunks = split_every(processes, iter(result))
        packed_chunks = (pack(func, (chunk, )) for chunk in chunks)
        results = pool.map(unpack, packed_chunks)
    return chain.from_iterable(results)

The pool.map call will force an evaluation, then chain.from_iterable will turn the list into an iter. So it is an iter, but already evaluated right when parallelize is called.

Going back to the evaluate code, the if staged is called if there are no more transformations and something is staged, at which point parallelize is called. From the above, that means the computation happens when evaluate is called. It would be better if the computation happens when the first value is requested from the iter, which is accomplished by the code I posted earlier (I am sure there is a cleaner way to do that though). Am I forgetting something perhaps?

Side note: should the function compute the result in serial or throw an error if it can't be serialized? On the one hand, I think the user should know that things are not being computed in parallel when they think it is, on the other hand its a decent fallback. Perhaps this could be a configurable parameter somehow set to one of those two modes?

from pyfunctional.

EntilZha avatar EntilZha commented on May 18, 2024

Any thoughts @versae? I really like where this is going and think this could very easily become a merged pull request with a little more work!

from pyfunctional.

versae avatar versae commented on May 18, 2024

I tried the code you posted, lazy_parallel(), but it made my laptop crash :) I'll give it a second try.

I think I misunderstood the purpose of evaluate(), I though PyFunctional would call evaluate() only when needed, and when called, all transformations would be evaluated at once. If after calling evaluate() you need to have all transformations already applied, does it really matter if the parallelization happens when getting the first element as long as the results are there?

evaluate() acts as an iterator, not by itself, but by the functions used in the transformations. I'll give it a second thought, but as I see it, parallelization only makes sense if executed all at once, so I'll adapt the code but only to execute the parallel computation when getting the first element instead of at evaluate() time. I'll see what I can get.

Regarding the open(), range(), etc. functions, I'd say that more important than ExecutionEngine being a singleton would be the possibility of passing the number of processors, in the end the singleton can be easily achieved at the module level. I'll think on a way of implementing this.

from pyfunctional.

EntilZha avatar EntilZha commented on May 18, 2024

Tested the code for lazy_parallel() and it is indeed wrong. Going to play around with it and can post a tested and fixed version.

The evaluate method is used to take the lineage of operations and turn them into an iterator. When the first next() call is made on the iterator then things start computing. If possible, this is completely lazy, but for parallelization the next best thing is to only do it once a value is actually asked for. Sounds like that is what you are working on next which is great.

One nice thing to have would be that those functions (open...) are methods on a class and that seq/pseq are instances of that class. This would help editors actually detect these methods correctly since they currently don't. It would also make it easy to re-use all the code via inheritance for both engines. I am not certain what is the best way to structure the code, the above is one way to do that, but it would be great to see alternatives.

Would seq or pseq or both to have number of processes passable? I agree that pseq for sure should have that as a passable parameter since that should be configurable. I am not as convinced to have it on seq, but open to thoughts. In general, I like the idea of the clean API separation between the two and I also think it looks cleaner in general.

# This seems better
from functional import pseq
pseq(...)

# Than this
from functional import seq
seq(..., processes=4)

# And even when you need both, this is still clean
from functional import seq, pseq

seq(,,,)

pseq(...)

# And changing core counts

pseq(..., processes=4)

# or maybe even
pseq.processes = 4
pseq(...)

from pyfunctional.

versae avatar versae commented on May 18, 2024

Got it working! Lazy parallelism now :D (versae@2009d36)

Reading the rest of your comment...

from pyfunctional.

versae avatar versae commented on May 18, 2024

I agree on using inheritance to make things easier. Your approach of implementing __call__ sounds good to me. And I actually kinda like the seq.parallel = 4 syntax, we can expose the parallel property and have both modes, by argument when constructing a new one, and by setting the property.

About allowing processes on seq, I totally see your point. To me it was like pseq will parallelize unless told not to, and seq will not run in parallel unless told to. We could even establish that seq will fallback to non parallel code if serialization fails, while pseq will fail. On the other hand, explicit is always better than implicit, so it's your call, it requires minimum code change to remove the parallel argument in seq.

PS: Should util.py be renamed to utils.py?

from pyfunctional.

EntilZha avatar EntilZha commented on May 18, 2024

I also like that syntax as well, it makes it clearer that there is one execution engine and setting that is a global setting. In any case, the use case for setting two different values for that is pretty limited, in which case the user can use the constructor explicitly.

On failure modes: I think that if there is some explicit instruction to try to parallelize code (eg: by using pseq), it should fail if serialization fails. I think it would be better that the user knows that their code is not being parallelized, and a useful error message on what they can do about it (like install dill).

For now, I would keep seq and pseq separate, but if it turns out to be a more common use it can be added in without a breaking API change (the opposite not being true).

Is there a good reason to rename it? A short github search indicates that util.py is more common anyway.

from pyfunctional.

versae avatar versae commented on May 18, 2024

OK, so now both syntaxes are supported:

# As property (default to cpu_count())
from functional import pseq
pseq.processes = 2
pseq.range(40)...
# Passed as parameter overrides the property
pseq(processes=3).range(40)...
pseq(range(40), processes=3)...
# There is also a new parameter raise_errors (default True)
# to fallback to non-parallel mode if serialization fails
pseq.raise_errors = False

# seq now will fail if passed processes
from functional import seq
seq(..., processes=4)

Here's the commit. I had to create new classes (Stream and ParallelStream) to avoid circular importing between ExecutionEngine's and Sequence.

from pyfunctional.

EntilZha avatar EntilZha commented on May 18, 2024

Looks great! There are a few minor things here and there, but I could comment on those individually when you open a pull request.

Next things are probably:

  1. Figure out how to test things correctly. I would make it so that the entire test suite is tested against both pseq and seq, but skipping tests that do state/side effects testing to insure correct execution order.
  2. Go through the transformations and mark the embarrassingly parallel operations with PARALLEL. I would for the moment leave everything else for future work.

from pyfunctional.

versae avatar versae commented on May 18, 2024
  1. One option for the tests is to have something like:
class TestPipeline(unittest.TestCase):

    def setUp(self):
        self.seq = seq

And then using self.seq elsewhere in the code. This would allow to have a TestPipelineParallel(TestPipeline) that uses pseq instead.
2. I am not sure if there are any other easily parallelizable transformations. But once marked, a next step could to implement parallelization for reduction functions.

from pyfunctional.

versae avatar versae commented on May 18, 2024

Also, I just discovered that stuff like pseq([1, 2, 3]).sum(lambda x: x * 2)) fails while pseq([1, 2, 3]).sum()) (not passing a function) works. I'll take a look tomorrow.

from pyfunctional.

EntilZha avatar EntilZha commented on May 18, 2024

The above idea for testing is what I had in mind. I would extend this idea by making a function decorator like @parallel for tests that should (correctly) fail in parallel, and should be skipped. I'm sure there is some hook in in unittest that can help with that.

How does that fail?

from pyfunctional.

EntilZha avatar EntilZha commented on May 18, 2024

This demonstrates that capability:

class MyTestCase(unittest.TestCase):

    @unittest.skip("demonstrating skipping")
    def test_nothing(self):
        self.fail("shouldn't happen")

    @unittest.skipIf(mylib.__version__ < (1, 3),
                     "not supported in this library version")
    def test_format(self):
        # Tests that work for only a certain version of the library.
        pass

    @unittest.skipUnless(sys.platform.startswith("win"), "requires Windows")
    def test_windows_support(self):
        # windows specific testing code
        pass

from pyfunctional.

EntilZha avatar EntilZha commented on May 18, 2024

Any updates @versae? I am actually fairly excited to get a PR open and merged. Great work so far! If you happen to be busy or don't want to work out the tests, just let me know and we can figure something out.

from pyfunctional.

versae avatar versae commented on May 18, 2024

I fixed the problem I found and now all tests run (I basically imported pseq as seq) except for those that use functions with side-effects, call [], or len() of the iterable returned by the pool of processes. I could workaround the len() problem by consuming the iterable into another interable while counting the number of elements, or simply doing list() before len(), but not sure if that's what we want.

The tests that need to be rethought for the parallel execution are:

  • test_single_call() and test_cache(), due to function with side-effect
  • test_init(), test_inits(), and test_tails() rely on calling len() and or []
  • test_to_sqlite3_namedtuple(), can't serialize a lambda of a namedtuple.

Once those tests are solved I will do the pull request and then you could organize the final tests in the best way.

from pyfunctional.

versae avatar versae commented on May 18, 2024

Also, itertools.chain objects are not subscriptable, so things like sequence[:i] (in transformations.py:355) fail as well. Only option here is to convert the generator/iterable to a list, which is what we had before the lazy_parallel implementation.

from pyfunctional.

versae avatar versae commented on May 18, 2024

Here's how I solved it:

lambda sequence: (lambda s: [wrap(s[i:])
                             for i in range(len(s) + 1)]
                 )(list(sequence)),

Length could me made more efficient by using reduce(lambda acc, e: acc + 1, sequence, 0), but since there is a need for [i:] in the same function, it might just be better to do the conversion to list.

With this last change, now the only errors are due to serialization and side-effects. If you are OK with this I can create the PR :)

from pyfunctional.

EntilZha avatar EntilZha commented on May 18, 2024

Little surprised at the issues with len and indexing, both of those should and do call cache before asking for len or __getitem__. At that point, cache should have force expanded the iterable and saved it as a list (like here https://github.com/EntilZha/PyFunctional/blob/master/functional/pipeline.py#L549).

It is also similar to the case with tails since it has the PRE_COMPUTE execution strategy which does almost the same thing as cache (relevant code https://github.com/EntilZha/PyFunctional/blob/master/functional/lineage.py#L49).

For these, I would like to understand why they weren't working in the first place since they should have. Could you post what was failing?

On tests, I'll take a careful look when I get home in a couple hours, but if there is anything that will obviously not work in parallel like the side effect tests, then they can be flagged to be skipped. The point of those tests is check the correctness of the code which is independent of serial/parallel execution. On serialization, I would skip this test as well and if I can fix it later I will.

from pyfunctional.

versae avatar versae commented on May 18, 2024

And you are absolutely right. It was a problem in the logic of the parallel execution engine. It is fixed now.

from pyfunctional.

EntilZha avatar EntilZha commented on May 18, 2024

With that fixed, I think you should be able to put in the flags on tests for the ones that should be fixed and be good to go for PR?

from pyfunctional.

versae avatar versae commented on May 18, 2024

I don't understand, there are no tests yet for pseq.

from pyfunctional.

EntilZha avatar EntilZha commented on May 18, 2024

Sorry, I didn't take a close enough look at the committed code. Just to verify, currently running the tests is substituting seq for pseq here https://github.com/versae/PyFunctional/blob/master/functional/test/test_functional.py?

To get everything running I would first add a constructor like:

class TestPipeline(unittest.TestCase):
    def __init__(self, *args, **kwargs):
        self.seq = seq
        super(TestPipeline, self).__init__(*args, **kwargs)

Then change all the references of seq in the tests to self.seq. Then add another test class like:

class TestParallelPipeline(TestPipeline):
    def __init__(self, *args, **kwargs):
        self.seq = pseq
        super(TestParallelPipeline, self).__init__(*args, **kwargs)

For all the tests which shouldn't be there because of side effects and what not, put this code:

@unittest.skipIf(self.seq is pseq, "seq is pseq, skipping serial test")
def test_side_effect(self):
    pass

If things work as I think they would, this should lead to unittest running both of those classes and skipping the tests that shouldn't be tested in parallel.

from pyfunctional.

versae avatar versae commented on May 18, 2024

Gotcha now, I wasn't sure that was the approach you wanted.

from pyfunctional.

EntilZha avatar EntilZha commented on May 18, 2024

If you have a better idea, go for it. Testing like above looked like a reasonable way to go about it

from pyfunctional.

versae avatar versae commented on May 18, 2024

Unfortunately the decorator has no access to self, so we have two options:

  1. Create our own
  2. Raise a unittest.SkipTest inside the function

from pyfunctional.

EntilZha avatar EntilZha commented on May 18, 2024

Looks like there is a method self.skipTest(reason) https://docs.python.org/2.7/library/unittest.html#unittest.TestCase.skipTest designed for exactly this. I think thats a fine option

from pyfunctional.

versae avatar versae commented on May 18, 2024

Done, and done :)

from pyfunctional.

EntilZha avatar EntilZha commented on May 18, 2024

Is there anything else left to do?

from pyfunctional.

versae avatar versae commented on May 18, 2024

There are other kinds of functions that could be running in parallel using a reduce approach, such as sum, but that would require a lot of code trial and error.

from pyfunctional.

EntilZha avatar EntilZha commented on May 18, 2024

My thought would be to break these different things into smaller PRs rather than make one giant PR. I think that the work so far could be a single PR which focuses on the added parallelization capabilities. Future commits/PRs could add in other parallel methods, and better reducing methods.

It would also be great to merge it so that I can work on implementing on_error which I have waited on since the execution engine code was getting changed.

from pyfunctional.

versae avatar versae commented on May 18, 2024

That sounds good to me too.

from pyfunctional.

versae avatar versae commented on May 18, 2024

We need to add dill to the travis setup. And pypy doesn't even work.

from pyfunctional.

EntilZha avatar EntilZha commented on May 18, 2024

Feature merged in #67

from pyfunctional.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.