Giter VIP home page Giter VIP logo

spark-redshift's People

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

spark-redshift's Issues

Generated table creation SQL fails for strings longer than 256 characters

Thanks to @jaley for the recent milestone on getting data into Redshift from a DataFrame, the latest updates are pretty awesome. One edge case I encountered, however, is during schema generation String datatypes in the Dataframe always get mapped to the TEXT type in the generated SQL schema. This is problematic as the TEXT type is actually just an alias for varchar(256) in Redshift, and will subsequently cause a failure during the data import to Redshift if any rows contain text exceeding 256 characters (the specific error in Redshift is String length exceeds DDL length)

Digging through the code, it looks like the source if this problem actually originates from the spark.sql.jdbc.JDBCWrapper class which defines the mapping (https://github.com/apache/spark/blob/8a94eb23d53e291441e3144a1b800fe054457040/sql/core/src/main/scala/org/apache/spark/sql/jdbc/jdbc.scala#L128).

Redshift only supports fixed length fields so I don't see another way to preserve data integrity without replacing TEXT in the SQL schema to VARCHAR(N) where N is the longest string length for that column in the Dataframe. Obviously this would introduce a small amount of overhead and complexity to the code.

`hadoop-aws` dependency

We're running spark-redshift on EMR 4. We're trying to avoid including org.apache.hadoop:hadoop-aws in our build, as it conflicts with many things already provided by the EMR environment.

The assertion here, ironically, seems to require that the S3Filesystem class is available on the classpath just so that we can assert that it isn't being used? We're seeing the following stacktrace on the 0.5.0 release:

java.lang.NoClassDefFoundError: org/apache/hadoop/fs/s3/S3FileSystem
    at com.databricks.spark.redshift.Utils$.assertThatFileSystemIsNotS3BlockFileSystem(Utils.scala:104)
    at com.databricks.spark.redshift.RedshiftWriter.saveToRedshift(RedshiftWriter.scala:273)
    at com.databricks.spark.redshift.DefaultSource.createRelation(DefaultSource.scala:104)
    at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:309)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:144)
...

On EMR, that class shouldn't be available, as we don't want to use it. We instead use the EmrFileSystem class, which is provided by the environment already.

Reliance on Spark's private JDBCRDD API breaks compilation against Spark 1.5.0

This library's use of Spark's private JDBCRDD API means that it no longer compiles against Spark 1.5.0:

/build/sbt -Dspark.version=1.5.0-SNAPSHOT clean test

[error] /Users/joshrosen/Documents/spark-redshift/src/main/scala/org/apache/spark/sql/jdbc/RedshiftJDBCWrapper.scala:33: not found: value JDBCRDD
[error]     JDBCRDD.getConnector(driver, url, properties)
[error]     ^
[error] /Users/joshrosen/Documents/spark-redshift/src/main/scala/org/apache/spark/sql/jdbc/RedshiftJDBCWrapper.scala:34: not found: value JdbcUtils
[error]   def tableExists(conn: Connection, table: String) = JdbcUtils.tableExists(conn, table)
[error]                                                      ^
[error] /Users/joshrosen/Documents/spark-redshift/src/main/scala/org/apache/spark/sql/jdbc/RedshiftJDBCWrapper.scala:29: not found: value DriverRegistry
[error]   def registerDriver(driverClass: String) = DriverRegistry.register(driverClass)
[error]                                             ^
[error] /Users/joshrosen/Documents/spark-redshift/src/main/scala/org/apache/spark/sql/jdbc/RedshiftJDBCWrapper.scala:31: not found: value JDBCRDD
[error]     JDBCRDD.resolveTable(jdbcUrl, table, properties)
[error]     ^
[error] /Users/joshrosen/Documents/spark-redshift/src/main/scala/org/apache/spark/sql/jdbc/RedshiftJDBCWrapper.scala:28: not found: value JDBCWriteDetails
[error]   def schemaString(dataFrame: DataFrame, url: String) = JDBCWriteDetails.schemaString(dataFrame, url)
[error]                                                         ^
[error] 5 errors found
[error] (compile:compile) Compilation failed
[error] Total time: 21 s, completed Aug 18, 2015 9:23:39 AM

The problem is that these class's package changed from org.apache.spark.sql.jdbc to org.apache.spark.sql.execution.datasources.jdbc.

There's a few ways that we can work around this. Ideally we would only use public Spark SQL APIs; if that's not possible then we can consider opening up certain internal SQL APIs to be @DeveloperApi or public. If we want to support pre-1.5.x versions then we might still need to rely on APIs that used to be private in those releases.

If we're fine with continuing to rely on private APIs then we can use reflection in order to maintain compatibility with 1.4.x and 1.5.x. If we do this, though, we're going to need to have really good tests that exercise all of the reflection code. As a result, I'm going to propose that we defer fixing this immediately and wait until we've gotten end-to-end test infra set up for the 1.4.x-compatible code.

S3 endpoint URI invalid (independent of region issue)

Hi, I'm having this issue, which is causing my program to fail. It could still be something wrong on my side but @JoshRosen asked me to start an issue.

App > java.lang.IllegalArgumentException: Invalid S3 URI: hostname does not appear to be a valid S3 endpoint: s3://kyu/temp/testcopy
App >   at com.amazonaws.services.s3.AmazonS3URI.(AmazonS3URI.java:65)
App >   at com.amazonaws.services.s3.AmazonS3URI.(AmazonS3URI.java:42)
App >   at com.databricks.spark.redshift.Utils$.checkThatBucketHasObjectLifecycleConfiguration(Utils.scala:90)
App >   at com.databricks.spark.redshift.RedshiftWriter.saveToRedshift(RedshiftWriter.scala:362)
App >   at com.databricks.spark.redshift.DefaultSource.createRelation(DefaultSource.scala:106)
App >   at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:170)
App >   at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:146)
App >   at KeywordMapping$.execute(KeywordMapping.scala:50)
App >   at KeywordMappingJob$.main(KeywordMappingJob.scala:11)
App >   at KeywordMappingJob.main(KeywordMappingJob.scala)
App >   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
App >   at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
App >   at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
App >   at java.lang.reflect.Method.invoke(Method.java:606)
App >   at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:700)
App >   at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:183)
App >   at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:208)
App >   at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:123)
App >   at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
App > 15/12/07 18:13:00 sparkDriver-akka.actor.default-dispatcher-16 INFO BlockManagerMasterEndpoint: Registering block manager ip-172-31-33-34.ec2.internal:53290 with 8.6 GB RAM, BlockManagerId(4, ip-172-31-33-34.ec2.internal, 53290)
App > 15/12/07 18:13:00 main WARN Utils$: An error occurred while trying to read the S3 bucket lifecycle configuration
App > java.lang.IllegalArgumentException: Invalid S3 URI: hostname does not appear to be a valid S3 endpoint: s3://kyu/temp

I'm using aws-java-sdk-core 1.10.22.

I see how my URI causes an error here: https://github.com/aws/aws-sdk-java/blob/master/aws-java-sdk-s3/src/main/java/com/amazonaws/services/s3/AmazonS3URI.java

I've tried different versions of the endpoint URI to match the reg ex. I am wondering how your example works - maybe I'm making a mistake which would lead to an addition in the docs.

(Also, see this, but it doesn't seem to be relevant: https://forums.databricks.com/questions/1963/why-spark-redshift-can-not-write-s3-bucket.html)
Thank you for any help.

AWS config issues (python)

I've been working on getting my aws credentials properly set up to read/write, and encountered some issues:

1.) The given example has aws access being configured by sc.hadoopConfig.set("fs.s3n.awsAccessKeyId", "YOUR_KEY_ID"),etc ; there is no hadoopConfig method for the pyspark context. It seems to be possible to pass a config file to hadoopFile, but this seems specific to whatever file you're pointing it at.

2.) s3n://ACCESSKEY:SECRETKEY@bucket/path/to/temp/dir works absolutely fine when reading from redshift, but throws
java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3n URL

when writing the dataframe back to redshift.

Both work fine if I define access/secret key environment variables before launching the shell, although the write throws 'invalid AVRO file found', probably because the column names contain _, as per #84 .

How can I import RedshiftInputFormat in python?

I can't seem to figure out the correct package name

from com.databricks.spark.redshift import RedshiftContext fails

As does RedshiftInputFormat.

Trying to import a file that has already been exported from Redshift using ESCAPE.

Thanks!

java.lang.NoClassDefFoundError: Could not initialize class com.amazonaws.ClientConfiguration

I'm trying to use this library but I'm not able to solve a problem related with aws sdk.

I'm running spark shell and adding the dependencies that are required but I'm getting the following stacktrace when I try to use it.

bin/spark-shell --packages com.databricks:spark-redshift_2.10:0.5.2,com.amazonaws:aws-java-sdk-s3:1.10.38,com.amazonaws:aws-java-sdk-core:1.10.38 --repositories http://repohost/content/groups/public --exclude-packages com.fasterxml.jackson.core:jackson-databind --jars ~/development/redshift/RedshiftJDBC41-1.1.10.1010.jar

Already inside the spark-shell:

scala> sqlContext.read.format("com.databricks.spark.redshift").option("url", "jdbc:redshift://redshifthost:5439/db?user=user&password=somepass").option("dbtable", "schema.table").option("tempdir", "s3n://s3bucket/spark-redshift-test").load()
res0: org.apache.spark.sql.DataFrame = [id: int, user_id: int, ...]
scala> res0.show
java.lang.NoClassDefFoundError: Could not initialize class com.amazonaws.ClientConfiguration
    at com.amazonaws.services.s3.AmazonS3Client.<init>(AmazonS3Client.java:389)
    at com.databricks.spark.redshift.DefaultSource$$anonfun$$init$$1.apply(DefaultSource.scala:39)
    at com.databricks.spark.redshift.DefaultSource$$anonfun$$init$$1.apply(DefaultSource.scala:39)
    at com.databricks.spark.redshift.RedshiftRelation.buildScan(RedshiftRelation.scala:83)
    at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$3.apply(DataSourceStrategy.scala:53)
    at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$3.apply(DataSourceStrategy.scala:53)
    at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$pruneFilterProject$1.apply(DataSourceStrategy.scala:287)
    at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$pruneFilterProject$1.apply(DataSourceStrategy.scala:286)
    at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProjectRaw(DataSourceStrategy.scala:318)
    at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProject(DataSourceStrategy.scala:282)
    at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:49)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54)
    at org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:374)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)

java.sql.SQLException: [Amazon](500310) Invalid operation: S3ServiceException:The specified key does not exist

We are currently using your redshift driver as a sink of a spark stream that copies batches of ~5 minutes from a Kafka log directly into Redshift.

After a random amount of time, mostly 3 to 6 days, the spark driver will fail with the following exception:

a.sql.SQLException: [Amazon](500310) Invalid operation: S3ServiceException:The specified key does not exist.,Status 404,Error NoSuchKey,Rid X,ExtRid X,CanRetry 1
Details: 
 -----------------------------------------------
  error:  S3ServiceException:The specified key does not exist.,Status 404,Error NoSuchKey,Rid X,ExtRid X,CanRetry 1
  code:      8001
  context:   S3 key being read : s3://bucket/folder/d7b2b0ad-1fdd-4777-8a9c-46e5449e7681/part-r-00004-bf98d605-595e-402b-9109-8a6cde5ea7ee.avro
  query:     321881492
  location:  table_s3_scanner.cpp:345
  process:   query6_45 [pid=16853]

The file does exist on S3 after i check it. So this seems to be a rare race condition between the execution of the copy command against redshift and the write of the data files to s3n. We are using the s3n:// filesystem for the temporary folder.

Any ideas on how to fix this? Thanks!!

Port to Spark 1.3.0

I would very much like to use the new DataFrame work so I'm attempting to build this for Spark 1.3.0
I'll be happy to submit a Pull Request once I get it building.

Currently getting the following error:

[info] Compiling 2 Scala sources to /Users/antony/home/src/spark/spark-redshift/target/scala-2.10/classes...
[error] /Users/antony/home/src/spark/spark-redshift/src/main/scala/com/databricks/examples/redshift/input/RedshiftInputFormat.scala:109: value attr is not a member of String
[error]         field.name.attr.cast(field.dataType).as(Symbol(field.name))
[error]                    ^

The types seem to have moved around some in the change from 1.2 to 1.3.0. It looks to me the like the type of field.name is String in both 1.2 and 1.3.0, so I am guessing there's some kind of implicit conversion that was visible in 1.2 that's disappeared or moved in 1.3.0. If anyone can offer a comment on where to look to understand what field.name.attr was doing or suggest how to resolve, I'll do the legwork to try and fix.

DATEFORMAT issue in RedshiftWriter.scala

There's no need for a dateformat - especially since dates don't have millisecond level granularity, that would be timeformat, but that's already the default...

http://docs.aws.amazon.com/redshift/latest/dg/r_DATEFORMAT_and_TIMEFORMAT_strings.html

private def copySql(
      sqlContext: SQLContext,
      params: MergedParameters,
      creds: AWSCredentials,
      manifestUrl: String): String = {
    val credsString: String = AWSCredentialsUtils.getRedshiftCredentialsString(creds)
    val fixedUrl = Utils.fixS3Url(manifestUrl)
    s"COPY ${params.table.get} FROM '$fixedUrl' CREDENTIALS '$credsString' FORMAT AS " +
      s"AVRO 'auto' DATEFORMAT 'YYYY-MM-DD HH:MI:SS' manifest ${params.extraCopyOptions}"
  }

"DATEFORMAT 'YYYY-MM-DD HH:MI:SS' " should be removed from Line 101

I found this issue when reading a parquet file from s3 and then trying to save the dataframe into redshift:

Error (code 1205) while loading data into Redshift: "Date value did not match format specified [YYYY-MM-DD HH:MI:SS]"

The string format for date I'm using is YYYY-MM-DD

Reserved words cannot be used as column names when writing back to Redshift

spark-redshift does not currently support the use of reserved words as column names when creating tables in Redshift. For example, table cannot be used as a column name.

This should be simple to fix: simply quote all column names when constructing the create table statement in RedshiftJDBCWrapper.schemaString.

Audit SaveMode codepaths and internals

See comments that I left in #65; we need to audit the places where the overwrite configuration is used and need to make sure that this matches up with the semantics of the SaveMode.overwrite (and possibly unify code paths).

Add configurations to allow tempdir and Redshift cluster to be in different AWS regions

By default, S3 <-> Redshift copies will not work if the S3 bucket and Redshift cluster are in different AWS regions. If you try to use a bucket in a different region, then you get a confusing error message; see https://forums.databricks.com/questions/1963/why-spark-redshift-can-not-write-s3-bucket.html for one example.

Note that it is technically possible to use a bucket in a different region if you pass an extra region parameter to the COPY command; see https://sqlhaven.wordpress.com/2014/09/07/common-errors-of-redshift-copy-command-and-how-to-solve-them-part-1/ for one example of this.


~~As a result, I think that we should document this limitation and possibly add some configuration validation to print a better error message when the S3 bucket is in the wrong region.~~~

We should add a configuration option so that users can explicitly specify the `tempdir` region to enable cross-region copies.

Handle full queries as well as table names.

We should be able to handle full queries that use things like redshift joins or aggregation. In Spark JDBC we do this by wrapping the query in a subquery. I propose the following interface. If query is specified instead of dbtable lets wrap it in () and pass it as a subquery.

StringType defaults to TEXT which is varchar(256)

Hi,

My understanding is that spark-redshift takes a DataFrame of row, with its schema, and cast it to the proper schema for Redshift. StringType becomes TEXT, which is understood by Redshift as varchar(256), hence fails if the text fields contains a string longer than 256 bytes.

It works in append mode if the table has been created before (otherwise I think it goes through a temp table, using the automatically inferred schema, and fail).

Python documentation error

I believe you have accidentally copied the Scala syntax into the Python code examples,e.g.:

df.write \
  .format("com.databricks.spark.redshift")
  .option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass") \
  .option("dbtable" -> "my_table_copy") \
  .option("tempdir" -> "s3://path/for/temp/data") \
  .mode("error")
  .save()

Presumably, all of the -> should be replaced with =, and the quotes on the assignment removed, i.e.:

df.write \
  .format("com.databricks.spark.redshift")
  .option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass") \
  .option(dbtable="my_table_copy") \
  .option(tempdir="s3://path/for/temp/data") \
  .mode("error")
  .save()

However, this doesn't quite work either; it returns the error "option() got an unexpected keyword argument 'dbtable'" instead.

Avro schema only file causes Redshift stl_load_errors: "Invalid AVRO file found. Unexpected end of AVRO file."

When a part file contains no data and just the Avro schema, Redshift throws an exception:

Caused by: com.amazon.support.exceptions.ErrorException: [Amazon](500310) 
Invalid operation: Load into table 'stg_device_attribute' failed.  Check 
'stl_load_errors' system table for details.;

Where the load error reads:

Invalid AVRO file found. Unexpected end of AVRO file.

My AVRO file just contains the schema for a number of shards where no records matched a WHERE-clause condition:

Obj...avro.codecsnappy.avro.schema..{"type":"record","name":"topLevelRecord","fields":[{"name":"roster_id","type":["long","null"]},{"name":"device_id","type":["long","null"]},{"name":"push_token","type":["string","null"]},{"name":"shard_id","type":["long","null"]},{"name":"device_type","type":["string","null"]},{"name":"device_date_created","type":["string","null"]}]} 

In my particular case I am running a query across 12 PostgreSQL databases which may not contain results. I can get around this problem by merging into a smaller number of partitions and hope there's at least one record in each part file by sacrificing Redshift parallelism on the load.

I am not sure what the solution is but it might be worth it to warn if a part file will have 0 records.

Redshift password in connection string can't be illegal-URL characters

If you have a password with a carat or question mark, fails with 'Illegal character'

java.net.URISyntaxException: Illegal character in query at index 122: redshift://me.us-west-2.redshift.amazonaws.com:5439/my_db?tcpKeepAlive=true&user=treid&password=M^gI?[12oaV^Mv#$
    at java.net.URI$Parser.fail(URI.java:2829)
    at java.net.URI$Parser.checkChars(URI.java:3002)
    at java.net.URI$Parser.parseHierarchical(URI.java:3092)
    at java.net.URI$Parser.parse(URI.java:3034)
    at java.net.URI.<init>(URI.java:595)
    at com.databricks.spark.redshift.JDBCWrapper.getConnector(RedshiftJDBCWrapper.scala:137)
...

If you have a percentage sign, you get 'Malformed escape pair at index...'

If I urlencode my password parameter I just get java.sql.SQLException: [Amazon](500150) Error setting/closing connection: password authentication failed for user "me"

Is there a way around this?

Thanks in advance, apologies if I'm missing something obvious

The converter for Timestamp data is not thread-safe

Seems we use SimpleDateFormat in a few places in the Conversions object ( see https://github.com/databricks/spark-redshift/blob/master/src/main/scala/com/databricks/spark/redshift/Conversions.scala#L41-L42), which will cause job failure with error message like

org.apache.spark.SparkException: Job aborted due to stage failure: Task 36 in stage 28.0 failed 4 times, most recent failure: Lost task 36.3 in stage 28.0 (TID 781, 10.73.213.69): java.lang.NumberFormatException: For input string: ""
at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
    at java.lang.Long.parseLong(Long.java:601)
    at java.lang.Long.parseLong(Long.java:631)
    at java.text.DigitList.getLong(DigitList.java:195)
    at java.text.DecimalFormat.parse(DecimalFormat.java:2051)
    at java.text.SimpleDateFormat.subParse(SimpleDateFormat.java:2162)
    at java.text.SimpleDateFormat.parse(SimpleDateFormat.java:1514)
    at com.databricks.spark.redshift.Conversions$$anon$1.parse(Conversions.scala:54)
    at java.text.DateFormat.parse(DateFormat.java:364)
    at com.databricks.spark.redshift.Conversions$.com$databricks$spark$redshift$Conversions$$parseTimestamp(Conversions.scala:67)
    at com.databricks.spark.redshift.Conversions$$anonfun$1.apply(Conversions.scala:122)
    at com.databricks.spark.redshift.Conversions$$anonfun$1.apply(Conversions.scala:108)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
    at com.databricks.spark.redshift.Conversions$.com$databricks$spark$redshift$Conversions$$convertRow(Conversions.scala:108)
    at com.databricks.spark.redshift.Conversions$$anonfun$createRowConverter$1.apply(Conversions.scala:135)
    at com.databricks.spark.redshift.Conversions$$anonfun$createRowConverter$1.apply(Conversions.scala:135)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:890)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:890)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1851)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1851)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:88)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1825)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1838)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1851)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1922)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:890)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:888)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
    at org.apache.spark.rdd.RDD.foreach(RDD.scala:888)
Caused by: java.lang.NumberFormatException: For input string: ""
    at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
    at java.lang.Long.parseLong(Long.java:601)
    at java.lang.Long.parseLong(Long.java:631)
    at java.text.DigitList.getLong(DigitList.java:195)
    at java.text.DecimalFormat.parse(DecimalFormat.java:2051)
    at java.text.SimpleDateFormat.subParse(SimpleDateFormat.java:2162)
    at java.text.SimpleDateFormat.parse(SimpleDateFormat.java:1514)
    at com.databricks.spark.redshift.Conversions$$anon$1.parse(Conversions.scala:54)
    at java.text.DateFormat.parse(DateFormat.java:364)
    at com.databricks.spark.redshift.Conversions$.com$databricks$spark$redshift$Conversions$$parseTimestamp(Conversions.scala:67)
    at com.databricks.spark.redshift.Conversions$$anonfun$1.apply(Conversions.scala:122)
    at com.databricks.spark.redshift.Conversions$$anonfun$1.apply(Conversions.scala:108)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
    at com.databricks.spark.redshift.Conversions$.com$databricks$spark$redshift$Conversions$$convertRow(Conversions.scala:108)
    at com.databricks.spark.redshift.Conversions$$anonfun$createRowConverter$1.apply(Conversions.scala:135)
    at com.databricks.spark.redshift.Conversions$$anonfun$createRowConverter$1.apply(Conversions.scala:135)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:890)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:890)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1851)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1851)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:88)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

An error occurred while trying to read the S3 bucket lifecycle configuration

Hi !

I am trying to load data from a redshift table but unable to do so.
here is my code
val conf = new SparkConf().setAppName("TestApp").setMaster("local[*]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val df1: DataFrame = sqlContext.read
.format("com.databricks.spark.redshift")
.option("url", "jdbc:redshift://host.us-east-1.redshift.amazonaws.com:5439/test?user=uname&password=pwd")
.option("dbtable", "users")
.option("tempdir", "s3n://accesskey:secretkey@redhsift-storage/test")
.load()
df1.show()

I get following exception

WARN Utils$: An error occurred while trying to read the S3 bucket lifecycle configuration
java.lang.NullPointerException
at com.databricks.spark.redshift.Utils$.checkThatBucketHasObjectLifecycleConfiguration(Utils.scala:76)
at com.databricks.spark.redshift.RedshiftRelation.buildScan(RedshiftRelation.scala:76)
at org.apache.spark.sql.sources.DataSourceStrategy$$anonfun$3.apply(DataSourceStrategy.scala:50)
at org.apache.spark.sql.sources.DataSourceStrategy$$anonfun$3.apply(DataSourceStrategy.scala:50)
at org.apache.spark.sql.sources.DataSourceStrategy$$anonfun$pruneFilterProject$1.apply(DataSourceStrategy.scala:266)
at org.apache.spark.sql.sources.DataSourceStrategy$$anonfun$pruneFilterProject$1.apply(DataSourceStrategy.scala:265)
at org.apache.spark.sql.sources.DataSourceStrategy$.pruneFilterProjectRaw(DataSourceStrategy.scala:296)
at org.apache.spark.sql.sources.DataSourceStrategy$.pruneFilterProject(DataSourceStrategy.scala:261)
at org.apache.spark.sql.sources.DataSourceStrategy$.apply(DataSourceStrategy.scala:46)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54)
at org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:314)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59)
at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:943)
at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:941)
at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:947)
at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:947)
at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1269)
at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1203)
at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1262)
at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:176)
at org.apache.spark.sql.DataFrame.show(DataFrame.scala:331)
at org.apache.spark.sql.DataFrame.show(DataFrame.scala:338)
at test.TestClass$.main(TestClass.scala:21)
at test.TestClass.main(TestClass.scala)
Exception in thread "main" java.net.URISyntaxException: Relative path in absolute URI: s3n://redhsift-storage%5Ctest%5Cf19a1019-5b20-41e5-a8df-b6490426eb64
at java.net.URI.checkPath(URI.java:1804)
at java.net.URI.(URI.java:679)
at java.net.URI.(URI.java:781)
at com.databricks.spark.redshift.Utils$.joinUrls(Utils.scala:47)
at com.databricks.spark.redshift.Utils$.makeTempPath(Utils.scala:61)
at com.databricks.spark.redshift.Parameters$MergedParameters.createPerQueryTempDir(Parameters.scala:78)
at com.databricks.spark.redshift.RedshiftRelation.buildScan(RedshiftRelation.scala:100)
at org.apache.spark.sql.sources.DataSourceStrategy$$anonfun$3.apply(DataSourceStrategy.scala:50)
at org.apache.spark.sql.sources.DataSourceStrategy$$anonfun$3.apply(DataSourceStrategy.scala:50)
at org.apache.spark.sql.sources.DataSourceStrategy$$anonfun$pruneFilterProject$1.apply(DataSourceStrategy.scala:266)
at org.apache.spark.sql.sources.DataSourceStrategy$$anonfun$pruneFilterProject$1.apply(DataSourceStrategy.scala:265)
at org.apache.spark.sql.sources.DataSourceStrategy$.pruneFilterProjectRaw(DataSourceStrategy.scala:296)
at org.apache.spark.sql.sources.DataSourceStrategy$.pruneFilterProject(DataSourceStrategy.scala:261)
at org.apache.spark.sql.sources.DataSourceStrategy$.apply(DataSourceStrategy.scala:46)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54)
at org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:314)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59)
at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:943)
at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:941)
at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:947)
at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:947)
at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1269)
at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1203)
at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1262)
at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:176)
at org.apache.spark.sql.DataFrame.show(DataFrame.scala:331)
at org.apache.spark.sql.DataFrame.show(DataFrame.scala:338)

Any suggestion?

thanks

Support pushdown of more filter types

The data sources API supports pushdown of more filters than this library currently supports. We should aim to support all of Spark's filter types plus conjunction and disjunction of filters.

A complete list of filters supporting pushdowns can be found at https://github.com/apache/spark/blob/v1.4.0/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala

While adding support for these additional filter types, we should also clean up the existing code by moving the buildWhereClause method into a static utility object and should add a dedicated unit test suite for the generated expressions.

Add a Redshift table schema parser and use it in redshiftFile

Now we ask users to specify column names, which involves some manual efforts. We don't assume that the Spark cluster can talk to Redshift directly. So it would be nice to ask users to do "describe table" and then copy and paste the output and pass it to redshiftFile directly:

sqlContext.redshiftFile(path: String, schema: String)

NPE on lifecycle rule check

While this is just a logger WARN message the stack trace caught me by surprise at first and I was trying to hunt down the cause of the error:

> df.first()
2015-09-18 08:10:59,269 [main                 | WARN ] com.databricks.spark.redshift.Utils$ - An error occurred while trying to read the S3 bucket lifecycle configuration
java.lang.NullPointerException
        at com.databricks.spark.redshift.Utils$$anonfun$1.apply(Utils.scala:80)
        at com.databricks.spark.redshift.Utils$$anonfun$1.apply(Utils.scala:76)

I am using com.amazonaws:aws-java-sdk-s3:1.10.8:jar in production. I have a rule on my bucket named delete-after-1-day which does exactly what the rule name suggests.

java.lang.IllegalArgumentException: Unsupported JDBC protocol: 'postgresql'

How to use postgresql jdbc driver with spark-redshift?
following code give me exception
java.lang.IllegalArgumentException: Unsupported JDBC protocol: 'postgresql'

val df1: DataFrame = sqlContext.read
            .format("com.databricks.spark.redshift")
            .option("url", "jdbc:postgresql://host:5439/db?user=test&password=test")
            .option("dbtable", "wdata")
            .option("tempdir", "s3n://accessKEy:SecretKEy@redshift/dir/")
            .load()
        df1.show()

Full stacktrace is as follow

Exception in thread "main" java.lang.IllegalArgumentException: Unsupported JDBC protocol: 'postgresql'
    at com.databricks.spark.redshift.JDBCWrapper$$anonfun$getDriverClass$2.apply(RedshiftJDBCWrapper.scala:68)
    at com.databricks.spark.redshift.JDBCWrapper$$anonfun$getDriverClass$2.apply(RedshiftJDBCWrapper.scala:52)
    at scala.Option.getOrElse(Option.scala:120)
    at com.databricks.spark.redshift.JDBCWrapper.getDriverClass(RedshiftJDBCWrapper.scala:51)
    at com.databricks.spark.redshift.JDBCWrapper.getConnector(RedshiftJDBCWrapper.scala:138)
    at com.databricks.spark.redshift.RedshiftRelation$$anonfun$schema$1.apply(RedshiftRelation.scala:59)
    at com.databricks.spark.redshift.RedshiftRelation$$anonfun$schema$1.apply(RedshiftRelation.scala:56)
    at scala.Option.getOrElse(Option.scala:120)
    at com.databricks.spark.redshift.RedshiftRelation.schema$lzycompute(RedshiftRelation.scala:56)
    at com.databricks.spark.redshift.RedshiftRelation.schema(RedshiftRelation.scala:55)
    at org.apache.spark.sql.execution.datasources.LogicalRelation.<init>(LogicalRelation.scala:31)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:120)

DATE columns unloaded as epochmillisecs can not be loaded back into Redshift

Both DATE and TIMESTAMP types are unloaded using epochmillisecs format currently. Data is loaded back into Redshift by using COPY ... TIMEFORMAT 'epochmillisecs' here, however, this fails for DATE types as the default of DATEFORMAT 'auto', does not handle epochmillisecs, nor is there currently even an option of DATEFORMAT 'epochmillisecs' that could be used (perhaps a good Redshift enhancement). Given this, it seems to make more sense to use a common serialization format that could be used for both DATE and TIMESTAMP, like YYYY-MM-DD HH:MI:SS.SSSSSS which does work for both types even when the data contains fractional seconds using DATEFORMAT 'YYYY-MM-DD HH:MI:SS'. The default format for TIMESTAMP is YYYY-MM-DD HH:MI:SS so no TIMEFORMAT is then required. See docs.

Coalesce before writing to Redshift

The Redshift write performance might be bad if we have tons of small partitions; it would be best to coalesce to a reasonable number of partitions before writing to S3.

How to define schema in Redshift

I do not want to store my data in default PUBLIC schema,instead i want to store my tables to a custom schema.Any help on achieving this with spark redshift connector will very useful.

Redshift sqlContext.read fails with S3 input path does not exist - but it does

I'm not sure if this is spark-redshift specific, but wondered if you have encountered this issue when using sqlContext.read from a Redshift table.

15/08/06 10:19:16 INFO SparkContext: Created broadcast 0 from newAPIHadoopFile at RedshiftRelation.scala:82
org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: s3://spark-temp-dir/19380690-5730-451a-b741-8245b27da674
  at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:285)
  at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:340)
  at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:95)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
  at scala.Option.getOrElse(Option.scala:120)
...

However Spark has successfully unloaded the table to S3 as show by:

Gregs-MacBook-Pro:spark-scripts grahn$ aws s3 ls s3://spark-temp-dir/19380690-5730-451a-b741-8245b27da674/
2015-08-06 10:19:15        335 0000_part_00
2015-08-06 10:19:15        338 0001_part_00

Complete scripts of Spark setup, etc. in this gist:
https://gist.github.com/gregrahn/b63fea668d27d6265434

Cancel Redshift queries when thread is interrupted

It appears that blocking JDBC calls do not respond to thread interrupts, preventing us from canceling reads or writes. It appears that the easiest solution is to issue the JDBC statements from a separate thread or Future, then use Statement.cancel() to stop the query (see http://blog.grovehillsoftware.com/2009/08/interruptible-jdbc-statements.html). The easiest way to do this is probably to define a utility method in our JDBCWrapper that encapsulates this logic.

jets3 issue

Hi !

i want to know which jets3 version is compatible with spark-redshift as i m facing following jets3 exception with 0.7.1 version

java.lang.NoSuchMethodError: org.jets3t.service.impl.rest.httpclient.RestS3Service.<init>(Lorg/jets3t/service/security/AWSCredentials;)V

 at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.initialize(Jets3tNativeFileSystemStore.java:60)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
        at org.apache.hadoop.fs.s3native.$Proxy16.initialize(Unknown Source)
        at org.apache.hadoop.fs.s3native.NativeS3FileSystem.initialize(NativeS3FileSystem.java:272)
        at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2433)
        at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:88)
        at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2467)
        at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2449)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:367)
        at org.apache.hadoop.fs.Path.getFileSystem(Path.java:287)
        at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.<init>(FileOutputCommitter.java:91)
        at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.<init>(FileOutputCommitter.java:74)
        at org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.getOutputCommitter(FileOutputFormat.java:309)
        at org.apache.spark.sql.execution.datasources.BaseWriterContainer.newOutputCommitter(WriterContainer.scala:126)
        at org.apache.spark.sql.execution.datasources.BaseWriterContainer.executorSideSetup(WriterContainer.scala:113)
        at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:231)
        at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
        at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)

Remove dependency on spark.Logging

As a best-practice, third-party code should not rely on Spark's Logging trait. We should remove this dependency and should add a Scalastyle rule to make sure that it's not re-introduced in new code.

"Invalid operation: Mandatory url is not present in manifest file" error when writing data to Redshift while using an old version of spark-avro

This issue describes a problem that occurs when using spark-redshift with an old spark-avro dependency.

Symptom:

Saving data to Redshift fails with the following error if spark-avro 1.0.0 is used:

java.sql.SQLException: [Amazon](500310) Invalid operation: Mandatory url is not present in manifest file.
-----------------------------------------------
  error:  Mandatory url is not present in manifest file.
  code:      8001
  context:   Manifest file location=s3://spark-redshift-testing/temp/fc8db7a7-97fa-4dcd-b694-cbc44f8dbbe8/manifest.json url=s3://spark-redshift-testing/temp/fc8db7a7-97fa-4dcd-b694-cbc44f8dbbe8/part-r-00000-ro.avro
  query:     403423
  location:  s3_utility.cpp:328
  process:   padbmaster [pid=27093]
  -----------------------------------------------;
    at com.amazon.redshift.client.messages.inbound.ErrorResponse.toErrorException(Unknown Source)
    at com.amazon.redshift.client.PGMessagingContext.handleErrorResponse(Unknown Source)
    at com.amazon.redshift.client.PGMessagingContext.getOperationMetadata(Unknown Source)
    at com.amazon.redshift.client.PGMessagingContext.getOperationMetadata(Unknown Source)
    at com.amazon.redshift.client.PGMessagingContext.handleMessage(Unknown Source)
    at com.amazon.jdbc.communications.InboundMessagesPipeline.getNextMessageOfClass(Unknown Source)
    at com.amazon.redshift.client.PGMessagingContext.doMoveToNextClass(Unknown Source)
    at com.amazon.redshift.client.PGMessagingContext.getReadyForQuery(Unknown Source)
    at com.amazon.redshift.client.PGMessagingContext.getOperationMetadata(Unknown Source)
    at com.amazon.redshift.client.PGMessagingContext.getOperationMetadata(Unknown Source)
    at com.amazon.redshift.client.PGMessagingContext.handleMessage(Unknown Source)
    at com.amazon.jdbc.communications.InboundMessagesPipeline.getNextMessageOfClass(Unknown Source)
    at com.amazon.redshift.client.PGMessagingContext.doMoveToNextClass(Unknown Source)
    at com.amazon.redshift.client.PGMessagingContext.doMoveToNextClass(Unknown Source)
    at com.amazon.redshift.client.PGMessagingContext.getErrorResponse(Unknown Source)
    at com.amazon.redshift.client.PGClient.executePreparedStatement(Unknown Source)
    at com.amazon.redshift.dataengine.PGIQueryExecutor.execute(Unknown Source)
    at com.amazon.jdbc.common.SPreparedStatement.executeWithParams(Unknown Source)
    at com.amazon.jdbc.common.SPreparedStatement.execute(Unknown Source)
    at com.databricks.spark.redshift.RedshiftWriter$$anonfun$com$databricks$spark$redshift$RedshiftWriter$$doRedshiftLoad$1.apply(RedshiftWriter.scala:170)
    at com.databricks.spark.redshift.RedshiftWriter$$anonfun$com$databricks$spark$redshift$RedshiftWriter$$doRedshiftLoad$1.apply(RedshiftWriter.scala:165)
    at scala.Option.foreach(Option.scala:236)
    at com.databricks.spark.redshift.RedshiftWriter.com$databricks$spark$redshift$RedshiftWriter$$doRedshiftLoad(RedshiftWriter.scala:165)
    at com.databricks.spark.redshift.RedshiftWriter.saveToRedshift(RedshiftWriter.scala:375)
    at com.databricks.spark.redshift.DefaultSource.createRelation(DefaultSource.scala:106)
    at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:309)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:144)

Cause:

In order to fix a crash that could occur when writing tables containing empty partitions, #99 modified the write path to use manifest files that instruct Redshift to load only the non-empty partitions' Avro files. The code that handled processing of filenames when generating the manifest made assumptions that hold for spark-avro 2.0.0+ but not for spark-avro 1.0.0.

Solution:

For now, the best solution is to user spark-avro 2.0.1. Usually this should happen automatically if you install libraries using Maven or another package/dependency manager.

Databricks users can attach spark-redshift by specifying the coordinate com.databricks:spark-redshift_2.10:0.5.2 in the Maven library upload screen or by using the integrated Spark Packages and Maven Central browser). If you are manually attaching JARs to your cluster, you can either include spark-avro 2.0.1+ or you can generate a combined assembly JAR containing all of spark-redshift's dependencies by using the sbt-assembly plugin: just run sbt/sbt assembly in the spark-redshift checkout. *EDIT: * the approach listed here will not work for DBC 1.4.1 because the driver's own spark-avro dependency will take precedence. We'll have to either shade or restore compatibility.

In the long term, I may be able to fix this by either restoring compatibility with spark-avro 1.0.0 or by adding an error message so we fail in a less confusing way when an incompatible version of the library is used.

java.lang.NoClassDefFoundError: Could not initialize class com.amazonaws.services.s3.internal.ServiceUtils

I am building a spark project using scala in Intellij. I have added the following dependencies:

spark-redshift_2.10-0.5.1-SNAPSHOT
RedshiftJDBC41-1.1.7.1007
httpcore-4.4.3
httpclient-4.5.1
aws-java-sdk-s3-1.10.23-SNAPSHOT
aws-java-sdk-core-1.10.23-SNAPSHOT

I am recieving the following error: java.lang.NoClassDefFoundError: Could not initialize class com.amazonaws.services.s3.internal.ServiceUtils

where running
val df = sqlContext.read.foramat("com.databricks.spark.redshift")
.option("url", "jdbc:redshift://AWS_SERVER:5439/warehouse?user=USER&password=PWD")
.option("dbtable", "fact_time")
.option("tempdir", "s3n://bucket/path")
.load()

df.show()

java.lang.NoSuchMethodError: org.apache.avro.generic.GenericData.createDatumWriter

Hi1

I am using spark-redshift to save dataframe into redshift but facing following exception.

java.lang.NoSuchMethodError: org.apache.avro.generic.GenericData.createDatumWriter(Lorg/apache/avro/Schema;)Lorg/apache/avro/io/DatumWriter;
        at org.apache.avro.mapreduce.AvroKeyRecordWriter.<init>(AvroKeyRecordWriter.java:55)
        at org.apache.avro.mapreduce.AvroKeyOutputFormat$RecordWriterFactory.create(AvroKeyOutputFormat.java:79)
        at org.apache.avro.mapreduce.AvroKeyOutputFormat.getRecordWriter(AvroKeyOutputFormat.java:105)
        at com.databricks.spark.avro.AvroOutputWriter.<init>(AvroOutputWriter.scala:82)
        at com.databricks.spark.avro.AvroOutputWriterFactory.newInstance(AvroOutputWriterFactory.scala:31)
        at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:234)
        at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
        at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:88)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

[Amazon](500310) Invalid operation: syntax error at or near ".";

When using SaveMode.Overwrite (which I honestly haven't checked to see if its supported) like the following code:

.toDF().write
      .format("com.databricks.spark.redshift")
      .option("jdbcdriver", "com.amazon.redshift.jdbc41.Driver")
      .option("url", redshiftJdbcUrl)
      .option("dbtable", "pf.some_table_here")
      .option("tempdir", tempDirBucket)
      .mode(SaveMode.Overwrite)
      .save()

I encounter:

Caused by: com.amazon.support.exceptions.ErrorException: [Amazon](500310) Invalid operation: syntax error at or near ".";
        ... 80 more

(Lost the rest of the stack trace in a closed screen session but I believe the error can be reproduced by specifying the schema name with the table name and using SaveMode.Overwrite.)

Using SaveMode.Append is a work around (I just truncate the table prior to saving).

Automatically use Redshift 4.0 or 4.1 JDBC driver, depending on which is installed

Currently, the default value of jdbcdriver is com.amazon.redshift.jdbc4.Driver, but this will not work if a user has installed the JDBC 4.1 driver. Instead of hardcoding either the 4.0 or 4.1 driver, we should handle the absence of a user-provided jdbcdriver config by first trying to load the JDBC4 driver and falling back to attempt the 4.1 driver (or vice versa).

See #56 (comment) for additional context.

Basic install and usage instructions in README.md

Is this library packaged in some way that's easily usable from other projects? Would be tremendously helpful if the README for this project included instructions on recommended way to use it from another sbt-based project or a pointer on where to find that information.

What I hoped might work was to just add:

libraryDependencies += "com.databricks.examples.redshift" %% "redshift-input-format" % "0.1"

to my own app's sbt project file. Unfortunately sbt couldn't resolve the dependency when I tried that.

I will manually copy the jar from building it locally for now, but is there a better way?

Apologies for rudimentary n00b request; I'm not an sbt expert and this is my first day playing with Spark.

Column names cannot contain spaces when saving back to Redshift

Because spark-redshift uses Avro in its write path, it inherits the limitations of Avro's schema validation: we cannot create tables with columns that contain non-letter-or-_ characters, such as spaces or quotes.

In order to work around these limitations, I think that we'd have to use a different set of column names when writing to Avro, then use a JSONPaths file to map those columns back to the original column names.

Incompatible with Hadoop 2.x

RedshiftInputFormat calls TaskAttemptContext, which was a class in Hadoop 1.x but then an interface in Hadoop 2.x:

java.lang.IncompatibleClassChangeError: Found interface org.apache.hadoop.mapreduce.TaskAttemptContext, but class was expected

at com.databricks.spark.redshift.RedshiftRecordReader.initialize(RedshiftInputFormat.scala:103)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.