Giter VIP home page Giter VIP logo

spark-avro's Introduction

spark-avro's People

Contributors

aarondav avatar bepcyc avatar clockfly avatar dbieber avatar gengliangwang avatar granturing avatar gsolasab avatar guisim avatar jaley avatar jdrit avatar jendap avatar jiekebo avatar jinshen-cn avatar joshrosen avatar koertkuipers avatar ksedgwic avatar leahmcguire avatar liancheng avatar markgrover avatar marmbrus avatar mengxr avatar nihed avatar oopsoutofmemory avatar philwills avatar pwendell avatar rxin avatar squito avatar steven-aerts avatar timyitong avatar vlyubin avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

spark-avro's Issues

NotSerializableException when calling `first()`

val person = sqlContext.avroFile("/tmp/person.avro")

When I try to call `first()` on it, I get "NotSerializableException" exceptions again:

person.first()

...
14/11/21 12:59:17 ERROR Executor: Exception in task 0.0 in stage 14.0 (TID 20)
java.io.NotSerializableException: org.apache.avro.generic.GenericData$Record
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
    at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)

Support for compression

Hi I was looking to read and write my dataset in avro format using snappy compression.
Does this is supported? any examples or pointers will be great help.

Thanks

"NoClassDefFoundError FsInput" error

Update: please read my second post


I'm using (~ desperately trying to use) the 1.0.0 version, since with the previous one I have the "LONG not supported error".

<dependency>
   <groupId>com.databricks</groupId>
   <artifactId>spark-avro_2.10</artifactId>
   <version>1.0.0</version>
</dependency>

In Eclipse I got this error
"bad symbolic reference. A signature in package.class refers to type DataFrame in package org.apache.spark.sql which is not available. It may be completely missing from the current classpath, or the version on the classpath might be incompatible with the version used when compiling package.class."
As well with spark-shell --jars

scala>     val sqlContext = new org.apache.spark.sql.SQLContext(sc)
sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@7d942108
scala> import com.databricks.spark.avro._
import com.databricks.spark.avro._
scala> sqlContext.avroFile(pathToAvrodata).collect().foreach(println)
error: bad symbolic reference. A signature in package.class refers to type DataFrame
in package org.apache.spark.sql which is not available.
It may be completely missing from the current classpath, or the version on
the classpath might be incompatible with the version used when compiling package.class.

Moreover with the previous version I was able to "package with maven" now not anymore:

[ERROR] Failed to execute goal net.alchim31.maven:scala-maven-plugin:3.1.3:compile (default) on project spark-formats-test: wrap: org.apache.commons.exec.ExecuteException: Process exited with an error: 1(Exit value: 1) -> [Help 1]
org.apache.maven.lifecycle.LifecycleExecutionException: Failed to execute goal net.alchim31.maven:scala-maven-plugin:3.1.3:compile (default) on project spark-formats-test: wrap: org.apache.commons.exec.ExecuteException: Process exited with an error: 1(Exit value: 1)

I'm using Eclipse Kepler, maven 3.0+ and Scala plugin 2.10.4 with the net.alchim31.maven to compile the Scala project. Any help or suggestion is really appreciated.

Cheers,
L

Map type not supported

Exception in thread "main" java.lang.RuntimeException: Unsupported type MAP
at scala.sys.package$.error(package.scala:27)
at com.databricks.spark.avro.AvroRelation.com$databricks$spark$avro$AvroRelation$$toSqlType(AvroRelation.scala:116)
at com.databricks.spark.avro.AvroRelation$$anonfun$5.apply(AvroRelation.scala:97)
at com.databricks.spark.avro.AvroRelation$$anonfun$5.apply(AvroRelation.scala:96)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)

missing support for spark DateType

Hi,
I want to save the data, loaded from the database, to the HDFS.
In the DB schema there is a field which has an "ansidate" type.
When I load the data to the dataframes the field with the date type is converted to spark.DateType.
And finally when I'm trying to save the dataframe on the HDFS the application is crushing with given stack trace:

 INFO DAGScheduler: Job 8 finished: saveAsHadoopFile at AvroSaver.scala:54, took 1.289520 s
Exception in thread "main" java.lang.IllegalArgumentException: Unexpected type DateType.
        at com.databricks.spark.avro.SchemaConverters$.com$databricks$spark$avro$SchemaConverters$$convertFieldTypeToAvro(SchemaConverters.scala:184)
        at com.databricks.spark.avro.SchemaConverters$$anonfun$convertStructToAvro$1.apply(SchemaConverters.scala:103)
        at com.databricks.spark.avro.SchemaConverters$$anonfun$convertStructToAvro$1.apply(SchemaConverters.scala:99)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
        at com.databricks.spark.avro.SchemaConverters$.convertStructToAvro(SchemaConverters.scala:99)
        at com.databricks.spark.avro.AvroSaver$.save(AvroSaver.scala:51)
        at com.databricks.spark.avro.package$AvroDataFrame.saveAsAvroFile(package.scala:34)
        at com.databricks.spark.avro.DefaultSource.createRelation(DefaultSource.scala:78)
        at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:308)
        at org.apache.spark.sql.DataFrame.save(DataFrame.scala:1123)
        at org.apache.spark.sql.DataFrame.save(DataFrame.scala:1083)
        at com.barclays.mercury.junoop.bcbs.SparkExperimentalJob$$anonfun$main$3.apply(SparkExperimentalJob.scala:53)
        at com.barclays.mercury.junoop.bcbs.SparkExperimentalJob$$anonfun$main$3.apply(SparkExperimentalJob.scala:52)
        at scala.collection.immutable.List.foreach(List.scala:318)
        at com.barclays.mercury.junoop.bcbs.SparkExperimentalJob$.main(SparkExperimentalJob.scala:52)
        at com.barclays.mercury.junoop.bcbs.SparkExperimentalJob.main(SparkExperimentalJob.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
^

Saving the DateType also doesn't work in the parquet file format in spark core library.
I also wonder what is the reason for that it's not implemented.

Did you achieve the solution for this?

StackOverFlowError : When added where clause in the query

When I Tried to add a Where clause in the query :

import org.apache.hadoop.hdfs.DistributedFileSystem
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import com.databricks.spark.avro._
object SparkAvroRW {
private val AppName = "SparkAvroRW"
def main(args: Array[String]) {

    //Defining the context and Initializing the App
    val sparkConf = new SparkConf() setAppName(AppName) setMaster("local")
    val sc = new SparkContext(sparkConf)
        val input = args(0)
    val output = args(1)
    val sqlContext = new SQLContext(sc);
    val students = sqlContext.avroFile(input);
    val sRDD =   students.collect;
    print(sRDD.length)
    students.registerTempTable("student")
    val studentsRDD = sqlContext.sql("SELECT * FROM student where id > 20").saveAsAvroFile(output) 
  }
}

It Throws the following error :

Exception in thread "main" java.lang.StackOverflowError
    at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
    at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
    at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
    at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
    at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
    at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
    at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
    at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
    at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
    at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
    at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
    at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
    at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
    at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
    at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
    at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
    at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
    at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
    at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
    at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
    at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
    at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
    at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
    at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)

Multiple load paths in .load()

It looks like support for multiple load paths via .load() is not consistent with that of spark-csv.

For example, this works in spark-csv:
val df = sqlContext.read.format("com.databricks.spark.csv").load("/path/data1.csv,/path/data2.csv")

When I try this in spark-avro:
val df = sqlContext.read.format("com.databricks.spark.avro").load("/path/data1.avro,/path/data2.avro")

It fails with the following message:
"java.io.FileNotFoundException: The path (/path/data1.avro,/path/data2.avro) is invalid."

  1. Will the next version of spark-avro have that support (separate paths by a comma)?
  2. How about changing the argument of .load() from a string to a list of paths (iterable)?

PySpark episodes example gives NoClassDefFoundError

Spark 1.3.1, spark-avro_2.10-1.0.0 and Hadoop 2.4.0.

Running the example

>>> from pyspark.sql import SQLContext
>>> sqlContext = SQLContext(sc)
>>> df = sqlContext.load("episodes.avro", source="com.databricks.spark.avro")

gives me the following error:

py4j.protocol.Py4JJavaError: An error occurred while calling o25.load.
: java.lang.NoClassDefFoundError: org/apache/avro/SchemaBuilder$BaseTypeBuilder
    at com.databricks.spark.avro.AvroRelation.<init>(AvroRelation.scala:59)
    at com.databricks.spark.avro.DefaultSource.createRelation(DefaultSource.scala:41)
    at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:219)
    at org.apache.spark.sql.SQLContext.load(SQLContext.scala:697)
    at org.apache.spark.sql.SQLContext.load(SQLContext.scala:685)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
    at py4j.Gateway.invoke(Gateway.java:259)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:207)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: org.apache.avro.SchemaBuilder$BaseTypeBuilder
    at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
    ... 16 more

Will use KryoSerializer work in spark-avro?

Hi, in the SparkSQL, if I loaded my AVRO files and register them in tables, if I setup "spark.serializer" to "org.apache.spark.serializer.KryoSerializer", will it be honored by SparkSQL and SparkAvro? The reason is that currently I saw the gc time took more than 50% time, but even I enable the "kryoserializer", I didn't see any improvement. My question is that if the Spark-AVRO supports "KryoSerializer" internal? Thanks

spark-avro fails to save DF with nested records having the same name

spark-avro fails to save this dataset:

df = sqlc.createDataFrame([Row(id=1, data=Row(data=Row(key='value')))])
df.save("s3://...", "com.databricks.spark.avro")

with the following error:

Py4JJavaError: An error occurred while calling o1873.save.
: org.apache.avro.SchemaParseException: Can't redefine: data
    at org.apache.avro.SchemaBuilder$NameContext.put(SchemaBuilder.java:936)
    at org.apache.avro.SchemaBuilder$NameContext.access$600(SchemaBuilder.java:884)
    at org.apache.avro.SchemaBuilder$NamespacedBuilder.completeSchema(SchemaBuilder.java:470)
    at org.apache.avro.SchemaBuilder$RecordBuilder.fields(SchemaBuilder.java:1734)
    at com.databricks.spark.avro.SchemaConverters$.convertStructToAvro(SchemaConverters.scala:99)
    at com.databricks.spark.avro.SchemaConverters$.com$databricks$spark$avro$SchemaConverters$$convertFieldTypeToAvro(SchemaConverters.scala:190)
    at com.databricks.spark.avro.SchemaConverters$$anonfun$convertStructToAvro$1.apply(SchemaConverters.scala:104)
    at com.databricks.spark.avro.SchemaConverters$$anonfun$convertStructToAvro$1.apply(SchemaConverters.scala:100)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
    at com.databricks.spark.avro.SchemaConverters$.convertStructToAvro(SchemaConverters.scala:100)
    at com.databricks.spark.avro.SchemaConverters$.com$databricks$spark$avro$SchemaConverters$$convertFieldTypeToAvro(SchemaConverters.scala:190)
    at com.databricks.spark.avro.SchemaConverters$$anonfun$convertStructToAvro$1.apply(SchemaConverters.scala:104)
    at com.databricks.spark.avro.SchemaConverters$$anonfun$convertStructToAvro$1.apply(SchemaConverters.scala:100)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
    at com.databricks.spark.avro.SchemaConverters$.convertStructToAvro(SchemaConverters.scala:100)
    at com.databricks.spark.avro.AvroSaver$.save(AvroSaver.scala:66)
    at com.databricks.spark.avro.package$AvroDataFrame.saveAsAvroFile(package.scala:37)
    at com.databricks.spark.avro.DefaultSource.createRelation(DefaultSource.scala:84)
    at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:240)
    at org.apache.spark.sql.DataFrame.save(DataFrame.scala:1196)
    at org.apache.spark.sql.DataFrame.save(DataFrame.scala:1181)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
    at py4j.Gateway.invoke(Gateway.java:259)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:207)
    at java.lang.Thread.run(Thread.java:745)

SchemaConverter is private, can we expose an API for it?

The use case is that sometimes a user might want to store an RDD with a specific Avro schema, if there is a converter available, it will be fairly easy to do something like:

val schema = new Schema.Parser().load('bla') // Avro Schema class
val structType = new SchemaConverter().toSql(schema)
val rddWithRightSchema = sqlContext.applySchema(rdd, structType)
rddWithRightSchema.saveAsAvroFile("/foo/bar")

java.lang.ArrayIndexOutOfBoundsException with Google Analytics Data

I'm attempting to use spark-avro with Google Analytics avro data files, from one of our clients. Also I'm new to spark/scala, so my apologies if I've got anything wrong or done anything stupid. I'm using Spark 1.3.1.

I'm experimenting with the data in the spark-shell which I'm kicking off like this:

spark-shell --packages com.databricks:spark-avro_2.10:1.0.0

Then I'm running the following commands:

import com.databricks.spark.avro._
import scala.collection.mutable._

val gadata = sqlContext.avroFile("[client]/data")
gadata: org.apache.spark.sql.DataFrame = [visitorId: bigint, visitNumber: bigint, visitId: bigint, visitStartTime:  bigint, date: string, totals: struct<visits:bigint,hits:bigint,pageviews:bigint,timeOnSite:bigint,bounces:bigint,tr ansactions:bigint,transactionRevenue:bigint,newVisits:bigint,screenviews:bigint,uniqueScreenviews:bigint,timeOnScre en:bigint,totalTransactionRevenue:bigint>, trafficSource: struct<referralPath:string,campaign:string,source:string, medium:string,keyword:string,adContent:string>, device: struct<browser:string,browserVersion:string,operatingSystem :string,operatingSystemVersion:string,isMobile:boolean,mobileDeviceBranding:string,flashVersion:string,javaEnabled: boolean,language:string,screenColors:string,screenResolution:string,deviceCategory:string>, geoNetwork: str...

val gaIds = gadata.map(ga => ga.getString(11)).collect()

I get the following error:

[Stage 2:=>                                                                                          (8 + 4) / 430]15/05/14 11:14:04 ERROR Executor: Exception in task 12.0 in stage 2.0 (TID 27)
java.lang.ArrayIndexOutOfBoundsException
15/05/14 11:14:04 WARN TaskSetManager: Lost task 12.0 in stage 2.0 (TID 27, localhost): java.lang.ArrayIndexOutOfBoundsException

15/05/14 11:14:04 ERROR TaskSetManager: Task 12 in stage 2.0 failed 1 times; aborting job
15/05/14 11:14:04 WARN TaskSetManager: Lost task 11.0 in stage 2.0 (TID 26, localhost): TaskKilled (killed intentionally)
15/05/14 11:14:04 WARN TaskSetManager: Lost task 10.0 in stage 2.0 (TID 25, localhost): TaskKilled (killed intentionally)
15/05/14 11:14:04 WARN TaskSetManager: Lost task 9.0 in stage 2.0 (TID 24, localhost): TaskKilled (killed intentionally)
15/05/14 11:14:04 WARN TaskSetManager: Lost task 13.0 in stage 2.0 (TID 28, localhost): TaskKilled (killed intentionally)
org.apache.spark.SparkException: Job aborted due to stage failure: Task 12 in stage 2.0 failed 1 times, most recent failure: Lost task 12.0 in stage 2.0 (TID 27, localhost): java.lang.ArrayIndexOutOfBoundsException

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
        at scala.Option.foreach(Option.scala:236)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

I though this might be too do with the index I was using, but the following statement works OK.

scala> gadata.first().getString(11)
res12: String = 29456309767885

So I though that maybe some of the records might be empty or have different amount of columns... so I attempted to run the following statement to get a list of all the record lengths:

scala> gadata.map(ga => ga.length).collect()

But I get a similar error:

[Stage 4:=>                                                                                          (8 + 4) / 430]15/05/14 11:20:04 ERROR Executor: Exception in task 12.0 in stage 4.0 (TID 42)
java.lang.ArrayIndexOutOfBoundsException
15/05/14 11:20:04 WARN TaskSetManager: Lost task 12.0 in stage 4.0 (TID 42, localhost): java.lang.ArrayIndexOutOfBoundsException

15/05/14 11:20:04 ERROR TaskSetManager: Task 12 in stage 4.0 failed 1 times; aborting job
15/05/14 11:20:04 WARN TaskSetManager: Lost task 11.0 in stage 4.0 (TID 41, localhost): TaskKilled (killed intentionally)
15/05/14 11:20:04 ERROR Executor: Exception in task 13.0 in stage 4.0 (TID 43)
org.apache.spark.TaskKilledException
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:194)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
15/05/14 11:20:04 WARN TaskSetManager: Lost task 9.0 in stage 4.0 (TID 39, localhost): TaskKilled (killed intentionally)
15/05/14 11:20:04 WARN TaskSetManager: Lost task 10.0 in stage 4.0 (TID 40, localhost): TaskKilled (killed intentionally)
org.apache.spark.SparkException: Job aborted due to stage failure: Task 12 in stage 4.0 failed 1 times, most recent failure: Lost task 12.0 in stage 4.0 (TID 42, localhost): java.lang.ArrayIndexOutOfBoundsException

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
        at scala.Option.foreach(Option.scala:236)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

Is this an Issue with Spark-Avro or Spark?

Unable to get an overloaded version of newAPIHadoopFile work with Avro

This piece of code works

val hadoopConf = new org.apache.hadoop.conf.Configuration()
hadoopConf.setLong(FileInputFormat.SPLIT_MAXSIZE, 2048)
val input = sc.newAPIHadoopFile(
  "README.md",
  classOf[TextInputFormat],
  classOf[LongWritable],
  classOf[Text],
  hadoopConf).map(_._2.toString())

newAPIHadoopFile() is taking hadoopConf as argument.

However a similar version with Avro does not work and throws compiler errors

import org.apache.avro.generic.GenericData
import org.apache.avro.generic.GenericRecord
import org.apache.avro.mapred.AvroKey
import org.apache.avro.Schema
import org.apache.hadoop.io.NullWritable
import org.apache.avro.mapreduce.AvroKeyInputFormat
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.Text
val hadoopConfiguration = new Configuration(sc.hadoopConfiguration)
hadoopConfiguration.set("mapreduce.input.fileinputformat.split.maxsize", "67108864")
sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable, AvroKeyInputFormat[GenericRecord]](path + "/*.avro",
  classOf[AvroKey[GenericRecord]],
  classOf[NullWritable],
  classOf[AvroKeyInputFormat[GenericRecord]],
  hadoopConfiguration)


    <dependency>
        <groupId>org.apache.avro</groupId>
        <artifactId>avro</artifactId>
        <version>1.7.7</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>com.databricks</groupId>
        <artifactId>spark-avro_2.10</artifactId>
        <version>1.0.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.avro</groupId>
        <artifactId>avro-mapred</artifactId>
        <version>1.7.7</version>
        <classifier>hadoop2</classifier>
        <scope>provided</scope>
    </dependency>

Error:

    [ERROR] /Users/dvasthimal/ebay/projects/ep/ep-spark/src/main/scala/com/ebay/ep/poc/spark/reporting/process/util/DataUtil.scala:39: error: overloaded method value newAPIHadoopFile with alternatives:
    [INFO]   (path: String,fClass: Class[org.apache.avro.mapreduce.AvroKeyInputFormat[org.apache.avro.generic.GenericRecord]],kClass: Class[org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord]],vClass: Class[org.apache.hadoop.io.NullWritable],conf: org.apache.hadoop.conf.Configuration)org.apache.spark.rdd.RDD[(org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord], org.apache.hadoop.io.NullWritable)] <and>
    [INFO]   (path: String)(implicit km: scala.reflect.ClassTag[org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord]], implicit vm: scala.reflect.ClassTag[org.apache.hadoop.io.NullWritable], implicit fm: scala.reflect.ClassTag[org.apache.avro.mapreduce.AvroKeyInputFormat[org.apache.avro.generic.GenericRecord]])org.apache.spark.rdd.RDD[(org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord], org.apache.hadoop.io.NullWritable)]
    [INFO]  cannot be applied to (String, Class[org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord]], Class[org.apache.hadoop.io.NullWritable], Class[org.apache.avro.mapreduce.AvroKeyInputFormat[org.apache.avro.generic.GenericRecord]], org.apache.hadoop.conf.Configuration)
    [INFO]     sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable, AvroKeyInputFormat[GenericRecord]](path + "/*.avro",
    [INFO]                        ^
    [ERROR] o

Specifying a read schema with spark-avro

It would be nice to have an option to supply a read schema (in lieu of the embedded schema) when reading avro files via spark-avro.

For example, the Python Avro API allows the following:
reader = DataFileReader(data, DatumReader(readers_schema=schema))

The scenario is this: I have many .avro files, possibly with different schemas (due to schema evolution), and I would like to use a single "master" schema to ingest all of those avro files into a single Spark Dataframe.

java.lang.NoSuchMethodError while invoking hc.avroFile(inputDir)

package com.ep.poc.spark.reporting.process.service

import com.ep.poc.spark.reporting.process.util.DateUtil._

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

import collection.mutable.HashMap

import com.databricks.spark.avro._

class HadoopSuccessEvents2Service extends ReportingService {

override def execute(arguments: HashMap[String, String], sc: SparkContext) {
val detail = "reporting.detail." + arguments.get("subcommand").get
val startDate = formatStringAsDate(arguments.get("startDate").get)
val endDate = formatStringAsDate(arguments.get("endDate").get)
val inputDir = arguments.get("input").get
val outputDir = arguments.get("output").get

val hc = new org.apache.spark.sql.hive.HiveContext(sc)
val successDetail_S1 = hc.avroFile(inputDir)
successDetail_S1.registerTempTable("success_events.sojsuccessevents1")
val countS1 = hc.sql("select count(*) from success_events.sojsuccessevents1")
println(countS1.collect)

}
}

I am sorry to raise it as a issue. Can you share any mailing list or google group ?

Logs

Exception in thread "Driver" scala.MatchError: java.lang.NoSuchMethodError: com.databricks.spark.avro.package$AvroContext.avroFile(Ljava/lang/String;)Lorg/apache/spark/sql/DataFrame; (of class java.lang.NoSuchMethodError)

Got NullPointerException when tried to read AVRO file generated by avro 1.7.4

I tried to test the spark-avro latest from the master with existing AVRO files stored in Hadoop 2.2.0 HDFS, generated by AVRO 1.7.4 version.

I installed the Spark 1.2.0, and spark-avro of latest from trunk, the jar file version is spark-avro_2.10-0.1.jar.

When I tried to load a small 2.3M avro file in spark, I got the errors. Here are the steps:

[yzhang@hostname spark-1.2.0-bin-2.2.0]$ bin/spark-shell --jars ~/lib/spark-avro_2.10-0.1.jar
Spark assembly has been built with Hive, including Datanucleus jars on classpath
.........
Using Scala version 2.10.4 (IBM J9 VM, Java 1.7.0)
.......................

scala> import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SQLContext

scala> val sqlContext = new SQLContext(sc)
sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@3790301b

scala> import com.databricks.spark.avro._
import com.databricks.spark.avro._

scala> val tags = sqlContext.avroFile("hdfs://namenode:9000/data/tags/")

tags: org.apache.spark.sql.SchemaRDD =
SchemaRDD[0] at RDD at SchemaRDD.scala:108
== Query Plan ==
== Physical Plan ==
PhysicalRDD [account_id#0L,delta#1,tags#2], MappedRDD[2] at map at AvroRelation.scala:63

scala> tags.count()

15/02/04 16:18:04 INFO executor.Executor: Fetching http://10.20.95.141:32451/jars/spark-avro_2.10-0.1.jar with timestamp 1423084457400
15/02/04 16:18:04 INFO util.Utils: Fetching http://10.20.95.141:32451/jars/spark-avro_2.10-0.1.jar to /tmp/fetchFileTemp7286568015846601068.tmp
15/02/04 16:18:05 INFO executor.Executor: Adding file:/tmp/spark-787b4ea0-4289-4a53-953e-e7b7c9a282bd/spark-avro_2.10-0.1.jar to class loader
15/02/04 16:18:05 INFO rdd.HadoopRDD: Input split: hdfs://namenode:9000/data/tags/20150202094448-part-r-00000.avro:0+1197027
15/02/04 16:18:05 INFO rdd.HadoopRDD: Input split: hdfs://p2-bibigin101:9000/data/tags/20150202094448-part-r-00000.avro:1197027+1197027
15/02/04 16:18:05 ERROR executor.Executor: Exception in task 1.0 in stage 0.0 (TID 1)
java.lang.NullPointerException
at java.io.StringReader.(StringReader.java:61)
at org.apache.avro.Schema$Parser.parse(Schema.java:943)
at org.apache.avro.Schema.parse(Schema.java:992)
at org.apache.avro.mapred.AvroJob.getInputSchema(AvroJob.java:65)
at org.apache.avro.mapred.AvroRecordReader.(AvroRecordReader.java:43)
at org.apache.avro.mapred.AvroInputFormat.getRecordReader(AvroInputFormat.java:52)
at org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:233)
at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:210)
at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:99)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1176)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641)
at java.lang.Thread.run(Thread.java:853)
15/02/04 16:18:05 ERROR executor.Executor: Exception in task 0.0 in stage 0.0 (TID 0)

It looks like the exception thrown from the Avro. I understand the the AVRO version comes with Spark-Avro is 1.7.7. Is this related to avro version mismatch, or some problem from the spark-avro?

Thanks

Long Type not supported

java.lang.RuntimeException: Unsupported type LONG
    at scala.sys.package$.error(package.scala:27)
    at com.databricks.spark.avro.AvroRelation.com$databricks$spark$avro$AvroRelation$$toSqlType(AvroRelation.scala:116)
    at com.databricks.spark.avro.AvroRelation$$anonfun$5.apply(AvroRelation.scala:97)
    at com.databricks.spark.avro.AvroRelation$$anonfun$5.apply(AvroRelation.scala:96)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)

Be permissive when reading avro files with inconsistent schema

If there are multiple files in a directory and some of them have additional records, do not throw exception as long as those fields are not accessed. Ideally show a warning when loading.

On a related note this can be controlled with a flag in the options.

Relax filesystem layout constraints

Hey,

I'd like to use spark-avro to load and merge some Avro data an existing system is producing in large quantities, and convert it to Parquet for interactive, ad-hoc analytics in Spark. I've got two issues preventing it all from "just working" in the current implementation:

  • Requires that files are named "*.avro".
  • Requires "flat" directory layout - no support for nesting.

Our data is laid out as <dataset name>/<timestamp>/<random uuid>, which I've not been able to make work with spark-avro as yet. I want to merge all fields under the same dataset name to a set of large Parquet files, which I don't think is possible with the API as it stands now.

Actually, I've had a go at tweaking this myself and what I'm seeing is really puzzling me. If anyone is able to explain to me what's happening here and propose an acceptable change to the API, I'd be happy to send a PR. Here's what I've seen:

  • There's an initial search in AvroRelation for any file ending in ".avro", which is used to load the schema. This part actually does perform a recursive search from the location parameter.
  • Loosening the above requirement is straight-forward enough, but data is then loaded using hadoopFile(location, ...), which will only work with a flat directory of data files. Seems odd that the first search was recursive and this part isn't?
  • Because the location parameter is used both as an argument directly to hadoopFile, and as the root of the recursive search for the schema above, you can't really use the wildcards and such that hadoopFile supports.
  • Even when I tried hacking it to let me pass in wildcards, for some reason the RDD loaded only matched files ending ".avro". As far as I can see, that requirement was only for the file used to load the schema, but it seems to make it all the way through somehow? I even saw logging from the underlying hadoop FileInputFormat saying it had matched 700 data files, but then only the one file ending in .avro (which I added for testing) was actually loaded into the RDD.

Any suggestions? Like I was saying, if anyone can offer some direction on how best to make this work, I'll happily implement it myself and send over a pull request.

Make it possible to specify partitons num while reading

Currently the AvroRelation is using defaultMinPartitions of SparkContext while reading avro files. Could we consider adding an option to let users specify partitions number? That would make it possible to load data directly into a certain number of partitions instead of using a repartition operation when needed.

Save DF with specific Avro schema

Currently spark-avro constructs avro schema from Spark schema on save. It's limited, e.g. we can't set namespace and name of nested records.

One scenario is reading Avro DF, and then saving it again: original schema, and new schema are different.

Having an option to pass Avro schema explictly should solve this.

Support custom converters

Avro types are quite limited, whereas Spark support wider set of types including DecimalType and DateType. Some people (including me) work around that by using nested records, e.g:

{
            "name" : "some_date_field",
            "type" : [ "null", {
              "type" : "record",
              "name" : "date",
              "namespace" : "avro",
              "fields" : [ {
                "name" : "value",
                "type" : [ "string", "null" ]
              } ]
            } ],
            "default" : null
          }

It would be nice to register a converter which converts all fields with type "avro.date" to DateType() on read. And vice versa, DateType() to this kind of nested record on save.

What to you think?

Force output to single avro-file

When running with smaller dataset the writer generates hundreds of empty .avro -part files. It would be nice if there was an option forcing the writer to write data into a single result .avro or at least some reasonable number of output files.

Where is the build for Spark 1.2.x, or 0.2.0 branch?

Hi, our production environment will deploy Spark 1.2.x, and from the README, it looks like spark-avro also support Spark 1.2.x. But I cannot either find a release for Spark 1.2.x, nor the branch for 0.2.0, as claimed in the README for Spark 1.2.x? Can someone tell me if there is bin release for Spark 1.2.0, or where is the source code if I have to build it myself? Thanks

[spark-avro]$ git branch -a

  • master
    remotes/origin/HEAD -> origin/master
    remotes/origin/master
    remotes/origin/update_artifacts

Incorrect results with reduceByKey

Howdy,

We've noticed a strange behavior with Avro serialized data and reduceByKey RDD functionality. Please see below:

val data: RDD[T] = sparkContext.hadoopFile(path, classOf[AvroInputFormat[T]], classOf[AvroWrapper[T]], classOf[NullWritable]) // we're reading an avro serialized data
val bad: RDD[(String,List[T])]  = data.map(r => (r.id, List(r))).reduceByKey(_ ++ _) // incorrect data returned
val good: RDD[(String,List[T])] = data.map(r => (r.id, List(r))).partitionBy(Partitioner.defaultPartitioner(data)).reduceByKey(_ ++ _) // OK, as expected

Any ideas?

Thanks in advance.

Include avro-mapred-1.7.7-hadoop1.jar in dependencies

For us noobs who are just starting out, it would be really helpful to explicitly state in the readme that in order to run the example stated you need to download and include avro-mapred-1.7.7-hadoop1.jar in addition to spark-avro.

Apologies if this was a really noob mistake but I figured this might save others some confusion in the future.

Awesome tool and keep up the good work!

Document how to load avro files from multiple different directories at once

I've tried separating paths with commas (e.g. "s3n://mybucket/my/first/path/,s3n://mybucket/my/second/path/"), but can't find how to load multiple different directories of avro files into the same RDD.

I got no response on the mailing list.

Please provide examples of loading multiple different directories into the same SchemaRDD in the docs. Thanks.

Potential issue when reading Avro files generated from Java classes

See report of error at https://stackoverflow.com/questions/32503059/how-to-read-avro-files-generated-from-java-class-using-spark-shell-when-the-so

It seems that class cast exceptions can occur when loading Avro files that were generated from Java objects:

A cannot be cast to org.apache.avro.generic.IndexedRecord
    at org.apache.avro.generic.GenericData.setField(GenericData.java:569)
    at org.apache.avro.generic.GenericData.setField(GenericData.java:586)
    at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)
    at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233)
    at org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:66)
    at org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:32)
    at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:248)
    at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:216)
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$class.isEmpty(Iterator.scala:256)
    at scala.collection.AbstractIterator.isEmpty(Iterator.scala:1157)
    at com.databricks.spark.avro.AvroRelation$$anonfun$buildScan$1$$anonfun$4.apply(AvroRelation.scala:127)
    at com.databricks.spark.avro.AvroRelation$$anonfun$buildScan$1$$anonfun$4.apply(AvroRelation.scala:126)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
    at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
    at org.apache.spark.scheduler.Task.run(Task.scala:70)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Expose the GenericRecord to Row conversion

The AvroRelation class contains some very useful code to convert data from a GenericRecord to a Row representation. This functionality is purely about Spark SQL / Avro interoperability. However, it is currently tied to hdfs (the buildScan method in AvroRelation). It would be most convenient if this code were extracted so that it could also be used with other persistence engines.

Concretely, I'm attempting to read Avro records from Kafka.

Please provide a working sample for python. I need to do a spark sql query on an avro file containing json data.

I started the shell using --jars option
./bin/pyspark --jars /root/krd/temp/spark-avro_2.10-1.0.0.jar

Tried loading an avro file

df = sqlContext.load("/root/krd/temp/part-r-00000.avro", "com.databricks.spark.avro")
Traceback (most recent call last):
File "", line 1, in
File "/root/spark/spark-1.4.0-bin-hadoop2.4/python/pyspark/sql/context.py", line 475, in load
return self.read.load(path, source, schema, **options)
File "/root/spark/spark-1.4.0-bin-hadoop2.4/python/pyspark/sql/readwriter.py", line 94, in load
return self._df(self._jreader.load(path))
File "/root/spark/spark-1.4.0-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in call
File "/root/spark/spark-1.4.0-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o48.load.
: java.lang.RuntimeException: Failed to load class for data source: com.databricks.spark.avro
at scala.sys.package$.error(package.scala:27)

Thank you for the help.

Exception while reading multiple files from a folder

Exception in thread "main" com.fasterxml.jackson.databind.JsonMappingException: Could not find creator property with name 'id' (in class org.apache.spark.rdd.RDDOperationScope)
at [Source: {"id":"0","name":"hadoopFile"}; line: 1, column: 1]

.avro files not found in the Folder.

Hi Guys,

I used Hive 1.0+ for the CSV to Avro conversion and it saved the file without .avro extension.

Following is the Exception :

Exception in thread "main" java.lang.RuntimeException: Could not find .avro file with schema at s3://sample_bucket/

I also tried to set this in my Hadoop Conf and Spark Context's Hadoop Conf.

avro.mapred.ignore.inputs.without.extension

is also not working

Hive Support for Spark-Avro

I am trying to read the contents using Hive views or tables, the data is stored as Avro.

It'll be cool to have the feature to support Hive views or tables with Avro in Spark using this library.

2.0.0 artifact is not in any public maven repo

I'm not able to resolve com.databricks#spark-avro_2.10;2.0.0 in any well-known repository. I see 1.0.0 is in central maven repo. Could we have 2.0.0 deployed there?

Even after I've built from source 2.0.0 and publish-local, i'm unable to run spark-shell successfully as noted in README.md:

spark-shell --packages com.databricks:spark-avro_2.10:2.0.0

Support for Avro with "bytes" schema (BinaryType)

Currently "bytes" schema is not supported. However "bytes" schema is used (and recommended to be used) as output format for some MR jobs.

Example of use case:

To write to Avro data files from a streaming program, specify org.apache.avro.mapred.AvroTextOutputFormat as the output format. This output format will create Avro data files with a "bytes" schema,...
Cloudera Avro Usage

an attempt to load an avro file with bytes schema will result in the following error message:

Py4JJavaError: An error occurred while calling o37.load.
: java.lang.RuntimeException: Avro files must contain Records to be read, type BinaryType not supported
    at scala.sys.package$.error(package.scala:27)
    at com.databricks.spark.avro.AvroRelation.<init>(AvroRelation.scala:75)
    at com.databricks.spark.avro.DefaultSource.createRelation(DefaultSource.scala:55)
    at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:219)
    at org.apache.spark.sql.SQLContext.load(SQLContext.scala:697)
    at org.apache.spark.sql.SQLContext.load(SQLContext.scala:685)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
    at py4j.Gateway.invoke(Gateway.java:259)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:207)
    at java.lang.Thread.run(Thread.java:745)

Failed to load class for data source: com.databricks.spark.avro

I am trying to use the spark-avro library to process avro files. I am using SBT:

build.sbt:

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-sql" % "1.3.0",
  "com.databricks" %% "spark-avro" % "1.0.0")

tester.scala

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import com.databricks.spark.avro._

object tester {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Simple Application").setMaster("local")
    val sc = new SparkContext(conf)

    // Creates a DataFrame from a specified file
    val df = sqlContext.load("episodes.avro", "com.databricks.spark.avro")
  }
}

when I run tester in IntelliJ IDE, I get the following stack trace:

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/avro/mapred/FsInput
    at com.databricks.spark.avro.AvroRelation.newReader(AvroRelation.scala:111)
    at com.databricks.spark.avro.AvroRelation.<init>(AvroRelation.scala:53)
    at com.databricks.spark.avro.DefaultSource.createRelation(DefaultSource.scala:41)
    at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:290)

when I run:

$ sbt package
$ ~/spark-1.3.1/bin/spark-submit --class "tester" target/scala-2.10/project_2.10-0.1-SNAPSHOT.jar

I get the following stack trace:

Exception in thread "main" java.lang.RuntimeException: Failed to load class for data source: com.databricks.spark.avro
    at scala.sys.package$.error(package.scala:27)
    at org.apache.spark.sql.sources.ResolvedDataSource$.lookupDataSource(ddl.scala:194)
    at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:205)
    at org.apache.spark.sql.SQLContext.load(SQLContext.scala:697)

Any help is greatly appreciated. Thanks!!

Spark-avro Fails To Write DF in EMR 4.0

I have a simple code which works without a problem when running on my laptop (which uses pre-built spark-1.4.1 hadoop 2.4). When running the same code in EMR 4.0, the code crashes when it tries to execute the following line

time_series_df.write.format('com.databricks.spark.avro').save(args.output_dir)

Stack trace is provided below:

2015-09-23 01:59:42,559 ERROR [Thread-3] sources.DefaultWriterContainer (Logging.scala:logError(75)) - Job job_201509230159_0000 aborted.
Traceback (most recent call last):
File "/home/hadoop/./time_series_producer.py", line 336, in
main()
File "/home/hadoop/./time_series_producer.py", line 333, in main
.save(args.output_dir)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 304, in save
File "/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in call
File "/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o171.save.
: org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.insert(commands.scala:166)
at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.run(commands.scala:139)
at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)
at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)
at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:68)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:88)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:88)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:87)
at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:950)
at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:950)
at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:336)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:144)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:135)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 11.0 failed 4 times, most recent failure: Lost task 1.3 in stage 11.0 (TID 71, ip-172-31-36-39.ec2.internal): org.apache.spark.SparkException: Task failed while writing rows.
at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:191)
at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:160)
at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:160)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NoSuchMethodError: org.apache.avro.generic.GenericData.createDatumWriter(Lorg/apache/avro/Schema;)Lorg/apache/avro/io/DatumWriter;
at org.apache.avro.mapreduce.AvroKeyRecordWriter.(AvroKeyRecordWriter.java:55)
at org.apache.avro.mapreduce.AvroKeyOutputFormat$RecordWriterFactory.create(AvroKeyOutputFormat.java:79)
at org.apache.avro.mapreduce.AvroKeyOutputFormat.getRecordWriter(AvroKeyOutputFormat.java:105)
at com.databricks.spark.avro.AvroOutputWriter.(AvroOutputWriter.scala:82)
at com.databricks.spark.avro.AvroOutputWriterFactory.newInstance(AvroOutputWriterFactory.scala:31)
at org.apache.spark.sql.sources.DefaultWriterContainer.initWriters(commands.scala:470)
at org.apache.spark.sql.sources.BaseWriterContainer.executorSideSetup(commands.scala:360)
at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:172)
... 8 more

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

2015-09-23 01:59:42,591 INFO [task-result-getter-0] scheduler.TaskSetManager (Logging.scala:logInfo(59)) - Lost task 5.3 in stage 11.0 (TID 76) on executor ip-172-31-36-38.ec2.internal: org.apache.spark.SparkException (Task failed while writing rows.) [duplicate 28]

spark-avro 2.0.1 generates strange schema (spark-avro 1.0.0 is fine)

I save my dataframe to avro with spark-avro 1.0.0 and it looks like this (using avro-tools tojson):

{"field1":"value1","field2":976200}
{"field1":"value2","field2":976200}
{"field1":"value3","field2":614100}

But when I use spark-avro 2.0.1, it looks like this:

{"field1":{"string":"value1"},"field2":{"long":976200}}
{"field1":{"string":"value2"},"field2":{"long":976200}}
{"field1":{"string":"value3"},"field2":{"long":614100}}

At this point I'd be happy to use spark-avro 1.0.0, except that it doesn't seem to support specifying a compression codec (I want deflate).

Getting "java.lang.StackOverflowError" when reading avro file

I got "java.lang.stackOverflowError" when using sqlContext.load(filePath, "com.databricks.spark.avro") to read a specific avro file to a DataFrame.
However, I could not reproduce this issue on the sample episode.avro file that you provided; thus it seems like this is something related to the schema of the avro file. I realize that the schema of the avro file is much more complicated than the schema of episode.avo file.
But in the meanwhile, when I was using either the avro-tool.jar tool that provided by Apache or the Python API, it can read and parse our avro file without any issues.
Do you think there is a compatibility issue between the spark-avro and the avro file schema we use ? Is there any workaround that might fix it for now ? Below is part of the error stack. Thanks !

15/05/11 17:00:20 INFO Utils: Successfully started service 'SparkUI' on port 4040.
15/05/11 17:00:20 INFO SparkUI: Started SparkUI at http://10.0.1.11:4040
15/05/11 17:00:20 INFO Executor: Starting executor ID  on host localhost
15/05/11 17:00:20 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://[email protected]:57777/user/HeartbeatReceiver
15/05/11 17:00:20 INFO NettyBlockTransferService: Server created on 57779
15/05/11 17:00:20 INFO BlockManagerMaster: Trying to register BlockManager
15/05/11 17:00:20 INFO BlockManagerMasterActor: Registering block manager localhost:57779 with 2.9 GB RAM, BlockManagerId(, localhost, 57779)
15/05/11 17:00:20 INFO BlockManagerMaster: Registered BlockManager
Exception in thread "main" java.lang.StackOverflowError
    at scala.collection.mutable.ArrayBuffer.(ArrayBuffer.scala:47)
    at scala.collection.mutable.ArrayBuffer.(ArrayBuffer.scala:62)
    at scala.collection.mutable.Buffer$.newBuilder(Buffer.scala:44)
    at scala.collection.generic.GenericTraversableTemplate$class.newBuilder(GenericTraversableTemplate.scala:64)
    at scala.collection.AbstractTraversable.newBuilder(Traversable.scala:105)
    at scala.collection.TraversableLike$class.filter(TraversableLike.scala:262)
    at scala.collection.AbstractTraversable.filter(Traversable.scala:105)
    at scala.collection.TraversableLike$class.filterNot(TraversableLike.scala:274)
    at scala.collection.AbstractTraversable.filterNot(Traversable.scala:105)
    at com.databricks.spark.avro.SchemaConverters$.toSqlType(SchemaConverters.scala:72)
    at com.databricks.spark.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:51)
    at com.databricks.spark.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:50)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.AbstractTraversable.map(Traversable.scala:105)
    at com.databricks.spark.avro.SchemaConverters$.toSqlType(SchemaConverters.scala:50)
    at com.databricks.spark.avro.SchemaConverters$.toSqlType(SchemaConverters.scala:58)
    at com.databricks.spark.avro.SchemaConverters$.toSqlType(SchemaConverters.scala:74)
    at com.databricks.spark.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:51)
    at com.databricks.spark.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:50)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.AbstractTraversable.map(Traversable.scala:105)
    at com.databricks.spark.avro.SchemaConverters$.toSqlType(SchemaConverters.scala:50)
    at com.databricks.spark.avro.SchemaConverters$.toSqlType(SchemaConverters.scala:74)
    at com.databricks.spark.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:51)
    at com.databricks.spark.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:50)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.AbstractTraversable.map(Traversable.scala:105)
    at com.databricks.spark.avro.SchemaConverters$.toSqlType(SchemaConverters.scala:50)
    at com.databricks.spark.avro.SchemaConverters$.toSqlType(SchemaConverters.scala:58)
    at com.databricks.spark.avro.SchemaConverters$.toSqlType(SchemaConverters.scala:74)
    at com.databricks.spark.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:51)
    at com.databricks.spark.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:50)

Append Mode is not Enabled for AvroSaver

This is some serious issue as we have 68TB of Data and we need to append the files in the folder hierarchy, We can't do overwrite or delete and write it again.

java.lang.RuntimeException: Append mode is not supported by com.databricks.spark.avro.DefaultSource
        at scala.sys.package$.error(package.scala:27)
        at com.databricks.spark.avro.DefaultSource.createRelation(DefaultSource.scala:63)
        at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:309)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:144)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:135)
        at org.apache.spark.sql.DataFrame.save(DataFrame.scala:1672)
        at com.johndeere.dataclassifier.app.DataClassifierApp$$anonfun$4.apply(DataClassifierApp.scala:82)
        at com.johndeere.dataclassifier.app.DataClassifierApp$$anonfun$4.apply(DataClassifierApp.scala:39)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:870)
        at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:870)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
        at org.apache.spark.scheduler.Task.run(Task.scala:70)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
15/08/20 12:26:28 WARN scheduler.TaskSetManager: Lost task 1.0 in stage 1.0 (TID 21, localhost): java.lang.RuntimeException: Append mode is not supported by com.databricks.spark.avro.DefaultSource
        at scala.sys.package$.error(package.scala:27)
        at com.databricks.spark.avro.DefaultSource.createRelation(DefaultSource.scala:63)
        at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:309)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:144)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:135)
        at org.apache.spark.sql.DataFrame.save(DataFrame.scala:1672)
        at com.johndeere.dataclassifier.app.DataClassifierApp$$anonfun$4.apply(DataClassifierApp.scala:82)
        at com.johndeere.dataclassifier.app.DataClassifierApp$$anonfun$4.apply(DataClassifierApp.scala:39)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:870)
        at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:870)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
        at org.apache.spark.scheduler.Task.run(Task.scala:70)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.