Giter VIP home page Giter VIP logo

s3-sqs-connector's Introduction

S3-SQS Connector

Build Status

A library for reading data from Amzon S3 with optimised listing using Amazon SQS using Spark SQL Streaming ( or Structured streaming.).

Linking

Using SBT:

libraryDependencies += "com.qubole" %% "spark-sql-streaming-sqs_{{site.SCALA_BINARY_VERSION}}" % "{{site.PROJECT_VERSION}}"

Using Maven:

<dependency>
    <groupId>com.qubole</groupId>
    <artifactId>spark-sql-streaming-sqs_{{site.SCALA_BINARY_VERSION}}</artifactId>
    <version>{{site.PROJECT_VERSION}}</version>
</dependency>

This library can also be added to Spark jobs launched through spark-shell or spark-submit by using the --packages command line option. For example, to include it when starting the spark shell:

$ bin/spark-shell --packages com.qubole:spark-sql-streaming-sqs_{{site.SCALA_BINARY_VERSION}}:{{site.PROJECT_VERSION}}

Unlike using --jars, using --packages ensures that this library and its dependencies will be added to the classpath. The --packages argument can also be used with bin/spark-submit.

This library is compiled for Scala 2.11 only, and intends to support Spark 2.4.0 onwards.

Building S3-SQS Connector

S3-SQS Connector is built using Apache Maven](http://maven.apache.org/).

To build S3-SQS connector, clone this repository and run:

mvn -DskipTests clean package

This will create target/spark-sql-streaming-sqs_2.11-0.5.1.jar file which contains s3-sqs connector code and associated dependencies. Make sure the Scala and Java versions correspond to those required by your Spark cluster. We have tested it with Java 7/8, Scala 2.11 and Spark version 2.4.0.

Configuration options

The configuration is obtained from parameters.

Name Default Meaning
sqsUrl required, no default value sqs queue url, like 'https://sqs.us-east-1.amazonaws.com/330183209093/TestQueue'
region required, no default value AWS region where queue is created
fileFormat required, no default value file format for the s3 files stored on Amazon S3
schema required, no default value schema of the data being read
sqsFetchIntervalSeconds 10 time interval (in seconds) after which to fetch messages from Amazon SQS queue
sqsLongPollingWaitTimeSeconds 20 wait time (in seconds) for long polling on Amazon SQS queue
sqsMaxConnections 1 number of parallel threads to connect to Amazon SQS queue
sqsMaxRetries 10 Maximum number of consecutive retries in case of a connection failure to SQS before giving up
ignoreFileDeletion false whether to ignore any File deleted message in SQS queue
fileNameOnly false Whether to check new files based on only the filename instead of on the full path
shouldSortFiles true whether to sort files based on timestamp while listing them from SQS
useInstanceProfileCredentials false Whether to use EC2 instance profile credentials for connecting to Amazon SQS
maxFilesPerTrigger no default value maximum number of files to process in a microbatch
maxFileAge 7d Maximum age of a file that can be found in this directory

Example

An example to create a SQL stream which uses Amazon SQS to list files on S3,

    val inputDf = sparkSession
                      .readStream
                      .format("s3-sqs")
                      .schema(schema)
                      .option("sqsUrl", queueUrl)
                      .option("region", awsRegion)
                      .option("fileFormat", "json")
                      .option("sqsFetchIntervalSeconds", "2")
                      .option("useInstanceProfileCredentials", "true")
                      .option("sqsLongPollingWaitTimeSeconds", "5")
                      .load()

s3-sqs-connector's People

Contributors

abhishekd0907 avatar indit-qubole avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar

s3-sqs-connector's Issues

Is it Work with spark 3?

Hello,

Thank you for this great connector.

Is it work with spark 3?
This connector is only supported by this repository? I found in apache bahir repository too.

Not able to access the artifacts

Hi,
I am trying to test this s3-sqs connector for spark structured streaming and while running my build.sbt I am getting error like,
com.qubole.spark#spark-sql-streaming-sqs_2.11;0.5.1: not found.

When I try to build the repo in the build log it says that only 2 source files to compile and it compiles only the java classes but not the scala package and hence I am not able to start the spark stream with the SQS datasource provider
java.util.ServiceConfigurationError: org.apache.spark.sql.sources.DataSourceRegister: Provider org.apache.spark.sql.streaming.sqs.SqsSourceProvider not found
Kindly guide me on how can I test this feature with proper artifacts

Maximum Throughput Limited to 1 Message per Second

We recently faced an issue with the maximum throughput of the s3-sqs connector. Apparently, in the fastest possible case, the fetching of a message from the target SQS queue can happen once every second since:

  1. the scheduling unit of the SQS fetch job is SECOND accorindg to the code, and
  2. the lowest possible value for the fetching intervals is 1

In other words, the connector can fetch only one message per second at its fastest possible rate. Therefore, if we have more than one message pushed into the SQS queue, i.e., the message generation rate is greater than 1 message per second, we end up having the queue size infinitely increasing. Besides, the utilization of the processing resources on the Spark cluster side would be considerably reduced.

I suggest changing the scheduling unit to MILLISECOND in order to resolve this issue.

Error To Read SQS queue

Hi, how are you?

Thank you for provinding the lib, will help a lot.

I'm facing some errors when a try to create my spark structured streaming pipeline

20/07/23 17:57:13 ERROR MicroBatchExecution: Query [id = c9b4658e-af4b-41ab-939f-22149636e025, runId = 7aede0d8-a016-48fd-8ad9-41ba08539f2d] terminated with error
java.lang.NoSuchMethodError: org.apache.spark.sql.Dataset$.ofRows$default$3()Ljava/lang/String;
at org.apache.spark.sql.streaming.sqs.SqsSource.getBatch(SqsSource.scala:80)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3$$anonfun$apply$10.apply(MicroBatchExecution.scala:438)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3$$anonfun$apply$10.apply(MicroBatchExecution.scala:434)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:25)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3.apply(MicroBatchExecution.scala:434)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3.apply(MicroBatchExecution.scala:434)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:433)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
Exception in thread "stream execution thread for [id = c9b4658e-af4b-41ab-939f-22149636e025, runId = 7aede0d8-a016-48fd-8ad9-41ba08539f2d]" java.lang.NoSuchMethodError: org.apache.spark.sql.Dataset$.ofRows$default$3()Ljava/lang/String;
at org.apache.spark.sql.streaming.sqs.SqsSource.getBatch(SqsSource.scala:80)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3$$anonfun$apply$10.apply(MicroBatchExecution.scala:438)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3$$anonfun$apply$10.apply(MicroBatchExecution.scala:434)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:25)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3.apply(MicroBatchExecution.scala:434)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3.apply(MicroBatchExecution.scala:434)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:433)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
Traceback (most recent call last):
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o83.awaitTermination.
: org.apache.spark.sql.streaming.StreamingQueryException: org.apache.spark.sql.Dataset$.ofRows$default$3()Ljava/lang/String;
=== Streaming Query ===
Identifier: [id = c9b4658e-af4b-41ab-939f-22149636e025, runId = 7aede0d8-a016-48fd-8ad9-41ba08539f2d]
Current Committed Offsets: {}
Current Available Offsets: {SqsSource[https://sqs.us-west-2.amazonaws.com/713440373855/emr-sqs-test]: {"logOffset":0}}

My readstream

df_stream = spark.readStream
.format("s3-sqs")
.schema(stockout_raw_schema)
.option("sqsUrl", 'https://sqs.us-west-2.amazonaws.com/713440373855/emr-sqs-test')\
.option("region", 'us-west-2')
.option("fileFormat", "parquet")
.option("sqsFetchIntervalSeconds", "2")
.option("useInstanceProfileCredentials", "true")
.option("sqsLongPollingWaitTimeSeconds", "5")
.load()

my sink

def write_parquet_stream(spark_data_frame, checkpoint_location_folder, write_folder_path):

df_write_query = spark_data_frame \
                  .writeStream \
                  .outputMode('append') \
                  .format('parquet')\
                  .option("checkpointLocation", checkpoint_location_folder) \
                  .start(write_folder_path)

df_write_query.awaitTermination()

could you help me?

Thank you so much.

Error running on EMR with Spark version 2.4.0 and Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_265)

Hello,

This library looks very promising, but I cannot run it successfully on AWS EMR.

app.scala:
import org.apache.spark.sql.types.StructType

val inputDf = spark
.readStream
.format("s3-sqs")
.schema(new StructType().add("name", "string"))
.option("sqsUrl", "https://sqs.us-east-1.amazonaws.com/561461140170/s3-spark-streaming")
.option("region", "us-east-1")
.option("fileFormat", "csv")
.option("sqsFetchIntervalSeconds", "2")
.option("useInstanceProfileCredentials", "true")
.option("sqsLongPollingWaitTimeSeconds", "5")
.load()

val c = inputDf.select("*")

val query = c
.writeStream
.outputMode("append")
.format("console")
.start()

query.awaitTermination()

I ran:
spark-shell --packages com.qubole:spark-sql-streaming-sqs_2.11:0.5.1 -i app.scala

Once the first file is uploaded to S3, the library fails:

20/11/09 04:06:54 ERROR MicroBatchExecution: Query [id = 1c453f25-4de5-48f9-9262-63b33a14cf2d, runId = ad1b7d57-513c-4942-bf65-7386224d4c68] terminated with error
java.lang.NoSuchMethodError: org.apache.spark.sql.Dataset$.ofRows$default$3()Ljava/lang/String;
at org.apache.spark.sql.streaming.sqs.SqsSource.getBatch(SqsSource.scala:80)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3$$anonfun$apply$10.apply(MicroBatchExecution.scala:438)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3$$anonfun$apply$10.apply(MicroBatchExecution.scala:434)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:25)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3.apply(MicroBatchExecution.scala:434)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3.apply(MicroBatchExecution.scala:434)

SqsClient: Unexpected error while parsing SQS message next on empty iterator

Error while parsing SQS message

The below data has been sent to SQS queue via python boto3 library:

data = {'ticket_id': '3000613'}

# Send message to SQS queue
response = sqs.send_message(
    QueueUrl=queue_url,
    DelaySeconds=2,
    MessageAttributes={},
    MessageBody=json.dumps(data)
)

When we receive in SQS we are getting the following message:

{
   "MessageId":"95a45ab6-8a6e-4403-90e8-e3d0fb132ec3",
   "ReceiptHandle":"AQEBAw2+roaCV6U8PaNCiuV4cBHKfsb+rboAnQTekZa7CStJ5Hdbu9Nbpvvph5OqNtcupXvtuPM7OOTRXmJGrJ26+DIf/vdshZ+HIcCgqhEbaHBR4L3qQ3o+ClpwoNY0VZAB4VFQPD/mrHTUP9nAfYKGNszuU2Q1riRYgc9ClYO5KOcmdo2POWk+lrW5uDIr95lccuOmj+T0OBzy0pPxFquqOpSAbj7XyGEXRIz/ocW3MCP42WaoT4PdJAII0ylx0BYbZC5qWLqkEc1mYudgUhV1dadFM58xb6Gv71WI00V+RvaZwFbL/T19z9KqIu+Z0F7hH/Tpe15xxHpZ5yl6tSi+QAEoGMD6UshjpizspQ08Q98OEDAP0xLk0F99fC88AVcf8kJ11Icbv5raXzXnTikFxA==",
   "MD5OfBody":"ef4ab943de701a13d7e3359d3f19df98",
   "Body":"{\"ticket_id\": \"3000613\"}",
   "Attributes":{
      "SentTimestamp":"1605700176692"
   }
}

The following code was written using scala to receive the message:

 val schema = new StructType()
       .add(StructField("ticket_id", StringType))
   val fileFormat = "json"

   val inputDf = spark
     .readStream
     .format("s3-sqs")
     .schema(schema)
     .option("sqsUrl", queueUrl)
     .option("region", "eu-west-1")
     .option("awsAccessKeyId", "XXXXX")
     .option("awsSecretKey", "XXXXXXX")
     .option("fileFormat", fileFormat)
     .option("sqsFetchIntervalSeconds", "2")
     .option("useInstanceProfileCredentials", "false")
     .option("sqsLongPollingWaitTimeSeconds", "5")
     .option("maxFilesPerTrigger", "50")
     .option("ignoreFileDeletion", "true")
     .load()

   val query = inputDf.writeStream
     .queryName("sqs_records")    // this query name will be the table name
     .outputMode("append")
     .format("memory")
     .option("useInstanceProfileCredentials", "false")
     .option("region", "eu-west-1")
     .option("awsAccessKeyId", "XXXXXX")
     .option("awsSecretKey", "XXXXXXXX")
     .start()

We are not able to receive the message and getting the below error:

20/11/18 12:52:37 WARN SqsClient: Unexpected error while parsing SQS message next on empty iterator

Note: We observed that SqsClient:parseSqsMessages() method always expects to get s3 events-notifications message, else it will throw error.

S3-SQS source does not populate partition columns in the dataframne

Hi,
I are using this "s3-sqs" connector with spark structured streaming and deltalake to process incoming data in partitioned s3 buckets.
The problem I are facing is with "s3-sqs" source is that the file is directly read and returns a dataframe/dataset without the partition columns.
Hence, when we merge the source and target dataframes, we get all the partition columns as  HIVE_DEFAULT_PARTITION.

Do have any solution/workaround to add partition colums as a part of dataframe??

Thanks and regards,
Dipesh Vora

Get error for Error occured while creating Amazon SQS Client null

20/06/23 17:12:38 ERROR MicroBatchExecution: Query [id = 99480fa7-a0c7-4dff-8282-ffad4fd30629, runId = 95b4cfe6-e950-4986-843c-69739a873d52] terminated with error
org.apache.spark.SparkException: Error occured while creating Amazon SQS Client null
at org.apache.spark.sql.streaming.sqs.SqsClient.createSqsClient(SqsClient.scala:227)
at org.apache.spark.sql.streaming.sqs.SqsClient.(SqsClient.scala:54)
at org.apache.spark.sql.streaming.sqs.SqsSource.(SqsSource.scala:53)
at org.apache.spark.sql.streaming.sqs.SqsSourceProvider.createSource(SqsSourceProvider.scala:47)
at org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:255)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1$$anonfun$applyOrElse$1.apply(MicroBatchExecution.scala:88)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1$$anonfun$applyOrElse$1.apply(MicroBatchExecution.scala:85)
at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:79)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:85)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:83)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:71)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:285)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$class.transformDown(AnalysisHelper.scala:149)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:275)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:83)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:65)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:269)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)

Any ideas on how to resolve this issue?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.