Giter VIP home page Giter VIP logo

graphx's Introduction

Apache Spark

Lightning-Fast Cluster Computing - http://spark.apache.org/

Online Documentation

You can find the latest Spark documentation, including a programming guide, on the project webpage at http://spark.apache.org/documentation.html. This README file only contains basic setup instructions.

Building

Spark requires Scala 2.10. The project is built using Simple Build Tool (SBT), which can be obtained here. If SBT is installed we will use the system version of sbt otherwise we will attempt to download it automatically. To build Spark and its example programs, run:

./sbt/sbt assembly

Once you've built Spark, the easiest way to start using it is the shell:

./bin/spark-shell

Or, for the Python API, the Python shell (./bin/pyspark).

Spark also comes with several sample programs in the examples directory. To run one of them, use ./bin/run-example <class> <params>. For example:

./bin/run-example org.apache.spark.examples.SparkLR local[2]

will run the Logistic Regression example locally on 2 CPUs.

Each of the example programs prints usage help if no params are given.

All of the Spark samples take a <master> parameter that is the cluster URL to connect to. This can be a mesos:// or spark:// URL, or "local" to run locally with one thread, or "local[N]" to run locally with N threads.

Running tests

Testing first requires Building Spark. Once Spark is built, tests can be run using:

./sbt/sbt test

A Note About Hadoop Versions

Spark uses the Hadoop core library to talk to HDFS and other Hadoop-supported storage systems. Because the protocols have changed in different versions of Hadoop, you must build Spark against the same version that your cluster runs. You can change the version by setting the SPARK_HADOOP_VERSION environment when building Spark.

For Apache Hadoop versions 1.x, Cloudera CDH MRv1, and other Hadoop versions without YARN, use:

# Apache Hadoop 1.2.1
$ SPARK_HADOOP_VERSION=1.2.1 sbt/sbt assembly

# Cloudera CDH 4.2.0 with MapReduce v1
$ SPARK_HADOOP_VERSION=2.0.0-mr1-cdh4.2.0 sbt/sbt assembly

For Apache Hadoop 2.2.X, 2.1.X, 2.0.X, 0.23.x, Cloudera CDH MRv2, and other Hadoop versions with YARN, also set SPARK_YARN=true:

# Apache Hadoop 2.0.5-alpha
$ SPARK_HADOOP_VERSION=2.0.5-alpha SPARK_YARN=true sbt/sbt assembly

# Cloudera CDH 4.2.0 with MapReduce v2
$ SPARK_HADOOP_VERSION=2.0.0-cdh4.2.0 SPARK_YARN=true sbt/sbt assembly

# Apache Hadoop 2.2.X and newer
$ SPARK_HADOOP_VERSION=2.2.0 SPARK_YARN=true sbt/sbt assembly

When developing a Spark application, specify the Hadoop version by adding the "hadoop-client" artifact to your project's dependencies. For example, if you're using Hadoop 1.2.1 and build your application using SBT, add this entry to libraryDependencies:

"org.apache.hadoop" % "hadoop-client" % "1.2.1"

If your project is built with Maven, add this to your POM file's <dependencies> section:

<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-client</artifactId>
  <version>1.2.1</version>
</dependency>

Configuration

Please refer to the Configuration guide in the online documentation for an overview on how to configure Spark.

Contributing to Spark

Contributions via GitHub pull requests are gladly accepted from their original author. Along with any pull requests, please state that the contribution is your original work and that you license the work to the project under the project's open source license. Whether or not you state this explicitly, by submitting any copyrighted material via pull request, email, or other means you agree to license the material under the project's open source license and warrant that you have the legal authority to do so.

graphx's People

Contributors

aarondav avatar alig avatar andrewor14 avatar ankurdave avatar charlesreiss avatar colorant avatar dcrankshaw avatar dennybritz avatar holdenk avatar ijuma avatar jegonzal avatar jerryshao avatar jey avatar joshrosen avatar karenfeng avatar kayousterhout avatar markhamstra avatar mateiz avatar mlnick avatar mosharaf avatar mridulm avatar pwendell avatar rezazadeh avatar rxin avatar scrapcodes avatar shivaram avatar tdas avatar tgravescs avatar tmyklebu avatar xiajunluan avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

graphx's Issues

Fix Analytics.scala

Remove broken commented code and fix primary functions: PageRank, Shortest Path, connected components etc...

Also split out ALS and other non-analytics tasks.

Implement a "project" operation on Graph

For Graph[VD1, ED1], implement Graph#project[VD2, ED2](otherGraph: Graph[VD2, ED2]): Graph[VD1, ED1].

G.project(G') returns a subgraph of G with vertices and edges from G such that its structure matches with G',

Useful in filter operations where you compute some value on vertices and edges of G to use to subgraph, but you really want a subgraph of G, eg

val g = g.project(preprocess(g).subgraph(vPred, ePred))

ClassCastException: s.c.m.HashSet cannot be cast to s.c.m.BitSet

GraphX sometimes fails with the following stack trace:

java.lang.ClassCastException: scala.collection.mutable.HashSet cannot be cast to scala.collection.mutable.BitSet
        at org.apache.spark.graph.VertexSetRDD$$anonfun$5.apply(VertexSetRDD.scala:333)
        at org.apache.spark.graph.VertexSetRDD$$anonfun$5.apply(VertexSetRDD.scala:332)
        at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:84)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
        [...]

Unpersist previous Pregel iterations

Pregel currently does not explicitly unpersist RDDs that are created in previous iterations, forced, and will never be computed again. This slows down GC unnecessarily because the Spark caches fill up with the old iteration data. We should explicitly unpersist these RDDs to improve performance.
#93 was a first attempt at this.

Efficient storage of partition ids for each vertex. (the vid2pid map)

This is really two issues that may need to be solved simultaneously:

  1. Storing Pids for each vertex in either an Array[Pid] or a BitSet
  2. Storing the directionality of edges associated with each Pid. This is needed to support single sided joins with the edge table.

The second issue is slightly more pressing since in order to support the join optimization in mapReduceTriplets in which we only fill in the source or destination vertex attribute we need to know which partitions have in, out, or both edge directions for each vertex.

Changing the API of mrTriplets?

I've been thinking about this, and I think the mapUDF part of mrTriplets is a little bit confusing because we are asking the user to return an Iterator of (Vid, A).

How about we change mapUDF's signature from EdgeTriplet to (EdgeTriplet, Emitter), and the Emitter has a function sendMessage(targetVid: Vid, message: A)?

This way it should be more efficient, but also more obvious what the mapUDF is doing.

@ankurdave @jegonzal @dcrankshaw

Add ReduceEdges operator

Add an operator that takes the set of edge triplets between two connected vertices and a reduce function that emits a single edge triplet. We need to decide how this will handle edges of opposite directions because a user should be able to reduce just the incoming edges, just the outgoing edges, or both(?).

Bugs in VertexPartition

In VertexPartition.scala:

  • Line 107, 127, 151: mask.nextSetBit should be newMask.nextSetBit. The current behavior is harmless but wasteful.
  • Line 140: deltaJoin calls join if the indexes are different. It should call itself.
  • Line 193: createUsingIndex uses pos directly to index into the new partition. This will fail if iter contains a vid that is not in this, as may happen in leftJoin, for example. Instead, there should be a check for pos != -1.
  • Line 203: updateUsingIndex calls System.arraycopy on arrays of different types (Array[VD] and Array[VD2]). This will fail if VD != VD2. Instead, updateUsingIndex should require the iterator to be of type VD and return a VertexPartition[VD].

Change to PrimitiveKeyOpenHashMap broke Spark core build

Running sbt/sbt doc gives the following error:

[error] /Users/ankurdave/repos/graphx/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala:172: not enough arguments for constructor PrimitiveKeyOpenHashMap: (initialCapacity: Int)(implicit evidence$3: ClassManifest[Int], implicit evidence$4: ClassManifest[Int])org.apache.spark.util.collection.PrimitiveKeyOpenHashMap[Int,Int]
[error]     private val mapIdToIndex = new PrimitiveKeyOpenHashMap[Int, Int]()
[error]                                ^
[info] No documentation generated with unsucessful compiler run

The PrimitiveKeyOpenHashMap constructor has a default value for its only argument, but it doesn't seem to be working. Modifying the call site to pass the default argument explicitly fixes the problem.

Update README

  • Add Wikipedia demo script
  • Explain property graphs, Graph/GraphOps functions, and VertexRDD functions

Update README for GraphX

We should update the README so that it discusses GraphX not Spark. We probably want a little bit about how GraphX works as well as quickstart instructions.

Simplify GraphImpl internals further

  • VertexPlacement is only used in VTableReplicated, so it should be created on demand there rather than being passed around in GraphImpl.
  • VTableReplicated should be renamed to ReplicatedVertexView.
  • VertexPlacement should be renamed to VertexRoutingTable.

[Question] Which graphx version to compare to?

We are comparing performance on some of the example applications in Analytics.scala.
Results show GraphX pagerank performs within a factor of 2-3 of Graphlab. I haven't yet been able to get near this performance.

configurations:

  • twitter 1.4B edge
  • 16 nodes
  • branch master
  • command: bin/spark-class -Dspark.executor.memory=52g org.apache.spark.graphx.lib.Analytics <master> pagerank <graph> --tol=0.01 --numEPart=16

Do you have a pointer to which branch and what methodology I should use to replicate these results?

There are some issues in article "Launch a benchmarking cluster"

I want to run pagerank on Graphx, following the instructions(https://github.com/amplab/graphx/wiki/Launch-a-benchmarking-cluster), I encountered some problems.
First, the running command(~/graphx/run-example org.apache.spark.graph.Analytics spark://$MASTERS:7077 pagerank hdfs://$MASTERS:9000/soc-LiveJournal1.txt --numIter=20 --numEPart=128) is wrong, I changed it to "./bin/run-example org.apache.spark.graphx.lib.Analytics spark://XXX:7077 pagerank hdfs://XXX:8020/soc-LiveJournal1.txt". The parameter "--numIter" can not be found in running pagerank, I read the source code and find it is used in cc benchmark.

when I run the command above, spark throw warnings and errors:
14/04/21 10:24:59 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory
……
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Spark cluster looks down

What I am sure is spark UI is ok, and other benchmarks can run rightly.

What I expect is, if you can give some hint in deploying Graphx on standlone cluster?
Thanks very very much, cause I have been blocked on this for 1 week. :)

Edge contraction

Based on our discussion today it seems like it might be helpful to have a function of the form:

def contractEdges(
  ePred: EdgeTriplet[VD,ED] => Boolean, 
  contractFun: EdgeTriplet[VD,ED] => VD,  
  mergeFun: (VD, VD) => VD): Graph[VD, ED]

where the user defined edge predicate ePred determines which edges to contract and the user defined contractFun renders a new vertex for the contracted edge and mergeFun merges multiple vertices that have been contracted together.

Multiple versions of AggregateNeighbors

The aggregateNeighbors function presents edgeTriplets to the user defined map function. Lets provide an additional version that leaves the center vertex data as NULL. This should reduce communicate by half for algorithms like PageRank.

Triplets.collect returns incorrect values

The spark RDD.collect operation stores the output directly into an array. Since we reuse the iterator values only a single edge triplet is stored (in duplicate) for each partition.

Modifications to Aggregate Neighbors

Based on our discussion in the weekly meeting we want to make the following change to aggregate neighbors. The current version is:

def aggregateNeighbors[A: ClassManifest](
      mapFunc: (Vid, EdgeTriplet[VD, ED]) => Option[A],
      mergeFunc: (A, A) => A,
      direction: EdgeDirection)
    : Graph[(VD, Option[A]), ED]

It is worth noting that we actually did have the center vertex as an argument to the map function (addressing the primary issue). Regardless, the proposed simplification is:

def aggregateNeighbors[A: ClassManifest](
      mapFunc: EdgeTriplet[VD, ED] => Iter[(Vid, A)],
      mergeFunc: (A, A) => A)
    : Graph[(VD, Option[A]), ED]

The new semantics of mapFunc is that it takes an edge triplet and then emits an iterator over "messages" to be sent to either of the vertices in the triplet (or even arbitrary vertices in the graph). The mergeFunc is then applied to reduce the messages to a single message which is stored as the vertex attribute in the returned graph.

Improve pre-shuffle aggregation performance in mrTriplets

I realized today that we can actually save the position of the source vertex and the target vertex in the array in EdgeTriplet, and then in aggregation, we can simply use that position to update the aggregation value (without actually doing any hash lookups).

Issue 101 will make this easier to implement.

#101

Wrong PageRank results

Hi all,

I tried the static PageRank computation on GraphX (in Spark 1.0.0) with the following code:

import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

object ord extends Ordering[(VertexId, Double)] { def compare(a:(VertexId, Double), b:(VertexId, Double)) = a._2 compare b._2}

val graph = GraphLoader.edgeListFile(sc, "/lhome/zma/test.txt")
for (i <- 0 to 30) {
  println("iter: " + i)
  val ranks = graph.staticPageRank(i)
  ranks.vertices.top(100)(ord).foreach(println(_))
}

The /lhome/zma/test.txt contains a simple graph:

0 1
1 3
2 3
3 3

The PageRank implemenation ( https://github.com/amplab/graphx/blob/master/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala ) uses this PageRank definition:

PR[i] = alpha + (1 - alpha) * inNbrs[i].map(j => oldPR[j] / outDeg[j]).sum

However, the results I get are:

iter: 0
(0,0.15)
(1,0.15)
(3,0.15)
(2,0.15)
iter: 1
(3,0.5325)
(1,0.27749999999999997)
(0,0.15)
(2,0.15)
iter: 2
(3,0.8384999999999999)
(1,0.27749999999999997)
(0,0.15)
(2,0.15)
iter: 3
(3,0.862725)
(1,0.27749999999999997)
(0,0.15)
(2,0.15)
...

The results with iterNum as 0 and 1 are correct. However, for "iter: 2":

The PageRank value for 3 should be:

0.15 + 0.85 * (0.27749999999999997 + 0.15 + 0.5325)
= 0.966

while the result is (3,0.8384999999999999). It seems the PageRank of vertex 2 is not passed to the PageRank of vertex 3 (0.8384999999999999 == 0.15 + 0.85 * (0.27749999999999997 + 0.5325)).

I noticed that the Pregel implementation (https://github.com/amplab/graphx/blob/master/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala) only sends messages from vertices changed in last superstep. I can understand this performance optimization for less messages. However, is this PageRank implementation on top of this model wrong? Will passing the delta out from changed vertices be more suitable for implementation on this "Pregel" model in GraphX?

Please correct me if I am wrong at understanding some parts. I will appreciate it if additional information or pointers to them are provided.

PageRank causes java.util.NoSuchElementException

When running PageRank on a cluster, sometimes I hit a NoSuchElementException that's caused somewhere in VertexSetRDD. Full stack trace and command below. The line numbers may be slightly off due to debugging printlns.

Command:

/root/graphx/run-example org.apache.spark.graph.Analytics spark://ec2-54-224-159-106.compute-1.amazonaws.com:7077 pagerank hdfs://ec2-54-224-159-106.compute-1.amazonaws.com:9000/soc-LiveJournal1.txt --numIter=10 --numEPart=128

Stack Trace:

java.util.NoSuchElementException: End of stream
    at org.apache.spark.util.NextIterator.next(NextIterator.scala:83)
    at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:29)
    at org.apache.spark.graph.VertexSetRDD$$anonfun$8.apply(VertexSetRDD.scala:314)
    at org.apache.spark.graph.VertexSetRDD$$anonfun$8.apply(VertexSetRDD.scala:313)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:84)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:84)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:84)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:84)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:84)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:84)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:84)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:84)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:84)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:84)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:84)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
    at org.apache.spark.rdd.ZippedRDD.compute(ZippedRDD.scala:64)
    at org.apache.spark.graph.VertexSetRDD.compute(VertexSetRDD.scala:149)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
    at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:32)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:159)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:100)
    at org.apache.spark.scheduler.Task.run(Task.scala:53)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:212)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:744)

Synthetic Graph Generators

To help with benchmarking lets create some synthetic graph generators. The Pregel paper describes a log-normal generator which is relatively easy to implement.

Lazy evaluation of join and map operations

The VertexSetRDD[VD] stores the vertex attributes as an IndexedSeq[VD]. When a VertexSetRDD is first constructed from an RDD[(Vid,VD)] the attributes are stored in an Array[VD]. When mapValues is in invoked on a VertexSetRDD[VD] a new array is created and populated with the result of the map operation.

https://github.com/amplab/graphx/blob/master/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala#L129

However when leftJoin is invoked an IndexedSeqView is created:

https://github.com/amplab/graphx/blob/master/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala#L192

Should both be implemented using views or should both be implemented using actual storage. The tradeoffs are the following:

  1. Using views means that long chains of computation might be invoked repeatedly.
  2. Using Arrays could lead to many long-lived allocations.

I suspect all the operations should be implemented using the view but I am not sure what the implications are for caching.

Getting eror to run scala testcases from https://github.com/amplab/graphx/blob/master/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala getting error

When i m trying to run scala testcases from https://github.com/amplab/graphx/blob/master/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala getting error

Stacktrace

WARNING: -p has been deprecated and will be reused for a different (but still very cool) purpose in ScalaTest 2.0. Please change all uses of -p to -R.
*** RUN ABORTED ***
java.lang.ClassNotFoundException: org.apache.spark.graphx.GraphTestSuite
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.scalatest.tools.Runner$$anonfun$21.apply(Runner.scala:1470)
at org.scalatest.tools.Runner$$anonfun$21.apply(Runner.scala:1469)
at scala.collection.TraversableLike$$anonfun$filter$1.apply(TraversableLike.scala:264)
at scala.collection.immutable.List.foreach(List.scala:318)

In Scala Test IDE:

Event:
Run Aborted

Message:
Unable to load a Suite class. This could be due to an error in your runpath. Missing class: org.apache.spark.graphx.GraphTestSuite
Date:
Thu Jul 17 11:49:57 IST 2014
Thread:
Thread-2
Exception:
java.lang.ClassNotFoundException
java.net.URLClassLoader$1.run(URLClassLoader.java:366) java.net.URLClassLoader$1.run(URLClassLoader.java:355) java.security.AccessController.doPrivileged(Native Method) java.net.URLClassLoader.findClass(URLClassLoader.java:354) java.lang.ClassLoader.loadClass(ClassLoader.java:424) java.lang.ClassLoader.loadClass(ClassLoader.java:357) org.scalatest.tools.Runner$$anonfun$21.apply(Runner.scala:1470) org.scalatest.tools.Runner$$anonfun$21.apply(Runner.scala:1469) scala.collection.TraversableLike$$anonfun$filter$1.apply(TraversableLike.scala:264) scala.collection.immutable.List.foreach(List.scala:318) scala.collection.TraversableLike$class.filter(TraversableLike.scala:263) scala.collection.AbstractTraversable.filter(Traversable.scala:105) org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1469) org.scalatest.tools.RunnerJFrame$RunnerThread$$anonfun$run$1.apply(RunnerJFrame.scala:1361) org.scalatest.tools.RunnerJFrame$RunnerThread$$anonfun$run$1.apply(RunnerJFrame.scala:1359) org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1645) org.scalatest.tools.RunnerJFrame$RunnerThread.run(RunnerJFrame.scala:1358)

Can anyone please giude me .

Stack Overflow caused by repeated calls to default Java serialized data reader

When we run PageRank for too many iterations (100 iterations consistently triggers it), we get a stack overflow that stems from reading deeply nested serial data using the default java serializer. Basically, we get repeated calls to the following sequence of methods

        at scala.collection.immutable.$colon$colon.readObject(List.scala:435)
        at sun.reflect.GeneratedMethodAccessor1.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1015)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1891)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at scala.collection.immutable.$colon$colon.readObject(List.scala:435)
        at sun.reflect.GeneratedMethodAccessor1.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1015)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1891)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)

@ankurdave poked around a little bit and he said that Kryo seemed to be getting called on most things but apparently not whatever is causing this.

I did some more poking around today and when profiling PageRank running on livejournal, I found that a bunch of exceptions were being thrown in the Kryo serialization code. It seems that Kryo keeps trying to serialize something, fails with a NoSuchMethodException, and then falls back to Java serialization code. I'm thinking that these two might be related? The exception stack trace is here.

screen shot 2013-10-11 at 3 22 15 pm 2

Spark slaves die on second run of pagerank

As I've been running benchmarks the last few days, I've noticed that I have at least 1 and usually 2 or 3 slaves die when running pagerank for the second time in a row. Occasionally I will see a akka.pattern.AskTimeoutException but usually it just looks like the CoarseGrainedExecutorBackend on that machine received a signal to terminate.

Here are the complete steps to reproduce the issue (please let me know if this does not reproduce it for you):

# LOCAL REPO
# Launch the cluster
ec2/spark-ec2 -s 16 -k $KEYPAIR -i ~/.ssh/aws_rsa -t m2.4xlarge -z us-east-1d --spot-price=1 launch clean_bench
ssh $MASTER_IP_ADDRESS

# NOW ON MASTER
# Setup cluster
wget https://snap.stanford.edu/data/soc-LiveJournal1.txt.gz
gunzip soc-LiveJournal1.txt.gz
~/ephemeral-hdfs/bin/hadoop dfs -copyFromLocal soc-LiveJournal1.txt /
yum install git
git clone https://github.com/amplab/graphx.git
cd graphx
sbt/sbt assembly
cp ~/spark/conf/core-site.xml ~/spark/conf/slaves ~/spark/conf/spark-env.sh ~/graphx/conf/ 
~/spark/bin/stop-all.sh
~/spark-ec2/copy-dir ~/graphx
~/graphx/bin/start-all.sh

# Reproduce failure:
source ~/spark-ec2/ec2-variables.sh
# Run the first time, should finish fine
~/graphx/run-example org.apache.spark.graph.Analytics spark://$MASTERS:7077 pagerank hdfs://$MASTERS:9000/soc-LiveJournal1.txt --numIter=20 --numEPart=128
# Run a second time immediately afterward. Within a minute or so of starting the second computation you should see at least one
# slave (often 2 or 3) die. I check for dead slaves by watching the spark master web UI
~/graphx/run-example org.apache.spark.graph.Analytics spark://$MASTERS:7077 pagerank hdfs://$MASTERS:9000/soc-LiveJournal1.txt --numIter=20 --numEPart=128


# Restart the cluster
~/graphx/bin/stop-all.sh
~/graphx/bin/start-all.sh


# Repeat the experiment. You should see the same slave machines die on the second run as before.
# Run the first time, should finish fine
~/graphx/run-example org.apache.spark.graph.Analytics spark://$MASTERS:7077 pagerank hdfs://$MASTERS:9000/soc-LiveJournal1.txt --numIter=20 --numEPart=128
# Run a second time immediately afterward. Within a minute or so of starting the second computation you should see at least one
# slave (often 2 or 3) die
~/graphx/run-example org.apache.spark.graph.Analytics spark://$MASTERS:7077 pagerank hdfs://$MASTERS:9000/soc-LiveJournal1.txt --numIter=20 --numEPart=128

Partition by Canonical Edge Direction

To be able to efficiently collapse edges in reverse directions we need the edge partitioner to always place the edges (u,v) and (v,u) on the same machine. The current edge partition logic is (defined in https://github.com/amplab/graphx/blob/master/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala#L375):

protected def edgePartitionFunction2D(src: Vid, dst: Vid, 
    numParts: Pid, ceilSqrtNumParts: Pid): Pid = {
    val mixingPrime: Vid = 1125899906842597L 
    val col: Pid = ((math.abs(src) * mixingPrime) % ceilSqrtNumParts).toInt
    val row: Pid = ((math.abs(dst) * mixingPrime) % ceilSqrtNumParts).toInt
    (col * ceilSqrtNumParts + row) % numParts
  }

For now I propose adding:

protected def edgePartitionFunction2D(srcOrig: Vid, dstOrig: Vid, 
    numParts: Pid, ceilSqrtNumParts: Pid): Pid = {
    val mixingPrime: Vid = 1125899906842597L 
    val src = math.min(srcOrig, dstOrig)  // <-------- This line -------------
    val dst = math.max(srcOrig, dstOrig) // <-------- And this line -------
    val col: Pid = ((math.abs(src) * mixingPrime) % ceilSqrtNumParts).toInt
    val row: Pid = ((math.abs(dst) * mixingPrime) % ceilSqrtNumParts).toInt
    (col * ceilSqrtNumParts + row) % numParts
  }

Implement an "extendedSubgraph" function on Graph

Should we implement a variation of the subgraph function, where extendedSubgraph(vPred, ePred) returns a graph containing edges satisfying ePred(e) && (vPred(dst) || vPred(srs)), and vertices either satisfying vPrd(v) or is connected to an edge that satisfies the previous condition.

Can be used to implement a propagate-and-aggregate wit the ability to specify specific vertices that start the computation.

Add ALS to Analytics

To provide a better benchmark we need some additional applications for the analytics toolkits.

Evaluate switching vertices RDD to use IndexedRDD

Before actually switching the vertices to used the new IndexedRDD we should first evaluate how this will play with the replicated vertex table. I am a little concerned that we might need to rethink the design slightly.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.