Giter VIP home page Giter VIP logo

distributed's Introduction

Distributed

Test Status Longitudinal test report (full version) Longitudinal test report (short version) Coverage status Documentation Status Discuss Dask-related things and ask for help Version Status NumFOCUS

A library for distributed computation. See documentation for more details.

distributed's People

Contributors

broxtronix avatar canavandl avatar charlesbluca avatar crusaderky avatar dependabot[bot] avatar fjetter avatar gforsyth avatar github-actions[bot] avatar gjoseph92 avatar graingert avatar hendrikmakait avatar ian-r-rose avatar jacobtomlinson avatar jakirkham avatar jcrist avatar jrbourbeau avatar jsignell avatar koverholt avatar lesteve avatar madsbk avatar martindurant avatar milesgranger avatar mrocklin avatar pentschev avatar phofl avatar pitrou avatar quasiben avatar qulogic avatar rbubley avatar tomaugspurger avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

distributed's Issues

Configuration file

We could put the following information into a .distributedrc configuration file:

maximum message size: 2e9  # in bytes
debug level: info # or debug, critical, exception, ...
compression: None
hdfs address: ???
scheduler address: ???

This is motivated by a desire to avoid importing psutil unnecessarily, (which is a pain for cross-platform function serialization), but it may have other values. Not sure if we want to go this route or not.

Visualize/monitor worker state

The scheduler has state, the center has state, all of the workers have their own state. It'd be swell to visualize what's going on, both to help debug issues, and generally to look cool.

What would we want to display, what can we display, and what are the right ways to display this information?

Test HDFS under Python 3

@danielfrg if you have some time can you recreate your docker container work in the hdfs3 library here? Distributed is only testing hdfs3 under python 2.

Automate setup of distributed from cluster scheduler nodes file

We had a request today (and in fact I've heard this from a few other people) to have a way to support the automatic generation of configuration files for Python parallel computing frameworks based on the "nodes" files produced and dropped into the "node 0" (sometimes called "rank 0") master node when scheduled through a batch scheduler.

It seems like this is the kind of thing that magpie might do: https://github.com/chu11/magpie

It would be nice to have that capability for our distributed computing libraries.

dworker should wait for dcenter to come up

if dworker is running before dcenter process has started, worker never connects to the center. Consider:

$dworker 10.52.10.8:8787
distributed.nanny - INFO - Nanny starts worker process 10.52.10.8:8788
distributed.worker - INFO - Start worker at             10.52.10.8:8789
distributed.worker - INFO - Waiting to connect to       10.52.10.8:8787

and then:

$ dcenter 
distributed.dcenter - INFO - Start center at 10.52.10.8:8787

it appears that while dcenter process is running after dworker, dworker does not try to connect with the center again.

This is a slight issue when lunching a mesos task with center and worker process; mesos launches all tasks (center and workers) simultaneously, however the cadence of when center and worker task start running may not be in-sync.

Distribute .py files or eggs

Users are likely to have custom code that they want to ship out to all of the worker nodes and then load up into the running worker process. What is the right way to accomplish this?

To be clear, there are probably two technical challenges:

  1. Distribute files between nodes
  2. Inject those files into the environment of the running worker, possibly reloading over existing modules (I believe that Python has mechanisms for this.)

We could punt this off to other tools, but it's probably pretty useful to have directly within distributed or on closely attached tools. Ideally we can do this in such a way that it does not become a significant maintenance burden.

I wonder if @quasiben , @danielfrg , or @martindurant have thoughts on how best to do this.

Relay executor inputs/outputs to far-away node

The executor speaks directly to worker nodes. For minimal latency it's nice to have this run directly on the computation network. However users are likely to want to run things from their local machines. This is problematic both for latency issues and also for network permissions issues, often only the head node is publicly visible.

So we should have some mechanism to relay all communications from a client machine to a remote Executor.

All communication is already well separated through Queues. Doing this well probably just requires us to reroute those queues to communication streams.

cc @seibert

Shut down cluster from user commands

All of the Center/Worker/Nanny support termination commands, we just need to build a small function to trigger these and verify clean shutdown.

There is no particular reason why this needs to be attached to the executor. It could be a freestanding function. Perhaps we should put it on the Executor anyway though just for convenience.

Cancel futures

There should be a way for the user to cancel a future or list of futures.

For API the concurrent.futures API supports this for a single future, we should probably also have an Executor.cancel(list_of_futures) function for a list of futures. This would cancel enqueued tasks, tasks that are not yet ready, and would also cancel all dependent tasks. These tasks would likely be removed from memory and soon their metadata will also be removed from the scheduler.

For implementation this probably looks a lot like existing code for garbage collection. It's unclear what to do when multiple clients want and don't want the same data. My guess is that we still compute things while any client wants the result.

Compression

Communications both between workers and between workers and center/scheduler/client can all, at times, benefit from compression. Do we want to turn this on by default? Do we want to make it optional? If so, at what granularity should it be optional?

  • Whole system
  • Specified per worker/center/... object?
  • Specified per communication?

If we specify it per communication then we'll have to have a proper header that is separate from the current payload.

  • What do other systems (spark, impala) do?
  • What compression systems are both very fast and widely available (or at least on conda)?

Warning on `import distributed`

/Library/Python/2.7/site-packages/dask-0.7.6-py2.7.egg/dask/array/numpy_compat.py:29: RuntimeWarning: divide by zero encountered in divide
  or not np.allclose(np.divide(1, .5, dtype='i8'), 2)

Can give more details on system if needed.

dscheduler seems to be finding the wrong Python distribution

0280-Izaid:Desktop izaid$ python
Python 2.7.11 (default, Dec 11 2015, 00:08:45)
[GCC 4.2.1 Compatible Apple LLVM 7.0.2 (clang-700.1.81)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import six
>>> six.__version__
>>> '1.10.0'
0280-Izaid:~ izaid$ dscheduler
Traceback (most recent call last):
  File "/usr/local/bin/dscheduler", line 4, in <module>
    import pkg_resources
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/Extras/lib/python/pkg_resources.py", line 2797, in <module>
    parse_requirements(__requires__), Environment()
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/Extras/lib/python/pkg_resources.py", line 580, in resolve
    raise VersionConflict(dist,req) # XXX put more info here
pkg_resources.VersionConflict: (six 1.4.1 (/System/Library/Frameworks/Python.framework/Versions/2.7/Extras/lib/python), Requirement.parse('six>=1.5'))

Dask.array + distributed notes

Setup

I'm playing with dask.array on a distributed cluster. I've scattered some ECMWF data onto four m3.2xlarges. Data is sitting as numpy arrays in memory.

In [22]: x
Out[22]: dask.array<x, shape=(236, 721, 1440), dtype=float64, chunksize=(4, 721, 1440)>

Times

  • Getting a single element from the array, x[0, 0, 0] -- 8ms. Hooray for low overhead
  • Pulling out a single time slice (720 / 1440 float64s) x[100] -- 130 ms
  • Computing the mean along the non-chunked axes (x.mean(axis=[1, 2])) -- 300ms
  • Computing the mean along the chunked axis (x.mean(axis=0)) -- 14s.
    This is unfortunate. This is probably the result of moving around 400MB between workers on a not-super-fast interconnect. This would be faster if we did a reduction tree, rather than all-to-one.
  • Computing (x - x.mean()).sum() is taking a long long time. This is likely because in some cases it's moving the blocks of x to the x.mean() value, rather than the other way around.

Regarding the last point, imagine that the intermediate result storing the mean of the array is on Alice while a particular chunk, (x, 0, 0) is on Bob. We want to copy the mean from Alice to Bob (this is cheap) and not the chunk from Bob to Alice (this is expensive.) If we know how large each intermediate result is then the solution is obvious. Unfortunately we don't currently know how large each value is and so it's hard to make this determination.

Tracking data size

  • Dynamically: distributed workers can run some nbytes function on results and send that information back to the scheduler when they report work-finished.
  • Statically: dask.array can figure out the size of every chunk robustly. This would require us to tack on more metadata onto dask collections generally and pass it through as they create new collections

I'm currently leaning towards the first solution, putting the weight of this on distributed, but the second solution does open up some other possibilities with dask.

cc @shoyer @freeman-lab

`dworker` does not work on Windows 7

dworker does not work on Windows 7 using Python 2.7 (no other versions tested).

The process created by the nanny using multiprocessing.Process fails to execute due to an issue with how multiprocess handles forking in Windows. The error message is "ImportError: No module named dworker". I don't know the best way to address this. A workaround is to copy "dworker" to a filename that ends in ".py"--let's say "run_dworker.py"--and then run "python run_dworker.py".

Also, the dworker process just hung.

Unable to connect to Center in Linux from Client on Windows

Using the current master branch (December 16, 2015), I cannot run a Center on Linux and connect to it from Windows. Creating an Executor on Windows raises ImportError: cannot import name _psutil_linux during loads called from read in core.py.

Rename Distributed

Distributed is arguably an overly broad term. It would be nice to give this project a more specific name.

Reasons to rename

Within just-python this term makes sense, and fits nicely into the threading and multiprocessing library names. However, if we consider other things one can do in a distributed way (like data storage) or distributed libraries within other languages (like Hadoop), calling a project "distributed" seems like we're claiming too much conceptual space.

I would like to move this repository over to the blaze org soon. Ideally this happens at the same time as the renaming to reduce confusion.

Some seeds

  • distributed is lightweight
  • distributed is good at complex data analysis, not databases or trivial munging
  • distributed has low-latency
  • distributed responds robustly to stimuli
  • distributed is good for ad-hoc computations that don't fit into typical map/reduce models
  • distributed is a Python library (is there an attractive term for a swarm of fast twitchy snakes?)

Restrictions for any name

Should be easy to type. Must be available on PyPI. Must not conflict with another name for another project in another language. Must not conflict with a copyrighted/trademarked term.

Setting resource locality when calling Executor.submit

I can think of a few use cases where certain tasks need to be executed on particular nodes:

  • Partitioning of input files onto different nodes of a cluster (possibly with some redundancy) means that initial file read tasks must run on a subset of the workers.
  • Heterogeneous clusters where some tasks can run on low memory systems, but a few tasks need to run on a high memory system.
  • Similar to above, but some nodes have GPUs and others do not.

One possible API for this could look like:

executor = Executor(('127.0.0.1', 8787)) 
a = executor.submit(add, 1, 2, _allowed_workers=['192.168.1.100:8787', '192.168.1.101:8787']) 

The _allowed_workers kwarg is has a leading underscore to attempt to avoid collision with any kwargs of the user function being submitted.

As a related feature, it would be nice if workers could register themselves with unique identifiers that could be used with _allowed_workers instead of the full IP address and port.

Environments and multi-task persistent state

I frequently use GPU clusters, and for these workloads I find that I need to do some relatively expensive setup or teardown operations to establish a valid GPU computing context on each worker before I start the distributed processing of my data. There are various ways to "trick" distributed workers into maintaining a stateful execution environment for multiple jobs, probably the best of which is to use global variable in the Python interpreter itself. However, this solution (and others I'm aware of) do not provide particularly fine grained control over the distributed execution environment. It would be very useful if distributed could provide some way to explicitly manage execution "environments" in which jobs could be explicitly run.

I'm imagining something along these lines. I would call

my_gpu_env = executor.environment(my_setup_function(), my_teardown_function())

which would cause each worker to run my_setup_function() and then return a reference to its copy of the new environment. The scheduler would record these and send back a single reference to the collection of environments on the workers, my_new_env in this example. Subsequent calls to run distributed functions could specify the environment in which they would like to be run:

executor.map( some_function_using_the_gpu, data, environment = my_gpu_env )

The scheduler could keep track of the setup() and teardown() functions associated with each environment. Then, if a new worker comes online and is asked to run a function in an environment that it has not yet set up, it could request the necessary initialization routine from the scheduler and run that first before running any jobs.

This is a somewhat rough sketch of what would be desirable here, and I'm curious to start a discussion here to see if there are other users out there that might also want a feature like this. In particular, are there others using distributed to manage a cluster of GPU nodes? How do you manage a cluster-wide execution context?

Submit and scatter to all workers

Sometimes we need to send a piece of data or to perform a computation on all workers. This might be for a critical piece of data like a database connection that we don't want to have to bounce between the workers at random.

From an API perspective the functions submit and scatter could take an all=True or broadcast=True keyword argument, which would immediately trigger a computation on all workers, or on all workers listed within the workers= keyword.

From an implementation perspective this operation is a bit odd. We can't rely on the typical scheduling mechanics to handle it because they'll try to be too efficient. My guess is that we will jump the queue and and will send the task to each current worker immediately. I don't intend to support new workers that come in after the submit call.

If we wanted to roll operations like this generally into the scheduler then we could add the concept of num_replications to tasks to enable us to ask that a certain task run on a certain number of independent workers. For broadcast we would set this to inf. This is a bit too much machinery for me right now, but might be a good generalization if this idea becomes more useful later.

In either case I intend to return a single future with the same key. That future will probably have a finished status after just the first worker finishes the task, which may cause some issues.

cc @danielfrg @broxtronix

Race condition in Executor.ncores?

In an IPython notebook, if I execute the following two lines in a cell:

client = distributed.Executor('192.168.5.1:8787')
client.ncores

I get an exception:

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-13-d814f40dfc7a> in <module>()
      1 #local_client = Client(sched.address_to_clients)
      2 client = distributed.Executor('192.168.5.1:8787')
----> 3 client.ncores

AttributeError: 'Executor' object has no attribute 'ncores'

But if I split those statements into two cells and run them manually (with a human-scale delay between the two statements), client.ncores works fine.

dcluster authentication problem

โ”Œโ”€[pwolfram][XYZ1][~/trash]                                                                                                                                                                                             
โ””โ”€โ–ช dcluster --hostfile hostnames.txt                                                                                                                                                                                    

---------------------------------------------------------------                                                                                                                                                          
                    Distributed v1.7.3                                                                                                                                                                                   

Worker nodes:                                                                                                                                                                                                            
  0: XYZ1                                                                                                                                                                                                               
  1: XYZ2                                                                                                                                                                                                               

scheduler node: XYZ1:8786                                                                                                                                                                                               
---------------------------------------------------------------                                                                                                                                                          


[ dcluster ] : SSH connection error when connecting to XYZ1:22 to run 'dworker XYZ1:8786 --host XYZ1 --nthreads                                                                                                       
0 --nprocs 1'                                                                                                                                                                                                            
[ dcluster ] : SSH connection error when connecting to XYZ1:22 to run 'dscheduler --port 8786 --strict-port'                                                                                                            
               SSH reported this exception: Authentication failed.                                                                                                                                                       
Traceback (most recent call last):                                                                                                                                                                                       
  File "/users/pwolfram/lib/python2.7/site-packages/distributed/cluster.py", line 48, in async_ssh                                                                                                                       
               SSH reported this exception: Authentication failed.                                                                                                                                                       
Traceback (most recent call last):                                                                                                                                                                                       
  File "/users/pwolfram/lib/python2.7/site-packages/distributed/cluster.py", line 48, in async_ssh                                                                                                                       
    banner_timeout = 20)  # Helps prevent timeouts when many concurrent ssh connections are opened.                                                                                                                      
  File "/users/pwolfram/envs/LIGHT_analysis/lib/python2.7/site-packages/paramiko/client.py", line 307, in connect                                                                                                        
    banner_timeout = 20)  # Helps prevent timeouts when many concurrent ssh connections are opened.                                                                                                                      
  File "/users/pwolfram/envs/LIGHT_analysis/lib/python2.7/site-packages/paramiko/client.py", line 307, in connect                                                                                                        
    look_for_keys, gss_auth, gss_kex, gss_deleg_creds, gss_host)                                                                                                                                                         
    look_for_keys, gss_auth, gss_kex, gss_deleg_creds, gss_host)                                                                                                                                                         
  File "/users/pwolfram/envs/LIGHT_analysis/lib/python2.7/site-packages/paramiko/client.py", line 519, in _auth                                                                                                          
  File "/users/pwolfram/envs/LIGHT_analysis/lib/python2.7/site-packages/paramiko/client.py", line 519, in _auth                                                                                                          
    raise saved_exception                                                                                                                                                                                                
AuthenticationException: Authentication failed.                                                                                                                                                                          
               Retrying... (attempt 1/3)                                                                                                                                                                                 
    raise saved_exception                                                                                                                                                                                                
AuthenticationException: Authentication failed.                                                                                                                                                                          
               Retrying... (attempt 1/3)                                                                                                                                                                                 
[ dcluster ] : SSH connection error when connecting to XYZ2:22 to run 'dworker XYZ1:8786 --host XYZ2 --nthreads                                                                                                       
0 --nprocs 1'                                                                                                                                                                                                            
               SSH reported this exception: Authentication failed.                                                                                                                                                       
Traceback (most recent call last):                                                                                                                                                                                       
  File "/users/pwolfram/lib/python2.7/site-packages/distributed/cluster.py", line 48, in async_ssh                                                                                                                       
    banner_timeout = 20)  # Helps prevent timeouts when many concurrent ssh connections are opened.                                                                                                                      
  File "/users/pwolfram/envs/LIGHT_analysis/lib/python2.7/site-packages/paramiko/client.py", line 307, in connect                                                                                                        
    look_for_keys, gss_auth, gss_kex, gss_deleg_creds, gss_host)                                                                                                                                                         
  File "/users/pwolfram/envs/LIGHT_analysis/lib/python2.7/site-packages/paramiko/client.py", line 519, in _auth                                                                                                          
    raise saved_exception                                                                                                                                                                                                
AuthenticationException: Authentication failed.                                                                                                                                                                          
               Retrying... (attempt 1/3)                                                                                                                                                                                 
[ dcluster ] : SSH connection error when connecting to XYZ1:22 to run 'dworker XYZ1:8786 --host XYZ1 --nthreads                                                                                                       
0 --nprocs 1'                                                                                                                                                                                                            
               SSH reported this exception: Authentication failed.                                                                                                                                                       
Traceback (most recent call last):                                                                                                                                                                                       
  File "/users/pwolfram/lib/python2.7/site-packages/distributed/cluster.py", line 48, in async_ssh                                                                                                                       
    banner_timeout = 20)  # Helps prevent timeouts when many concurrent ssh connections are opened.                                                                                                                      
  File "/users/pwolfram/envs/LIGHT_analysis/lib/python2.7/site-packages/paramiko/client.py", line 307, in connect                                                                                                        
    look_for_keys, gss_auth, gss_kex, gss_deleg_creds, gss_host)                                                                                                                                                         
  File "/users/pwolfram/envs/LIGHT_analysis/lib/python2.7/site-packages/paramiko/client.py", line 519, in _auth                                                                                                          
    raise saved_exception                                                                                                                                                                                                
AuthenticationException: Authentication failed.                                                                                                                                                                          
               Retrying... (attempt 2/3)                                                                                                                                                                                 
[ dcluster ] : SSH connection error when connecting to XYZ2:22 to run 'dworker XYZ1:8786 --host XYZ2 --nthreads                                                                                                       
0 --nprocs 1'                                                                                                                                                                                                            
               SSH reported this exception: Authentication failed.                                                                                                                                                       
Traceback (most recent call last):                                                                                                                                                                                       
  File "/users/pwolfram/lib/python2.7/site-packages/distributed/cluster.py", line 48, in async_ssh                                                                                                                       
    banner_timeout = 20)  # Helps prevent timeouts when many concurrent ssh connections are opened.                                                                                                                      
  File "/users/pwolfram/envs/LIGHT_analysis/lib/python2.7/site-packages/paramiko/client.py", line 307, in connect                                                                                                        
    look_for_keys, gss_auth, gss_kex, gss_deleg_creds, gss_host)                                                                                                                                                         
  File "/users/pwolfram/envs/LIGHT_analysis/lib/python2.7/site-packages/paramiko/client.py", line 519, in _auth                                                                                                          
    raise saved_exception                                                                                                                                                                                                
AuthenticationException: Authentication failed.                                                                                                                                                                          
               Retrying... (attempt 2/3)                                                                                                                                                                                 
[ dcluster ] : SSH connection error when connecting to XYZ1:22 to run 'dscheduler --port 8786 --strict-port'                                                                                                            
               SSH reported this exception: Authentication failed.                                                                                                                                                       
Traceback (most recent call last):                                                                                                                                                                                       
  File "/users/pwolfram/lib/python2.7/site-packages/distributed/cluster.py", line 48, in async_ssh                                                                                                                       
    banner_timeout = 20)  # Helps prevent timeouts when many concurrent ssh connections are opened.                                                                                                                      
  File "/users/pwolfram/envs/LIGHT_analysis/lib/python2.7/site-packages/paramiko/client.py", line 307, in connect                                                                                                        
    look_for_keys, gss_auth, gss_kex, gss_deleg_creds, gss_host)                                                                                                                                                         
  File "/users/pwolfram/envs/LIGHT_analysis/lib/python2.7/site-packages/paramiko/client.py", line 519, in _auth                                                                                                          
    raise saved_exception                                                                                                                                                                                                
AuthenticationException: Authentication failed.                                                                                                                                                                          
               Retrying... (attempt 2/3)                                                                                                                                                                                 
[ dcluster ] : SSH connection error when connecting to XYZ1:22 to run 'dworker XYZ1:8786 --host XYZ1 --nthreads                                                                                                       
0 --nprocs 1'                                                                                                                                                                                                            
               SSH reported this exception: Authentication failed.                                                                                                                                                       
Traceback (most recent call last):                                                                                                                                                                                       
  File "/users/pwolfram/lib/python2.7/site-packages/distributed/cluster.py", line 48, in async_ssh                                                                                                                       
    banner_timeout = 20)  # Helps prevent timeouts when many concurrent ssh connections are opened.                                                                                                                      
  File "/users/pwolfram/envs/LIGHT_analysis/lib/python2.7/site-packages/paramiko/client.py", line 307, in connect                                                                                                        
    look_for_keys, gss_auth, gss_kex, gss_deleg_creds, gss_host)                                                                                                                                                         
  File "/users/pwolfram/envs/LIGHT_analysis/lib/python2.7/site-packages/paramiko/client.py", line 519, in _auth                                                                                                          
    raise saved_exception                                                                                                                                                                                                
AuthenticationException: Authentication failed.                                                                                                                                                                          
[ dcluster ] : SSH connection failed after 3 retries. Exiting.                                                                                                                                                           

Distributed DataFrames

General agreement is that pandas-like functionality on big data running on a cluster would be swell.

With dask.dataframe and distributed we have a lot of the pieces to do this. What remains?

Outline

  • Example and benchmark problems
  • Parallel DataFrame algorithms as task graphs with Pandas-like API (handled by dask.dataframe)
  • Distributed execution of task graphs on distributed machines (handled by distributed)
  • Distributed efficient storage of dataframes accessible by local Python/Pandas code
  • Distributed efficient shuffle computations for set_index/groupby.apply/join-on-non-index (separate from parallel algorithms above)
    • Thorough testing

Examples

It would be great to have a collection of large datasets during development.

It would be useful to nail down a set of necessary functionality for large dataframes. The Pandas API is large and complex and it's easy to fall down into rabbit holes.

Storage Formats

Common distributed storage solutions in HDFS include CSV, Avro, Parquet, and a variety of others. Formats like CSV are trivial from a Pandas perspective but are fairly slow generally. Formats like Parquet are far less well supported within Python but are quite fast. Formats like Avro are in between.

We may also want to invent our own. There will likely need to be work for distributed large array storage (few good solutions exist.) It might not be hard to include dataframe formats in this work as well.

Storage Locations

Beyond storage we also need to hammer down APIs for how to load dataframes from various storage locations, like S3/HDFS/Ceph/... This isn't hard in principle but does require gaining familiarity with each of these systems.

The PyData on HDFS without Java blogpost shows a proof of concept for how to do this on HDFS, one of the nastier systems.

Shuffle

Applications that require shuffling data around, like groupby.apply(func) or joins on columns that are not the index require special treatment. These algorithms were special cased in dask.dataframe for single machines within the partd project. We'll need to make an efficient distributed shuffle. This is a non-trivial task.

Note that this does not affect algorithms like df.groupby(df.x).y.mean() which can be efficiently computed without a shuffle. It's only when you need to use arbitrary python functions with .apply that this becomes an issue.

Thorough testing

We're likely to run into other issues. There will need to be an extensive play period before this can be made production-ready reliable.

Tune update_state

The Executor.submit function can be used many times within for loops while building up a complex computation. This can be a powerful way to write down strange computations with a simple API. If possible we'd like to make this fast, even for large numbers of submit calls. Unfortunately this is currently somewhat costly, limiting our ability to build up computations directly in this way.

I suspect that most of the cost is in distributed/scheduler.py:update_state which is run after any submit, map, or get call. The update_state function updates the scheduler's runtime state with a new graph. In the case of submit this new graph is very small (just one key-value pair) but the function is often called many times. At the moment this function depends somewhat on the size of the current internal graph, and not just on the new graph being added. This can cause significant slowdowns when using submit repeatedly on already large internal graphs.

Some solutions

  1. Don't use submit willy-nilly, and instead ask the user to build up the graphs externally with a tool like dask.do and submit those in larger chunks instead.
  2. Batch many submit calls into larger graphs internally on the scheduler side. This can probably help to keep down update_state overhead
  3. Tune update_state so that it is linear in the size of the input graph, and somewhat speedy.

If possible I'd like to make update_state fast. It will probably be important if we have long-running schedulers in the future with large internal graphs. We might as well push a bit on this early so that we can rely on it later.

Small benchmark script

from distributed.scheduler import update_state
from operator import add
from time import time

def bench_update_state(n):
    dsk = {}
    dependencies = {}
    dependents = {}

    waiting = {}
    waiting_data = {}

    held_data = set()
    in_memory = set()
    processing = set()
    released = set()

    update_state(dsk, dependencies, dependents, held_data,
                 in_memory, processing, released,
                 waiting, waiting_data, {('y', 0): 0}, {('y', 0)})

    for i in range(n):
        new_dsk = {('x', i): i, ('y', i): (add, ('y', i - 1), ('x', i))}
        new_keys = {('y', i)}
        update_state(dsk, dependencies, dependents, held_data,
                     in_memory, processing, released,
                     waiting, waiting_data, new_dsk, new_keys)

if __name__ == '__main__':
    import sys
    n = int(sys.argv[1])
    start = time()
    bench_update_state(n)
    end = time()
    print("%4f seconds" % (end - start))

Non-linear costs

By running this script at higher values of n we see non-linear costs:

mrocklin@notebook:~/workspace/distributed$ python bench_update_state.py 10
0.001004 seconds
mrocklin@notebook:~/workspace/distributed$ python bench_update_state.py 100
0.011575 seconds
mrocklin@notebook:~/workspace/distributed$ python bench_update_state.py 1000
0.155894 seconds
mrocklin@notebook:~/workspace/distributed$ python bench_update_state.py 10000
15.132474 seconds

Most of the cost in the update_state function itself

If you run this with the profiler

$ python -m cProfile -o p.out bench_update_state.py  10000
$ pip install snakeviz
$ snakeviz p.out

You'll find that about 2.7 seconds is spent within keys_outside_frontier and the rest of the time is spent within the update_state function itself.

cc @eriknw, in case he wants to resurrect his tuning skills and dive into the scheduler's guts.

Executor raises StreamClosedError when Scheduler is not present

I get a rather obscure error if scheduler is not present at the URI on master on OSX. I cannot think of a reason why this needs to be handled right now except for clarity.

In [1]: from distributed import Executor

In [2]: a = Executor('localhost:8786') # scheduler is not present at this URI
distributed.utils - ERROR - Stream is closed
Traceback (most recent call last):
---------------------------------------------------------------------------
StreamClosedError                         Traceback (most recent call last)
<ipython-input-2-bdc308648e14> in <module>()
----> 1 a = Executor('172.17.0.5:8786')

workspace/venv/lib/python2.7/site-packages/distributed/executor.pyc in __init__(self, address, start, loop)
    216 
    217         if start:
--> 218             self.start()
    219 
    220     def start(self, **kwargs):

workspace/venv/lib/python2.7/site-packages/distributed/executor.pyc in start(self, **kwargs)
    231         while not self.loop._running:
    232             sleep(0.001)
--> 233         sync(self.loop, self._start, **kwargs)
    234 
    235     def _send_to_scheduler(self, msg):

workspace/venv/lib/python2.7/site-packages/distributed/utils.pyc in sync(loop, func, *args, **kwargs)
    103     e.wait()
    104     if error[0]:
--> 105         raise result[0]
    106     else:
    107         return result[0]

StreamClosedError: Stream is closed

Should distributed have "broadcast" functionality for explicit one-to-many data transfers?

Certain workloads reference an identical, large piece of data on all worker nodes. For example, a distributed algorithm might have a large hash or lookup table that all nodes need to access. In these cases, it can be useful to be able to pre-cache this data by "broadcasting" it out to all workers.

This requires some care, though, because a one-to-many data transfer can easily overwhelm the node on which the data is originating. In Apache Spark, this is handled using the spark_context.broadcast() method, and I believe this method uses special network transfer mechanisms (possibly something akin to Bittorrent?) to scatter the data efficiently.

I would like to use something like Spark's broadcast() mechanism in distributed, but I'm curious to generate some discussion here to see if this is really a good fit. My understanding (based on conversation with @mrocklin) is that distributed will scatter and cache blocks of data on workers on an as-needed basis, so a piece of data used by all workers will eventually be copied everywhere. But my question, then, is how efficient the current copying scheme is compared to a broadcast mechanism that is specifically designed to distributed data to all nodes efficiently.

Another related question: what is the practical limit on the size of data that distributed can easily copy between nodes? What sets this limit? Is there a certain size for data transfers below which the distributed cluster performs much better? And do workers block and become unresponsive while transferring large chunks of data?

Nanny process

Currently we spin up dworker processes on remote nodes. These workers run our computations directly. It might be preferable to instead launch a super or "nanny" process that launches dworker.

This has the following advantages:

  • Makes the network robust to catastrophic worker failure, such as would be caused by a segfault. Nanny can just restart the worker
  • Can cleanly restart the worker nodes on user request, such as when they accidentally send up a very long running function
  • Can consistently monitor and send heartbeats to the scheduler.

This also raises some questions

  • How should the scheduler interact with the nanny? Does the nanny proxy all communications to the worker (if so is there a significant cost/overhad here?) Does the scheduler have two connections open?

batch submission of jobs to workers

The overhead per submit call is around a millisecond. This becomes a bottleneck when we have many worker cores and many small jobs (which users often do.) We may be able to reduce this cost by batching many operations together. This has to happen in several places to be effective.

  1. When we accept submit jobs through the scheduler_queue we call Scheduler.update_state. This function may be more efficient if we collect and aggregate all update-graph messages into one message ahead of time.
  2. When we send a job to Worker.compute we currently send just one function/args pair. We might want to send several in one message.
  3. Perhaps if sent a batch the worker responds with a batch. If not then we'll need to aggregate a little bit on the scheduler side as well

It's not clear how much of this performance loss is due to sending many messages or due to managing internal scheduler state. It would be nice to gather intuition here.

Diagnostics

Motivation

When optimizing complex problems on distributed I run into performance issues that seem unintuitive.
In these cases it would be valuable to have more visibility into the functioning of the network both at the per-worker level and at the communication level.

A snappy visualization here would be valuable both for performance tuning and, if it is sufficiently flashy, then also for demonstration purposes.

Some things I would like to know per worker

  1. What is the CPU utilization?
  2. What is the RAM use?
  3. What kinds of tasks are running? (similar to the coloring used in the dask Profiler)
  4. How much time is spent communicating and serializing/deserializing?

Information about inter-node communication

  1. How much communication is there between workers?
  2. Which workers seem to be innudated?
  3. Which data elements are moving around the most? (again coloring similar to what is used in the dask Profiler)

Visual Design

What is the right way to present this information to a user? What information should be immediately visible and what other information should require some kind of interaction, either with links, hover tool-tips, etc.. It would be nice to dogfood bokeh here.

Possible approaches

It's on me to collect this data (this is not hard) but I'm not sure what the right thing is to do with it after or during collection. We could approach this in at least the following ways:

  1. We could have the workers record and send logs to a central point. We could analyze these in bulk after a large set of computations.
  2. We could use existing socket mechanisms already within distributed to send a stream of information real time to some diagnostic process.
  3. We could have each of the workers use the web functionality of tornado to serve their diagnostic information as web endpoints. A client application could poll from these periodically (can bokeh handle this?)

This is of moderately high priority for distributed development. My hope is that @jcrist can work on this (he worked on the diagnostics for dask) but it would also be nice to get some input from some Bokeh or web developers as well, perhaps @bryevdv can recommend a good contact person to work with on this (if anyone is available.)

bad doc for CI

continuous_integration/README.md refers to hdfs3, not distributed

Large messages overflow buffer

Our current implementations of read and write fail for large messages, such as can be caused by passing large arrays around. Probably we should segment these into smaller frames and have some sort of protocol to tie successive frames together.

no more wheel on pypi ?

There was a wheel for 1.7.1, but there is no more for 1.7.3.

It would be nice to have it back, unless it's a problem.

How-to for job schedulers (SLURM, Torque, etc..)

Some users will want to launch distributed via job schedulers like SLURM, or Torque, which are commonly found in academic or research institutions. We should build documentation to aid these users on how to best deploy distributed in these settings.

cc @stefanv @pwolfram @rabernat

Notably, there is a bit of a challenge here because we need to communicate to the worker processes where the scheduler process is. We also need the scheduler process to know that it should be the scheduler process and run dscheduler rather than dworker accordingly.

Executor.map should yield results not futures

Executor.map returns a list of Futures, but the standard Executor API suggests that it should return an iterable (e.g. generator) of actual results.

I came upon this when trying to demonstrate that Executors allow swapping of IPython/concurrent.futures and distributed Executors, only to discover that distributed's Executor isn't compatible with the rest.

Reconfigure deployment solutions around Scheduler

Replaced Center with Scheduler

I'm phasing out Center and replacing it with Scheduler, which is now able to replace center completely for normal workflows. Where previously you would have done the following

$ dcenter
distributed.dcenter - INFO - Start center at 192.168.1.141:8787

$ dworker 192.168.1.141:8787
$ dworker 192.168.1.141:8787

>>> e = Executor('192.168.1.141:8787')

The following now suffices:

$ dscheduler
distributed.scheduler - INFO - Start Scheduler at 192.168.1.141:8786

$ dworker 192.168.1.141:8786
$ dworker 192.168.1.141:8786

>>> e = Executor('192.168.1.141:8786')

The scheduler now runs on the cluster, not locally on the client. Workers connect directly to the scheduler rather than registering themselves first with the center.

Deployment solutions

There are various deployment solutions either in existence or in progress

  1. anaconda-cluster plugin, written by @danielfrg
  2. dcluster command, written by @broxtronix
  3. mesos solution, written by @hussainsultan

These should still work today (so far no breaking changes have occurred) but it would be nice to transition them over. Changes here should be simple. Run dscheduler where we used to run dcenter. Switch default port to 8786.

Subscribe to center to learn about new workers

The executor currently has to explicitly call _sync_center to learn of new compute resources. It'd be good if it could listen to the center to learn of updates.

This involves two changes.

  1. Make adding workers to the scheduler more granular so that it's an event, must like task_finished. This will require carefully updating internal state.
  2. Set up the center to publish updates

Recreate remote tracebacks

Currently we send along a list of strings, which is less than pretty. We should improve this because errors are important. Some options:

  1. Construct a traceback-like object (the actual object appears to be inaccessible from pure Python) but hopefully a similarly duck-typed object suffices
  2. Use tblib to construct or to pickle the remote traceback and reraise locally. This is CPython only, but I think I'm ok with that.

I'll probably go with option 2 in the near future unless anyone has other thoughts.

Slow inter-node communication

[ This issue began as a private email discussion between myself and @mrocklin, but we are moving it here so that we can better keep track of it. cc also @shoyer, @freeman-lab, @mrocklin ]

@mrocklin recently suggested that I try dask.imperative (docs, example) for parallelizing complex computations that do not fit in the working memory of my cluster. Distributed should be able to process a dask graph as follows:

my_dask_value.compute(get=my_executor.get)

I tried this and it works great, but I am noticing some surprisingly slow performance when moderately sized results (100 x 2MB image arrays) are collect back from the workers to the executor. The computation itself seems to run very quickly, but this "gather" operation takes longer than expected. I created a notebook here that illustrates this problem.

@mrocklin tried my notebook and got similarly slow performance when using the dask.multiprocessing scheduler, but got much better results when using distributed. He set up his cluster as follows:

$ dcenter
$ dworker 127.0.0.1:8787 --ncores 1  # run this in four separate terminals

His results are here. However, when I tried setting up my cluster in this way, my benchmark for distributed is still clocking in at 1 min 22 seconds, while the threaded result runs in 10 seconds on my laptop.

There must be something else afoot here since @mrocklin is getting much better performance than I am. FWIW, I'm running the distributed master branch just after you merged in your recent scheduler changes (git hash: fad4ce3) on an Apple laptop running OSX, and Anaconda Python 3.5.0.

Support Yarn and Mesos

Previously I've assumed that interacting with Yarn and Mesos would look like asking Yarn/Mesos to spin up a center/scheduler and then spin up N workers.

Now I'm thinking that perhaps the center/scheduler lives outside of Yarn/Mesos and it instead asks Yarn/Mesos for workers.

If we go with the second case then we need to think of an appropriate API like

  • Scheduler asks Yarn/Mesos for new Workers (grow cluster)
  • Scheduler tells Yarn/Mesos to retire old Workers (shrink cluster)
  • Yarn/Mesos tell Scheduler that workers have unexpectedly died

What does such an API look like more concretely? Is there a common abstraction that can be supported by Yarn, Mesos, and possibly other systems like SGE?

cc @hussainsultan @danielfrg @quasiben

`Already reading` error from dscheduler

Okay, so I've set up a fake cluster as per the quickstart:

$ dscheduler
$ dworker 127.0.0.1:8786
$ dworker 127.0.0.1:8786
$ dworker 127.0.0.1:8786

I then run a simple example, that I think should work:

from distributed import Executor
e = Executor('127.0.0.1:8786')
futures = e.map(lambda x: None, range(10))

I then get a whole bunch of errors:

istributed.scheduler - ERROR - Stream is closed
Traceback (most recent call last):
  File "/opt/local/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/distributed-1.7.4-py2.7.egg/distributed/scheduler.py", line 723, in handle_messages
    msg = yield next_message()  # in_queue.get()
  File "/opt/local/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/tornado-4.3-py2.7-macosx-10.11-x86_64.egg/tornado/gen.py", line 1008, in run
    value = future.result()
  File "/opt/local/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/tornado-4.3-py2.7-macosx-10.11-x86_64.egg/tornado/concurrent.py", line 232, in result
    raise_exc_info(self._exc_info)
  File "/opt/local/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/tornado-4.3-py2.7-macosx-10.11-x86_64.egg/tornado/gen.py", line 1014, in run
    yielded = self.gen.throw(*exc_info)
  File "/opt/local/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/distributed-1.7.4-py2.7.egg/distributed/core.py", line 190, in read
    msg = yield stream.read_until(sentinel)
  File "/opt/local/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/tornado-4.3-py2.7-macosx-10.11-x86_64.egg/tornado/gen.py", line 1008, in run
    value = future.result()
  File "/opt/local/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/tornado-4.3-py2.7-macosx-10.11-x86_64.egg/tornado/concurrent.py", line 232, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 3, in raise_exc_info
StreamClosedError: Stream is closed
distributed.scheduler - ERROR - Stream is closed
Traceback (most recent call last):
  File "/opt/local/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/distributed-1.7.4-py2.7.egg/distributed/scheduler.py", line 723, in handle_messages
    msg = yield next_message()  # in_queue.get()
  File "/opt/local/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/tornado-4.3-py2.7-macosx-10.11-x86_64.egg/tornado/gen.py", line 1008, in run
    value = future.result()
  File "/opt/local/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/tornado-4.3-py2.7-macosx-10.11-x86_64.egg/tornado/concurrent.py", line 232, in result
    raise_exc_info(self._exc_info)
  File "/opt/local/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/tornado-4.3-py2.7-macosx-10.11-x86_64.egg/tornado/gen.py", line 282, in wrapper
    yielded = next(result)
  File "/opt/local/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/distributed-1.7.4-py2.7.egg/distributed/core.py", line 190, in read
    msg = yield stream.read_until(sentinel)
  File "/opt/local/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/tornado-4.3-py2.7-macosx-10.11-x86_64.egg/tornado/iostream.py", line 282, in read_until
    self._try_inline_read()
  File "/opt/local/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/tornado-4.3-py2.7-macosx-10.11-x86_64.egg/tornado/iostream.py", line 701, in _try_inline_read
    self._check_closed()
  File "/opt/local/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/tornado-4.3-py2.7-macosx-10.11-x86_64.egg/tornado/iostream.py", line 885, in _check_closed
    raise StreamClosedError(real_error=self.error)
StreamClosedError: Stream is closed
distributed.scheduler - INFO - Close connection to Scheduler, 354bf7d7-ca93-11e5-91ae-acbc32bf2b0d
distributed.core - ERROR - Already reading
Traceback (most recent call last):
  File "/opt/local/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/distributed-1.7.4-py2.7.egg/distributed/core.py", line 164, in handle_stream
    result = yield gen.maybe_future(handler(stream, **msg))
  File "/opt/local/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/tornado-4.3-py2.7-macosx-10.11-x86_64.egg/tornado/gen.py", line 1008, in run
    value = future.result()
  File "/opt/local/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/tornado-4.3-py2.7-macosx-10.11-x86_64.egg/tornado/concurrent.py", line 232, in result
    raise_exc_info(self._exc_info)
  File "/opt/local/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/tornado-4.3-py2.7-macosx-10.11-x86_64.egg/tornado/gen.py", line 1014, in run
    yielded = self.gen.throw(*exc_info)
  File "/opt/local/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/distributed-1.7.4-py2.7.egg/distributed/scheduler.py", line 691, in control_stream
    yield self.handle_messages(stream, stream)
  File "/opt/local/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/tornado-4.3-py2.7-macosx-10.11-x86_64.egg/tornado/gen.py", line 1008, in run
    value = future.result()
  File "/opt/local/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/tornado-4.3-py2.7-macosx-10.11-x86_64.egg/tornado/concurrent.py", line 232, in result
    raise_exc_info(self._exc_info)
  File "/opt/local/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/tornado-4.3-py2.7-macosx-10.11-x86_64.egg/tornado/gen.py", line 1014, in run
    yielded = self.gen.throw(*exc_info)
  File "/opt/local/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/distributed-1.7.4-py2.7.egg/distributed/scheduler.py", line 723, in handle_messages
    msg = yield next_message()  # in_queue.get()
  File "/opt/local/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/tornado-4.3-py2.7-macosx-10.11-x86_64.egg/tornado/gen.py", line 1008, in run
    value = future.result()
  File "/opt/local/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/tornado-4.3-py2.7-macosx-10.11-x86_64.egg/tornado/concurrent.py", line 232, in result
    raise_exc_info(self._exc_info)
  File "/opt/local/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/tornado-4.3-py2.7-macosx-10.11-x86_64.egg/tornado/gen.py", line 282, in wrapper
    yielded = next(result)
  File "/opt/local/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/distributed-1.7.4-py2.7.egg/distributed/core.py", line 190, in read
    msg = yield stream.read_until(sentinel)
  File "/opt/local/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/tornado-4.3-py2.7-macosx-10.11-x86_64.egg/tornado/iostream.py", line 278, in read_until
    future = self._set_read_callback(callback)
  File "/opt/local/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/tornado-4.3-py2.7-macosx-10.11-x86_64.egg/tornado/iostream.py", line 662, in _set_read_callback
    assert self._read_future is None, "Already reading"
AssertionError: Already reading

I've actually used distributed successfully, so I think something is leaving a part of distributed in a bad state.

Change threads/procs at runtime

Sometimes we want workers to use threads or to use processes. Sometimes we want to change this at runtime. We could change this during an Executor.restart event.

Distributed sort

Many important algorithms depend on an efficient distributed sort. This requires all-to-all communication; task scheduling is not the appropriate abstraction here. However, many broader applications that we would like to support (namely dataframe-y type things) need this on occasion so it might make sense to have some ability to switch out to another system as needed.

In dask we did this with partd, which works well off disk on a single machine. Presumably we could do something similar with distributed.

Links:

Dask dataframe does not count last data chunk

The following code demonstrates:

import distributed
ex = distributed.Executor('localhost:8787')
import numpy
data_list = [numpy.random.rand(1000) for _ in range(1000)]
out = ex.scatter(data_list)
import dask.dataframe as dd
import dask
dask.set_options(get=ex.get)
import pandas as pd
dfs = ex.map(lambda x: pd.DataFrame({'x': x}), out)
dsk = {('mydf', i): df for i, df in enumerate(dfs)}
data = dd.DataFrame(dsk, 'mydf', ['x'], [None]*len(dfs))
print(len(data))  # should be 1000000

and I have seen similar behaviour in more typical datasets: it's specifically the data belonging to the last future, dfs[-1] which isn't counted. Contrast with the following, which gives the correct value:
ex.submit(sum, ex.map(len, dfs))

Future Type

It would be nice to be able to inspect the type a future may contain. @mrocklin suggested this is already supported in concurrent.futures

Gather feedback about distributed documentation

I've been rolling through the documentation again. It would be good to gather impressions from people. Both in-depth-analysis and quick glance-overs are welcome and provide valuable feedback.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.