databricks / spark-redshift Goto Github PK
View Code? Open in Web Editor NEWRedshift data source for Apache Spark
License: Apache License 2.0
Redshift data source for Apache Spark
License: Apache License 2.0
Thanks to @jaley for the recent milestone on getting data into Redshift from a DataFrame, the latest updates are pretty awesome. One edge case I encountered, however, is during schema generation String
datatypes in the Dataframe always get mapped to the TEXT
type in the generated SQL schema. This is problematic as the TEXT
type is actually just an alias for varchar(256)
in Redshift, and will subsequently cause a failure during the data import to Redshift if any rows contain text exceeding 256 characters (the specific error in Redshift is String length exceeds DDL length
)
Digging through the code, it looks like the source if this problem actually originates from the spark.sql.jdbc.JDBCWrapper
class which defines the mapping (https://github.com/apache/spark/blob/8a94eb23d53e291441e3144a1b800fe054457040/sql/core/src/main/scala/org/apache/spark/sql/jdbc/jdbc.scala#L128).
Redshift only supports fixed length fields so I don't see another way to preserve data integrity without replacing TEXT
in the SQL schema to VARCHAR(N)
where N
is the longest string length for that column in the Dataframe. Obviously this would introduce a small amount of overhead and complexity to the code.
We're running spark-redshift
on EMR 4. We're trying to avoid including org.apache.hadoop:hadoop-aws
in our build, as it conflicts with many things already provided by the EMR environment.
The assertion here, ironically, seems to require that the S3Filesystem
class is available on the classpath just so that we can assert that it isn't being used? We're seeing the following stacktrace on the 0.5.0
release:
java.lang.NoClassDefFoundError: org/apache/hadoop/fs/s3/S3FileSystem
at com.databricks.spark.redshift.Utils$.assertThatFileSystemIsNotS3BlockFileSystem(Utils.scala:104)
at com.databricks.spark.redshift.RedshiftWriter.saveToRedshift(RedshiftWriter.scala:273)
at com.databricks.spark.redshift.DefaultSource.createRelation(DefaultSource.scala:104)
at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:309)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:144)
...
On EMR, that class shouldn't be available, as we don't want to use it. We instead use the EmrFileSystem
class, which is provided by the environment already.
This library's use of Spark's private JDBCRDD
API means that it no longer compiles against Spark 1.5.0:
/build/sbt -Dspark.version=1.5.0-SNAPSHOT clean test
[error] /Users/joshrosen/Documents/spark-redshift/src/main/scala/org/apache/spark/sql/jdbc/RedshiftJDBCWrapper.scala:33: not found: value JDBCRDD
[error] JDBCRDD.getConnector(driver, url, properties)
[error] ^
[error] /Users/joshrosen/Documents/spark-redshift/src/main/scala/org/apache/spark/sql/jdbc/RedshiftJDBCWrapper.scala:34: not found: value JdbcUtils
[error] def tableExists(conn: Connection, table: String) = JdbcUtils.tableExists(conn, table)
[error] ^
[error] /Users/joshrosen/Documents/spark-redshift/src/main/scala/org/apache/spark/sql/jdbc/RedshiftJDBCWrapper.scala:29: not found: value DriverRegistry
[error] def registerDriver(driverClass: String) = DriverRegistry.register(driverClass)
[error] ^
[error] /Users/joshrosen/Documents/spark-redshift/src/main/scala/org/apache/spark/sql/jdbc/RedshiftJDBCWrapper.scala:31: not found: value JDBCRDD
[error] JDBCRDD.resolveTable(jdbcUrl, table, properties)
[error] ^
[error] /Users/joshrosen/Documents/spark-redshift/src/main/scala/org/apache/spark/sql/jdbc/RedshiftJDBCWrapper.scala:28: not found: value JDBCWriteDetails
[error] def schemaString(dataFrame: DataFrame, url: String) = JDBCWriteDetails.schemaString(dataFrame, url)
[error] ^
[error] 5 errors found
[error] (compile:compile) Compilation failed
[error] Total time: 21 s, completed Aug 18, 2015 9:23:39 AM
The problem is that these class's package changed from org.apache.spark.sql.jdbc
to org.apache.spark.sql.execution.datasources.jdbc
.
There's a few ways that we can work around this. Ideally we would only use public Spark SQL APIs; if that's not possible then we can consider opening up certain internal SQL APIs to be @DeveloperApi
or public. If we want to support pre-1.5.x versions then we might still need to rely on APIs that used to be private in those releases.
If we're fine with continuing to rely on private APIs then we can use reflection in order to maintain compatibility with 1.4.x and 1.5.x. If we do this, though, we're going to need to have really good tests that exercise all of the reflection code. As a result, I'm going to propose that we defer fixing this immediately and wait until we've gotten end-to-end test infra set up for the 1.4.x-compatible code.
Hi, I'm having this issue, which is causing my program to fail. It could still be something wrong on my side but @JoshRosen asked me to start an issue.
App > java.lang.IllegalArgumentException: Invalid S3 URI: hostname does not appear to be a valid S3 endpoint: s3://kyu/temp/testcopy App > at com.amazonaws.services.s3.AmazonS3URI.(AmazonS3URI.java:65) App > at com.amazonaws.services.s3.AmazonS3URI.(AmazonS3URI.java:42) App > at com.databricks.spark.redshift.Utils$.checkThatBucketHasObjectLifecycleConfiguration(Utils.scala:90) App > at com.databricks.spark.redshift.RedshiftWriter.saveToRedshift(RedshiftWriter.scala:362) App > at com.databricks.spark.redshift.DefaultSource.createRelation(DefaultSource.scala:106) App > at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:170) App > at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:146) App > at KeywordMapping$.execute(KeywordMapping.scala:50) App > at KeywordMappingJob$.main(KeywordMappingJob.scala:11) App > at KeywordMappingJob.main(KeywordMappingJob.scala) App > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) App > at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) App > at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) App > at java.lang.reflect.Method.invoke(Method.java:606) App > at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:700) App > at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:183) App > at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:208) App > at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:123) App > at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) App > 15/12/07 18:13:00 sparkDriver-akka.actor.default-dispatcher-16 INFO BlockManagerMasterEndpoint: Registering block manager ip-172-31-33-34.ec2.internal:53290 with 8.6 GB RAM, BlockManagerId(4, ip-172-31-33-34.ec2.internal, 53290) App > 15/12/07 18:13:00 main WARN Utils$: An error occurred while trying to read the S3 bucket lifecycle configuration App > java.lang.IllegalArgumentException: Invalid S3 URI: hostname does not appear to be a valid S3 endpoint: s3://kyu/temp
I'm using aws-java-sdk-core 1.10.22.
I see how my URI causes an error here: https://github.com/aws/aws-sdk-java/blob/master/aws-java-sdk-s3/src/main/java/com/amazonaws/services/s3/AmazonS3URI.java
I've tried different versions of the endpoint URI to match the reg ex. I am wondering how your example works - maybe I'm making a mistake which would lead to an addition in the docs.
(Also, see this, but it doesn't seem to be relevant: https://forums.databricks.com/questions/1963/why-spark-redshift-can-not-write-s3-bucket.html)
Thank you for any help.
I've been working on getting my aws credentials properly set up to read/write, and encountered some issues:
1.) The given example has aws access being configured by sc.hadoopConfig.set("fs.s3n.awsAccessKeyId", "YOUR_KEY_ID"),etc ; there is no hadoopConfig method for the pyspark context. It seems to be possible to pass a config file to hadoopFile, but this seems specific to whatever file you're pointing it at.
2.) s3n://ACCESSKEY:SECRETKEY@bucket/path/to/temp/dir works absolutely fine when reading from redshift, but throws
java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3n URL
when writing the dataframe back to redshift.
Both work fine if I define access/secret key environment variables before launching the shell, although the write throws 'invalid AVRO file found', probably because the column names contain _, as per #84 .
I can't seem to figure out the correct package name
from com.databricks.spark.redshift import RedshiftContext fails
As does RedshiftInputFormat.
Trying to import a file that has already been exported from Redshift using ESCAPE.
Thanks!
When reading the tutorial section "Under the hood - Save Function":
https://github.com/databricks/spark-redshift/tree/master/tutorial#under-the-hood---save-function
I expected a diagram related to the save function.
However I saw the same diagram (same image hash) as the diagram from the load function explanation.
I'm trying to use this library but I'm not able to solve a problem related with aws sdk.
I'm running spark shell and adding the dependencies that are required but I'm getting the following stacktrace when I try to use it.
bin/spark-shell --packages com.databricks:spark-redshift_2.10:0.5.2,com.amazonaws:aws-java-sdk-s3:1.10.38,com.amazonaws:aws-java-sdk-core:1.10.38 --repositories http://repohost/content/groups/public --exclude-packages com.fasterxml.jackson.core:jackson-databind --jars ~/development/redshift/RedshiftJDBC41-1.1.10.1010.jar
Already inside the spark-shell:
scala> sqlContext.read.format("com.databricks.spark.redshift").option("url", "jdbc:redshift://redshifthost:5439/db?user=user&password=somepass").option("dbtable", "schema.table").option("tempdir", "s3n://s3bucket/spark-redshift-test").load()
res0: org.apache.spark.sql.DataFrame = [id: int, user_id: int, ...]
scala> res0.show
java.lang.NoClassDefFoundError: Could not initialize class com.amazonaws.ClientConfiguration
at com.amazonaws.services.s3.AmazonS3Client.<init>(AmazonS3Client.java:389)
at com.databricks.spark.redshift.DefaultSource$$anonfun$$init$$1.apply(DefaultSource.scala:39)
at com.databricks.spark.redshift.DefaultSource$$anonfun$$init$$1.apply(DefaultSource.scala:39)
at com.databricks.spark.redshift.RedshiftRelation.buildScan(RedshiftRelation.scala:83)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$3.apply(DataSourceStrategy.scala:53)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$3.apply(DataSourceStrategy.scala:53)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$pruneFilterProject$1.apply(DataSourceStrategy.scala:287)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$pruneFilterProject$1.apply(DataSourceStrategy.scala:286)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProjectRaw(DataSourceStrategy.scala:318)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProject(DataSourceStrategy.scala:282)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:49)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54)
at org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:374)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
We should support saving decimals back to Redshift. I think that this might block on adding proper Decimal support to spark-avro
: databricks/spark-avro#80
We are currently using your redshift driver as a sink of a spark stream that copies batches of ~5 minutes from a Kafka log directly into Redshift.
After a random amount of time, mostly 3 to 6 days, the spark driver will fail with the following exception:
a.sql.SQLException: [Amazon](500310) Invalid operation: S3ServiceException:The specified key does not exist.,Status 404,Error NoSuchKey,Rid X,ExtRid X,CanRetry 1
Details:
-----------------------------------------------
error: S3ServiceException:The specified key does not exist.,Status 404,Error NoSuchKey,Rid X,ExtRid X,CanRetry 1
code: 8001
context: S3 key being read : s3://bucket/folder/d7b2b0ad-1fdd-4777-8a9c-46e5449e7681/part-r-00004-bf98d605-595e-402b-9109-8a6cde5ea7ee.avro
query: 321881492
location: table_s3_scanner.cpp:345
process: query6_45 [pid=16853]
The file does exist on S3 after i check it. So this seems to be a rare race condition between the execution of the copy command against redshift and the write of the data files to s3n. We are using the s3n:// filesystem for the temporary folder.
Any ideas on how to fix this? Thanks!!
I would very much like to use the new DataFrame work so I'm attempting to build this for Spark 1.3.0
I'll be happy to submit a Pull Request once I get it building.
Currently getting the following error:
[info] Compiling 2 Scala sources to /Users/antony/home/src/spark/spark-redshift/target/scala-2.10/classes...
[error] /Users/antony/home/src/spark/spark-redshift/src/main/scala/com/databricks/examples/redshift/input/RedshiftInputFormat.scala:109: value attr is not a member of String
[error] field.name.attr.cast(field.dataType).as(Symbol(field.name))
[error] ^
The types seem to have moved around some in the change from 1.2 to 1.3.0. It looks to me the like the type of field.name
is String in both 1.2 and 1.3.0, so I am guessing there's some kind of implicit conversion that was visible in 1.2 that's disappeared or moved in 1.3.0. If anyone can offer a comment on where to look to understand what field.name.attr
was doing or suggest how to resolve, I'll do the legwork to try and fix.
There's no need for a dateformat - especially since dates don't have millisecond level granularity, that would be timeformat, but that's already the default...
http://docs.aws.amazon.com/redshift/latest/dg/r_DATEFORMAT_and_TIMEFORMAT_strings.html
private def copySql(
sqlContext: SQLContext,
params: MergedParameters,
creds: AWSCredentials,
manifestUrl: String): String = {
val credsString: String = AWSCredentialsUtils.getRedshiftCredentialsString(creds)
val fixedUrl = Utils.fixS3Url(manifestUrl)
s"COPY ${params.table.get} FROM '$fixedUrl' CREDENTIALS '$credsString' FORMAT AS " +
s"AVRO 'auto' DATEFORMAT 'YYYY-MM-DD HH:MI:SS' manifest ${params.extraCopyOptions}"
}
"DATEFORMAT 'YYYY-MM-DD HH:MI:SS' " should be removed from Line 101
I found this issue when reading a parquet file from s3 and then trying to save the dataframe into redshift:
Error (code 1205) while loading data into Redshift: "Date value did not match format specified [YYYY-MM-DD HH:MI:SS]"
The string format for date I'm using is YYYY-MM-DD
spark-redshift
does not currently support the use of reserved words as column names when creating tables in Redshift. For example, table
cannot be used as a column name.
This should be simple to fix: simply quote all column names when constructing the create table statement in RedshiftJDBCWrapper.schemaString
.
See comments that I left in #65; we need to audit the places where the overwrite
configuration is used and need to make sure that this matches up with the semantics of the SaveMode.overwrite
(and possibly unify code paths).
By default, S3 <-> Redshift copies will not work if the S3 bucket and Redshift cluster are in different AWS regions. If you try to use a bucket in a different region, then you get a confusing error message; see https://forums.databricks.com/questions/1963/why-spark-redshift-can-not-write-s3-bucket.html for one example.
Note that it is technically possible to use a bucket in a different region if you pass an extra region
parameter to the COPY command; see https://sqlhaven.wordpress.com/2014/09/07/common-errors-of-redshift-copy-command-and-how-to-solve-them-part-1/ for one example of this.
~~As a result, I think that we should document this limitation and possibly add some configuration validation to print a better error message when the S3 bucket is in the wrong region.~~~
We should add a configuration option so that users can explicitly specify the `tempdir` region to enable cross-region copies.
We should be able to handle full queries that use things like redshift joins or aggregation. In Spark JDBC we do this by wrapping the query in a subquery. I propose the following interface. If query
is specified instead of dbtable
lets wrap it in ()
and pass it as a subquery.
According to some benchmarks published at http://www.overfitted.com/blog/?p=367, it seems that Redshift is significantly faster when loading data from CSV than when loading from Avro. We should benchmark this ourselves and should consider whether it makes sense to automatically pick the CSV format depending on which data types are being used.
It would be good to get this back up to 90%+
Hi,
My understanding is that spark-redshift takes a DataFrame of row, with its schema, and cast it to the proper schema for Redshift. StringType
becomes TEXT
, which is understood by Redshift as varchar(256)
, hence fails if the text fields contains a string longer than 256 bytes.
It works in append mode if the table has been created before (otherwise I think it goes through a temp table, using the automatically inferred schema, and fail).
There should be a way to write the RDDs to Redshfit in Spark with some example
I believe you have accidentally copied the Scala syntax into the Python code examples,e.g.:
df.write \
.format("com.databricks.spark.redshift")
.option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass") \
.option("dbtable" -> "my_table_copy") \
.option("tempdir" -> "s3://path/for/temp/data") \
.mode("error")
.save()
Presumably, all of the -> should be replaced with =, and the quotes on the assignment removed, i.e.:
df.write \
.format("com.databricks.spark.redshift")
.option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass") \
.option(dbtable="my_table_copy") \
.option(tempdir="s3://path/for/temp/data") \
.mode("error")
.save()
However, this doesn't quite work either; it returns the error "option() got an unexpected keyword argument 'dbtable'" instead.
When a part file contains no data and just the Avro schema, Redshift throws an exception:
Caused by: com.amazon.support.exceptions.ErrorException: [Amazon](500310)
Invalid operation: Load into table 'stg_device_attribute' failed. Check
'stl_load_errors' system table for details.;
Where the load error reads:
Invalid AVRO file found. Unexpected end of AVRO file.
My AVRO file just contains the schema for a number of shards where no records matched a WHERE
-clause condition:
Obj...avro.codecsnappy.avro.schema..{"type":"record","name":"topLevelRecord","fields":[{"name":"roster_id","type":["long","null"]},{"name":"device_id","type":["long","null"]},{"name":"push_token","type":["string","null"]},{"name":"shard_id","type":["long","null"]},{"name":"device_type","type":["string","null"]},{"name":"device_date_created","type":["string","null"]}]}
In my particular case I am running a query across 12 PostgreSQL databases which may not contain results. I can get around this problem by merging into a smaller number of partitions and hope there's at least one record in each part file by sacrificing Redshift parallelism on the load.
I am not sure what the solution is but it might be worth it to warn if a part file will have 0 records.
If you have a password with a carat or question mark, fails with 'Illegal character'
java.net.URISyntaxException: Illegal character in query at index 122: redshift://me.us-west-2.redshift.amazonaws.com:5439/my_db?tcpKeepAlive=true&user=treid&password=M^gI?[12oaV^Mv#$
at java.net.URI$Parser.fail(URI.java:2829)
at java.net.URI$Parser.checkChars(URI.java:3002)
at java.net.URI$Parser.parseHierarchical(URI.java:3092)
at java.net.URI$Parser.parse(URI.java:3034)
at java.net.URI.<init>(URI.java:595)
at com.databricks.spark.redshift.JDBCWrapper.getConnector(RedshiftJDBCWrapper.scala:137)
...
If you have a percentage sign, you get 'Malformed escape pair at index...'
If I urlencode my password parameter I just get java.sql.SQLException: [Amazon](500150) Error setting/closing connection: password authentication failed for user "me"
Is there a way around this?
Thanks in advance, apologies if I'm missing something obvious
Seems we use SimpleDateFormat in a few places in the Conversions
object ( see https://github.com/databricks/spark-redshift/blob/master/src/main/scala/com/databricks/spark/redshift/Conversions.scala#L41-L42), which will cause job failure with error message like
org.apache.spark.SparkException: Job aborted due to stage failure: Task 36 in stage 28.0 failed 4 times, most recent failure: Lost task 36.3 in stage 28.0 (TID 781, 10.73.213.69): java.lang.NumberFormatException: For input string: ""
at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Long.parseLong(Long.java:601)
at java.lang.Long.parseLong(Long.java:631)
at java.text.DigitList.getLong(DigitList.java:195)
at java.text.DecimalFormat.parse(DecimalFormat.java:2051)
at java.text.SimpleDateFormat.subParse(SimpleDateFormat.java:2162)
at java.text.SimpleDateFormat.parse(SimpleDateFormat.java:1514)
at com.databricks.spark.redshift.Conversions$$anon$1.parse(Conversions.scala:54)
at java.text.DateFormat.parse(DateFormat.java:364)
at com.databricks.spark.redshift.Conversions$.com$databricks$spark$redshift$Conversions$$parseTimestamp(Conversions.scala:67)
at com.databricks.spark.redshift.Conversions$$anonfun$1.apply(Conversions.scala:122)
at com.databricks.spark.redshift.Conversions$$anonfun$1.apply(Conversions.scala:108)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at com.databricks.spark.redshift.Conversions$.com$databricks$spark$redshift$Conversions$$convertRow(Conversions.scala:108)
at com.databricks.spark.redshift.Conversions$$anonfun$createRowConverter$1.apply(Conversions.scala:135)
at com.databricks.spark.redshift.Conversions$$anonfun$createRowConverter$1.apply(Conversions.scala:135)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:890)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:890)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1851)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1851)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1825)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1838)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1851)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1922)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:890)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:888)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:888)
Caused by: java.lang.NumberFormatException: For input string: ""
at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Long.parseLong(Long.java:601)
at java.lang.Long.parseLong(Long.java:631)
at java.text.DigitList.getLong(DigitList.java:195)
at java.text.DecimalFormat.parse(DecimalFormat.java:2051)
at java.text.SimpleDateFormat.subParse(SimpleDateFormat.java:2162)
at java.text.SimpleDateFormat.parse(SimpleDateFormat.java:1514)
at com.databricks.spark.redshift.Conversions$$anon$1.parse(Conversions.scala:54)
at java.text.DateFormat.parse(DateFormat.java:364)
at com.databricks.spark.redshift.Conversions$.com$databricks$spark$redshift$Conversions$$parseTimestamp(Conversions.scala:67)
at com.databricks.spark.redshift.Conversions$$anonfun$1.apply(Conversions.scala:122)
at com.databricks.spark.redshift.Conversions$$anonfun$1.apply(Conversions.scala:108)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at com.databricks.spark.redshift.Conversions$.com$databricks$spark$redshift$Conversions$$convertRow(Conversions.scala:108)
at com.databricks.spark.redshift.Conversions$$anonfun$createRowConverter$1.apply(Conversions.scala:135)
at com.databricks.spark.redshift.Conversions$$anonfun$createRowConverter$1.apply(Conversions.scala:135)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:890)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:890)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1851)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1851)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Hi !
I am trying to load data from a redshift table but unable to do so.
here is my code
val conf = new SparkConf().setAppName("TestApp").setMaster("local[*]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val df1: DataFrame = sqlContext.read
.format("com.databricks.spark.redshift")
.option("url", "jdbc:redshift://host.us-east-1.redshift.amazonaws.com:5439/test?user=uname&password=pwd")
.option("dbtable", "users")
.option("tempdir", "s3n://accesskey:secretkey@redhsift-storage/test")
.load()
df1.show()
I get following exception
WARN Utils$: An error occurred while trying to read the S3 bucket lifecycle configuration
java.lang.NullPointerException
at com.databricks.spark.redshift.Utils$.checkThatBucketHasObjectLifecycleConfiguration(Utils.scala:76)
at com.databricks.spark.redshift.RedshiftRelation.buildScan(RedshiftRelation.scala:76)
at org.apache.spark.sql.sources.DataSourceStrategy$$anonfun$3.apply(DataSourceStrategy.scala:50)
at org.apache.spark.sql.sources.DataSourceStrategy$$anonfun$3.apply(DataSourceStrategy.scala:50)
at org.apache.spark.sql.sources.DataSourceStrategy$$anonfun$pruneFilterProject$1.apply(DataSourceStrategy.scala:266)
at org.apache.spark.sql.sources.DataSourceStrategy$$anonfun$pruneFilterProject$1.apply(DataSourceStrategy.scala:265)
at org.apache.spark.sql.sources.DataSourceStrategy$.pruneFilterProjectRaw(DataSourceStrategy.scala:296)
at org.apache.spark.sql.sources.DataSourceStrategy$.pruneFilterProject(DataSourceStrategy.scala:261)
at org.apache.spark.sql.sources.DataSourceStrategy$.apply(DataSourceStrategy.scala:46)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54)
at org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:314)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59)
at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:943)
at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:941)
at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:947)
at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:947)
at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1269)
at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1203)
at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1262)
at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:176)
at org.apache.spark.sql.DataFrame.show(DataFrame.scala:331)
at org.apache.spark.sql.DataFrame.show(DataFrame.scala:338)
at test.TestClass$.main(TestClass.scala:21)
at test.TestClass.main(TestClass.scala)
Exception in thread "main" java.net.URISyntaxException: Relative path in absolute URI: s3n://redhsift-storage%5Ctest%5Cf19a1019-5b20-41e5-a8df-b6490426eb64
at java.net.URI.checkPath(URI.java:1804)
at java.net.URI.(URI.java:679)
at java.net.URI.(URI.java:781)
at com.databricks.spark.redshift.Utils$.joinUrls(Utils.scala:47)
at com.databricks.spark.redshift.Utils$.makeTempPath(Utils.scala:61)
at com.databricks.spark.redshift.Parameters$MergedParameters.createPerQueryTempDir(Parameters.scala:78)
at com.databricks.spark.redshift.RedshiftRelation.buildScan(RedshiftRelation.scala:100)
at org.apache.spark.sql.sources.DataSourceStrategy$$anonfun$3.apply(DataSourceStrategy.scala:50)
at org.apache.spark.sql.sources.DataSourceStrategy$$anonfun$3.apply(DataSourceStrategy.scala:50)
at org.apache.spark.sql.sources.DataSourceStrategy$$anonfun$pruneFilterProject$1.apply(DataSourceStrategy.scala:266)
at org.apache.spark.sql.sources.DataSourceStrategy$$anonfun$pruneFilterProject$1.apply(DataSourceStrategy.scala:265)
at org.apache.spark.sql.sources.DataSourceStrategy$.pruneFilterProjectRaw(DataSourceStrategy.scala:296)
at org.apache.spark.sql.sources.DataSourceStrategy$.pruneFilterProject(DataSourceStrategy.scala:261)
at org.apache.spark.sql.sources.DataSourceStrategy$.apply(DataSourceStrategy.scala:46)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54)
at org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:314)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59)
at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:943)
at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:941)
at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:947)
at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:947)
at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1269)
at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1203)
at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1262)
at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:176)
at org.apache.spark.sql.DataFrame.show(DataFrame.scala:331)
at org.apache.spark.sql.DataFrame.show(DataFrame.scala:338)
Any suggestion?
thanks
The data sources API supports pushdown of more filters than this library currently supports. We should aim to support all of Spark's filter types plus conjunction and disjunction of filters.
A complete list of filters supporting pushdowns can be found at https://github.com/apache/spark/blob/v1.4.0/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala
While adding support for these additional filter types, we should also clean up the existing code by moving the buildWhereClause
method into a static utility object and should add a dedicated unit test suite for the generated expressions.
Now we ask users to specify column names, which involves some manual efforts. We don't assume that the Spark cluster can talk to Redshift directly. So it would be nice to ask users to do "describe table" and then copy and paste the output and pass it to redshiftFile
directly:
sqlContext.redshiftFile(path: String, schema: String)
can we push this to maven central, just like spark-avro and spark-csv
While this is just a logger WARN message the stack trace caught me by surprise at first and I was trying to hunt down the cause of the error:
> df.first()
2015-09-18 08:10:59,269 [main | WARN ] com.databricks.spark.redshift.Utils$ - An error occurred while trying to read the S3 bucket lifecycle configuration
java.lang.NullPointerException
at com.databricks.spark.redshift.Utils$$anonfun$1.apply(Utils.scala:80)
at com.databricks.spark.redshift.Utils$$anonfun$1.apply(Utils.scala:76)
I am using com.amazonaws:aws-java-sdk-s3:1.10.8:jar
in production. I have a rule on my bucket named delete-after-1-day
which does exactly what the rule name suggests.
How to use postgresql jdbc driver with spark-redshift?
following code give me exception
java.lang.IllegalArgumentException: Unsupported JDBC protocol: 'postgresql'
val df1: DataFrame = sqlContext.read
.format("com.databricks.spark.redshift")
.option("url", "jdbc:postgresql://host:5439/db?user=test&password=test")
.option("dbtable", "wdata")
.option("tempdir", "s3n://accessKEy:SecretKEy@redshift/dir/")
.load()
df1.show()
Full stacktrace is as follow
Exception in thread "main" java.lang.IllegalArgumentException: Unsupported JDBC protocol: 'postgresql'
at com.databricks.spark.redshift.JDBCWrapper$$anonfun$getDriverClass$2.apply(RedshiftJDBCWrapper.scala:68)
at com.databricks.spark.redshift.JDBCWrapper$$anonfun$getDriverClass$2.apply(RedshiftJDBCWrapper.scala:52)
at scala.Option.getOrElse(Option.scala:120)
at com.databricks.spark.redshift.JDBCWrapper.getDriverClass(RedshiftJDBCWrapper.scala:51)
at com.databricks.spark.redshift.JDBCWrapper.getConnector(RedshiftJDBCWrapper.scala:138)
at com.databricks.spark.redshift.RedshiftRelation$$anonfun$schema$1.apply(RedshiftRelation.scala:59)
at com.databricks.spark.redshift.RedshiftRelation$$anonfun$schema$1.apply(RedshiftRelation.scala:56)
at scala.Option.getOrElse(Option.scala:120)
at com.databricks.spark.redshift.RedshiftRelation.schema$lzycompute(RedshiftRelation.scala:56)
at com.databricks.spark.redshift.RedshiftRelation.schema(RedshiftRelation.scala:55)
at org.apache.spark.sql.execution.datasources.LogicalRelation.<init>(LogicalRelation.scala:31)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:120)
Both DATE and TIMESTAMP types are unloaded using epochmillisecs format currently. Data is loaded back into Redshift by using COPY ... TIMEFORMAT 'epochmillisecs'
here, however, this fails for DATE types as the default of DATEFORMAT 'auto'
, does not handle epochmillisecs, nor is there currently even an option of DATEFORMAT 'epochmillisecs'
that could be used (perhaps a good Redshift enhancement). Given this, it seems to make more sense to use a common serialization format that could be used for both DATE and TIMESTAMP, like YYYY-MM-DD HH:MI:SS.SSSSSS
which does work for both types even when the data contains fractional seconds using DATEFORMAT 'YYYY-MM-DD HH:MI:SS'
. The default format for TIMESTAMP is YYYY-MM-DD HH:MI:SS
so no TIMEFORMAT
is then required. See docs.
Before releasing 2.0.0, we should audit the APIs deprecated in #65 and remove them if nobody has asked us to keep them.
The Redshift write performance might be bad if we have tons of small partitions; it would be best to coalesce to a reasonable number of partitions before writing to S3.
I do not want to store my data in default PUBLIC schema,instead i want to store my tables to a custom schema.Any help on achieving this with spark redshift connector will very useful.
I'm not sure if this is spark-redshift specific, but wondered if you have encountered this issue when using sqlContext.read
from a Redshift table.
15/08/06 10:19:16 INFO SparkContext: Created broadcast 0 from newAPIHadoopFile at RedshiftRelation.scala:82
org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: s3://spark-temp-dir/19380690-5730-451a-b741-8245b27da674
at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:285)
at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:340)
at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:95)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
...
However Spark has successfully unloaded the table to S3 as show by:
Gregs-MacBook-Pro:spark-scripts grahn$ aws s3 ls s3://spark-temp-dir/19380690-5730-451a-b741-8245b27da674/
2015-08-06 10:19:15 335 0000_part_00
2015-08-06 10:19:15 338 0001_part_00
Complete scripts of Spark setup, etc. in this gist:
https://gist.github.com/gregrahn/b63fea668d27d6265434
It appears that blocking JDBC calls do not respond to thread interrupts, preventing us from canceling reads or writes. It appears that the easiest solution is to issue the JDBC statements from a separate thread or Future, then use Statement.cancel()
to stop the query (see http://blog.grovehillsoftware.com/2009/08/interruptible-jdbc-statements.html). The easiest way to do this is probably to define a utility method in our JDBCWrapper that encapsulates this logic.
Hi !
i want to know which jets3 version is compatible with spark-redshift as i m facing following jets3 exception with 0.7.1 version
java.lang.NoSuchMethodError: org.jets3t.service.impl.rest.httpclient.RestS3Service.<init>(Lorg/jets3t/service/security/AWSCredentials;)V
at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.initialize(Jets3tNativeFileSystemStore.java:60)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at org.apache.hadoop.fs.s3native.$Proxy16.initialize(Unknown Source)
at org.apache.hadoop.fs.s3native.NativeS3FileSystem.initialize(NativeS3FileSystem.java:272)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2433)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:88)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2467)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2449)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:367)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:287)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.<init>(FileOutputCommitter.java:91)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.<init>(FileOutputCommitter.java:74)
at org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.getOutputCommitter(FileOutputFormat.java:309)
at org.apache.spark.sql.execution.datasources.BaseWriterContainer.newOutputCommitter(WriterContainer.scala:126)
at org.apache.spark.sql.execution.datasources.BaseWriterContainer.executorSideSetup(WriterContainer.scala:113)
at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:231)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
As a best-practice, third-party code should not rely on Spark's Logging
trait. We should remove this dependency and should add a Scalastyle rule to make sure that it's not re-introduced in new code.
This issue describes a problem that occurs when using spark-redshift
with an old spark-avro
dependency.
Symptom:
Saving data to Redshift fails with the following error if spark-avro
1.0.0 is used:
java.sql.SQLException: [Amazon](500310) Invalid operation: Mandatory url is not present in manifest file.
-----------------------------------------------
error: Mandatory url is not present in manifest file.
code: 8001
context: Manifest file location=s3://spark-redshift-testing/temp/fc8db7a7-97fa-4dcd-b694-cbc44f8dbbe8/manifest.json url=s3://spark-redshift-testing/temp/fc8db7a7-97fa-4dcd-b694-cbc44f8dbbe8/part-r-00000-ro.avro
query: 403423
location: s3_utility.cpp:328
process: padbmaster [pid=27093]
-----------------------------------------------;
at com.amazon.redshift.client.messages.inbound.ErrorResponse.toErrorException(Unknown Source)
at com.amazon.redshift.client.PGMessagingContext.handleErrorResponse(Unknown Source)
at com.amazon.redshift.client.PGMessagingContext.getOperationMetadata(Unknown Source)
at com.amazon.redshift.client.PGMessagingContext.getOperationMetadata(Unknown Source)
at com.amazon.redshift.client.PGMessagingContext.handleMessage(Unknown Source)
at com.amazon.jdbc.communications.InboundMessagesPipeline.getNextMessageOfClass(Unknown Source)
at com.amazon.redshift.client.PGMessagingContext.doMoveToNextClass(Unknown Source)
at com.amazon.redshift.client.PGMessagingContext.getReadyForQuery(Unknown Source)
at com.amazon.redshift.client.PGMessagingContext.getOperationMetadata(Unknown Source)
at com.amazon.redshift.client.PGMessagingContext.getOperationMetadata(Unknown Source)
at com.amazon.redshift.client.PGMessagingContext.handleMessage(Unknown Source)
at com.amazon.jdbc.communications.InboundMessagesPipeline.getNextMessageOfClass(Unknown Source)
at com.amazon.redshift.client.PGMessagingContext.doMoveToNextClass(Unknown Source)
at com.amazon.redshift.client.PGMessagingContext.doMoveToNextClass(Unknown Source)
at com.amazon.redshift.client.PGMessagingContext.getErrorResponse(Unknown Source)
at com.amazon.redshift.client.PGClient.executePreparedStatement(Unknown Source)
at com.amazon.redshift.dataengine.PGIQueryExecutor.execute(Unknown Source)
at com.amazon.jdbc.common.SPreparedStatement.executeWithParams(Unknown Source)
at com.amazon.jdbc.common.SPreparedStatement.execute(Unknown Source)
at com.databricks.spark.redshift.RedshiftWriter$$anonfun$com$databricks$spark$redshift$RedshiftWriter$$doRedshiftLoad$1.apply(RedshiftWriter.scala:170)
at com.databricks.spark.redshift.RedshiftWriter$$anonfun$com$databricks$spark$redshift$RedshiftWriter$$doRedshiftLoad$1.apply(RedshiftWriter.scala:165)
at scala.Option.foreach(Option.scala:236)
at com.databricks.spark.redshift.RedshiftWriter.com$databricks$spark$redshift$RedshiftWriter$$doRedshiftLoad(RedshiftWriter.scala:165)
at com.databricks.spark.redshift.RedshiftWriter.saveToRedshift(RedshiftWriter.scala:375)
at com.databricks.spark.redshift.DefaultSource.createRelation(DefaultSource.scala:106)
at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:309)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:144)
Cause:
In order to fix a crash that could occur when writing tables containing empty partitions, #99 modified the write path to use manifest files that instruct Redshift to load only the non-empty partitions' Avro files. The code that handled processing of filenames when generating the manifest made assumptions that hold for spark-avro
2.0.0+ but not for spark-avro
1.0.0.
Solution:
For now, the best solution is to user spark-avro
2.0.1. Usually this should happen automatically if you install libraries using Maven or another package/dependency manager.
Databricks users can attach *EDIT: * the approach listed here will not work for DBC 1.4.1 because the driver's own spark-redshift
by specifying the coordinate com.databricks:spark-redshift_2.10:0.5.2
in the Maven library upload screen or by using the integrated Spark Packages and Maven Central browser). If you are manually attaching JARs to your cluster, you can either include spark-avro
2.0.1+ or you can generate a combined assembly JAR containing all of spark-redshift
's dependencies by using the sbt-assembly
plugin: just run sbt/sbt assembly
in the spark-redshift
checkout.spark-avro
dependency will take precedence. We'll have to either shade or restore compatibility.
In the long term, I may be able to fix this by either restoring compatibility with spark-avro
1.0.0 or by adding an error message so we fail in a less confusing way when an incompatible version of the library is used.
Add test for distinguishing between null values and empty strings when unloading data.
It looks like empty strings become null
in unloaded data. We should enable proper quoting / escaping to prevent this from happening.
I am building a spark project using scala in Intellij. I have added the following dependencies:
spark-redshift_2.10-0.5.1-SNAPSHOT
RedshiftJDBC41-1.1.7.1007
httpcore-4.4.3
httpclient-4.5.1
aws-java-sdk-s3-1.10.23-SNAPSHOT
aws-java-sdk-core-1.10.23-SNAPSHOT
I am recieving the following error: java.lang.NoClassDefFoundError: Could not initialize class com.amazonaws.services.s3.internal.ServiceUtils
where running
val df = sqlContext.read.foramat("com.databricks.spark.redshift")
.option("url", "jdbc:redshift://AWS_SERVER:5439/warehouse?user=USER&password=PWD")
.option("dbtable", "fact_time")
.option("tempdir", "s3n://bucket/path")
.load()
df.show()
Hi1
I am using spark-redshift to save dataframe into redshift but facing following exception.
java.lang.NoSuchMethodError: org.apache.avro.generic.GenericData.createDatumWriter(Lorg/apache/avro/Schema;)Lorg/apache/avro/io/DatumWriter;
at org.apache.avro.mapreduce.AvroKeyRecordWriter.<init>(AvroKeyRecordWriter.java:55)
at org.apache.avro.mapreduce.AvroKeyOutputFormat$RecordWriterFactory.create(AvroKeyOutputFormat.java:79)
at org.apache.avro.mapreduce.AvroKeyOutputFormat.getRecordWriter(AvroKeyOutputFormat.java:105)
at com.databricks.spark.avro.AvroOutputWriter.<init>(AvroOutputWriter.scala:82)
at com.databricks.spark.avro.AvroOutputWriterFactory.newInstance(AvroOutputWriterFactory.scala:31)
at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:234)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
@emlyn I saw a branch from your fork that supports compression in UNLOAD (https://github.com/emlyn/spark-redshift/tree/compression). If it is ready, are you interested in contributing it back? Thanks:)
Supporting text compression in UNLOAD might help overall throughput.
When using SaveMode.Overwrite
(which I honestly haven't checked to see if its supported) like the following code:
.toDF().write
.format("com.databricks.spark.redshift")
.option("jdbcdriver", "com.amazon.redshift.jdbc41.Driver")
.option("url", redshiftJdbcUrl)
.option("dbtable", "pf.some_table_here")
.option("tempdir", tempDirBucket)
.mode(SaveMode.Overwrite)
.save()
I encounter:
Caused by: com.amazon.support.exceptions.ErrorException: [Amazon](500310) Invalid operation: syntax error at or near ".";
... 80 more
(Lost the rest of the stack trace in a closed screen session but I believe the error can be reproduced by specifying the schema name with the table name and using SaveMode.Overwrite
.)
Using SaveMode.Append
is a work around (I just truncate the table prior to saving).
Currently, the default value of jdbcdriver
is com.amazon.redshift.jdbc4.Driver
, but this will not work if a user has installed the JDBC 4.1 driver. Instead of hardcoding either the 4.0 or 4.1 driver, we should handle the absence of a user-provided jdbcdriver
config by first trying to load the JDBC4 driver and falling back to attempt the 4.1 driver (or vice versa).
See #56 (comment) for additional context.
Is this library packaged in some way that's easily usable from other projects? Would be tremendously helpful if the README for this project included instructions on recommended way to use it from another sbt-based project or a pointer on where to find that information.
What I hoped might work was to just add:
libraryDependencies += "com.databricks.examples.redshift" %% "redshift-input-format" % "0.1"
to my own app's sbt project file. Unfortunately sbt couldn't resolve the dependency when I tried that.
I will manually copy the jar from building it locally for now, but is there a better way?
Apologies for rudimentary n00b request; I'm not an sbt expert and this is my first day playing with Spark.
Because spark-redshift
uses Avro in its write path, it inherits the limitations of Avro's schema validation: we cannot create tables with columns that contain non-letter-or-_
characters, such as spaces or quotes.
In order to work around these limitations, I think that we'd have to use a different set of column names when writing to Avro, then use a JSONPaths file to map those columns back to the original column names.
Add example of writing data back to Redshift using SQL API
It would be nice if users had an easy way to figure out whether filter pushdown or column pruning took place. We should override toString() as appropriate so that the explain() output shows this for Redshift sources.
RedshiftInputFormat calls TaskAttemptContext
, which was a class in Hadoop 1.x but then an interface in Hadoop 2.x:
java.lang.IncompatibleClassChangeError: Found interface org.apache.hadoop.mapreduce.TaskAttemptContext, but class was expected
at com.databricks.spark.redshift.RedshiftRecordReader.initialize(RedshiftInputFormat.scala:103)
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.