Giter VIP home page Giter VIP logo

Comments (5)

GaelVaroquaux avatar GaelVaroquaux commented on June 28, 2024

Parallel.retrieve ensures that order is maintained when Parallel.call is
called on an iterable. Rather than returning a list of results in the same
order as the input, I propose a generator-based version of Parallel.call
that yields output as it is ready without ensuring order.

workers = Parallel(n_jobs=8)
workers(delayed(sqrt)(i) for i in range(10)) # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
workers.async(delayed(square)(i) for i in range(10)) # <generator object _____ at 0x______>

Thoughts? Issues?

It would be fantastic, but I have in mind that it would be a bit tricky
to implement. I somehow fear deadlocks as there is a lock between the
dispatch queue and the retrieve code.

However, I am very open to a pull request to discuss these issues based
on code rather than gut feelings.

The way I would suggest to do it is with a keyword argument 'async' to
Parallel.

Thanks heaps for proposing this, and sorry for my slow reply.

from joblib.

duckworthd avatar duckworthd commented on June 28, 2024

Hi Gael,

Below is a very barebones implementation (with example) of an asynchronous version of Parallel.__call__. I don't have the eye for catching deadlocks that you may have, so would you might taking a look? In particular, I have TODO I'm not quite sure how to solve and another TODO where there's more polling happening than I'd like. Suggestions appreciated.

import multiprocessing
import Queue


class Parallel(object):
  def __init__(self, n_jobs=None):
    self.pool = multiprocessing.Pool(n_jobs)

  def _dispatch(self, iterator, calls):
    n_calls = 0   # number of outputs to expect (could be unbounded)
    for func, args, kwargs in calls:
      n_calls += 1

      # asynchronously execute function and put its result to the iterator when
      # the function returns
      self.pool.apply_async(
          func, args, kwargs, callback=iterator.put
        )

    # tell the result iterator how many results to expect
    iterator.length(n_calls)

  def __call__(self, calls):
    result_iterator = ResultIterator()

    # asynchronously put tasks to the iterator
    self._dispatch(result_iterator, calls)   # TODO this should be done async

    # return iterator holding task outputs
    return result_iterator


class ResultIterator(object):
  """An iterator whose length will be known later"""

  def __init__(self, *args, **kwargs):
    self._queue    = multiprocessing.Queue()
    self._returned = 0
    self._length   = float('inf')

  def put(self, result):
    """Add a new result to the queue"""
    self._queue.put(result)

  def length(self, length):
    self._length = length

  def __iter__(self):
    while True:
      if self._returned >= self._length:
        raise StopIteration()
      else:
        # TODO this takes more resources than it should, no?
        try:
          yield self._queue.get(block=False)
          self._returned += 1
        except Queue.Empty:
          # still waiting for next result
          pass


def wait(n):
  import time
  print "enter wait(%f)" % n
  time.sleep(n)
  print "leave wait(%f)" % n
  return n


if __name__ == '__main__':
  from joblib import delayed
  import random; random.seed(0)

  args    = [i for i in range(4)]
  random.shuffle(args)
  results = Parallel()(delayed(wait)(arg) for arg in args)
  for i, result in enumerate(results):
    print 'returned: (%d) %f' % (i, result)

from joblib.

GaelVaroquaux avatar GaelVaroquaux commented on June 28, 2024

Hi David,

Unfortunately this barebones implementation does not give us a lot of the
functionalities that are valuable in joblib, in particular error
management, and on-the-fly dispatching of jobs. These are the features
that leed to having 2 threads in the dispatching and potential deadlocks.

I'd much prefer that you submitted a patch to the existing code, so that
we can have a look at it and see if it raises potential issues such as
deadlocks.

Cheers

from joblib.

duckworthd avatar duckworthd commented on June 28, 2024

@GaelVaroquaux I've been toying with capturing exceptions, but I'm having trouble re-raising them later. It seems that the callback argument to Pool.apply_async(..., callback) is applied in a separate thread (not a process, an actual thread!). Are you familiar with a way of capturing exceptions thrown in other threads and re-raising them in the main thread asynchronously?

from joblib.

fcharras avatar fcharras commented on June 28, 2024

Done in #1393 and #1463

from joblib.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.