Giter VIP home page Giter VIP logo

mrjob's Introduction

mrjob: the Python MapReduce library

https://github.com/Yelp/mrjob/raw/master/docs/logos/logo_medium.png

mrjob is a Python 2.7/3.4+ package that helps you write and run Hadoop Streaming jobs.

Stable version (v0.7.4) documentation

Development version documentation

https://travis-ci.org/Yelp/mrjob.png

mrjob fully supports Amazon's Elastic MapReduce (EMR) service, which allows you to buy time on a Hadoop cluster on an hourly basis. mrjob has basic support for Google Cloud Dataproc (Dataproc) which allows you to buy time on a Hadoop cluster on a minute-by-minute basis. It also works with your own Hadoop cluster.

Some important features:

  • Run jobs on EMR, Google Cloud Dataproc, your own Hadoop cluster, or locally (for testing).
  • Write multi-step jobs (one map-reduce step feeds into the next)
  • Easily launch Spark jobs on EMR or your own Hadoop cluster
  • Duplicate your production environment inside Hadoop
    • Upload your source tree and put it in your job's $PYTHONPATH
    • Run make and other setup scripts
    • Set environment variables (e.g. $TZ)
    • Easily install python packages from tarballs (EMR only)
    • Setup handled transparently by mrjob.conf config file
  • Automatically interpret error logs
  • SSH tunnel to hadoop job tracker (EMR only)
  • Minimal setup
    • To run on EMR, set $AWS_ACCESS_KEY_ID and $AWS_SECRET_ACCESS_KEY
    • To run on Dataproc, set $GOOGLE_APPLICATION_CREDENTIALS
    • No setup needed to use mrjob on your own Hadoop cluster

Installation

pip install mrjob

As of v0.7.0, Amazon Web Services and Google Cloud Services are optional depedencies. To use these, install with the aws and google targets, respectively. For example:

pip install mrjob[aws]

A Simple Map Reduce Job

Code for this example and more live in mrjob/examples.

"""The classic MapReduce job: count the frequency of words.
"""
from mrjob.job import MRJob
import re

WORD_RE = re.compile(r"[\w']+")


class MRWordFreqCount(MRJob):

    def mapper(self, _, line):
        for word in WORD_RE.findall(line):
            yield (word.lower(), 1)

    def combiner(self, word, counts):
        yield (word, sum(counts))

    def reducer(self, word, counts):
        yield (word, sum(counts))


if __name__ == '__main__':
     MRWordFreqCount.run()

Try It Out!

# locally
python mrjob/examples/mr_word_freq_count.py README.rst > counts
# on EMR
python mrjob/examples/mr_word_freq_count.py README.rst -r emr > counts
# on Dataproc
python mrjob/examples/mr_word_freq_count.py README.rst -r dataproc > counts
# on your Hadoop cluster
python mrjob/examples/mr_word_freq_count.py README.rst -r hadoop > counts

Setting up EMR on Amazon

Setting up Dataproc on Google

Advanced Configuration

To run in other AWS regions, upload your source tree, run make, and use other advanced mrjob features, you'll need to set up mrjob.conf. mrjob looks for its conf file in:

  • The contents of $MRJOB_CONF
  • ~/.mrjob.conf
  • /etc/mrjob.conf

See the mrjob.conf documentation for more information.

Project Links

Reference

More Information

Thanks to Greg Killion (ROMEO ECHO_DELTA) for the logo.

mrjob's People

Contributors

88manpreet avatar alejandro-rivera avatar anusha-r avatar ap-ensighten avatar ashkop avatar bchess avatar bretthoerner avatar cad106uk avatar coyotemarin avatar darthvadar avatar david-cliqz avatar ddehghan avatar drulludanni avatar edschofield avatar ewencp avatar icio avatar irskep avatar jblomo avatar jretz avatar julioninos avatar msabramo avatar mtai avatar philswanson avatar roguelazer avatar shusenliu avatar sudarshang avatar tarnfeld avatar timtadh avatar yalinhuang avatar ymilki avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

mrjob's Issues

Allow DEFAULT_INPUT_PROTOCOL to be None

If an MRJob's DEFAULT_INPUT_PROTOCOL is None, we should read input with the same protocol we use intnerally, just like we do when DEFAULT_OUTPUT_PROTOCOL is None.

This is somewhat esoteric, but it's useful for jobs like mrjob.examples.mr_page_rank that feed back into themselves.

some automated tests require mrjob.conf

These tests won't work without an mrjob.conf set up to work on EMR:

mrjob.tests.emr_test FindProbableCauseOfFailureTestCase
mrjob.tests.emr_test TestLs

Just need to set these up with mock AWS like we do the EMR end-to-end test.

SunOS/tar compatibility issues

LocalMRJobRunner invokes tar to untar archives (specifically mrjob.tar.gz). Apparently there are versions of tar floating around there that don't accept the z (un-gzip) option, such as on this fine machine:

$ uname -a
SunOS apocalypse 5.10 Generic_142900-03 sun4u sparc SUNW,UltraAX-i2

The fix: I should be using Python's tarfile module rather than invoking tar anyway.

job flow pooling

At Yelp, we have lots of people testing out mrjobs on EMR all the time. It takes about 3 minutes to start up a jobflow, so if you're doing a lot of testing, it saves a lot of time to have a jobflow already waiting for you (mrjob.tools.emr.create_job_flow offers a simple way of doing this).

It would be nice if your job could automatically join any currently running job flow (that was set up with the same bootstrap scripts and other options). If there wasn't a job already free in the pool, your job would start one. We could strategically wait to terminate idle job flows until near the end of the hour (since Amazon bills by the full hour).

run alternate versions of python inside Hadoop

We should be able to specify the version of python run inside hadoop with python_bin.

I suppose we could control the version of python used to run the job locally (to call it with --steps) with local_python_bin. Might just be confusing.

Also, we should be able to specify binaries as either strings or lists of arguments (so we can add flags).

add cat() to MRJobRunner (stream compressed output)

cat() would work like hadoop -fs cat, except that it would automatically decompress .gz and .bz2 files.

MRJobRunner.stream_output() would just call cat(). This would allow us to stream compressed output from jobs (which currently we can't do).

allow mrjob to use Java mappers/reducers for some steps

It would be nice to be able to use some of the built-in mappers/reducers from Java for effiency reasons (e.g. org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorReducer).

probably would look something like this:

def steps(self):
    return [self.mr(mapper=..., reducer_java_class='org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorReducer')]

(This is not the same thing as using JARs as steps; see #770 for that.)

Support all URIs

s3n:// is the technically correct way for EMR to refer to files on S3, but nowadays, s3:// does the exact same thing.

(s3:// used to refer to some legacy block format that only HDFS could use; now it's deprecated, and you have to use s3bfs:// instead.)

We should probably allow users to use s3:// or s3n:// as they prefer, and translate either to s3:// for consistency.

LocalMRJobRunner should run multiple mappers/reducers

LocalMRJobRunner should run at least 2 mappers and at least 2 reducers in each step, to catch hidden assumptions in poorly-written jobs.

Depending how much we want to get into managing simultaneous subprocesses, we could turn local mode into sort of the poor man's Hadoop. If we do this, LocalMRJobRunner should respect the mapred.map.tasks and mapred.reduce.tasks jobconf arguments.

hide site_packages from local mode

This mostly comes up when you run scripts that are dependent on a large external source tree (added with python_archives). If a job ends up having hidden dependencies on a library that you don't install on Hadoop/EMR, you don't find out in local mode because locally run scripts have access to site_packages.

It would be nice to have the ability to whitelist certain python packages in local mode and hide site_packages using python -S, to catch problems like this.

7 tests fail if mrjob is not in $PYTHONPATH

tests that invoke python scripts fail when mrjob isn't in our $PYTHONPATH (e.g. when you check mrjob out from github but don't install it). We should patch os.path.abspath('.') into the $PYTHONPATH for the current environment when running these tests.

For now, just keeping the tests out of MANIFEST.in. :)

push changes to mrjob.botoemr back into boto

We made some significant changes to boto's emr code in mrjob:

  • correctly parse new EMR responses that include InstanceGroups
  • parse bootstrap actions
  • reducers are now optional

We should make a fork of boto and submit a pull request.

add support for alternate partitioners

The KeyFieldBasedPartitioner sorts the values passed to a given reducer. This would be really useful for cases where we want to apply one bit of information to several values (for example, we want a reducer to first receive scoring information, and then apply that scoring information to a very large number of documents).

Right now the only way to do this is to re-define job_runner_kwargs() in your MRJob subclass so that the approriate -partitioner argument gets added to hadoop_extra_args. This seems unnecessarily awkward.

Since the choice of partitioner is part of the semantics of the job, MRJob should probably have a DEFAULT_PARTITIONER field you can set.

run scripts written in other languages

It wouldn't actually be that difficult for MRJob to run scripts written in other languages if they implemented the MRJob protocol (--steps, --mapper, --reducer, and --step-num). Instead of prepending python to our command inside Hadoop streaming, we'd prepend ruby or java or (for shell scripts) nothing. We'd probably run them like:

python mrjob.job.MRJob --mr-job-script mr_perform_aweomeness.rb

or alternately:

mrjob mr_perform_awesomeness.rb

The main thing is, I'm not sure there's any demand for such a feature.

Tell you what, you write the base MRJob class in your favorite language and put it up on github, and I'll hook it up to mrjob for you. :)

find_probable_cause_of_failure() is bad at fetching logs

We currently grab EMR logs from S3. This only works for job flows that shut down after running your job. Technically, it's not supposed to work at all; according to (http://developer.amazonwebservices.com/connect/entry.jspa?externalID=3938&categoryID=265), logs aren't copied to S3 until they've been untouched for 5 minutes.

Rather than grabbing the logs from S3 directly, we need to download the relevant logs via ssh if the job flow is still running, and S3 if it's not, and parse the log files locally.

Add "migrating from dumbo" section to docs

Migrating from dumbo (the other MapReduce Python module) should be pretty easy because its mappers and reducers have the same function signature.

Would be great to have some input from someone who actually uses dumbo so we're not just making stuff up. :)

add compact stack trace

We have something at Yelp that would turn a stack trace into something small enough to fit into a counter. It doesn't quite work properly on EMR because of all the crazy symlinks, but once we get it working, we should make it part of mrjob.

add S3 tmpwatch

We should add a script that deletes all files older than 30 days in your mrjob temp directory (with an exception for files belonging to currently running or pending steps).

We already have script like this at Yelp; we just need to port it over.

create "mrjob-" scratch bucket lazily

Currently, whenever you instantiate an EMRJobRunner, it may create a bucket whose name starts with mrjob- on S3. This isn't really necessary if you're just using EMRJobRunner as a utility to talk to EMR and S3, and not actually running a job.

We shouldn't actually create the bucket until we need it to run the job.

EMRJobRunner should auto-infer region from bucket

Currently, when doing the quick setup for mrjob on EMR, you have to create a bucket in the "US standard" region. Users should be able to create a bucket on any region, and we should query the bucket to find out what region it lives on.

add a script to emr.tools to audit EMR usage

We should make a script that can look at your last 2 weeks of EMR usage (i.e. all that's easily available) and tell you:

  • how many instance hours of each kind of instance you used
  • which users used the most instance hours
  • which jobs used the most instance hours (for recurring jobs)
  • if any money was wasted on idle job flows

I'm going to measure these in EC2 "Compute Units" rather than dollars because any price the script gives will almost certainly be slightly wrong; if Open Source libraries had liability issues, this would be one. :)

See:

http://aws.amazon.com/ec2/instance-types/

Clean up --cleanup

Cleanup is a littie bit hacky (especially the SCRATCH option). We really need the option to clean up:

  • local temp dirs
  • remote temp dirs
  • logs

and do different things when the job is successful, and when it fails.

use cat when no mapper specified

A common way to write a multi-step job is map, reduce, reduce, reduce, reduce...

Hadoop streaming requires a mapper, so currently each reduce-only step involves sending every bit of data through the identity mapper. If a mapper has the same input and output protocol (i.e. it's not the first step), we should use org.apache.hadoop.mapred.lib.IdentityMapper rather than running the mr script.

support alternate job owners

This is a feature that Yelp specifically requested:

We should be able to run mr_jobs with a --job-owner switch that changes the username in the job name.

So for example, if the ads team has a job that's run as the user batch, they could specify --job-owner=ads and the job name would start with mr_ads_script.ads rather than mr_ads_script.batch

Add "Recipes" to docs and/or Wiki

Would be nice to have a section of specific ways to make your mrjob setup nicer. For example:

  • using simplejson 2.0.9+ on EMR
  • making your source tree available to your jobs
  • using persistent job flows
  • increasing the timeout in mrjob

It might make more sense to put this on the github wiki, and just reference it in the docs.

pass through other switches with --steps

We currently just run mr_your_script.py --steps to get information about how many mapper and reducer steps there are. We should pass other options through to --steps so that it's possible for steps() to dynamically choose which steps to run based on command-line options.

thrashing detection

Jobs should be able to detect when they've been moved into swap space, and issue a warning and/or kill themselves.

Not sure the best way to handle this, but thrashing jobs are a major source of expense for Yelp.

create/find a Berkeley DB replacement

Berkeley DB isn't a forwards-compatible format, so it's quite possible that Hadoop/EMR won't be able to read Berkeley DBs created on your local machine.

SQLite doesn't have this problem because SQLite 3 is a defined format. But SQLite is much more powerful; sometimes we just want a hash table.

This doesn't belong in mrjob, but if someone makes or already has made a file-backed hash table that we can safely copy to EMR/Hadoop, we should point to it.

automatically tarball directories?

We have a python_archives option which allows you to upload a tarball and stick it in the $PYTHONPATH. It seems kind of silly, but it would probably be helpful to people if we would automatically tar up directories for them.

We probably want to automatically remove stray editor/MacFuse crud (~, .#, ._*) like we do when bootstrapping mrjob.

Not going to do this until someone asks for it. :)

parse and report job failure due to steps timing out

By default, mappers/reducers time out if they produce no output for 10 minutes. If this happens four times for the same mapper/reducer, the job will be killed.

Errors are found in jobs/*.jar, and look like:

Task TASKID="task_201010202309_0001_m_000153" TASK_TYPE="MAP" TASK_STATUS="FAILED" FINISH_TIME="1287618918658" ERROR="Task attempt_201010202309_0001_m_000153_3 failed to report status for 602 seconds. Killing!" 

It might also be helpful for the error message to suggest that people set --jobconf mapred.task.timeout=... to an appropriate number of milliseconds.

merge CHANGES.txt and debian/changelog

Right now we need to make debian packages of mrjob for Yelp.

debian uses the top line of debian/changelog to determine the package version, but it follows a very strict format. We should make sure CHANGES.txt is in that format, and make debian/changelog a symlink to it.

need better testing for running without YAML

We need tests that monkey-patch mrjob.conf.yaml, so we can make sure that configuration parsing works without YAML installed. Found a bug in hand-testing that only happened without YAML installed.

add_file_option() doesn't work with --steps

Beh, I forgot to pass through the local option from MRJobRunner._mr_job_extra_args() to MRJobRunner._get_file_upload_args(), and as a result, we don't pass the entire path for file upload options with --steps.

MRJob.steps() rarely actually needs to look at these files, so for now, the workaround is to use MRJob.is_mapper_or_reducer(), and not actually try to load files unless we're in a mapper or reducer.

I've got a fix for this in the development branch, but I need to write a regression test for it.

Windows compatibility issues

Ran into two issues running mrjob with Active Python 2.6 on Windows:

  • time.tzset() doesn't exist
    • How to fix: only run time.tzset() if it exists
  • os.symlink() doesn't exist (nor do symlinks in general)
    • How to fix: copy support files rather than symlinking, use absolute paths for input files (since they're likely to be large enough that copying them would waste time)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.