Connect-standalone.properties file:
bootstrap.servers=localhost:9092
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.converter.schemas.enable=true
value.converter.schemas.enable=true
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
Quickstart-s3.properties:
name=s3-sink
connector.class=io.confluent.connect.s3.S3SinkConnector
tasks.max=1
topics=test
s3.bucket.name=dev-kafka
s3.part.size=5242880
flush.size=3
storage.class=io.confluent.connect.s3.storage.S3Storage
format.class=io.confluent.connect.s3.format.avro.AvroFormat
schema.generator.class=io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator
partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
schema.compatibility=NONE
connect-avro-standalone.properties:
bootstrap.servers=localhost:9092
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.converter.schemas.enable=true
value.converter.schemas.enable=true
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
Error output:
Warsames-MacBook-Pro:~ Warsame$ $CONFLUENT_HOME/bin/connect-standalone $CONFLUENT_HOME/etc/schema-registry/connect-avro-standalone.properties $CONFLUENT_HOME/etc/kafka-connect-s3/quickstart-s3.properties
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/local/share/confluent-3.2.0/share/java/kafka-serde-tools/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/local/share/confluent-3.2.0/share/java/kafka-connect-elasticsearch/slf4j-simple-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/local/share/confluent-3.2.0/share/java/kafka-connect-hdfs/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/local/share/confluent-3.2.0/share/java/kafka-connect-s3/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/local/share/confluent-3.2.0/share/java/kafka-connect-storage-common/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/local/share/confluent-3.2.0/share/java/kafka/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
[2017-04-30 18:15:01,722] INFO StandaloneConfig values:
access.control.allow.methods =
access.control.allow.origin =
bootstrap.servers = [localhost:9092]
internal.key.converter = class org.apache.kafka.connect.json.JsonConverter
internal.value.converter = class org.apache.kafka.connect.json.JsonConverter
key.converter = class io.confluent.connect.avro.AvroConverter
offset.flush.interval.ms = 60000
offset.flush.timeout.ms = 5000
offset.storage.file.filename = /tmp/connect.offsets
rest.advertised.host.name = null
rest.advertised.port = null
rest.host.name = null
rest.port = 8083
task.shutdown.graceful.timeout.ms = 5000
value.converter = class io.confluent.connect.avro.AvroConverter
(org.apache.kafka.connect.runtime.standalone.StandaloneConfig:180)
[2017-04-30 18:15:01,831] INFO Logging initialized @5534ms (org.eclipse.jetty.util.log:186)
[2017-04-30 18:15:07,099] INFO AvroConverterConfig values:
schema.registry.url = [http://localhost:8081]
max.schemas.per.subject = 1000
(io.confluent.connect.avro.AvroConverterConfig:169)
[2017-04-30 18:15:07,223] INFO AvroDataConfig values:
schemas.cache.config = 1000
enhanced.avro.schema.support = false
connect.meta.data = true
(io.confluent.connect.avro.AvroDataConfig:169)
[2017-04-30 18:15:07,225] INFO AvroConverterConfig values:
schema.registry.url = [http://localhost:8081]
max.schemas.per.subject = 1000
(io.confluent.connect.avro.AvroConverterConfig:169)
[2017-04-30 18:15:07,225] INFO AvroDataConfig values:
schemas.cache.config = 1000
enhanced.avro.schema.support = false
connect.meta.data = true
(io.confluent.connect.avro.AvroDataConfig:169)
[2017-04-30 18:15:07,238] INFO Kafka Connect starting (org.apache.kafka.connect.runtime.Connect:50)
[2017-04-30 18:15:07,238] INFO Herder starting (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:72)
[2017-04-30 18:15:07,239] INFO Worker starting (org.apache.kafka.connect.runtime.Worker:119)
[2017-04-30 18:15:07,239] INFO Starting FileOffsetBackingStore with file /tmp/connect.offsets (org.apache.kafka.connect.storage.FileOffsetBackingStore:60)
[2017-04-30 18:15:07,244] INFO Worker started (org.apache.kafka.connect.runtime.Worker:124)
[2017-04-30 18:15:07,244] INFO Herder started (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:74)
[2017-04-30 18:15:07,244] INFO Starting REST server (org.apache.kafka.connect.runtime.rest.RestServer:98)
[2017-04-30 18:15:07,389] INFO jetty-9.2.15.v20160210 (org.eclipse.jetty.server.Server:327)
Apr 30, 2017 6:15:08 PM org.glassfish.jersey.internal.Errors logErrors
WARNING: The following warnings have been detected: WARNING: The (sub)resource method listConnectors in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.
WARNING: The (sub)resource method createConnector in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.
WARNING: The (sub)resource method listConnectorPlugins in org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource contains empty path annotation.
WARNING: The (sub)resource method serverInfo in org.apache.kafka.connect.runtime.rest.resources.RootResource contains empty path annotation.
[2017-04-30 18:15:08,040] INFO Started o.e.j.s.ServletContextHandler@54336c81{/,null,AVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:744)
[2017-04-30 18:15:08,061] INFO Started ServerConnector@5d332969{HTTP/1.1}{0.0.0.0:8083} (org.eclipse.jetty.server.ServerConnector:266)
[2017-04-30 18:15:08,063] INFO Started @11768ms (org.eclipse.jetty.server.Server:379)
[2017-04-30 18:15:16,710] INFO Reflections took 9403 ms to scan 560 urls, producing 12993 keys and 85570 values (org.reflections.Reflections:229)
[2017-04-30 18:15:18,071] INFO REST server listening at http://192.168.6.7:8083/, advertising URL http://192.168.6.7:8083/ (org.apache.kafka.connect.runtime.rest.RestServer:150)
[2017-04-30 18:15:18,071] INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect:56)
[2017-04-30 18:15:18,176] INFO ConnectorConfig values:
connector.class = io.confluent.connect.s3.S3SinkConnector
key.converter = null
name = s3-sink
tasks.max = 1
transforms = null
value.converter = null
(org.apache.kafka.connect.runtime.ConnectorConfig:180)
[2017-04-30 18:15:18,176] INFO Creating connector s3-sink of type io.confluent.connect.s3.S3SinkConnector (org.apache.kafka.connect.runtime.Worker:178)
[2017-04-30 18:15:18,178] INFO Instantiated connector s3-sink with version 3.2.0 of type class io.confluent.connect.s3.S3SinkConnector (org.apache.kafka.connect.runtime.Worker:181)
[2017-04-30 18:15:18,179] INFO S3SinkConnectorConfig values:
filename.offset.zero.pad.width = 10
flush.size = 3
format.class = class io.confluent.connect.s3.format.avro.AvroFormat
retry.backoff.ms = 5000
rotate.interval.ms = -1
rotate.schedule.interval.ms = -1
s3.bucket.name = dev-kafka
s3.credentials.provider.class = class com.amazonaws.auth.DefaultAWSCredentialsProviderChain
s3.part.size = 5242880
s3.region = us-west-2
s3.ssea.name =
s3.wan.mode = false
schema.cache.size = 1000
shutdown.timeout.ms = 3000
(io.confluent.connect.s3.S3SinkConnectorConfig:180)
[2017-04-30 18:15:18,179] INFO StorageCommonConfig values:
directory.delim = /
file.delim = +
storage.class = class io.confluent.connect.s3.storage.S3Storage
store.url = null
topics.dir = topics
(io.confluent.connect.storage.common.StorageCommonConfig:180)
[2017-04-30 18:15:18,180] INFO HiveConfig values:
hive.conf.dir =
hive.database = default
hive.home =
hive.integration = false
hive.metastore.uris =
schema.compatibility = NONE
(io.confluent.connect.storage.hive.HiveConfig:180)
[2017-04-30 18:15:18,180] INFO PartitionerConfig values:
locale =
partition.duration.ms = -1
partition.field.name =
partitioner.class = class io.confluent.connect.storage.partitioner.DefaultPartitioner
path.format =
schema.generator.class = class io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator
timezone =
(io.confluent.connect.storage.partitioner.PartitionerConfig:180)
[2017-04-30 18:15:18,180] INFO Starting S3 connector s3-sink (io.confluent.connect.s3.S3SinkConnector:61)
[2017-04-30 18:15:18,182] INFO Finished creating connector s3-sink (org.apache.kafka.connect.runtime.Worker:194)
[2017-04-30 18:15:18,182] INFO SourceConnectorConfig values:
connector.class = io.confluent.connect.s3.S3SinkConnector
key.converter = null
name = s3-sink
tasks.max = 1
transforms = null
value.converter = null
(org.apache.kafka.connect.runtime.SourceConnectorConfig:180)
[2017-04-30 18:15:18,184] INFO Creating task s3-sink-0 (org.apache.kafka.connect.runtime.Worker:305)
[2017-04-30 18:15:18,184] INFO ConnectorConfig values:
connector.class = io.confluent.connect.s3.S3SinkConnector
key.converter = null
name = s3-sink
tasks.max = 1
transforms = null
value.converter = null
(org.apache.kafka.connect.runtime.ConnectorConfig:180)
[2017-04-30 18:15:18,185] INFO TaskConfig values:
task.class = class io.confluent.connect.s3.S3SinkTask
(org.apache.kafka.connect.runtime.TaskConfig:180)
[2017-04-30 18:15:18,186] INFO Instantiated task s3-sink-0 with version 3.2.0 of type io.confluent.connect.s3.S3SinkTask (org.apache.kafka.connect.runtime.Worker:317)
[2017-04-30 18:15:18,201] INFO ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.id =
connections.max.idle.ms = 540000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = connect-s3-sink
heartbeat.interval.ms = 3000
interceptor.classes = null
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
(org.apache.kafka.clients.consumer.ConsumerConfig:180)
[2017-04-30 18:15:18,207] INFO ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.id = consumer-1
connections.max.idle.ms = 540000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = connect-s3-sink
heartbeat.interval.ms = 3000
interceptor.classes = null
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
(org.apache.kafka.clients.consumer.ConsumerConfig:180)
[2017-04-30 18:15:18,262] INFO Kafka version : 0.10.2.0-cp1 (org.apache.kafka.common.utils.AppInfoParser:83)
[2017-04-30 18:15:18,262] INFO Kafka commitId : 64c9b42f3319cdc9 (org.apache.kafka.common.utils.AppInfoParser:84)
[2017-04-30 18:15:18,264] INFO Created connector s3-sink (org.apache.kafka.connect.cli.ConnectStandalone:90)
[2017-04-30 18:15:18,265] INFO S3SinkConnectorConfig values:
filename.offset.zero.pad.width = 10
flush.size = 3
format.class = class io.confluent.connect.s3.format.avro.AvroFormat
retry.backoff.ms = 5000
rotate.interval.ms = -1
rotate.schedule.interval.ms = -1
s3.bucket.name = dev-kafka
s3.credentials.provider.class = class com.amazonaws.auth.DefaultAWSCredentialsProviderChain
s3.part.size = 5242880
s3.region = us-west-2
s3.ssea.name =
s3.wan.mode = false
schema.cache.size = 1000
shutdown.timeout.ms = 3000
(io.confluent.connect.s3.S3SinkConnectorConfig:180)
[2017-04-30 18:15:18,266] INFO StorageCommonConfig values:
directory.delim = /
file.delim = +
storage.class = class io.confluent.connect.s3.storage.S3Storage
store.url = null
topics.dir = topics
(io.confluent.connect.storage.common.StorageCommonConfig:180)
[2017-04-30 18:15:18,266] INFO HiveConfig values:
hive.conf.dir =
hive.database = default
hive.home =
hive.integration = false
hive.metastore.uris =
schema.compatibility = NONE
(io.confluent.connect.storage.hive.HiveConfig:180)
[2017-04-30 18:15:18,266] INFO PartitionerConfig values:
locale =
partition.duration.ms = -1
partition.field.name =
partitioner.class = class io.confluent.connect.storage.partitioner.DefaultPartitioner
path.format =
schema.generator.class = class io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator
timezone =
(io.confluent.connect.storage.partitioner.PartitionerConfig:180)
[2017-04-30 18:15:20,630] ERROR Task s3-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:141)
org.apache.kafka.connect.errors.ConnectException: com.amazonaws.SdkClientException: Unable to load AWS credentials from any provider in the chain
at io.confluent.connect.s3.S3SinkTask.start(S3SinkTask.java:108)
at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:231)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:145)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.amazonaws.SdkClientException: Unable to load AWS credentials from any provider in the chain
at com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:131)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.getCredentialsFromContext(AmazonHttpClient.java:1115)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.runBeforeRequestHandlers(AmazonHttpClient.java:764)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:728)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:721)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:704)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:672)
at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:654)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:518)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4185)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4132)
at com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1302)
at com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:1259)
at io.confluent.connect.s3.storage.S3Storage.bucketExists(S3Storage.java:110)
at io.confluent.connect.s3.S3SinkTask.start(S3SinkTask.java:95)
... 9 more
[2017-04-30 18:15:20,633] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:142)