Giter VIP home page Giter VIP logo

disco's Introduction

Disco - Massive data, Minimal code

Disco Logo

Disco is a distributed map-reduce and big-data framework. Like the original framework, which was publicized by Google, Disco supports parallel computations over large data sets on an unreliable cluster of computers. This makes it a perfect tool for analyzing and processing large datasets without having to bother about difficult technical questions related to distributed computing, such as communication protocols, load balancing, locking, job scheduling or fault tolerance, all of which are taken care by Disco.

Writing a Disco job is very simple. For example, the following job counts the number of words in a document:

from disco.core import Job, result_iterator

def map(line, params):
    for word in line.split():
        yield word, 1

def reduce(iter, params):
    from disco.util import kvgroup
    for word, counts in kvgroup(sorted(iter)):
        yield word, sum(counts)

if __name__ == '__main__':
    input = ["http://discoproject.org/media/text/chekhov.txt"]
    job = Job().run(input=input, map=map, reduce=reduce)
    for word, count in result_iterator(job.wait()):
        print word, count

Note: For installing Disco, you cannot use the zip or tar.gz packages generated by github, instead you should clone this repository.

The develop branch contains the newest features and is not recommended for use in production. The master branch is the latest stable release and is tested in production. Important bug fixes will be first merged into the develop branch and then backported into the master branch.

Disco integrates with a lot of different tools. The following screenshot, for example, shows using ipython notebook to write a Disco job and using matplotlib to plot the results: ipython example

To learn more about the Disco Ecosystem see Disco Integrations. For some other resources, check out the Talks on Disco. Visit [discoproject.org] (http://discoproject.org) for more information.

Build Status: Travis-CI :: Travis-CI

disco's People

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

disco's Issues

Worker behaves badly if it runs out of file descriptors

You can easily make the worker run out of file descriptors e.g. by setting nr_reduces = 1000. This may result e.g. to spurious DNS failures, since no new sockets can be opened.

Disco should limit the number of fds used in a worker process, at least to make the error messages more descriptive.

Rate limit is not applied for all messages

archwild reported on IRC that a Java process can cause Erlang to consume 100% of CPU, if a misbehaving external process produces masses of output to stderr.

This should be easy to fix by applying rate limit to all types of messages, not just .

Homedisco doesn't work with Disco 0.2?

22:54 < neal_> hi
22:55 < neal_> when i try to run homedisco i get the following error
22:55 < neal_> Traceback (most recent call last):
22:55 < neal_> File "wordcount.py", line 1, in
22:55 < neal_> from homedisco import HomeDisco
22:55 < neal_> File "/home/neal/school/seng474/frequent_itemsets/cfim/homedisco.py", line 4, in

22:55 < neal_> from disconode import disco_worker
22:55 < neal_> File "/usr/local/lib/python2.6/dist-packages/disconode/disco_worker.py", line 504, in

22:55 < neal_> init()
22:55 < neal_> File "/usr/local/lib/python2.6/dist-packages/disconode/disco_worker.py", line 64, in
init
22:55 < neal_> OOB_URL = ("http://%s/disco/ctrl/oob_get?" % this_master())
22:55 < neal_> File "/usr/local/lib/python2.6/dist-packages/disconode/disco_worker.py", line 46, in
this_master
22:55 < neal_> return sys.argv[4].split("/")[2]

Kill job doesn't always kill external processes

archwild reported on the IRC channel that Java processes lauched via the external interface sometimes don't die when the job is killed. They don't seem to die with pkill either.

Test Java with external interface.

disco-worker IO exception handling fails

disco-worker catches exceptions with the following code:
try:
run(method, mode, part, m)
except comm.CommException, x:
util.data_err("HTTP error: %s" % x)
except IOError, x:
util.data_err("IO error: %s" % x)

The calls to util.data_err fail because it requires 2 parameters.

Job fails due to a temporary glusterfs error

Job fails fatally due to a glusterfs error (timeout), even though the error should be recoverable.

Gluster logs (client):

[2009-08-12 15:32:09] E [client-protocol.c:437:client_ping_timer_expired] dx26-vol1: Server 172.16.1.27:9900 has not responded in the last 60 seconds, disconnecting.

Server:
[2009-08-12 18:44:41] E [server-protocol.c:3903:server_readv] vol1: invalid argument: state->fd
[2009-08-12 18:44:41] W [server-protocol.c:4210:server_fstat] server: fd - 1: unresolved fd
[2009-08-12 18:44:41] E [server-protocol.c:5763:server_finodelk] server: fd - 1: unresolved fd

Disco:

Worker failed. Last words:
Traceback (most recent call last):
File "/usr/bin/disco-worker", line 69, in
method(m)
File "/var/lib/python-support/python2.5/disconode/disco_worker.py", line 392, in op_map
run_map(job_input[0], partitions, map_params)
File "/var/lib/python-support/python2.5/disconode/disco_worker.py", line 327, in run_map
for entry in reader:
File "/home/tuulos/src/disco/pydisco/disco/func.py", line 60, in netstr_reader
File "/home/tuulos/src/disco/pydisco/disco/func.py", line 40, in read_netstr
IOError: [Errno 107] Transport endpoint is not connected
close failed: [Errno 107] Transport endpoint is not connected

Automatic distribution of simple modules

Disco can copy required files and libraries to nodes when using the external interface. It should be possible to use this feature also with native Python code so that simple modules etc. could packaged in a job request and distributed automatically to nodes.

Separate webpages for status and configuration

Instead of having status & configuration on the same webpage, it would be useful to separate them into 2 separate pages. The configuration can then be password protected and accessible only by the cluster administrator.

Untitled

OK, I have sovled this problem

Reduce doesn't fail nicely if HTTP server down

When running tests map tasks execute ok even if lighttpd is down on a node. However, reduce fails since it can't access the map results. At least with comm_httplib this results to a cryptic error message - at least it could say nicely what's wrong.

Stopping master should stop epmd too

/usr/local/lib/erlang/erts-5.7.2/bin/epmd -daemon
This one caused me a headache as I wasn't able to restart the master. The epmd process was still running and prevented master from starting up.

disco using foo instead of foo.local on macosx

On macosx, disco-worker gets a master_url in sys.argv[4] that on macosx does not use localhost if you're using a single host but rather the name part of 'foo.local' (ie 'foo'), the alternative host naming scheme. Since this doesn't resolve disco fails with "WARN [map:0] Failed to get http://foo:7000/disco/master/_disco_4441/2c/wordcount@1251260554//params: (2, 'Temporary failure in name resolution')"

To fix I hack node/disco-worker with:
master_url = master_url.replace('foo', 'localhost')

Missing erl on node crashes job

One of my nodes did not have /etc/environment pointing to the bin directory where the "erl" command was installed. When that node was configured to accept jobs on the master, the entire job would die.

It be much better if the master would just redirect to another node.

Disco 0.2.3 master can't connect to remote Amazon EC2 nodes

Running Disco 0.2.3 source distribution on ami-2946a740 (Debian Squeeze AMI from http://alestic.com/), with one master node and one remote slave, I was unable to run a job successfully. I went through all of the steps on the Troubleshooting Disco page, and found that the slave:start test failed with an {error, timeout} result. I tried a number of different flavors of node names (ec2 internal name, short names defined in /etc/hosts), and met with the same results. I was successfully able to run slave:start on the slave node to start a slave locally, but the same command from the master failed.

Unable to run disco master and slave on same machine in 0.2.3

While experimenting with 0.2.3, I found I couldn't run both
disco master start
and
disco worker start
at the same time. In particular, when starting the worker, I got a message to the effect of "lighttpd is already running with worker config". However, the only lighttpd process running was that with the master config file.

I also noticed that the MASTER_DISCO_PORT config parameter has been removed in 0.2.3, which I suspect is related. Does the master automatically start a worker as well? Or is there some other way of running both a master and slave on the same node?

connection timeout and erl crash

Hi, I have configure a cluster with two computers, xiliu-fedora (master) and xiliu-public (worker). Both are running Fedora 11, python 2.6.3. However, I did alway get following timeout errors, and on the worker node, erl is crash, and throws erl_crash.dump


2009/10/08 19:20:36master

map:0 added to waitlist

2009/10/08 19:20:36xiliu-public

WARN: [map:0] Node failure: "Couldn't connect to xiliu-public (timeout). Node blacklisted temporarily."

2009/10/08 19:20:04master

map:0 assigned to xiliu-public

2009/10/08 19:20:04master

map:0 added to waitlist

2009/10/08 19:20:04master

Map phase

2009/10/08 19:20:04master

Starting job

2009/10/08 19:20:04master

New job!


[xiliu@xiliu-public ~]$ tail -f erl_crash.dump
timeout
infinity
fun
''
'$end_of_table'
'nonode@nohost'
'_'
true
false
=end
^C

make-lighttpd-proxyconf.py parses comments

We have commented out lines of host names and IPs in our /etc/hosts that we'd rather not remove at this time. However, make-lighttpd-proxyconf.py still parses those old and invalid lines. Modifying the script to the code below fixes the problem.

Thanks!

    #!/usr/bin/python
    import os, re

    port = os.environ["DISCO_PORT"]
    print "proxy.server = ("
    r = re.compile("^(\d+\.\d+\.\d+\.\d+)\s+(.*)", re.MULTILINE)
    for x in re.finditer(r, file("/etc/hosts").read()):
            ip, host = x.groups()
            print '"/disco/node/%s/" => (("host" => "%s", "port" => %s)),' %\
                    (host, ip, port)
    print ")"

Sleep before re-trying a failed task

If a task fails due to e.g. a temporarily unavailable input, it should wait for some time before re-trying. Otherwise it can easily reach the maximum number of failed tasks in a few seconds.

Disco 0.2.3 doesn't work with Amazon EC2 instructions

Disco 0.2.3 depends on erlang-base 13b, which Debian Lenny doesn't supply. The image "ami-e69d798f" is a lenny install, so it doesn't have the required packages.

Furthermore, setup-instances.py overrides /etc/apt/sources.list to only use discoproject and lenny repos, so it reinforces the same issue.

Per-node task queues

The current scheduler is suboptimal when the cluster is fully loaded. Whenever a task finishes on any node, the next task is scheduled on its place, regardless where its input data is located.

A node-specific task queue would solve this problem by pre-sorting tasks to nodes according to locations of input files.

Homedisco "can't access local input file" errors

In the git source, the homedisco example (at the bottom of util/homedisco.py) fails to run with the following error:

sqs2 ~/src/disco: python util/homedisco.py
*[09/01/02 00:47:19 none ()] Received a new map job!
*
[09/01/02 00:47:19 none ()] Done: 3 entries mapped in total
*[09/01/02 00:47:19 none ()] 0 chunk://localhost/homedisco@1230878839/map-chunk-0
*
[09/01/02 00:47:19 none ()] Received a new reduce job!
*[09/01/02 00:47:19 none ()] Starting reduce
connect_input(fname=chunkfile://data/homedisco@1230878839/map-chunk-0)
Traceback (most recent call last):
File "/Users/sqs/src/disco/node/disconode/disco_worker.py", line 39, in open_local
f = file(fname)
IOError: [Errno 2] No such file or directory: 'data/homedisco@1230878839/map-chunk-0'
None
*
[09/01/02 00:47:19 none (chunkfile://data/homedisco@1230878839/map-chunk-0)] Can't access a local input file: chunkfile://data/homedisco@1230878839/map-chunk-0
Traceback (most recent call last):
File "util/homedisco.py", line 78, in
reduce = fun_reduce)
File "util/homedisco.py", line 44, in new_job
disco_worker.op_reduce(req)
File "/Users/sqs/src/disco/node/disconode/disco_worker.py", line 430, in op_reduce
fun_reduce(red_in.iter(), red_out, red_params)
File "util/homedisco.py", line 60, in fun_reduce
for k, v in iter:
File "/Users/sqs/src/disco/node/disconode/disco_worker.py", line 285, in multi_file_iterator
sze, fd = connect_input(fname)
File "/Users/sqs/src/disco/node/disconode/disco_worker.py", line 131, in connect_input
return open_local(input, local_file, is_chunk)
File "/Users/sqs/src/disco/node/disconode/disco_worker.py", line 50, in open_local
% input, input)
File "/Users/sqs/src/disco/node/disconode/disco_worker.py", line 39, in open_local
f = file(fname)
IOError: [Errno 2] No such file or directory: 'data/homedisco@1230878839/map-chunk-0'

It appears that the open_local path is incorrectly determining the filename from the chunkfile:// URI it is given. It does not prepend the value of the DISCO_ROOT environment variable as it should.

Result_iterator also tries to load the result from a relative path when it should be applying DISCO_ROOT to the beginning. It fails with this error if only the open_local issue is fixed:

*[09/01/02 00:55:54 none ()] Received a new map job!
*
[09/01/02 00:55:54 none ()] Done: 3 entries mapped in total
*[09/01/02 00:55:54 none ()] 0 chunk://localhost/homedisco@1230879354/map-chunk-0
*
[09/01/02 00:55:54 none ()] Received a new reduce job!
*[09/01/02 00:55:54 none ()] Starting reduce
connect_input(fname=chunkfile://data/homedisco@1230879354/map-chunk-0)
*
[09/01/02 00:55:54 none ()] Reduce done: 3 entries reduced in total
*[09/01/02 00:55:54 none ()] Reduce done
*
[09/01/02 00:55:54 none ()] 0 disco://localhost/homedisco@1230879354/reduce-disco-0
['file://data/homedisco@1230879354/reduce-disco-0']
Traceback (most recent call last):
File "util/homedisco.py", line 80, in
for k, v in result_iterator(res):
File "build/bdist.macosx-10.5-i386/egg/disco/core.py", line 261, in result_iterator
IOError: [Errno 2] No such file or directory: 'data/homedisco@1230879354/reduce-disco-0'

After applying this patch, the correct output is returned:

sqs2 ~/src/disco: python util/homedisco.py
*[09/01/02 00:57:57 none ()] Received a new map job!
*
[09/01/02 00:57:57 none ()] Done: 3 entries mapped in total
*[09/01/02 00:57:57 none ()] 0 chunk://localhost/homedisco@1230879477/map-chunk-0
*
[09/01/02 00:57:57 none ()] Received a new reduce job!
*[09/01/02 00:57:57 none ()] Starting reduce
*
[09/01/02 00:57:57 none ()] Reduce done: 3 entries reduced in total
*[09/01/02 00:57:57 none ()] Reduce done
*
[09/01/02 00:57:57 none ()] 0 disco://localhost/homedisco@1230879477/reduce-disco-0
KEY red:dog VALUE dog
KEY red:cat VALUE cat
KEY red:possum VALUE possum

The patch also fixes the problem for a custom HomeDisco job I wrote, but there's no test suite for me to determine whether it is correct in all cases. Specifically, it does not appear to introduces issues when running remote jobs (i.e., not through HomeDisco), but I can't guarantee anything. Also, there may be a better way of doing this. (I saw that the LOCAL_PATH env var exists, but it already has "/data" at the end, and the filenames we are appending to $DISCO_ROOT have "/data" at the beginning, so using LOCAL_PATH would result in an incorrect "/data/data".)

Connection reset when using loop

If you use the sample count_words.py code in the documentation and you wrap the new job call in a loop (sleeping 30 seconds after completion), then the 2nd call will always fail (at least on osx) with a socket.error: (54, 'Connection reset by peer')

If you don't use the loop and call the script manually repeatedly, no such error occurs. Seems to me that disco is not closing connections properly after finishing a job.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.