Giter VIP home page Giter VIP logo

kafka-connect-storage-cloud's Introduction

Kafka Connect Connector for S3

FOSSA Status

kafka-connect-storage-cloud is the repository for Confluent's Kafka Connectors designed to be used to copy data from Kafka into Amazon S3.

Kafka Connect Sink Connector for Amazon Simple Storage Service (S3)

Documentation for this connector can be found here.

Blogpost for this connector can be found here.

Development

To build a development version you'll need a recent version of Kafka as well as a set of upstream Confluent projects, which you'll have to build from their appropriate snapshot branch. See the kafka-connect-storage-common FAQ for guidance on this process.

You can build kafka-connect-storage-cloud with Maven using the standard lifecycle phases.

Running Integration Tests

Integration tests are run as part of mvn install; however one needs to first configure the environment variableAWS_CREDENTIALS_PATH to point to a json file path with following structure:

{
    "aws_access_key_id": "<key>",
    "aws_secret_access_key": "<secret>"
}

Contribute

License

This project is licensed under the Confluent Community License.

FOSSA Status

kafka-connect-storage-cloud's People

Contributors

andrewegel avatar aniketshrimal avatar arihant-confluent avatar ashoke-cube avatar c0urante avatar cjolivier01 avatar confluentjenkins avatar confluentsemaphore avatar cyrusv avatar enigma25 avatar ewencp avatar janjwerner-confluent avatar jkao97 avatar joel-hamill avatar kkonstantine avatar kpatelatwork avatar maxzheng avatar norwood avatar patrick-premont avatar pbadani avatar poojakuntalcflt avatar rhauch avatar rohits64 avatar sidd1809 avatar sp-gupta avatar subhashiyer9 avatar sudeshwasnik avatar tony810430 avatar wicknicks avatar xiangxin72 avatar

Stargazers

 avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kafka-connect-storage-cloud's Issues

confluent-3.2.2 kafka-connect-s3

Since upgrading to 3.2.2 in order to use the TimeBasedPartitioner, I'm no longer able to start connect-standalone passing the source and sink properties files.

I'm using this Kafka Connect for Twitter. I've done maven clean build and updated the dependencies to confluent 3.2.2 and kafaka to 10.2.1.
https://github.com/Eneco/kafka-connect-twitter

Here are the properties in pom file:

<java.version>1.8</java.version>
<scala.version>2.11.7</scala.version>
<confluent.version>3.2.2</confluent.version>
<kafka.version>0.10.2.1</kafka.version>
<hosebird.version>2.2.0</hosebird.version>
<avro.version>1.7.7</avro.version>
<jsr305.version>1.3.9</jsr305.version>
<mockito.version>1.10.19</mockito.version>
<scalatest.version>3.0.0-M1</scalatest.version>
<guava.version>19.0</guava.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<confluent.maven.repo>http://packages.confluent.io/maven/</confluent.maven.repo>
<cloudera.maven.repo>https://repository.cloudera.com/artifactory/cloudera-repos</cloudera.maven.repo>

There are dependencies collisions with 3.2.2 that I can't identify. Can you please let me know what the valid dependencies should be for 3.2.2

HTTP Request Issues Connecting with Confluent S3 Connector

I am trying to connect to s3, and write a topic to a s3 bucket that I have access to. I trying to do this on a local machine, and through a corp proxy. Here is the errors I am receiving:

ERROR Task s3-custom-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:141) org.apache.kafka.connect.errors.ConnectException: com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to s3.amazonaws.com:443 [s3.amazonaws.com/52.216.192.27] failed: connect timed out at io.confluent.connect.s3.S3SinkTask.start(S3SinkTask.java:108) at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:231) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:145) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) Caused by: com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to s3.amazonaws.com:443 [s3.amazonaws.com/52.216.192.27] failed: connect timed out at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1043) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:747) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:721) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:704) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:672) at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:654) at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:518) at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4185) at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4132) at com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1302) at com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:1259) at io.confluent.connect.s3.storage.S3Storage.bucketExists(S3Storage.java:110) at io.confluent.connect.s3.S3SinkTask.start(S3SinkTask.java:95) ... 9 more Caused by: org.apache.http.conn.ConnectTimeoutException: Connect to s3.amazonaws.com:443 [s3.amazonaws.com/52.216.192.27] failed: connect timed out at org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:143) at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:353) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at com.amazonaws.http.conn.ClientConnectionManagerFactory$Handler.invoke(ClientConnectionManagerFactory.java:76) at com.amazonaws.http.conn.$Proxy42.connect(Unknown Source) at org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:380) at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:236) at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:184) at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:184) at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82) at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:55) at com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1186) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1035) ... 21 more Caused by: java.net.SocketTimeoutException: connect timed out at java.net.PlainSocketImpl.socketConnect(Native Method) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.java:589) at org.apache.http.conn.ssl.SSLConnectionSocketFactory.connectSocket(SSLConnectionSocketFactory.java:337) at com.amazonaws.http.conn.ssl.SdkTLSSocketFactory.connectSocket(SdkTLSSocketFactory.java:132) at org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:134) ... 37 more

Not sure if I need to be setting any config vars with the proxy information or not. But I have tried everything I can think of trying, and still can't seem to get it to work.

Could not find artifact io.confluent:licenses:jar:3.2.0 in central

I just cloned the kafka-connect-storage-cloud project. After checking out the v3.2.0 tag, I attempted to do a mvn install. This produced the following error:

[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary:
[INFO]
[INFO] kafka-connect-storage-cloud ........................ SUCCESS [  0.179 s]
[INFO] kafka-connect-s3 ................................... FAILURE [01:21 min]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 01:22 min
[INFO] Finished at: 2017-03-23T18:45:34+00:00
[INFO] Final Memory: 55M/563M
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.2.1:java (create-licenses) on project kafka-connect-s3: Execution create-licenses of goal org.codehaus.mojo:exec-maven-plugin:1.2.1:java failed: Plugin org.codehaus.mojo:exec-maven-plugin:1.2.1 or one of its dependencies could not be resolved: Could not find artifact io.confluent:licenses:jar:3.2.0 in central (https://repo.maven.apache.org/maven2) -> [Help 1]

To get past it, I just commented out the licenses-package profile in the kafka-connect-s3 pom.xml.

At any rate, I figured I'd report the issue since it looks like an issue with confluent's jars.

Allow custom partitioners to have their own configs

I've written a custom partitioner that requires a few configs (basically db name & table name). The S3SinkTask does not pass down all the configs it receives upon instantiation to the custom partitioner however. In S3SinkConnectorConfig, only a specific set of configs are exposed, namely, those present in StorageCommonConfig, HiveConfig and PartitionerConfig.

What I need basically is for all of the initial set of configs to get passed within the map that the custom partitioner is configured with. Ideally, instead of the S3SinkTask using the config.plainValues() method to pass the configs to the partitioner, it would use the config.originals() method.

I would be willing to submit this patch, unless there is some reasoning for not making this change. If so, some kind of other work around would be awesome and I'm open to suggestions.

ERROR org.apache.kafka.connect.runtime.WorkerTask - Task s3-sink

Haas anyone seen this error before:

[CLASSPATH traversal thread.] INFO org.reflections.Reflections - Reflections took 11317 ms to scan 562 urls, producing 13756 keys and 91233 values
[pool-1-thread-2] ERROR org.apache.kafka.connect.runtime.WorkerTask - Task s3-sink-0 threw an uncaught and unrecoverable exception
org.apache.kafka.connect.errors.ConnectException: java.lang.reflect.InvocationTargetException
at io.confluent.connect.storage.StorageFactory.createStorage(StorageFactory.java:47)
at io.confluent.connect.s3.S3SinkTask.start(S3SinkTask.java:94)
at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:231)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:145)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at io.confluent.connect.storage.StorageFactory.createStorage(StorageFactory.java:45)
... 10 more
Caused by: java.lang.NoSuchFieldError: INSTANCE
at org.apache.http.conn.ssl.SSLConnectionSocketFactory.(SSLConnectionSocketFactory.java:144)
at com.amazonaws.http.apache.client.impl.ApacheConnectionManagerFactory.getPreferredSocketFactory(ApacheConnectionManagerFactory.java:87)
at com.amazonaws.http.apache.client.impl.ApacheConnectionManagerFactory.create(ApacheConnectionManagerFactory.java:65)
at com.amazonaws.http.apache.client.impl.ApacheConnectionManagerFactory.create(ApacheConnectionManagerFactory.java:58)
at com.amazonaws.http.apache.client.impl.ApacheHttpClientFactory.create(ApacheHttpClientFactory.java:51)
at com.amazonaws.http.apache.client.impl.ApacheHttpClientFactory.create(ApacheHttpClientFactory.java:39)
at com.amazonaws.http.AmazonHttpClient.(AmazonHttpClient.java:319)
at com.amazonaws.http.AmazonHttpClient.(AmazonHttpClient.java:303)
at com.amazonaws.AmazonWebServiceClient.(AmazonWebServiceClient.java:173)
at com.amazonaws.services.s3.AmazonS3Client.(AmazonS3Client.java:645)
at com.amazonaws.services.s3.AmazonS3Builder$1.apply(AmazonS3Builder.java:31)
at com.amazonaws.services.s3.AmazonS3Builder$1.apply(AmazonS3Builder.java:28)
at com.amazonaws.services.s3.AmazonS3ClientBuilder.build(AmazonS3ClientBuilder.java:60)
at com.amazonaws.services.s3.AmazonS3ClientBuilder.build(AmazonS3ClientBuilder.java:26)
at com.amazonaws.client.builder.AwsSyncClientBuilder.build(AwsSyncClientBuilder.java:38)
at io.confluent.connect.s3.storage.S3Storage.newS3Client(S3Storage.java:81)
at io.confluent.connect.s3.storage.S3Storage.(S3Storage.java:59)
... 15 more
[pool-1-thread-2] ERROR org.apache.kafka.connect.runtime.WorkerTask - Task is being killed and will not recover until manually restarted

Thread block when using a high flush.size

I'm setting up a Kafka connect S3 sink with the following configuration

{
    "name": "kafka-s3-sink",
    "config": {
    	"tasks.max": 5,
    	"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
        "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    	"topics": "example-topic",
    	"flush.size": 100000,
    	"storage.class": "io.confluent.connect.s3.storage.S3Storage",
    	"schema.compatibility": "NONE",
    	"s3.bucket.name": "foo",
        "s3.region": "us-east-1",
    	"s3.part.size": 5242880,
    	"rotate.interval.ms": 86400000,
        "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
        "partition.duration.ms": 3600000,
	"locale": "en",
	"timezone": "UTC",
	"path.format": "YYYY-MM-dd-HH-mm-ss",
	"schema.generator.class": "io.confluent.connect.storage.hive.schema.TimeBasedSchemaGenerator"
    }

When setting flush.size to about 10000 everything works. But any number greater than that, seems to hang the connector altogether. I'm saying this because I get the following error:

[2017-10-17 11:05:46,812] WARN failed to publish monitoring message (io.confluent.monitoring.clients.interceptor.MonitoringInterceptor)
org.apache.kafka.common.errors.InterruptException: java.lang.InterruptedException
	at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:499)
	at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:440)
	at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:360)
	at io.confluent.monitoring.clients.interceptor.MonitoringInterceptor.publishMonitoringMessage(MonitoringInterceptor.java:255)
	at io.confluent.monitoring.clients.interceptor.MonitoringInterceptor.publishMetrics(MonitoringInterceptor.java:240)
	at io.confluent.monitoring.clients.interceptor.MonitoringInterceptor.publishMetrics(MonitoringInterceptor.java:206)
	at io.confluent.monitoring.clients.interceptor.MonitoringInterceptor.run(MonitoringInterceptor.java:160)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.InterruptedException
	at java.lang.Object.wait(Native Method)
	at org.apache.kafka.clients.Metadata.awaitUpdate(Metadata.java:156)
	at org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(KafkaProducer.java:548)
	at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:450)

Is this a known issue? Any workarounds?

Kafka Connect - Dead workers going in cyclic mode

Scenario:

  1. Statuses topic that connect group uses had some statuses which included the old worker id's which do not exist anymore.
  2. Deleted all the metadata topics. Recreated them and started the connector again.
  3. All the data on the status topic gets repopulated. Don't know where it comes from.

That might be the reason i see these cyclic errors where it continuously tries connecting to leader worker ip which does not even exist and is unable to start the rest server.

Changing the group id fixes the problem but i was wondering if there is any other way to approach this issue.

[2017-07-25 21:57:05,266] INFO Herder started (org.apache.kafka.connect.runtime.distributed.DistributedHerder:195)
[mysql-kafka-app-687132680-b56mk] [2017-07-25 21:57:05,366] INFO Discovered coordinator iekfk003l.load.appia.com:9092 (id: 2147477466 rack: null) for group carrier-mysql-euwest-qacarrier-connect-cluster. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:589)
[mysql-kafka-app-687132680-b56mk] [2017-07-25 21:57:05,467] INFO (Re-)joining group carrier-mysql-euwest-qacarrier-connect-cluster (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:423)
[mysql-kafka-app-687132680-b56mk] [2017-07-25 21:57:14,328] INFO Successfully joined group carrier-mysql-euwest-qacarrier-connect-cluster with generation 390
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator:391)
[mysql-kafka-app-687132680-b56mk] [2017-07-25 21:57:14,368] INFO Joined group and got assignment: Assignment{error=0, leader='connect-1-5ef00613-d69a-4d02-91c4-51545d52a0f6', leaderUrl='http://10.41.162.34:8083/', offset=260, connectorIds=[mysql-kafka-processes], taskIds=[mysql-kafka-mccmnc-0]} (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1151)
[mysql-kafka-app-687132680-b56mk] [2017-07-25 21:57:14,466] WARN Catching up to assignment's config offset. (org.apache.kafka.connect.runtime.distributed.DistributedHerder:740)
[mysql-kafka-app-687132680-b56mk] [2017-07-25 21:57:14,867] INFO Current config state offset -1 is behind group assignment 260, reading to end of config log (org.apache.kafka.connect.runtime.distributed.DistributedHerder:784)
[mysql-kafka-app-687132680-b56mk] [2017-07-25 21:57:15,368] INFO Finished reading to end of log and updated config snapshot, new config log offset: -1 (org.apache.kafka.connect.runtime.distributed.DistributedHerder:788)
[mysql-kafka-app-687132680-b56mk] [2017-07-25 21:57:15,369] INFO Current config state offset -1 does not match group assignment 260. Forcing rebalance. (org.apache.kafka.connect.runtime.distributed.DistributedHerder:764)
[mysql-kafka-app-687132680-b56mk] [2017-07-25 21:57:15,369] INFO Rebalance started
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:1172)
[mysql-kafka-app-687132680-b56mk] [2017-07-25 21:57:15,369] INFO Wasn't unable to resume work after last rebalance, can skip stopping connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1204)
[mysql-kafka-app-687132680-b56mk] [2017-07-25 21:57:15,466] INFO (Re-)joining group carrier-mysql-euwest-qacarrier-connect-cluster (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:423)
[mysql-kafka-app-687132680-b56mk] Elapsed Time: 00:01:05
[mysql-kafka-app-687132680-b56mk] [2017-07-25 21:57:15,766] INFO Successfully joined group carrier-mysql-euwest-qacarrier-connect-cluster with generation 390 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:391)
[mysql-kafka-app-687132680-b56mk] [2017-07-25 21:57:15,766] INFO Joined group and got assignment: Assignment{error=0, leader='connect-1-5ef00613-d69a-4d02-91c4-51545d52a0f6', leaderUrl='http://10.41.162.34:8083/', offset=260, connectorIds=[mysql-kafka-processes], taskIds=[mysql-kafka-mccmnc-0]} (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1151)
[mysql-kafka-app-687132680-b56mk] [2017-07-25 21:57:15,767] WARN Catching up to assignment's config offset. (org.apache.kafka.connect.runtime.distributed.DistributedHerder:740)
[mysql-kafka-app-687132680-b56mk] [2017-07-25 21:57:15,767] INFO Current config state offset -1 is behind group assignment 260, reading to end of config log (org.apache.kafka.connect.runtime.distributed.DistributedHerder:784)
[mysql-kafka-app-687132680-b56mk] [2017-07-25 21:57:16,066] INFO Finished reading to end of log and updated config snapshot, new config log offset: -1 (org.apache.kafka.connect.runtime.distributed.DistributedHerder:788)
[mysql-kafka-app-687132680-b56mk] [2017-07-25 21:57:16,066] INFO Current config state offset -1 does not match group assignment 260. Forcing rebalance.
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:764)
[mysql-kafka-app-687132680-b56mk] [2017-07-25 21:57:16,067] INFO Rebalance started (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1172)

[Feature] flush.ms

Unsure how hard to implement, but it would be great to commit based on the first attained of flush.ms & flush.size. That would allow data to be frequently committed no matter what and provide an upper bound to how late data can be in S3.

Add support for ingestion time based partitioner to s3 connector.

Hi all,

Could we add time based partitioning support based on Kafka ingestion time of the messages? This can be done easily by extending the TimeBasedPartitioner class and grabbing the timestamp from the sink record in the encodePartition method. However I am not too sure if this would guarantee exactly once delivery like the default partitioner does.

I would be glad to help if you can direct me on how to achieve this.

Write to a topic on S3 file closing

Hi,
I need to load the files into Redshift after they have been created in S3. I was thinking if it possible to create a record with the file creation event in Kafka itself (a different topic) so I can create an app that consumes that topic and execute the COPY of the S3 file into redshift.

Can someone point if it's possible and how can I do that?

Thank you,

Andres

Setting s3.ssea.name to AES256 has no effect, no encryption is applied for JSON files

But the parameter s3.ssea.name is being passed to s3 connector. How do I know? I tried AES-256 as value for this parameter and I got
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: The encryption method specified is not supported (Service: Amazon S3; Status Code: 400; Error Code: InvalidArgument; Request ID: ...
Anybody who has successfully used this parameter?

confluent-3.2.0 S3 Connector default partition settings

I have been able to successfully write my topic date stream to my s3 bucket, however, the data is partition numerically (partition=0).

current display: (Amazon S3 > bucket-name > topics > test > partition=0

Is it possible to partition the data by Date(yyyy-mm-dd). And if so where would be the configuration to override the default partition settings.

Thank you for your help,
Warsame

Error with Kafka Connect 0.11.0.0

We are upgrading Kafka Connect from Confluent version 3.2.2 to 3.3.0 and are facing an issue (see below) when trying to connect to an independently managed centralized Kafka cluster, which is version 0.10.2.1-cp2, during start up. The reason for the issue is that our Kafka Connect consumer is only authorized for read access to the Kafka cluster, but when we upgrade to version 3.3.0 it is also trying to do a write on the kafka cluster. This was not happening with version 3.2.2. Is there a way to stop Kafka Connect trying to do the write action during start up in version 3.3.0?

ERROR Uncaught exception in herder work thread, exiting: (org.apache.kafka.connect.runtime.distributed.DistributedHerder:206)
org.apache.kafka.connect.errors.ConnectException: Error while attempting to create/find topic(s) ''
at org.apache.kafka.connect.util.TopicAdmin.createTopics(TopicAdmin.java:245)
at org.apache.kafka.connect.storage.KafkaOffsetBackingStore$1.run(KafkaOffsetBackingStore.java:99)
at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:126)
at org.apache.kafka.connect.storage.KafkaOffsetBackingStore.start(KafkaOffsetBackingStore.java:109)
at org.apache.kafka.connect.runtime.Worker.start(Worker.java:146)
at org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:99)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:194)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.UnknownServerException: User is not authorized for OWNER for destination kafka-cluster
at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:213)
at org.apache.kafka.connect.util.TopicAdmin.createTopics(TopicAdmin.java:226)
... 11 more

If this is the wrong place to post this issue, can you point me in the right direction. Kafka repo doesn't have an issues tab.

Data being regularly written to S3 but offsets not being committed to Kafka

I believe that this problem is not specific to this particular connector but would like to confirm that. I've been using version 3.2.2 for quite some time in production and bumped into quite sever problem recently. Discovered following log lines:

WARN :: WorkerSinkTask:337 - Ignoring invalid task provided offset topic-2/OffsetAndMetadata{offset=1000, metadata=''} -- partition not assigned
WARN :: WorkerSinkTask:337 - Ignoring invalid task provided offset topic-2/OffsetAndMetadata{offset=2000, metadata=''} -- partition not assigned
WARN :: WorkerSinkTask:337 - Ignoring invalid task provided offset topic-2/OffsetAndMetadata{offset=3000, metadata=''} -- partition not assigned
WARN :: WorkerSinkTask:337 - Ignoring invalid task provided offset topic-2/OffsetAndMetadata{offset=4000, metadata=''} -- partition not assigned

As you can see from the log lines the offset is increasing on every commit attempt
and data is being written to S3 (although the files that were written didn't really follow the defined flush.size) . Once I restarted the service it started from the last committed offset (1000) but then due to files written before not enforcing flush.size the process is not idempotent as the offset (which is part of the filename) is different then it was before (due to rule of flush.size now being enforced). However the state of S3 is now corrupted.

Looking at WorkerSinkTask:337 and surrounding code which looks like:

 final Map<TopicPartition, OffsetAndMetadata> commitableOffsets = new HashMap<>(lastCommittedOffsets);
        for (Map.Entry<TopicPartition, OffsetAndMetadata> taskProvidedOffsetEntry : taskProvidedOffsets.entrySet()) {
            final TopicPartition partition = taskProvidedOffsetEntry.getKey();
            final OffsetAndMetadata taskProvidedOffset = taskProvidedOffsetEntry.getValue();
            if (commitableOffsets.containsKey(partition)) {
                if (taskProvidedOffset.offset() <= currentOffsets.get(partition).offset()) {
                    commitableOffsets.put(partition, taskProvidedOffset);
                } else {
                    log.warn("Ignoring invalid task provided offset {}/{} -- not yet consumed", partition, taskProvidedOffset);
                }
            } else {
                log.warn("Ignoring invalid task provided offset {}/{} -- partition not assigned", partition, taskProvidedOffset);
            }
        }

It seems funny that somehow taskProvidedOffsets contains TopicPartition which is not in commitableOffsets. What is even stranger is that this was running in this state for couple of days with files being written but offset not committed. Important to note is that this problem appeared about the same time when the few of worker tasks died. I run 4 instances of Kafka-connect-storage-cloud and each with 8 worker tasks, so I guess some rebalancing happened and possibly corrupted some internal WorkerSinkTask state. Even with offsets not being committed to Kafka if the flush.size was respected this would not be such a huge problem as reprocessing would overwrite the files in S3 - however I somehow experienced this non deterministic behavior.

S3 Connector preCommit() Null Pointer exception

I got this error after i added the property in s3 connector "schedule.rotate.interval.ms"=5000.
i am using confluent 3.2.0.

INFO Revoking previously assigned partitions [] for group connect-kafka-s3-client-events (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:393)
[2017-07-26 14:08:06,483] INFO (Re-)joining group connect-kafka-s3-client-events (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:407)
[2017-07-26 14:08:07,174] INFO Successfully joined group connect-kafka-s3-client-events with generation 385 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:375)
[2017-07-26 14:08:07,182] INFO Setting newly assigned partitions [devauto.reliance.client.v1.test-0] for group connect-kafka-s3-client-events (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:252)
[2017-07-26 14:08:07,332] INFO Fetch offset 70 is out of range for partition devauto.reliance.client.v1.test-0, resetting offset (org.apache.kafka.clients.consumer.internals.Fetcher:820)
[2017-07-26 14:08:25,423] INFO Reflections took 26578 ms to scan 590 urls, producing 14875 keys and 101872 values (org.reflections.Reflections:229)
[2017-07-26 14:09:02,326] WARN {} Offset commit failed during close (org.apache.kafka.connect.runtime.WorkerSinkTask:302)
[2017-07-26 14:09:02,328] ERROR Commit of WorkerSinkTask{id=kafka-s3-client-events-0} offsets threw an unexpected exception: (org.apache.kafka.connect.runtime.WorkerSinkTask:204)
java.lang.NullPointerException
at io.confluent.connect.s3.S3SinkTask.preCommit(S3SinkTask.java:176)
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:299)
at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:480)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:152)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
[2017-07-26 14:09:02,331] ERROR Task kafka-s3-client-events-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:141)
java.lang.NullPointerException
at io.confluent.connect.s3.S3SinkTask.close(S3SinkTask.java:189)
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:317)
at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:480)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:152)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
[2017-07-26 14:09:02,332] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:142)

Sink both Key and Value to the Same Avro File

Currently, it looks like we can only sink the value part of a message to S3. However, I have a use case that both the key and value contain valid data.

Is there a way I can config the S3 connector in a way that it sinks both the key and value of the same message to the same avro file?

Unable to access S3SinkConnectorConfig in custom RecordWriterProvider

The following line of code passes a null to the first argument of the RecordWriterProvider.getRecordWriter(final S3SinkConnectorConfig conf, final String filename) call. Is there a reason for passing null here? I think S3SinkConnectorConfig is an instance variable of TopicPartitionWriter, and it would be helpful to be able to access this configuration object when writing a custom RecordWriterProvider implementation.

https://github.com/confluentinc/kafka-connect-storage-cloud/blob/master/kafka-connect-s3/src/main/java/io/confluent/connect/s3/TopicPartitionWriter.java#L286

Extra "/" with TimeBasedPartitioner

We realized an issue today when using the TimeBasedPartitioner (specifically, we are using the DailyPartitioner which automatically writes folder names up until day).

When using this partitioner we see the following behavior: [topics.dir]/[topicname]/year=2017/month=06/day=07//[filename]

There is an extra "/" after the partition and before the filename.

When using the FieldPartitioner we do not see this behavior. From what I can tell, the class that returns the partition when using the TimeBasedPartitioner returns a "/", but the class that returns the partition when using the FieldPartitioner does not. The code in the S3 connector adds a "/". Therefore, when using the TimeBasedPartitioner, it results in an extra "directory" with a blank name.

I think this error lies in getCommitFilename and fileKeyToCommit. We can certainly remove the "/" from the code in the S3 connector, but that means that when using the FieldPartitioner, we will be missing a directory delimiter, screwing up the directory names / combining them.

'java.net.SocketException: Connection reset' while uploading part should be a RetriableException

A reset connection should be a retriable exception. I can add the catch block if this seems reasonable.

https://github.com/confluentinc/kafka-connect-storage-cloud/blob/master/kafka-connect-s3/src/main/java/io/confluent/connect/s3/storage/S3OutputStream.java#L115

Error log:

[2017-05-16 07:30:30,348] ERROR Task kafkaconnect-mde-timeseriesmerger-12 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask)
org.apache.kafka.connect.errors.ConnectException: java.io.IOException: Part upload failed: 
	at io.confluent.connect.s3.format.avro.AvroRecordWriterProvider$1.write(AvroRecordWriterProvider.java:79)
	at io.confluent.connect.s3.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:328)
	at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:191)
	at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:163)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:429)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:250)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Part upload failed: 
	at io.confluent.connect.s3.storage.S3OutputStream.uploadPart(S3OutputStream.java:120)
	at io.confluent.connect.s3.storage.S3OutputStream.uploadPart(S3OutputStream.java:102)
	at io.confluent.connect.s3.storage.S3OutputStream.write(S3OutputStream.java:94)
	at org.apache.avro.file.DataFileWriter$BufferedFileOutputStream$PositionFilter.write(DataFileWriter.java:446)
	at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
	at org.apache.avro.io.BufferedBinaryEncoder$OutputStreamSink.innerWrite(BufferedBinaryEncoder.java:216)
	at org.apache.avro.io.BufferedBinaryEncoder.writeFixed(BufferedBinaryEncoder.java:150)
	at org.apache.avro.file.DataFileStream$DataBlock.writeBlockTo(DataFileStream.java:366)
	at org.apache.avro.file.DataFileWriter.writeBlock(DataFileWriter.java:383)
	at org.apache.avro.file.DataFileWriter.writeIfBlockFull(DataFileWriter.java:328)
	at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:299)
	at io.confluent.connect.s3.format.avro.AvroRecordWriterProvider$1.write(AvroRecordWriterProvider.java:77)
	... 14 more
Caused by: java.net.SocketException: Connection reset
	at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113)
	at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
	at sun.security.ssl.OutputRecord.writeBuffer(OutputRecord.java:431)
	at sun.security.ssl.OutputRecord.write(OutputRecord.java:417)
	at sun.security.ssl.SSLSocketImpl.writeRecordInternal(SSLSocketImpl.java:876)
	at sun.security.ssl.SSLSocketImpl.writeRecord(SSLSocketImpl.java:847)
	at sun.security.ssl.AppOutputStream.write(AppOutputStream.java:123)
	at org.apache.http.impl.io.SessionOutputBufferImpl.streamWrite(SessionOutputBufferImpl.java:126)
	at org.apache.http.impl.io.SessionOutputBufferImpl.flushBuffer(SessionOutputBufferImpl.java:138)
	at org.apache.http.impl.io.SessionOutputBufferImpl.write(SessionOutputBufferImpl.java:169)
	at org.apache.http.impl.io.ContentLengthOutputStream.write(ContentLengthOutputStream.java:115)
	at org.apache.http.entity.InputStreamEntity.writeTo(InputStreamEntity.java:146)
	at com.amazonaws.http.RepeatableInputStreamRequestEntity.writeTo(RepeatableInputStreamRequestEntity.java:160)
	at org.apache.http.impl.DefaultBHttpClientConnection.sendRequestEntity(DefaultBHttpClientConnection.java:158)
	at org.apache.http.impl.conn.CPoolProxy.sendRequestEntity(CPoolProxy.java:162)
	at org.apache.http.protocol.HttpRequestExecutor.doSendRequest(HttpRequestExecutor.java:237)
	at com.amazonaws.http.protocol.SdkHttpRequestExecutor.doSendRequest(SdkHttpRequestExecutor.java:63)
	at org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:122)
	at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:271)
	at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:184)
	at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:184)
	at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82)
	at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:55)
	at com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1186)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1035)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:747)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:721)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:704)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:672)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:654)
	at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:518)
	at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4185)
	at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4132)
	at com.amazonaws.services.s3.AmazonS3Client.doUploadPart(AmazonS3Client.java:3172)
	at com.amazonaws.services.s3.AmazonS3Client.uploadPart(AmazonS3Client.java:3157)
	at io.confluent.connect.s3.storage.S3OutputStream$MultipartUpload.uploadPart(S3OutputStream.java:199)
	at io.confluent.connect.s3.storage.S3OutputStream.uploadPart(S3OutputStream.java:113)
	... 25 more
[2017-05-16 07:30:30,349] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerSinkTask)

Add support for Avro compression.

Hi guys it would be nice to be able to configure the s3 connector to compress Avro data when using the AvroRecordWriterProvider. The different compression codecs supported by Avro are snappy and deflate.

Support AWS KMS and SSE-C

Currently the only option for server-side encryption is AWS SSE-S3, it would be great to support KMS and SSE-C.

Export data with json format to s3

Why the data is the json string when export data to s3 not json?
My config:
name=s3-sink
connector.class=io.confluent.connect.s3.S3SinkConnector
tasks.max=1
topics=nginx-topic,lite-nginx-topic

s3.region=us-west-2
s3.bucket.name=kafka-bak-s3-us-west-2
flush.size=100000
rotate.schedule.interval.ms=3600000

storage.class=io.confluent.connect.s3.storage.S3Storage
format.class=io.confluent.connect.s3.format.json.JsonFormat
schema.generator.class=io.confluent.connect.storage.hive.schema.TimeBasedSchemaGenerator
partitioner.class=io.confluent.connect.storage.partitioner.TimeBasedPartitioner

schema.compatibility=NONE
partition.duration.ms=60000
path.format=YYYY/MM/dd
locale=en
timezone=UTC

The data on s3:
"{\"id\":\"6baa2977ae5d121efb4f163e963db177\",\"host\":\"10.10.0.246\",\"name\":\"nginx\",\"family\":\"webapp-m\",\"task\":\"webapp-m:84\",\"cid\":\"722472e97ad1a43a248f57b69f1edc1ecb9a4b4ccf9d2826fa9d74687241de1b\",\"timestamp\":\"2017-09-02T15:45:23.290902089Z\",\"log_type\":1,\"raw\":\"\",\"log\":{\"body_bytes_sent\":\"43\",\"http_cookie\":\"\",\"http_referrer\":\"\",\"http_user_agent\":\"Mozilla/5.0 (Linux; Android 4.4.2; SM-G386T Build/KOT49H) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/30.0.0.0 Mobile Safari/537.36\",\"level\":\"info\",\"process\":\"7\",\"remote_addr\":\"\",\"remote_user\":\"\",\"request_length\":\"1722\",\"request_method\":\"POST\",\"request_time\":\"0.023\",\"status\":\"200\",\"time\":\"1504367123.290\",\"time_iso8601\":\"2017-09-02T15:45:23+00:00\",\"upstream_cache_status\":\"\"}}"

I want the data on s3:
{
"id": "6baa2977ae5d121efb4f163e963db177",
"host": "10.10.0.246",
"name": "nginx",
"family": "webapp-m",
"task": "webapp-m:84",
"cid": "722472e97ad1a43a248f57b69f1edc1ecb9a4b4ccf9d2826fa9d74687241de1b",
"timestamp": "2017-09-02T15:45:23.290902089Z",
"log_type": 1,
"raw": "",
"log": {
"body_bytes_sent": "43",
"http_cookie": "",
"http_referrer": "",
"http_user_agent": "Mozilla/5.0 (Linux; Android 4.4.2; SM-G386T Build/KOT49H) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/30.0.0.0 Mobile Safari/537.36",
"level": "info",
"process": "7",
"remote_addr": "",
"remote_user": "",
"request_length": "1722",
"request_method": "POST",
"request_time": "0.023",
"status": "200",
"time": "1504367123.290",
"time_iso8601": "2017-09-02T15:45:23+00:00",
"upstream_cache_status": ""
}
}

Add compression options for JSON

Avro compression is present. It'd be great to add it for the rest of the formats.

Creating the issue to link to the PR that is currently in progress here:
#98

Using jdbc sink connector to write into different schemas

Hi,

I am using jdbc sink connector to insert data into different schema.
But it keeps on saying table name is missing.

Below is the property that i use:
"table.name.format": "schema.application"

If i do not give schema name then it inserts into public schema.

Am i doing anything wrong here? Is there a way to specify schema name?

Document local development flow

In order to be able to run a simple mvn test on the master branch here, it's necessary to clone and build a cascade of dependencies. It would be great to have an easier path, but if that's the intended workflow, I'd love to see if documented, either in the README here or in some central location for the confluent tools and then linked in the README.

Does it supports parquet format?

We have kafka events needs to write on S3 with parquet format.. just would like to know does this connector supports this format?

Does it support generating user specific key for s3 folder structure?

They way it generates a key is like this:

{bukcet_name}/{topics.dir}/{topic_name}/{partitioning}

OR

{bukcet_name}/topics/{topic_name}/{partitioning} ( if topics.dir config is not supplied)

Is there a way to generate key by eliminating the topic name? Also if there is any way to shuffle those keys?

Thanks,
Ismail

Exceptions do not explain the error when encryption is not enabled (for encrypted S3 bucket)

I was trying to use the Connector to ingest data into an S3 bucket which is encrypted.

I received the following cryptic error:

{
    "name": "s3-sink-connector",
    "connector": {
        "state": "RUNNING",
        "worker_id": "10.153.19.140:8787"
    },
    "tasks": [
        {
            "state": "FAILED",
            "trace": "org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:457)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:251)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:180)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\n",
            "id": 0,
            "worker_id": "10.153.19.140:8787"
        }
    ]
}

I then guessed (complete chance) that it might be the fact the bucket is encrypted. I also then found the solution to enable encryption in #57. The docs around this are non-existent.

Any recommendations on how you'd like this to be done? I'd also be happy to provide a PR of docs if you provide guidance.

Exception in thread "main" java.lang.NoClassDefFoundError: io/confluent/connect/storage/StorageSinkConnectorConfig

I’m new to kafka connector and currently I want to set up a connection between s3 and my own kafka cluster(not using confluent distribute).

I try to install the confluent s3 connector in this way:

Copy the folder confluent-3.3.0/share/java/kafka-connect-s3 to my plugin.path, which I specified in worker.properties

And then launch my connect worker in distributed mode, it shows the bug as below:

INFO Loading plugin from: /home/ec2-user/kafka-connect-plugins/kafka-connect-s3 (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:176)
Exception in thread "main" java.lang.NoClassDefFoundError: io/confluent/connect/storage/StorageSinkConnectorConfig
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:54)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.getDeclaredConstructors0(Native Method)
at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671)
at java.lang.Class.getConstructor0(Class.java:3075)
at java.lang.Class.newInstance(Class.java:412)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.getPluginDesc(DelegatingClassLoader.java:245)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:224)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:198)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.registerPlugin(DelegatingClassLoader.java:190)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:150)
at org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:47)
at org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:63)
Caused by: java.lang.ClassNotFoundException: io.confluent.connect.storage.StorageSinkConnectorConfig
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:62)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 22 more

Looks like the worker can found that plugin folder, but looks like that we missed some parts, am I misunderstanding the way to install a kafka connector?

S3 Connector `close()` NullPointerException

I am receiving the following NullPointerException:

java.lang.NullPointerException
	at io.confluent.connect.s3.S3SinkTask.close(S3SinkTask.java:189)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:302)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:435)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:147)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

My config is the following:

{
  "connector.class": "io.confluent.connect.s3.S3SinkConnector",
  "s3.region": "us-east-1",
  "partition.duration.ms": "900000",
  "flush.size": "3",
  "schema.compatibility": "NONE",
  "topics": "events_topic",
  "tasks.max": "1",
  "s3.part.size": "5242880",
  "timezone": "UTC",
  "locale": "en",
  "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
  "hive.integration": "false",
  "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
  "schema.generator.class": "io.confluent.connect.storage.hive.schema.TimeBasedSchemaGenerator",
  "name": "EventsS3",
  "storage.class": "io.confluent.connect.s3.storage.S3Storage",
  "s3.bucket.name": "my-bucket",
  "path.format": "'events'/YYYY/MM/DD/HH/"
}

Can you please advise how I can correct this error?

Thanks,

Rotate.interval.ms is not commiting

I'm using TimeBasedPartitioner for sink to s3, with rotate.interval.ms around 1 minute, and partition.duration.ms around 5 minutes, but I can't anyhow get the full snapshot of data.
I tried to change value flush.size, and I do get more data if I choose the right divisor, but I can't ever get all the data, it seems some stuff is just being stuck in memory till every single partition is filled till flush size, but I don't want that to happen for too long (my data comes in batches on daily basis)

Am I doing it wrong? Is there any another way to force commit/flush ?

Invocation Target Exception

Getting this on Confluent 3.2.0 kafka connect image. Any idea what could be wrong?

[2017-05-08 07:03:00,831] ERROR Task s3-sink-test-1-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: java.lang.reflect.InvocationTargetException
	at io.confluent.connect.storage.StorageFactory.createStorage(StorageFactory.java:47)
	at io.confluent.connect.s3.S3SinkTask.start(S3SinkTask.java:94)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:231)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:145)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.reflect.InvocationTargetException
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at io.confluent.connect.storage.StorageFactory.createStorage(StorageFactory.java:45)
	... 10 more
Caused by: java.lang.NoSuchMethodError: com.amazonaws.ClientConfiguration.withUserAgentPrefix(Ljava/lang/String;)Lcom/amazonaws/ClientConfiguration;
	at io.confluent.connect.s3.storage.S3Storage.newS3Client(S3Storage.java:63)
	at io.confluent.connect.s3.storage.S3Storage.<init>(S3Storage.java:59)
	... 15 more

My config:

name=s3-sink-test-1
connector.class=io.confluent.connect.s3.S3SinkConnector
tasks.max=3
topics=mytopic

s3.bucket.name=my-kafka-connect-s3-dev
s3.region=ap-southeast-2
s3.part.size=5242880
flush.size=100

storage.class=io.confluent.connect.s3.storage.S3Storage
format.class=io.confluent.connect.s3.format.avro.AvroFormat
topics.dir=topics

schema.generator.class=io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator
partitioner.class=io.confluent.connect.storage.partitioner.TimeBasedPartitioner

schema.compatibility=NONE
partition.duration.ms=30000
path.format=path.format='year'=YYYY/'month'=MM/'day'=dd/'hour'=HH/

Add support to sync data from Kafka to S3 as it is.

Hi guys,

From what I understand it is not possible to sync schemaless data from Kafka to s3 right now. Would it be possible to support syncing data as it is from Kafka to s3 if the Hive integration is not activated?

We have an use case in my company where we want to sync plain Strings to S3 so this feature would be awesome to have. Also it would be cool to be able to compress the data synced with Snappy or gzip.

Retry on AmazonS3Exception when uploading S3 files

Just encountered this exception from S3:

[2017-08-06 06:22:41,094] ERROR Multipart upload failed to complete for bucket 'xxx' key 'xxxxxxxxxxxxxx' (io.confluent.connect.s3.storage.S3OutputStream)
[2017-08-06 06:22:41,094] ERROR Task tracerbullet-2-S3-staging-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask)
org.apache.kafka.connect.errors.DataException: Multipart upload failed to complete.
        at io.confluent.connect.s3.storage.S3OutputStream.commit(S3OutputStream.java:138)
        at com.stitchfix.datahighway.sfs3connector.SFCSVRecordWriterProvider$1.commit(SFCSVRecordWriterProvider.java:86)
        at io.confluent.connect.s3.TopicPartitionWriter.commitFile(TopicPartitionWriter.java:419)
        at io.confluent.connect.s3.TopicPartitionWriter.commitFiles(TopicPartitionWriter.java:399)
        at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:209)
        at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:173)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:429)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:250)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: We encountered an internal error. Please try again. (Service: null; Status Code: 0; Error Code: InternalError; Request ID: D923FC82C75E2B6E), S3 Extended Request ID: xsRu3H8odRxZiQi+n38Ah0yUiUp8pTWsc9FeVNTyCf8GxDMQYFPiu7vptFbmwznPsM+PY9O/fug=
        at com.amazonaws.services.s3.model.transform.XmlResponsesSaxParser$CompleteMultipartUploadHandler.doEndElement(XmlResponsesSaxParser.java:1773)
        at com.amazonaws.services.s3.model.transform.AbstractHandler.endElement(AbstractHandler.java:52)
        at org.apache.xerces.parsers.AbstractSAXParser.endElement(Unknown Source)
        at org.apache.xerces.impl.XMLNSDocumentScannerImpl.scanEndElement(Unknown Source)
        at org.apache.xerces.impl.XMLDocumentFragmentScannerImpl$FragmentContentDispatcher.dispatch(Unknown Source)
        at org.apache.xerces.impl.XMLDocumentFragmentScannerImpl.scanDocument(Unknown Source)
        at org.apache.xerces.parsers.XML11Configuration.parse(Unknown Source)
        at org.apache.xerces.parsers.XML11Configuration.parse(Unknown Source)
        at org.apache.xerces.parsers.XMLParser.parse(Unknown Source)
        at org.apache.xerces.parsers.AbstractSAXParser.parse(Unknown Source)
        at com.amazonaws.services.s3.model.transform.XmlResponsesSaxParser.parseXmlInputStream(XmlResponsesSaxParser.java:142)
        at com.amazonaws.services.s3.model.transform.XmlResponsesSaxParser.parseCompleteMultipartUploadResponse(XmlResponsesSaxParser.java:462)
        at com.amazonaws.services.s3.model.transform.Unmarshallers$CompleteMultipartUploadResultUnmarshaller.unmarshall(Unmarshallers.java:230)
        at com.amazonaws.services.s3.model.transform.Unmarshallers$CompleteMultipartUploadResultUnmarshaller.unmarshall(Unmarshallers.java:227)
        at com.amazonaws.services.s3.internal.S3XmlResponseHandler.handle(S3XmlResponseHandler.java:62)
        at com.amazonaws.services.s3.internal.ResponseHeaderHandlerChain.handle(ResponseHeaderHandlerChain.java:44)
        at com.amazonaws.services.s3.internal.ResponseHeaderHandlerChain.handle(ResponseHeaderHandlerChain.java:30)
        at com.amazonaws.http.response.AwsResponseHandlerAdapter.handle(AwsResponseHandlerAdapter.java:70)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleResponse(AmazonHttpClient.java:1501)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1222)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1035)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:747)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:721)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:704)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:672)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:654)
        at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:518)
        at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4185)
        at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4132)
        at com.amazonaws.services.s3.AmazonS3Client.completeMultipartUpload(AmazonS3Client.java:2933)
        at io.confluent.connect.s3.storage.S3OutputStream$MultipartUpload.complete(S3OutputStream.java:206)
        at io.confluent.connect.s3.storage.S3OutputStream.commit(S3OutputStream.java:134)
        ... 16 more
[2017-08-06 06:22:41,094] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerSinkTask)

Restarting the connector resolved it. Would be great if it could retry instead of crashing. Related to this issue as well: #50

Operational sizing suggestions for running this connector

There should be some documentation added around operationally how you should run this connector. Draft formula for sizing heap of a worker that runs this connector:

Heap max size >= s3.part.size * #active partitions on a single node + 100MB

The 100MB is for constantly needed things for the JVM and is static. The #active partitions on a single node is the result of assignment of partitions to tasks. In a balanced stage, this should be roughly at most: ceil(#partitions / #connect nodes). A concrete example:
s3.part.size = 100MB
total number of partitions = 10
number of tasks = 4
number of workers = 4

In this case, if all tasks are evenly balanced across all workers, you would need to set the heap size to 100MB * 3 + 100MB. However, if you set it like this and one worker fails, you will need to pick up the task for that worker and thus the extra partitions.

The S3 connector does not split records in parts. Thus, transferring records of 1GB each with s3.part.size = 100MB won't work. Nonetheless, this requirement seems rather excessive for regular use. Normally, reasonable values for part size should fit several records (again whole records). Based on that, a single file containing several records is uploaded to S3 using a multi-part upload, where each part has size approximately equal to s3.part.size (depending how many records can fit in s3.part.size) plus, potentially, a last part with the remaining records of the file.

Getting NullPointerException when using rotate.interval.ms

I just upgraded to 3.3.0 and I'm trying out the new rotate.interval.ms config. I'm seeing the following exception when using it on my S3 connectors:

[2017-10-20 23:21:35,233] ERROR Task foo-to-S3-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask)
java.lang.NullPointerException
        at io.confluent.connect.s3.TopicPartitionWriter.rotateOnTime(TopicPartitionWriter.java:288)
        at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:234)
        at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:180)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:464)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:265)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:182)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:150)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)
[2017-10-20 23:21:35,233] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2017-10-20 23:21:35,233] ERROR Task foo-to-S3-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:484)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:265)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:182)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:150)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)

The stack trace is pointing to this line: https://github.com/confluentinc/kafka-connect-storage-cloud/blob/v3.3.0/kafka-connect-s3/src/main/java/io/confluent/connect/s3/TopicPartitionWriter.java#L288

My debugging so far has revealed that the recordTimestamp field is null. I'm still debugging, but I wanted to create an issue to track the problem and see if it's a known issue or if there's an obvious solution.

What if you dont't want any partitioner?

Hi,

I would like to write data to s3 by just specifying bucket name and path but would not like to use partitioner class. It uses default partitioner by default which appends partition=0 in the s3 key.

Is there any way to avoid that?

Thanks,
Ismail

java.lang.NoSuchMethodError - Can't find AWS S3 Method

Did a brand new install on Kafka Connect and I'm seeing this error:

ERROR:

connect-distributed: Caused by: java.lang.NoSuchMethodError: com.amazonaws.services.s3.AmazonS3ClientBuilder.withAccelerateModeEnabled(Ljava/lang/Boolean;)Lcom/amazonaws/services/s3/AmazonS3Builder;

I see the following JARS:
/usr/share/java/kafka-connect-s3/aws-java-sdk-core-1.11.86.jar
/usr/share/java/kafka-connect-s3/aws-java-sdk-kms-1.11.86.jar
/usr/share/java/kafka-connect-s3/aws-java-sdk-s3-1.11.86.jar

Connector Config:
'{
"name": "s3-sink",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": "4",
"type.name": "kafka-connect",
"topics": "rk-test",
"flush.size": "3",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"s3.bucket.name": "kafka-sink-test",
"s3.region": "us-west-1",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
"s3.wan.mode":"false"
}
}'

Add kafka-connect-storage-cloud in confluent maven repository

Hey, I'm trying to use kafka-connect-storage-cloud as an external library, so I can extend the connector. But I cannot download the jar with my gradle configuration.

I'm guessing I can grab the pom for kafka-connect-storage-cloud since gradle download some dependencies (like io.confluent:kafka-connect-storage-core:3.2.2) but the kafka-connect-storage-cloud jar cannot be downloaded

Is this expected?

N.-B.: Here is my build.gradle

Thank you :)

apply plugin: 'java'

sourceCompatibility = 1.8

allprojects {
    repositories {
        jcenter()
        maven { url 'http://packages.confluent.io/maven/' }
        maven { url 'http://repo.pentaho.org/artifactory/repo' }
    }
}

repositories {
    mavenCentral()
}

dependencies {
    compile group: 'io.confluent', name: 'kafka-connect-storage-cloud', version: '3.2.2'
    testCompile group: 'junit', name: 'junit', version: '4.12'
}

Here is a sample of the output I get from gradle. (meaning I think I configured your repository properly)

$gradle build
Starting a Gradle Daemon, 1 busy and 3 stopped Daemons could not be reused, use --status for details
:compileJava
Download http://packages.confluent.io/maven/io/confluent/kafka-connect-storage-core/3.2.2/kafka-connect-storage-core-3.2.2.jar
Download http://packages.confluent.io/maven/io/confluent/kafka-connect-storage-common/3.2.2/kafka-connect-storage-common-3.2.2.jar
Download http://packages.confluent.io/maven/io/confluent/kafka-connect-storage-format/3.2.2/kafka-connect-storage-format-3.2.2.jar
Download http://packages.confluent.io/maven/io/confluent/kafka-connect-storage-hive/3.2.2/kafka-connect-storage-hive-3.2.2.jar
Download http://packages.confluent.io/maven/io/confluent/kafka-connect-storage-partitioner/3.2.2/kafka-connect-storage-partitioner-3.2.2.jar
Download https://jcenter.bintray.com/com/fasterxml/jackson/core/jackson-databind/2.8.5/jackson-databind-2.8.5.jar
Download http://packages.confluent.io/maven/io/confluent/kafka-connect-avro-converter/3.2.2/kafka-connect-avro-converter-3.2.2.jar

rotate.interval.ms configuration not taken in account in the s3 connector

Hi all,

I have noticed the property rotate.interval.ms doesn't seem to be taken in account in my s3 connector tasks. I have noticed this configuration among rotate.schedule.interval.ms and shutdown.timeout.ms are black listed in the S3SinkConnectorConfig class.
What is the reason and would it be possible to use these configuration properties in the s3 connector?

It is really important for us to be able to rotate files based on time interval. Depending of our traffic, our data ingestion rate can be low and the connector doesn't write enough messages sometimes to trigger a file upload.

AvroSource -> JsonFormat does not output Decimals correctly

If a field is a logical type of Decimal, its primitive type is bytes, and it seems this connector does not attempt to create a BigDecimal from them, but instead converts them to a String that is not usable.

Example:

{ "currency_value":"BfXhAA==" }

Steps to reproduce

  1. take debezium as a source and create a table with a decimal field. Make sure data is stored as Avro within Kafka Connect
  2. Use the S3 sink and choose output format being "format.class": "io.confluent.connect.s3.format.json.JsonFormat"

small avro file after schema migration

hello,
i'm using confluent s3 connector with this config :

  "name": "s3-connector-events-v2",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "s3.region":"eu-west-1",
    "s3.bucket.name":"avrodata",
    "s3.part.size":5242880,
    "flush.size":220000,
    "storage.class":"io.confluent.connect.s3.storage.S3Storage",
    "format.class":"io.confluent.connect.s3.format.avro.AvroFormat",
    "schema.generator.class":"io.confluent.connect.storage.hive.schema.TimeBasedSchemaGenerator",
    "key.converter":"org.apache.kafka.connect.storage.StringConverter",
    "partitioner.class":"io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
    "partition.duration.ms":600000,
    "locale":"fr",
    "timezone":"UTC",
    "topics.dir":"kafka-connect-v2",
    "path.format":"'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
    "tasks.max": "1",
    "topics": "avro-events",
    "name": "s3-connector-event-v2"
  }
}

Last month this task produce and store ~100mo files.
Today i have deployed a new avro schema on this topic.
Now i have 3-5KB files. (1-3 item by files).
It sounds like flush.size is no more used.

i don't know if this is an issue or a config mistake, but any help is welcome.

thanks by advance

Unable to start Confluent S3 Connector - Unable to load AWS credentials from any provider in the chain

Connect-standalone.properties file:
bootstrap.servers=localhost:9092

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

key.converter.schemas.enable=true
value.converter.schemas.enable=true

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

Quickstart-s3.properties:
name=s3-sink
connector.class=io.confluent.connect.s3.S3SinkConnector
tasks.max=1
topics=test

s3.bucket.name=dev-kafka
s3.part.size=5242880
flush.size=3

storage.class=io.confluent.connect.s3.storage.S3Storage
format.class=io.confluent.connect.s3.format.avro.AvroFormat
schema.generator.class=io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator
partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner

schema.compatibility=NONE

connect-avro-standalone.properties:

bootstrap.servers=localhost:9092

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

key.converter.schemas.enable=true
value.converter.schemas.enable=true

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets

Error output:

Warsames-MacBook-Pro:~ Warsame$ $CONFLUENT_HOME/bin/connect-standalone $CONFLUENT_HOME/etc/schema-registry/connect-avro-standalone.properties $CONFLUENT_HOME/etc/kafka-connect-s3/quickstart-s3.properties
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/local/share/confluent-3.2.0/share/java/kafka-serde-tools/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/local/share/confluent-3.2.0/share/java/kafka-connect-elasticsearch/slf4j-simple-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/local/share/confluent-3.2.0/share/java/kafka-connect-hdfs/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/local/share/confluent-3.2.0/share/java/kafka-connect-s3/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/local/share/confluent-3.2.0/share/java/kafka-connect-storage-common/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/local/share/confluent-3.2.0/share/java/kafka/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
[2017-04-30 18:15:01,722] INFO StandaloneConfig values:
access.control.allow.methods =
access.control.allow.origin =
bootstrap.servers = [localhost:9092]
internal.key.converter = class org.apache.kafka.connect.json.JsonConverter
internal.value.converter = class org.apache.kafka.connect.json.JsonConverter
key.converter = class io.confluent.connect.avro.AvroConverter
offset.flush.interval.ms = 60000
offset.flush.timeout.ms = 5000
offset.storage.file.filename = /tmp/connect.offsets
rest.advertised.host.name = null
rest.advertised.port = null
rest.host.name = null
rest.port = 8083
task.shutdown.graceful.timeout.ms = 5000
value.converter = class io.confluent.connect.avro.AvroConverter
(org.apache.kafka.connect.runtime.standalone.StandaloneConfig:180)
[2017-04-30 18:15:01,831] INFO Logging initialized @5534ms (org.eclipse.jetty.util.log:186)
[2017-04-30 18:15:07,099] INFO AvroConverterConfig values:
schema.registry.url = [http://localhost:8081]
max.schemas.per.subject = 1000
(io.confluent.connect.avro.AvroConverterConfig:169)
[2017-04-30 18:15:07,223] INFO AvroDataConfig values:
schemas.cache.config = 1000
enhanced.avro.schema.support = false
connect.meta.data = true
(io.confluent.connect.avro.AvroDataConfig:169)
[2017-04-30 18:15:07,225] INFO AvroConverterConfig values:
schema.registry.url = [http://localhost:8081]
max.schemas.per.subject = 1000
(io.confluent.connect.avro.AvroConverterConfig:169)
[2017-04-30 18:15:07,225] INFO AvroDataConfig values:
schemas.cache.config = 1000
enhanced.avro.schema.support = false
connect.meta.data = true
(io.confluent.connect.avro.AvroDataConfig:169)
[2017-04-30 18:15:07,238] INFO Kafka Connect starting (org.apache.kafka.connect.runtime.Connect:50)
[2017-04-30 18:15:07,238] INFO Herder starting (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:72)
[2017-04-30 18:15:07,239] INFO Worker starting (org.apache.kafka.connect.runtime.Worker:119)
[2017-04-30 18:15:07,239] INFO Starting FileOffsetBackingStore with file /tmp/connect.offsets (org.apache.kafka.connect.storage.FileOffsetBackingStore:60)
[2017-04-30 18:15:07,244] INFO Worker started (org.apache.kafka.connect.runtime.Worker:124)
[2017-04-30 18:15:07,244] INFO Herder started (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:74)
[2017-04-30 18:15:07,244] INFO Starting REST server (org.apache.kafka.connect.runtime.rest.RestServer:98)
[2017-04-30 18:15:07,389] INFO jetty-9.2.15.v20160210 (org.eclipse.jetty.server.Server:327)
Apr 30, 2017 6:15:08 PM org.glassfish.jersey.internal.Errors logErrors
WARNING: The following warnings have been detected: WARNING: The (sub)resource method listConnectors in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.
WARNING: The (sub)resource method createConnector in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.
WARNING: The (sub)resource method listConnectorPlugins in org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource contains empty path annotation.
WARNING: The (sub)resource method serverInfo in org.apache.kafka.connect.runtime.rest.resources.RootResource contains empty path annotation.

[2017-04-30 18:15:08,040] INFO Started o.e.j.s.ServletContextHandler@54336c81{/,null,AVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:744)
[2017-04-30 18:15:08,061] INFO Started ServerConnector@5d332969{HTTP/1.1}{0.0.0.0:8083} (org.eclipse.jetty.server.ServerConnector:266)
[2017-04-30 18:15:08,063] INFO Started @11768ms (org.eclipse.jetty.server.Server:379)
[2017-04-30 18:15:16,710] INFO Reflections took 9403 ms to scan 560 urls, producing 12993 keys and 85570 values (org.reflections.Reflections:229)
[2017-04-30 18:15:18,071] INFO REST server listening at http://192.168.6.7:8083/, advertising URL http://192.168.6.7:8083/ (org.apache.kafka.connect.runtime.rest.RestServer:150)
[2017-04-30 18:15:18,071] INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect:56)
[2017-04-30 18:15:18,176] INFO ConnectorConfig values:
connector.class = io.confluent.connect.s3.S3SinkConnector
key.converter = null
name = s3-sink
tasks.max = 1
transforms = null
value.converter = null
(org.apache.kafka.connect.runtime.ConnectorConfig:180)
[2017-04-30 18:15:18,176] INFO Creating connector s3-sink of type io.confluent.connect.s3.S3SinkConnector (org.apache.kafka.connect.runtime.Worker:178)
[2017-04-30 18:15:18,178] INFO Instantiated connector s3-sink with version 3.2.0 of type class io.confluent.connect.s3.S3SinkConnector (org.apache.kafka.connect.runtime.Worker:181)
[2017-04-30 18:15:18,179] INFO S3SinkConnectorConfig values:
filename.offset.zero.pad.width = 10
flush.size = 3
format.class = class io.confluent.connect.s3.format.avro.AvroFormat
retry.backoff.ms = 5000
rotate.interval.ms = -1
rotate.schedule.interval.ms = -1
s3.bucket.name = dev-kafka
s3.credentials.provider.class = class com.amazonaws.auth.DefaultAWSCredentialsProviderChain
s3.part.size = 5242880
s3.region = us-west-2
s3.ssea.name =
s3.wan.mode = false
schema.cache.size = 1000
shutdown.timeout.ms = 3000
(io.confluent.connect.s3.S3SinkConnectorConfig:180)
[2017-04-30 18:15:18,179] INFO StorageCommonConfig values:
directory.delim = /
file.delim = +
storage.class = class io.confluent.connect.s3.storage.S3Storage
store.url = null
topics.dir = topics
(io.confluent.connect.storage.common.StorageCommonConfig:180)
[2017-04-30 18:15:18,180] INFO HiveConfig values:
hive.conf.dir =
hive.database = default
hive.home =
hive.integration = false
hive.metastore.uris =
schema.compatibility = NONE
(io.confluent.connect.storage.hive.HiveConfig:180)
[2017-04-30 18:15:18,180] INFO PartitionerConfig values:
locale =
partition.duration.ms = -1
partition.field.name =
partitioner.class = class io.confluent.connect.storage.partitioner.DefaultPartitioner
path.format =
schema.generator.class = class io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator
timezone =
(io.confluent.connect.storage.partitioner.PartitionerConfig:180)
[2017-04-30 18:15:18,180] INFO Starting S3 connector s3-sink (io.confluent.connect.s3.S3SinkConnector:61)
[2017-04-30 18:15:18,182] INFO Finished creating connector s3-sink (org.apache.kafka.connect.runtime.Worker:194)
[2017-04-30 18:15:18,182] INFO SourceConnectorConfig values:
connector.class = io.confluent.connect.s3.S3SinkConnector
key.converter = null
name = s3-sink
tasks.max = 1
transforms = null
value.converter = null
(org.apache.kafka.connect.runtime.SourceConnectorConfig:180)
[2017-04-30 18:15:18,184] INFO Creating task s3-sink-0 (org.apache.kafka.connect.runtime.Worker:305)
[2017-04-30 18:15:18,184] INFO ConnectorConfig values:
connector.class = io.confluent.connect.s3.S3SinkConnector
key.converter = null
name = s3-sink
tasks.max = 1
transforms = null
value.converter = null
(org.apache.kafka.connect.runtime.ConnectorConfig:180)
[2017-04-30 18:15:18,185] INFO TaskConfig values:
task.class = class io.confluent.connect.s3.S3SinkTask
(org.apache.kafka.connect.runtime.TaskConfig:180)
[2017-04-30 18:15:18,186] INFO Instantiated task s3-sink-0 with version 3.2.0 of type io.confluent.connect.s3.S3SinkTask (org.apache.kafka.connect.runtime.Worker:317)
[2017-04-30 18:15:18,201] INFO ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.id =
connections.max.idle.ms = 540000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = connect-s3-sink
heartbeat.interval.ms = 3000
interceptor.classes = null
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
(org.apache.kafka.clients.consumer.ConsumerConfig:180)
[2017-04-30 18:15:18,207] INFO ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.id = consumer-1
connections.max.idle.ms = 540000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = connect-s3-sink
heartbeat.interval.ms = 3000
interceptor.classes = null
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
(org.apache.kafka.clients.consumer.ConsumerConfig:180)
[2017-04-30 18:15:18,262] INFO Kafka version : 0.10.2.0-cp1 (org.apache.kafka.common.utils.AppInfoParser:83)
[2017-04-30 18:15:18,262] INFO Kafka commitId : 64c9b42f3319cdc9 (org.apache.kafka.common.utils.AppInfoParser:84)
[2017-04-30 18:15:18,264] INFO Created connector s3-sink (org.apache.kafka.connect.cli.ConnectStandalone:90)
[2017-04-30 18:15:18,265] INFO S3SinkConnectorConfig values:
filename.offset.zero.pad.width = 10
flush.size = 3
format.class = class io.confluent.connect.s3.format.avro.AvroFormat
retry.backoff.ms = 5000
rotate.interval.ms = -1
rotate.schedule.interval.ms = -1
s3.bucket.name = dev-kafka
s3.credentials.provider.class = class com.amazonaws.auth.DefaultAWSCredentialsProviderChain
s3.part.size = 5242880
s3.region = us-west-2
s3.ssea.name =
s3.wan.mode = false
schema.cache.size = 1000
shutdown.timeout.ms = 3000
(io.confluent.connect.s3.S3SinkConnectorConfig:180)
[2017-04-30 18:15:18,266] INFO StorageCommonConfig values:
directory.delim = /
file.delim = +
storage.class = class io.confluent.connect.s3.storage.S3Storage
store.url = null
topics.dir = topics
(io.confluent.connect.storage.common.StorageCommonConfig:180)
[2017-04-30 18:15:18,266] INFO HiveConfig values:
hive.conf.dir =
hive.database = default
hive.home =
hive.integration = false
hive.metastore.uris =
schema.compatibility = NONE
(io.confluent.connect.storage.hive.HiveConfig:180)
[2017-04-30 18:15:18,266] INFO PartitionerConfig values:
locale =
partition.duration.ms = -1
partition.field.name =
partitioner.class = class io.confluent.connect.storage.partitioner.DefaultPartitioner
path.format =
schema.generator.class = class io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator
timezone =
(io.confluent.connect.storage.partitioner.PartitionerConfig:180)
[2017-04-30 18:15:20,630] ERROR Task s3-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:141)
org.apache.kafka.connect.errors.ConnectException: com.amazonaws.SdkClientException: Unable to load AWS credentials from any provider in the chain
at io.confluent.connect.s3.S3SinkTask.start(S3SinkTask.java:108)
at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:231)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:145)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.amazonaws.SdkClientException: Unable to load AWS credentials from any provider in the chain
at com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:131)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.getCredentialsFromContext(AmazonHttpClient.java:1115)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.runBeforeRequestHandlers(AmazonHttpClient.java:764)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:728)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:721)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:704)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:672)
at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:654)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:518)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4185)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4132)
at com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1302)
at com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:1259)
at io.confluent.connect.s3.storage.S3Storage.bucketExists(S3Storage.java:110)
at io.confluent.connect.s3.S3SinkTask.start(S3SinkTask.java:95)
... 9 more
[2017-04-30 18:15:20,633] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:142)

S3 sink crashes on null-valued tombstone messages.

If we try to use the Confluent S3 sink (with the Avro Converter) to save a topic that uses null-valued messages as tombstones for log compaction, we get exceptions like the following. These exceptions kill the task, and repeat whenever we try to restart it.

[2017-05-03 14:31:56,348] INFO Opening record writer for: topics/my_instance.my_database.my_table/partition=1/my_instance.my_database.my_table+1+0001400000.avro (io.confluent.connect.s3.format.avro.AvroRecordWriterProvider:66)
[2017-05-03 14:31:56,422] ERROR Task s3-my-instance-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:449)
org.apache.kafka.connect.errors.ConnectException: org.apache.kafka.connect.errors.SchemaProjectorException: Switch between schema-based and schema-less data is not supported
at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:213)
at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:163)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:429)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:250)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.SchemaProjectorException: Switch between schema-based and schema-less data is not supported
at io.confluent.connect.storage.schema.StorageSchemaCompatibility.validateAndCheck(StorageSchemaCompatibility.java:75)
at io.confluent.connect.storage.schema.StorageSchemaCompatibility.shouldChangeSchema(StorageSchemaCompatibility.java:91)
at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:184)
... 12 more
[2017-05-03 14:31:56,422] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerSinkTask:450)
[2017-05-03 14:31:56,422] INFO WorkerSinkTask{id=s3-data-dev-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:272)
[2017-05-03 14:31:56,429] ERROR Task s3-data-dev-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:141)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:451)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:250)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
[2017-05-03 14:31:56,429] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:142)

For the particular streams that I'm dealing with it would be safe for the sink to just ignore the tombstone messages, but the bigger problem seems to be that there is just no way for connectors (neither sources nor sinks) to distinguish tombstones (that should be ignored) from null-valued data (which should be persisted).

In the mean time arguably there should be a configurable option to ignore messages with null values.

Related issue on third-party source connector:

s3 ORC format

any plan to support writing s3 by ORC recently?
Any suggestion?

S3 Object Tagging

Supporting S3 Object Tagging would be helpful for organization, bookkeeping and monitoring.

Adding a configuration option for S3 Object Tags would be preferred. Another option would be to allow additional HTTP header configuration (enabling users to set tags via the x-amz-tagging header).

Thanks,
Brandt

S3 Object tagging documentation

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.