Comments (5)
Parallel.retrieve ensures that order is maintained when Parallel.call is
called on an iterable. Rather than returning a list of results in the same
order as the input, I propose a generator-based version of Parallel.call
that yields output as it is ready without ensuring order.workers = Parallel(n_jobs=8)
workers(delayed(sqrt)(i) for i in range(10)) # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
workers.async(delayed(square)(i) for i in range(10)) # <generator object _____ at 0x______>Thoughts? Issues?
It would be fantastic, but I have in mind that it would be a bit tricky
to implement. I somehow fear deadlocks as there is a lock between the
dispatch queue and the retrieve code.
However, I am very open to a pull request to discuss these issues based
on code rather than gut feelings.
The way I would suggest to do it is with a keyword argument 'async' to
Parallel.
Thanks heaps for proposing this, and sorry for my slow reply.
from joblib.
Hi Gael,
Below is a very barebones implementation (with example) of an asynchronous version of Parallel.__call__
. I don't have the eye for catching deadlocks that you may have, so would you might taking a look? In particular, I have TODO I'm not quite sure how to solve and another TODO where there's more polling happening than I'd like. Suggestions appreciated.
import multiprocessing
import Queue
class Parallel(object):
def __init__(self, n_jobs=None):
self.pool = multiprocessing.Pool(n_jobs)
def _dispatch(self, iterator, calls):
n_calls = 0 # number of outputs to expect (could be unbounded)
for func, args, kwargs in calls:
n_calls += 1
# asynchronously execute function and put its result to the iterator when
# the function returns
self.pool.apply_async(
func, args, kwargs, callback=iterator.put
)
# tell the result iterator how many results to expect
iterator.length(n_calls)
def __call__(self, calls):
result_iterator = ResultIterator()
# asynchronously put tasks to the iterator
self._dispatch(result_iterator, calls) # TODO this should be done async
# return iterator holding task outputs
return result_iterator
class ResultIterator(object):
"""An iterator whose length will be known later"""
def __init__(self, *args, **kwargs):
self._queue = multiprocessing.Queue()
self._returned = 0
self._length = float('inf')
def put(self, result):
"""Add a new result to the queue"""
self._queue.put(result)
def length(self, length):
self._length = length
def __iter__(self):
while True:
if self._returned >= self._length:
raise StopIteration()
else:
# TODO this takes more resources than it should, no?
try:
yield self._queue.get(block=False)
self._returned += 1
except Queue.Empty:
# still waiting for next result
pass
def wait(n):
import time
print "enter wait(%f)" % n
time.sleep(n)
print "leave wait(%f)" % n
return n
if __name__ == '__main__':
from joblib import delayed
import random; random.seed(0)
args = [i for i in range(4)]
random.shuffle(args)
results = Parallel()(delayed(wait)(arg) for arg in args)
for i, result in enumerate(results):
print 'returned: (%d) %f' % (i, result)
from joblib.
Hi David,
Unfortunately this barebones implementation does not give us a lot of the
functionalities that are valuable in joblib, in particular error
management, and on-the-fly dispatching of jobs. These are the features
that leed to having 2 threads in the dispatching and potential deadlocks.
I'd much prefer that you submitted a patch to the existing code, so that
we can have a look at it and see if it raises potential issues such as
deadlocks.
Cheers
from joblib.
@GaelVaroquaux I've been toying with capturing exceptions, but I'm having trouble re-raising them later. It seems that the callback
argument to Pool.apply_async(..., callback)
is applied in a separate thread (not a process, an actual thread!). Are you familiar with a way of capturing exceptions thrown in other threads and re-raising them in the main thread asynchronously?
from joblib.
from joblib.
Related Issues (20)
- Add BSD support to CPU count check HOT 5
- What happens when `n_jobs` > `os.cpu_count()`?
- should joblib warn about float `n_jobs`? HOT 2
- Tests fails on Apple ARM
- Is it possible to return generator of generators in joblib.Parallel? HOT 3
- Compatibility Issue with `joblib` during serialization of `scipy` object in Python 3.11.5 HOT 2
- Large numpy arrays stored in big-endian format cannot be serialized, leading to errors with Parallel HOT 5
- Why does recursively running `joblib.Parallel` increases the computation time? HOT 1
- Consider adding `POLARS_MAX_THREADS` to `joblib._parallel_backends.MAX_NUM_THREADS_VARS` HOT 1
- timeout semantics unclear HOT 2
- debug log for worker overhead HOT 2
- return_as generator_unordered support HOT 1
- How to ensure that csv can be saved completely with Joblib? HOT 1
- Cannot change JSONDisk compression level HOT 1
- BatchCompletionCallBack not called with n_jobs=1 HOT 1
- update cached values HOT 1
- Capturing stdout of child processes in prefect logger HOT 2
- Using CPU cores over an `ssh` connection in `joblib.Parallel`? HOT 1
- BUG: MemorizedFunc.call output changed between 1.3.2 and 1.4.0 HOT 3
- Type-annotate Memory.cache
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from joblib.