Giter VIP home page Giter VIP logo

parkour's People

Contributors

josephwilk avatar llasram avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

parkour's Issues

Seeing data-reader mapping error on clojure 1.6

Hey,

I'm trying to get parkour running on our internal data system, and I'm getting an error that I can't really diagnose (as it claims to have been fixed in clojure 1.5 as clj-1034). Any ideas?

Here is the error:

clojure.lang.ExceptionInfo: Conflicting data-reader mapping {:url #<URL jar:file:/prod-analytics-0.1.0-SNAPSHOT-standalone.jar!/data_readers.clj>, :conflict hadoop.conf/configuration, :mappings {parkour/dval #'parkour.io.dval/dval-reader, parkour/dcpath #'parkour.io.dval/dcpath-reader, java.net/uri #'parkour.fs/uri, hadoop.mapreduce/job #'parkour.mapreduce/job, hadoop.fs/path #'parkour.fs/path, hadoop.conf/configuration #'parkour.conf/configuration}}
        at clojure.core$ex_info.invoke(core.clj:4227)
        at clojure.core$load_data_reader_file$fn__6356.invoke(core.clj:6671)
        at clojure.core.protocols$fn__5871.invoke(protocols.clj:76)
        at clojure.core.protocols$fn__5828$G__5823__5841.invoke(protocols.clj:13)
        at clojure.core$reduce.invoke(core.clj:6030)
        at clojure.core$load_data_reader_file.invoke(core.clj:6664)
        at clojure.core.protocols$fn__5883.invoke(protocols.clj:128)
        at clojure.core.protocols$fn__5854$G__5849__5863.invoke(protocols.clj:19)
        at clojure.core.protocols$seq_reduce.invoke(protocols.clj:31)
        at clojure.core.protocols$fn__5877.invoke(protocols.clj:48)
        at clojure.core.protocols$fn__5828$G__5823__5841.invoke(protocols.clj:13)
        at clojure.core$reduce.invoke(core.clj:6030)
        at clojure.core$load_data_readers$fn__6360.invoke(core.clj:6683)
        at clojure.lang.AFn.applyToHelper(AFn.java:161)
        at clojure.lang.AFn.applyTo(AFn.java:151)
        at clojure.lang.Var.alterRoot(Var.java:336)
        at clojure.core$alter_var_root.doInvoke(core.clj:4839)
        at clojure.lang.RestFn.invoke(RestFn.java:425)
        at clojure.core$load_data_readers.invoke(core.clj:6680)
        at clojure.core$fn__6363.invoke(core.clj:6686)
        at clojure.core__init.load(Unknown Source)
        at clojure.core__init.<clinit>(Unknown Source)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:247)
        at clojure.lang.RT.loadClassForName(RT.java:2056)
        at clojure.lang.RT.load(RT.java:419)
        at clojure.lang.RT.load(RT.java:400)
        at clojure.lang.RT.doInit(RT.java:436)
        at clojure.lang.RT.<clinit>(RT.java:318)
        at clojure.lang.Namespace.<init>(Namespace.java:34)
        at clojure.lang.Namespace.findOrCreate(Namespace.java:176)
        at clojure.lang.Var.internPrivate(Var.java:163)
        at prod_analytics.core.<clinit>(Unknown Source)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:247)
        at org.apache.hadoop.util.RunJar.main(RunJar.java:201)
Exception in thread "main" java.lang.ExceptionInInitializerError
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:247)
        at clojure.lang.RT.loadClassForName(RT.java:2056)
        at clojure.lang.RT.load(RT.java:419)
        at clojure.lang.RT.load(RT.java:400)
        at clojure.lang.RT.doInit(RT.java:436)
        at clojure.lang.RT.<clinit>(RT.java:318)
        at clojure.lang.Namespace.<init>(Namespace.java:34)
        at clojure.lang.Namespace.findOrCreate(Namespace.java:176)
        at clojure.lang.Var.internPrivate(Var.java:163)
        at prod_analytics.core.<clinit>(Unknown Source)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:247)
        at org.apache.hadoop.util.RunJar.main(RunJar.java:201)

and here is my project.clj:

(defproject prod-analytics "0.1.0-SNAPSHOT"
  :url ""
  :license {:name "Eclipse Public License"
            :url "http://www.eclipse.org/legal/epl-v10.html"}
  :dependencies [[org.clojure/clojure "1.6.0"]
                 [clojure-csv/clojure-csv "2.0.1"]
                 [org.clojure/algo.generic "0.1.2"]
                 [com.damballa/parkour "0.6.1"]
                 [org.apache.avro/avro "1.7.5"]
                 [org.apache.avro/avro-mapred "1.7.5"
                  :classifier "hadoop2"]
                 [org.codehaus.jsr166-mirror/jsr166y "1.7.0"]
                 ]
  :global-vars {*warn-on-reflection* true}
  :exclusions [org.apache.hadoop/hadoop-core
               org.apache.hadoop/hadoop-common
               org.apache.hadoop/hadoop-hdfs
               org.slf4j/slf4j-api org.slf4j/slf4j-log4j12 log4j
               org.apache.avro/avro
               org.apache.avro/avro-mapred
               org.apache.avro/avro-ipc]

  :repositories [["conjars" "http://conjars.org/repo"]
                ["cloudera" "https://repository.cloudera.com/content/repositories/releases"]]

  :main prod-analytics.core
  :profiles {:provided
             {:dependencies
              [[org.apache.hadoop/hadoop-client "2.0.0-mr1-cdh4.2.0"]
               [org.apache.hadoop/hadoop-core "2.0.0-mr1-cdh4.2.0"] 
               [org.apache.hadoop/hadoop-common "2.0.0-cdh4.2.0"]
               [org.slf4j/slf4j-api "1.6.1"]
               [org.slf4j/slf4j-log4j12 "1.6.1"]
               [log4j/log4j "1.2.17"]]}
             :aot {:aot :all, :compile-path "target/aot/classes"}
             :uberjar [:aot]
             :jobjar [:aot]})

Getting log output from mappers when using conf/local-mr!

Apologies if this is a general hadoop question rather than something parkour-specific, but I'm finding it pretty much impossible to get any log output (stdout / stderr etc) from my mappers/reducers when running parkour jobs from the REPL in local mode.

I've tried manually setting various config properties, e.g.

  (conf/assoc!
   (conf/local-mr!)
   "yarn.log.dir" "/my/log/dir"
   "yarn.nodemanager.log-dirs" "/my/log/dir")

But no joy. No logs anywhere under /usr/local/hadoop (where I have the distribution installed) either.

Is there something obvious I could be missing here? Is there anything about the way that parkour starts these jobs which would prevent them from logging / any special prodding which might be needed?

Doc feedback

Hello

Just a small (and selfish, feel free to ignore!) doc request / bit of feedback.

At the moment the library seems very much geared towards people who've already done hadoop the hard way, understand the pain points and want a higher-level DSL which abstracts over them a bit more.

Which is a perfectly valid aim, but I feel like it could also be a bit more accessible to those starting out with hadoop too, with a little more motivating "big picture" documentation along the lines of: here's how you do things directly with hadoop, here's why that's painful, here's how the higher-level constructs in this library help and how they translate to and from the lower-level stuff which you can read about elsewhere.

The docs already do a good job of outlining this in places, although I'm thinking about the parkour.graph stuff in particular -- here there's (what looks to a newbie like) a fair amount of magic introduced, and it's not quite clear how the chains of parkour.graph calls in the examples translate into mapreduce jobs.

Is there a less magical direct way to set up a single mapreduce job with a given mapper and/or reducer using this library, if I want to walk before I run and do things very explicitly for the sake of understanding the lower level to motivate understanding of some of the higher-level APIs and the pain points they're addressing?

Realise that I could do this directly with the hadoop java API, and maybe this is the only way to true enlightenment. But I very much like some of the features of this library like the REPL, idiomatic-clojureness and ease of testing, and it'd be nice to benefit from these while gradually easing into using more abstractions on top of the native hadoop concepts. Which I'm sure I can, just can't see the forest for the trees at the moment.

I'll see how I get on anyway -- keep up the good work!
Cheers
-Matt

Use existing output directory

Hi,

I need to be able to run an incremental job that will update a folder on hdfs, so have to be able to reuse the output folder. I've seen this solution from http://stackoverflow.com/questions/7713316/how-to-overwrite-reuse-the-exisitng-output-path-for-hadoop-jobs-again-and-agin :

public class OverwriteTestOutputFormat<K, V> extends TextOutputFormat<K, V>
{
      public RecordWriter<K, V> 
      getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException 
      {
          Configuration conf = job.getConfiguration();
          boolean isCompressed = getCompressOutput(job);
          String keyValueSeparator= conf.get("mapred.textoutputformat.separator","\t");
          CompressionCodec codec = null;
          String extension = "";
          if (isCompressed) 
          {
              Class<? extends CompressionCodec> codecClass = 
                      getOutputCompressorClass(job, GzipCodec.class);
              codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
              extension = codec.getDefaultExtension();
          }
          Path file = getDefaultWorkFile(job, extension);
          FileSystem fs = file.getFileSystem(conf);
          FSDataOutputStream fileOut = fs.create(file, true);
          if (!isCompressed) 
          {
              return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
          } 
          else 
          {
              return new LineRecordWriter<K, V>(new DataOutputStream(codec.createOutputStream(fileOut)),keyValueSeparator);
          }
      }
}

the FSDataOutputStream fileOut = fs.create(file, true); sets overwrite to true

I'm using ::mr/sink-as dux/prefix-keys and I've hunted arround the code and it seems to be in parkour.remote.dux namespace. Could you give me a hint?

Thank you very much,

Stefan

hadoop 2.6 compile failed

Same codes works well on hadoop2.4, but when update to hadoop2.6 got the following Error Message:

15/12/30 11:58:56 ERROR parkour.tool: Uncaught exception: clojure.lang.ExceptionInfo: Job tl-crawler.2015-12-22[1/1] failed. {:jname "tl-crawler.2015-12-22[1/1]"}
clojure.lang.ExceptionInfo: Job tl-crawler.2015-12-22[1/1] failed. {:jname "tl-crawler.2015-12-22[1/1]"}
    at clojure.core$ex_info.invoke(core.clj:4327)
    at clojure.lang.AFn.applyToHelper(AFn.java:163)
    at clojure.lang.AFn.applyTo(AFn.java:151)
    at clojure.core$apply.invoke(core.clj:617)
    at parkour.graph$eval12110$fn__12111$fn__12112.doInvoke(graph.clj:368)
    at clojure.lang.RestFn.invoke(RestFn.java:397)
    at clojure.lang.AFn.applyToHelper(AFn.java:159)
    at clojure.lang.RestFn.applyTo(RestFn.java:132)
    at clojure.core$apply.invoke(core.clj:617)
    at parkour.graph$graph_delay$fn__11958.invoke(graph.clj:33)
    at clojure.lang.Delay.deref(Delay.java:33)
    at clojure.core$deref.invoke(core.clj:2128)
    at clojure.core$map$fn__6268.invoke(core.clj:2485)
    at clojure.lang.LazySeq.sval(LazySeq.java:42)
    at clojure.lang.LazySeq.seq(LazySeq.java:60)
    at clojure.lang.RT.seq(RT.java:484)
    at clojure.core$seq.invoke(core.clj:133)
    at clojure.core$apply.invoke(core.clj:617)
    at parkour.graph$graph_delay$fn__11958.invoke(graph.clj:33)
    at clojure.lang.Delay.deref(Delay.java:33)
    at clojure.core$deref.invoke(core.clj:2128)
    at clojure.core$map$fn__6268.invoke(core.clj:2485)
    at clojure.lang.LazySeq.sval(LazySeq.java:42)
    at clojure.lang.LazySeq.seq(LazySeq.java:60)
    at clojure.lang.RT.seq(RT.java:484)
    at clojure.core$seq.invoke(core.clj:133)
    at clojure.core$apply.invoke(core.clj:617)
    at parkour.graph$graph_delay$fn__11958.invoke(graph.clj:33)
    at clojure.lang.Delay.deref(Delay.java:33)
    at clojure.core$deref.invoke(core.clj:2128)
    at parkour.graph$run_graph.invoke(graph.clj:63)
    at parkour.graph$execute.invoke(graph.clj:426)
    at tl_emr.daily.shopstyle.term_total_category$task.invoke(term_total_category.clj:81)
    at tl_emr.daily.shopstyle.term_total_category$tool.doInvoke(term_total_category.clj:88)
    at clojure.lang.RestFn.applyTo(RestFn.java:139)
    at clojure.core$apply.invoke(core.clj:619)
    at parkour.tool.ParkourTool$fn__2383.invoke(tool.clj:33)
    at parkour.tool$integral_STAR_.invoke(tool.clj:12)
    at parkour.tool.ParkourTool.run(tool.clj:31)
    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
    at parkour.tool$run.invoke(tool.clj:48)
    at tl_emr.daily.shopstyle.term_total_category$_main.doInvoke(term_total_category.clj:91)
    at clojure.lang.RestFn.invoke(RestFn.java:408)
    at clojure.lang.Var.invoke(Var.java:415)
    at clojure.lang.AFn.applyToHelper(AFn.java:161)
    at clojure.lang.Var.applyTo(Var.java:532)
    at clojure.core$apply.invoke(core.clj:617)
    at clojure.main$main_opt.invoke(main.clj:335)
    at clojure.main$main.doInvoke(main.clj:440)
    at clojure.lang.RestFn.invoke(RestFn.java:436)
    at clojure.lang.Var.invoke(Var.java:423)
    at clojure.lang.AFn.applyToHelper(AFn.java:167)
    at clojure.lang.Var.applyTo(Var.java:532)
    at clojure.main.main(main.java:37)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:136)

term_total_category.clj:81,88,91 looks well, not relate code for error.

My project.clj

:dependencies [[org.clojure/clojure "1.5.1"]
                 [com.damballa/parkour "0.5.4"]
                 [org.apache.avro/avro "1.7.5"]
                 [org.apache.avro/avro-mapred "1.7.5"
                  :classifier "hadoop2"]
                 [transduce/transduce "0.1.1"]
                 [clj-aws-s3 "0.3.10"]
                 [clj-json "0.5.3"]
                 [hadoop-lzo "0.4.15"]
                 ]
:profiles {:provided
             {:dependencies [[org.apache.hadoop/hadoop-client "2.2.0"]
                             [org.apache.hadoop/hadoop-common "2.2.0"]
                             [org.slf4j/slf4j-api "1.6.1"]
                             [org.slf4j/slf4j-log4j12 "1.6.1"]
                             [log4j "1.2.17"]]}
             :test {:resource-paths ["test-resources"]}
             :aot {:aot :all, :compile-path "target/aot/classes"}
             :uberjar {:aot :all}
             :jobjar [:aot]})

The error messages is very wried. I can't find out where should be fixed.

leiningen version is 2.5.3
jdk is 1.7.0_91

Need mapper to access local file system for input

Hi,

In relation to this issue: http://stackoverflow.com/questions/10107665/run-a-local-file-system-directory-as-input-of-a-mapper-in-cluster

I'm trying to run my jobs on hadoop. I've built an uberjar and am launching with hadoop jar project.jar
I have a custom input format that builds a dseq of filemaps of HDF files to process {"20101201" ["/a.hdf" /b.df"]}

These files are on the local file system. I don't want to put the files on HDFS because the point of this UPLOAD job is to collocate the files needed for each mapper on HDFS.

I'm using hdf-java to open the hdf files.
This library uses a native method call to open the hdf file and it seems to be looking on the HDFS file system. Because it results in an Error file not found. I've printed out the fpath it uses and confirmed the file exists and is readable. This works when running the job with lein run.

In the mapper just before reading the hdf file with hdf-java I tried a
(->> "/" clojure.java.io/file file-seq (take 10)) and confirmed that it's listing files on my local file system.

So I'm guessing it's because it's a native method as I'm a bit confused why I seem to be on the local file system with the file-seq? Is there anyway to get this to work?

Thanks,

Stefan

ArityException Wrong number of args (3) passed to: PersistentArrayMap clojure.lang.AFn.throwArity

Running from lein repl
(require 'parkour.repl)
(->> (text/dseq "test.txt")
(parkour.repl/launch! (get-conf) {"mapred.reduce.tasks" 313} word-count))
I've got bellow error
Warning: profile :jobjar not found.)
(Warning: profile :jobjar not found.)
Warning: specified :main without including it in :aot.
Implicit AOT of :main will be removed in Leiningen 3.0.0.
If you only need AOT for your uberjar, consider adding :aot :all into your
:uberjar profile instead.
Compiling parkour.example.wordcount
14/10/08 16:32:35 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Warning: The Main-Class specified does not exist within the jar. It may not be executable as expected. A gen-class directive may be missing in the namespace which contains the main method.
Created /opt/nelly/clojure/parkourtest/target/parkourtest-0.1.jar
14/10/08 16:32:37 WARN hdfs.DomainSocketFactory: The short-circuit local reads feature is disabled because libhadoop cannot be loaded.

ArityException Wrong number of args (3) passed to: PersistentArrayMap clojure.lang.AFn.throwArity (AFn.java:429)

Project.clj
(defproject parkourtest "0.1"
:description "Map reduce test"
:dependencies [[org.codehaus.jsr166-mirror/jsr166y "1.7.0"]
[com.damballa/parkour "0.6.0"]
[org.clojure/clojure "1.6.0"]]

        :profiles {:dev {:dependencies [[alembic "0.3.2"]]}
                    :provided
                   {:dependencies
                     [[org.apache.hadoop/hadoop-client "2.0.0-mr1-cdh4.2.1"]
                     [org.apache.hadoop/hadoop-core "2.0.0-mr1-cdh4.2.1"]
                      [org.apache.hadoop/hadoop-common "2.0.0-cdh4.2.1"]]}}
        :repositories [["clouderapublic" "http://repository.cloudera.com/artifactory/public/"]]

        :main parkour.example.wordcount)

wordcount.clj
(defn get-conf [](doto %28conf/ig%29
%28.addResource %28new Path))
(.addResource (new Path "hdfs-site.xml"))
(.addResource (new Path "mapred-site.xml"))
(.set "hadoop.job.ugi" "myuser,mygroup")))

Job parkour.example.wordcount/word-count[1/1] failed

Running bellow application, I have received error:

14/10/09 09:52:44 INFO parkour.graph: Launching job parkour.example.wordcount/word-count[1/1]
14/10/09 09:52:44 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
14/10/09 09:52:44 INFO input.FileInputFormat: Total input paths to process : 1
14/10/09 09:53:09 WARN parkour.graph: Job parkour.example.wordcount/word-count[1/1] failed
14/10/09 09:53:09 WARN parkour.graph: Cleaning up path /tmp/myuser-1412862763301-488491490-parkour-transient/t-61
14/10/09 09:53:09 ERROR parkour.tool: Uncaught exception: clojure.lang.ExceptionInfo: Job parkour.example.wordcount/word-count[ 1/1] failed. {:jname "parkour.example.wordcount/word-count[1/1]"}
clojure.lang.ExceptionInfo: Job parkour.example.wordcount/word-count[1/1] failed. {:jname "parkour.example.wordcount/word-count [1/1]"}
at clojure.core$ex_info.invoke(core.clj:4403)
at clojure.lang.AFn.applyToHelper(AFn.java:156)
at clojure.lang.AFn.applyTo(AFn.java:144)
at clojure.core$apply.invoke(core.clj:624)
at parkour.graph$fn__1814$fn__1815.doInvoke(graph.clj:377)
at clojure.lang.RestFn.invoke(RestFn.java:397)
at clojure.lang.AFn.applyToHelper(AFn.java:152)
at clojure.lang.RestFn.applyTo(RestFn.java:132)
at clojure.core$apply.invoke(core.clj:624)
at parkour.graph$graph_future$fn__1676.invoke(graph.clj:26)
at clojure.core$binding_conveyor_fn$fn__4145.invoke(core.clj:1910)
at clojure.lang.AFn.call(AFn.java:18)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)

(ns parkour.example.wordcount
(:require [clojure.string :as str]
[clojure.core.reducers :as r]
[parkour (conf :as conf) (fs :as fs) (mapreduce :as mr)
, (graph :as pg) (toolbox :as ptb) (tool :as tool)]
[parkour.io (text :as text) (seqf :as seqf)])
(:import [org.apache.hadoop.io Text LongWritable]
[org.apache.hadoop.fs Path]
[java.io FileInputStream]
[org.apache.hadoop.conf Configuration Configurable]))

(defn word-count-m
[coll](->> coll
%28r/mapcat #%28str/split % #"s+"%29%29
%28r/map #%28-> [% 1]%29%29))

(defn word-count
[conf lines](-> %28pg/input lines%29
%28pg/map #'word-count-m%29
%28pg/partition [Text LongWritable]%29
%28pg/combine #'ptb/keyvalgroups-r #'+%29
%28pg/output %28seqf/dsink [Text LongWritable]%29%29
%28pg/fexecute conf `word-count%29))

(defn tool
[conf & inpaths](->> %28apply text/dseq inpaths%29
%28word-count conf%29
%28into {}%29
%28prn%29))

(defn get-conf [](doto %28Configuration.%29
%28.addResource %28new Path))
(.addResource (new Path "hdfs-site.xml"))
(.addResource (new Path"mapred-site.xml"))
(.set "hadoop.job.ugi" "mygroup,supergroup")))

(defn -main
[& args](let [confi %28get-conf%29] %28binding [parkour.conf/default confi]
%28let [_ %28prn %28vals %28conf/ig%29%29%29
tlres %28tool/run tool ["/tmp/arektest/in/"]%29
] %28System/exit tlres%29%29%29))

avro/dsink determine output path from input?

Hi,

Any way to have something like ::mr/sink-as dux/prefix-keys but instead of adding prefix creates a subdir?
Or a way to have an avro dseq as input that filters by prefix in filename?

Thanks!

Stefan

Read simultaneously from multiple inputs

Sorry hope I'm not being a bother,

In the docs: https://github.com/damballa/parkour/blob/master/doc/multi-io.md

You mention under Multiple inputs:

The second means that tasks (usually map tasks) read simultaneously from multiple inputs e.g. for >performing joins. Parkour does not as yet have any special support for this type of multiple input.

I thought I could resolve this like this:

(->
                    [(pg/input (mra/dseq [A] A-path))
                     (pg/input (mra/dseq [B] B-path))]
                    (pg/map #'tokeyvalue-mapper)
                    (pg/partition (mra/shuffle [:string [A B]]))
                    (pg/reduce #'calc-reducer)
                    (pg/output :calc (mra/dsink [C] C-path))
                    (pg/execute conf "JOB"))

But I don't want a reducer agreggating all the data I'd like a mapper so I can process each [A B] dataset and sink that but a map node can't take a partition node as input.

Is there any way to get this "Read simultaneously from multiple inputs" functionality to work?

Thanks,

Stefan

ps: The tokeyvalue-mapper does V => K,V where K is a date string extracted from V.

Dseq for a range of integers / custom input splits from clojure code

I was looking for a way to create a dseq for a range of integers, which can be split amongst the mappers.

So far the closest I found was parkour.io.mem/dseq, but this only works in local mode.

Is there a way to implement custom dseqs with custom logic for input splits from clojure code? Perhaps I need to implement (and AOT-compile) a custom InputFormat class for this kind of thing, and implement a cstep which sets this class's name as mapreduce.job.inputformat.class, sets config for it, then somehow wrap that as a dseq? Or perhaps all the dseq stuff assumes file-based input?

Thought I'd mention this anyway as one of the "gaps" alluded to on that documentation ticket -- it feels like something which would be really simple in idiomatic clojure code, and these dseqs are advertised as one of the core abstractions of the library, but it's not at all clear how I'd go about creating one myself, given some logic I'd like to implement for how to generate keys as input for mappers. Looking at the protocol for dseq isn't very enlightening in this respect as they mainly just appear to be wrappers for csteps.

(I did find at least one custom InputFormat for integer range here: https://github.com/twitter/elephant-bird/blob/master/core/src/main/java/com/twitter/elephantbird/mapreduce/input/IntegerListInputFormat.java , although I'm not sure why this particular class uses static fields for its config and if that would work with parkour.)

REPL-launched remote jobs | java.lang.ClassNotFoundException: org.apache.hadoop.util.ShutdownHookManager

Hi I can't get the example in the doc to run (https://github.com/damballa/parkour/blob/master/doc/repl.md#repl-launched-remote-jobs)

I try to run it like so:

lein repl
(require 'parkour.repl)
(parkour.repl/launch! {"mapred.reduce.tasks" 313} word-count "outpath/0"
         (text/dseq "test.txt"))

Here is the output:

Compiling parkour.core
Exception in thread "main" java.lang.ExceptionInInitializerError, compiling:(core.clj:1:1)
    at clojure.lang.Compiler$InvokeExpr.eval(Compiler.java:3463)
    at clojure.lang.Compiler.compile1(Compiler.java:7153)
    at clojure.lang.Compiler.compile1(Compiler.java:7143)
    at clojure.lang.Compiler.compile(Compiler.java:7219)
    at clojure.lang.RT.compile(RT.java:398)
    at clojure.lang.RT.load(RT.java:438)
    at clojure.lang.RT.load(RT.java:411)
    at clojure.core$load$fn__5018.invoke(core.clj:5530)
    at clojure.core$load.doInvoke(core.clj:5529)
    at clojure.lang.RestFn.invoke(RestFn.java:408)
    at clojure.core$load_one.invoke(core.clj:5336)
    at clojure.core$compile$fn__5023.invoke(core.clj:5541)
    at clojure.core$compile.invoke(core.clj:5540)
    at user$eval7.invoke(NO_SOURCE_FILE:1)
    at clojure.lang.Compiler.eval(Compiler.java:6619)
    at clojure.lang.Compiler.eval(Compiler.java:6609)
    at clojure.lang.Compiler.eval(Compiler.java:6582)
    at clojure.core$eval.invoke(core.clj:2852)
    at clojure.main$eval_opt.invoke(main.clj:308)
    at clojure.main$initialize.invoke(main.clj:327)
    at clojure.main$null_opt.invoke(main.clj:362)
    at clojure.main$main.doInvoke(main.clj:440)
    at clojure.lang.RestFn.invoke(RestFn.java:421)
    at clojure.lang.Var.invoke(Var.java:419)
    at clojure.lang.AFn.applyToHelper(AFn.java:163)
    at clojure.lang.Var.applyTo(Var.java:532)
    at clojure.main.main(main.java:37)
Caused by: java.lang.ExceptionInInitializerError
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:270)
    at clojure.lang.RT.loadClassForName(RT.java:2098)
    at clojure.lang.RT.load(RT.java:430)
    at clojure.lang.RT.load(RT.java:411)
    at clojure.core$load$fn__5018.invoke(core.clj:5530)
    at clojure.core$load.doInvoke(core.clj:5529)
    at clojure.lang.RestFn.invoke(RestFn.java:408)
    at clojure.core$load_one.invoke(core.clj:5336)
    at clojure.core$load_lib$fn__4967.invoke(core.clj:5375)
    at clojure.core$load_lib.doInvoke(core.clj:5374)
    at clojure.lang.RestFn.applyTo(RestFn.java:142)
    at clojure.core$apply.invoke(core.clj:619)
    at clojure.core$load_libs.doInvoke(core.clj:5413)
    at clojure.lang.RestFn.applyTo(RestFn.java:137)
    at clojure.core$apply.invoke(core.clj:619)
    at clojure.core$require.doInvoke(core.clj:5496)
    at clojure.lang.RestFn.invoke(RestFn.java:457)
    at parkour.conf$loading__4910__auto__.invoke(conf.clj:1)
    at parkour.conf__init.load(Unknown Source)
    at parkour.conf__init.<clinit>(Unknown Source)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:270)
    at clojure.lang.RT.loadClassForName(RT.java:2098)
    at clojure.lang.RT.load(RT.java:430)
    at clojure.lang.RT.load(RT.java:411)
    at clojure.core$load$fn__5018.invoke(core.clj:5530)
    at clojure.core$load.doInvoke(core.clj:5529)
    at clojure.lang.RestFn.invoke(RestFn.java:408)
    at clojure.core$load_one.invoke(core.clj:5336)
    at clojure.core$load_lib$fn__4967.invoke(core.clj:5375)
    at clojure.core$load_lib.doInvoke(core.clj:5374)
    at clojure.lang.RestFn.applyTo(RestFn.java:142)
    at clojure.core$apply.invoke(core.clj:619)
    at clojure.core$load_libs.doInvoke(core.clj:5417)
    at clojure.lang.RestFn.applyTo(RestFn.java:137)
    at clojure.core$apply.invoke(core.clj:619)
    at clojure.core$require.doInvoke(core.clj:5496)
    at clojure.lang.RestFn.invoke(RestFn.java:457)
    at parkour.core$loading__4910__auto__.invoke(core.clj:1)
    at clojure.lang.AFn.applyToHelper(AFn.java:159)
    at clojure.lang.AFn.applyTo(AFn.java:151)
    at clojure.lang.Compiler$InvokeExpr.eval(Compiler.java:3458)
    ... 26 more
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.util.ShutdownHookManager
    at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:190)
    at parkour.util.shutdown__init.__init0(Unknown Source)
    at parkour.util.shutdown__init.<clinit>(Unknown Source)
    ... 69 more
Compilation failed: Subprocess failed
SocketException The transport's socket appears to have lost its connection to the nREPL server
Exception in thread "Thread-4" clojure.lang.ExceptionInfo: Subprocess failed {:exit-code 1}
    at clojure.core$ex_info.invoke(core.clj:4327)
    at leiningen.core.eval$fn__3532.invoke(eval.clj:226)
    at clojure.lang.MultiFn.invoke(MultiFn.java:231)
    at leiningen.core.eval$eval_in_project.invoke(eval.clj:326)
    at clojure.lang.AFn.applyToHelper(AFn.java:167)
    at clojure.lang.AFn.applyTo(AFn.java:151)
    at clojure.core$apply.invoke(core.clj:619)
    at leiningen.repl$server$fn__7443.invoke(repl.clj:201)
    at clojure.lang.AFn.applyToHelper(AFn.java:159)
    at clojure.lang.AFn.applyTo(AFn.java:151)
    at clojure.core$apply.invoke(core.clj:617)
    at clojure.core$with_bindings_STAR_.doInvoke(core.clj:1788)
    at clojure.lang.RestFn.invoke(RestFn.java:425)
    at clojure.lang.AFn.applyToHelper(AFn.java:163)
    at clojure.lang.RestFn.applyTo(RestFn.java:132)
    at clojure.core$apply.invoke(core.clj:621)
    at clojure.core$bound_fn_STAR_$fn__4102.doInvoke(core.clj:1810)
    at clojure.lang.RestFn.invoke(RestFn.java:397)
    at clojure.lang.AFn.run(AFn.java:24)
    at java.lang.Thread.run(Thread.java:744)
    clojure.tools.nrepl.transport/bencode/fn--4287/fn--4288 (transport.clj:95)
    clojure.tools.nrepl.transport/bencode/fn--4287 (transport.clj:95)
    clojure.tools.nrepl.transport/fn-transport/fn--4261 (transport.clj:42)
    clojure.core/binding-conveyor-fn/fn--4107 (core.clj:1836)
    java.util.concurrent.FutureTask.run (FutureTask.java:262)
    java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1145)
    java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:615)
    java.lang.Thread.run (Thread.java:744)
Bye for now!

Here's the error:

Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.util.ShutdownHookManager

My project.clj file:

(defproject parkour "0.1.0-SNAPSHOT"
  :description "FIXME: write description"
  :url "http://example.com/FIXME"
  :license {:name "Eclipse Public License"
            :url "http://www.eclipse.org/legal/epl-v10.html"}
  :dependencies [[com.damballa/parkour "0.5.4"]]
  :main parkour.core
  :aot [parkour.core]
  :target-path "target/%s"
  :profiles {
             :dev {:dependencies[[org.clojure/clojure "1.5.1"]]}
             :provided [:hadoop-1-2-1]
             :hadoop-1-2-1 {:dependencies[[org.apache.hadoop/hadoop-core "1.2.1"]]}})

My profiles.clj file:

{:user {:plugins [[lein-hadoop-cluster "0.1.2"]]
        :dependencies [[alembic "0.2.1"]]}}

Here's my leiningen version:

lein --version
Leiningen 2.3.4 on Java 1.7.0_51 OpenJDK 64-Bit Server VM

Running on:

lsb_release -a
No LSB modules are available.
Distributor ID: Ubuntu
Description:    Ubuntu 13.10
Release:    13.10
Codename:   saucy

I've noticed that org.apache.hadoop.util.ShutdownHookManager can be found in hadoop-common but I'm on hadoop 1.2.1 and couldn't find a compatible package.

Thanks!

Consider using Pangool?

Would it be interesting for Parkour to use Pangool (http://pangool.net/) rather than Hadoop Java MapRed?

Pangool is a thin Java layer on top of Hadoop MapRed that makes most of the things easier (i.e. joins, secondary sort) and enhances it (using instances rather than classes, making multiple outputs / inputs cleaner, proper text i/o formats, etc) while keeping about the same performance (5% variation). By using a simple Tuple model the limitations of key/value disappear (so one can essentially group by any combination of fields). It has no flow management and it remains at the MapReduce level, being a suitable tool for writing raw MapReduce jobs.

We had the idea to create a Clojure API on top of Pangool (actually, we were first working on another abstraction for adding flow capabilities to Pangool, and planning to add Clojure on top of it), but never ended it so far. We have been running Pangool for almost 2 years now and will be releasing a 1.0 version not so far in the future. We believe the tool is pretty stable and strong, we have used it in many of our clients and have heard of other use cases through the mailing list and so on.

If this is interesting at all we are keen to help.

chaining and multiplexing issue when no data to write

Hi,
When chaining output to input, and output has 0 records to write (in case of multiplexing), then file won't be created and the chained input will fail the whole job.

In my case:

(let [[err-node data-node] (-> node
                                 (pg/output :err (seqf/dsink [BytesWritable BytesWritable])
                                            :data (text/dsink)))]
      [(pg/input data-node) (pg/input err-node)])

In case of no items to write in :err, the folder won't be created and the input, right after will fail with the following error:

15/02/03 19:11:27 ERROR security.UserGroupInformation: PriviledgedActionException as:Roee cause:org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: file:/tmp/Roee-1422963858120-743895824-parkour-transient/t-15354
15/02/03 19:11:27 ERROR parkour.tool: Uncaught exception: org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: file:/tmp/Roee-1422963858120-743895824-parkour-transient/t-15354
org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: file:/tmp/Roee-1422963858120-743895824-parkour-transient/t-15354
    at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:235)
    at org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.listStatus(SequenceFileInputFormat.java:55)
    at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:252)
    at parkour.remote.mux$input_format$reify__12695$fn__12697.invoke(mux.clj:72)
    at clojure.core.reducers$mapcat$fn__3638$fn__3639.invoke(reducers.clj:178)
    at clojure.lang.ArrayChunk.reduce(ArrayChunk.java:63)
    at clojure.core.protocols$fn__6051.invoke(protocols.clj:98)
    at clojure.core.protocols$fn__6015$G__6010__6024.invoke(protocols.clj:19)
    at clojure.core.protocols$seq_reduce.invoke(protocols.clj:31)
    at clojure.core.protocols$fn__6036.invoke(protocols.clj:54)
    at clojure.core.protocols$fn__5989$G__5984__6002.invoke(protocols.clj:13)
    at clojure.core.reducers$folder$reify__3608.coll_reduce(reducers.clj:126)
    at clojure.core$reduce.invoke(core.clj:6177)
    at clojure.core$into.invoke(core.clj:6229)
    at parkour.remote.mux$input_format$reify__12695.getSplits(mux.clj:66)
    at parkour.hadoop.ProxyInputFormat.getSplits(ProxyInputFormat.java:20)
    at org.apache.hadoop.mapred.JobClient.writeNewSplits(JobClient.java:1054)
    at org.apache.hadoop.mapred.JobClient.writeSplits(JobClient.java:1071)
    at org.apache.hadoop.mapred.JobClient.access$700(JobClient.java:179)
    at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:983)
    at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:936)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:394)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1190)
    at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:936)
    at org.apache.hadoop.mapreduce.Job.submit(Job.java:550)
    at parkour.graph$run_job$fn__6012.invoke(graph.clj:363)
    at parkour.graph$run_job.invoke(graph.clj:361)
    at parkour.graph$eval6031$fn__6032$fn__6033.doInvoke(graph.clj:393)
        ...

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.