Giter VIP home page Giter VIP logo

kafka-connect-oss's Introduction

Kafka Connect OSS

Now, you can use this connector as a sink, to upload data from kafka topics to OSS in Json, Avro or Parquet format.

This connector periodically polls data from Kafka and in turn uploads it to OSS. You can configure a partitioner to split data into chunks, and each chunk of data is represented as an OSS object.The key name encodes the topic, the Kafka partition, and the start offset of this data chunk. If no partitioner is specified in the configuration, the default partitioner which preserves Kafka partitioning is used. The size of each data chunk is determined by the number of records written to OSS and by schema compatibility.

License

It is under Apache License, Version 2.0, meaning you are pretty much free to use it however you want in whatever way.

Limitations

Kafka 0.11.0.0 or above(0.11.x, 1.x, 2.x) is required. Other versions(0.10.x or below) will not be supported now.

Install OSS Connector

Compile and Deployment

Execute commands below:

git clone https://github.com/aliyun/kafka-connect-oss.git
cd kafka-connect-oss
mvn clean install -DskipTests

The generated jar library(kafka-connect-oss-5.2.0.jar) will be in the target directory, copy it to $KAFKA_HOME/libs of each node.

Setup Hadoop configuration

This connector depends on hadoop oss filesystem interface to upload data to OSS, so we should add core-site.xml configuration file to $KAFKA_HOME/config directory:

[root@master ~]# cd $KAFKA_HOME
[root@master kafka_2.11-0.11.0.0]# ls config
connect-console-sink.properties    connect-file-sink.properties    connect-standalone.properties  log4j.properties     tools-log4j.properties
connect-console-source.properties  connect-file-source.properties  consumer.properties            producer.properties  zookeeper.properties
connect-distributed.properties     connect-log4j.properties        core-site.xml                  server.properties

Then, we should add $KAFKA_HOME/config to CLASSPATH by changing $KAFKA_HOME/bin/kafka-run-class.sh before we start kafka connector cluster.

Add this command to $KAFKA_HOME/bin/kafka-run-class.sh.

[root@master kafka_2.11-0.11.0.0]# vim $KAFKA_HOME/bin/kafka-run-class.sh
CLASSPATH=$CLASSPATH:$base_dir/config

We can see the diff(bin/kafka-run-class.sh.bak is original file)

[root@master kafka_2.11-0.11.0.0]# diff -y bin/kafka-run-class.sh bin/kafka-run-class.sh.bak
CLASSPATH=$CLASSPATH:$base_dir/config	                      <
# Launch mode							  # Launch mode
if [ "x$DAEMON_MODE" = "xtrue" ]; then				  if [ "x$DAEMON_MODE" = "xtrue" ]; then
  nohup $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $K	    nohup $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $K
else						                  else
  exec $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KA	    exec $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KA
fi							          fi

You should add configurations below to core-site.xml

configuration value comments
fs.oss.endpoint e.g. oss-cn-zhangjiakou-internal.aliyuncs.com endpoint to connect to, endpoints
fs.oss.accessKeyId access key id
fs.oss.accessKeySecret access key secret
fs.oss.impl org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem hadoop oss filesystem implementation
fs.oss.buffer.dir /tmp/oss Comma separated list of directories to buffer OSS data before uploading to Aliyun OSS
fs.oss.connection.secure.enabled false Connect to oss over ssl or not, true by default
fs.oss.connection.maximum 2048 Number of simultaneous connections to oss

You can find the details here

Setup Kafka connector

You should setup kafka connector cluster, please refer to official documentation here: https://docs.confluent.io/current/connect/userguide.html#distributed-worker-configuration.

You should configure bootstrap.servers and group.id in $KAFKA_HOME/config/connect-distributed.properties of each node. Each node in kafka connector cluster should have the same group.id, and then execute command below in each node:

cd $KAFKA_HOME && nohup bin/connect-distributed.sh config/connect-distributed.properties &

Other configurations

In order to upload data to OSS, you should add oss.bucket configuration when you post connector tasks(see Quick Start below).

In order to use ParquetJsonFormat, you should add oss.parquet.protobuf.schema.class configuration when you post connector tasks(see Quick Start below). oss.parquet.protobuf.schema.class is a list, the first value is the kafka topic name, and the second value is the schema class of the kafka topic.

Documentation

Formats supported

In order to use these formats, you should configure value.converter in $KAFKA_HOME/config/connect-distributed.properties and format.class when you post connector tasks to kafka connect cluster.

format format.class value.converter
Byte array com.aliyun.oss.connect.kafka.format.bytearray.ByteArrayFormat org.apache.kafka.connect.converters.ByteArrayConverter
Avro com.aliyun.oss.connect.kafka.format.avro.AvroFormat io.confluent.connect.avro.AvroConverter
Json com.aliyun.oss.connect.kafka.format.json.JsonFormat org.apache.kafka.connect.json.JsonConverter
Avro to Parquet com.aliyun.oss.connect.kafka.format.parquet.ParquetAvroFormat io.confluent.connect.avro.AvroConverter
Json to Parquet com.aliyun.oss.connect.kafka.format.parquet.ParquetJsonFormat org.apache.kafka.connect.storage.StringConverter

Details of ParquetJsonFormat

In order to convert json format to parquet format, we need schema of json input. So you should define your schema in google protocol buffer language.

We have an example in src/main/proto/parquet.proto

There are two ways to provide schema classes:

  • You can simply put your schema proto file under src/main/proto/ and compile with this repo(Recommended).
  • You can compile proto file by yourself, package it and put the final jar in CLASSPATH($KAFKA_HOME/libs)

Exactly-once delivery

The OSS connector is able to provide exactly-once semantics to consumers of the objects it exports to OSS, under the condition that the connector is supplied with a deterministic partitioner.

Currently, out of the available partitioners, the default and field partitioners are always deterministic. TimeBasedPartitioner can be deterministic with some configurations, discussed below. This implies that, when any of these partitioners is used, splitting of files always happens at the same offsets for a given set of Kafka records. These partitioners take into account flush.size and schema.compatibility to decide when to roll and save a new file to OSS. The connector always delivers files in OSS that contain the same records, even under the presence of failures. If a connector task fails before an upload completes, the file does not become visible to OSS. If, on the other hand, a failure occurs after the upload has completed but before the corresponding offset is committed to Kafka by the connector, then a re-upload will take place. However, such a re-upload is transparent to the user of the OSS bucket, who at any time will have access to the same records made eventually available by successful uploads to OSS.

To guarantee exactly-once semantics with the TimeBasedPartitioner, the connector must be configured to use a deterministic implementation of TimestampExtractor and a deterministic rotation strategy. The deterministic timestamp extractors are Kafka records (timestamp.extractor=Record) or record fields (timestamp.extractor=RecordField). The deterministic rotation strategy configuration is rotate.interval.ms (setting rotate.schedule.interval.ms is non deterministic and will invalidate exactly-once guarantees).

Schema Evolution

The OSS connector supports schema evolution and reacts to schema changes of data according to the schema.compatibility configuration.

  • NO Compatibility: Default value. This case will ensure that each file written to OSS has the proper schema. When the connector observes a schema change in data, it commits the current set of files for the affected topic partitions and writes the data with new schema in new files

  • BACKWARD Compatibility: If a schema is evolved in a backward compatible way, we can always use the latest schema to query all the data uniformly. For example, removing fields is backward compatible change to a schema, since when we encounter records written with the old schema that contain these fields we can just ignore them. Adding a field with a default value is also backward compatible.

    If BACKWARD is specified in the schema.compatibility, the connector keeps track of the latest schema used in writing data to OSS, and if a data record with a schema version larger than current latest schema arrives, the connector commits the current set of files and writes the data record with new schema to new files. For data records arriving at a later time with schema of an earlier version, the connector projects the data record to the latest schema before writing to the same set of files to OSS.

  • FORWARD Compatibility: If a schema is evolved in a forward compatible way, we can always use the oldest schema to query all the data uniformly. Removing a field that had a default value is forward compatible, since the old schema will use the default value when the field is missing.

    If FORWARD is specified in the schema.compatibility, the connector projects the data to the oldest schema before writing to the same set of files to OSS.

  • Full Compatibility: Full compatibility means that old data can be read with the new schema and new data can also be read with the old schema.

    If FULL is specified in the schema.compatibility, the connector performs the same action as BACKWARD.

Schema evolution in the OSS connector works in the same way as in the HDFS connector

Pluggable Partitioner

The connector comes out of the box with partitioners that support default partitioning based on Kafka partitions, field partitioning, and time-based partitioning in days or hours. You may implement your own partitioners by extending the Partitioner class. Additionally, you can customize time based partitioning by extending the TimeBasedPartitioner class.

Quick start(Using ParquetJsonFormat as an example)

First, we should create a topic to test.

[root@master ~]# cd $KAFKA_HOME
[root@master kafka_2.11-0.11.0.0]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 10 --topic test

Assume we have 4 nodes cluster(192.168.0.172, 192.168.0.173, 192.168.0.174, 192.168.0.175), and we post connector task:

curl -i -X POST \
   -H "Accept:application/json" \
   -H "Content-Type:application/json" \
   -d \ '{
     "name":"oss-sink",
     "config": {
        "name":"oss-sink",
        "topics":"test",
        "oss.parquet.protobuf.schema.class":"test,com.aliyun.oss.connect.kafka.format.parquet.Parquet$Demo",
        "connector.class":"com.aliyun.oss.connect.kafka.OSSSinkConnector",
        "format.class":"com.aliyun.oss.connect.kafka.format.parquet.ParquetJsonFormat",
        "flush.size":"10000",
        "tasks.max":"4",
        "storage.class":"com.aliyun.oss.connect.kafka.storage.OSSStorage",
        "partitioner.class":"io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
        "timestamp.extractor":"Record",
        "oss.bucket":"hadoop-oss-test",
        "partition.duration.ms":"10000",
        "path.format":"YYYY-MM-dd-HH",
        "locale":"US",
        "timezone":"Asia/Shanghai",
        "rotate.interval.ms":"30000"
        }}' \
    'http://slave01:8083/connectors'

Test schema class(com.aliyun.oss.connect.kafka.format.parquet.Parquet$Demo) has already been compiled in the jar library(kafka-connect-oss-5.2.0.jar)

HTTP/1.1 201 Created
Date: Wed, 12 Jun 2019 16:40:19 GMT
Location: http://slave01:8083/connectors/oss-sink
Content-Type: application/json
Content-Length: 836
Server: Jetty(9.2.15.v20160210)
{
	"name": "oss-sink",
	"config": {
		"name": "oss-sink",
		"topics": "test",
		"oss.parquet.protobuf.schema.class": "test,com.aliyun.oss.connect.kafka.format.parquet.Parquet$Demo",
		"connector.class": "com.aliyun.oss.connect.kafka.OSSSinkConnector",
		"format.class": "com.aliyun.oss.connect.kafka.format.parquet.ParquetJsonFormat",
		"flush.size": "10000",
		"tasks.max": "4",
		"storage.class": "com.aliyun.oss.connect.kafka.storage.OSSStorage",
		"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
		"timestamp.extractor": "Record",
		"oss.bucket": "hadoop-oss-test",
		"partition.duration.ms": "10000",
		"path.format": "YYYY-MM-dd-HH",
		"locale": "US",
		"timezone": "Asia/Shanghai",
		"rotate.interval.ms": "30000"
	},
	"tasks": [{
		"connector": "oss-sink",
		"task": 0
	}, {
		"connector": "oss-sink",
		"task": 1
	}, {
		"connector": "oss-sink",
		"task": 2
	}, {
		"connector": "oss-sink",
		"task": 3
	}]
}

Then, we can check status of this connector task by:

 curl -i -X GET \
   -H "Accept:application/json" \
   -H "Content-Type:application/json" \
 'http://slave01:8083/connectors/oss-sink/status'
HTTP/1.1 200 OK
Date: Wed, 12 Jun 2019 16:41:18 GMT
Content-Type: application/json
Content-Length: 334
Server: Jetty(9.2.15.v20160210)
{
	"name": "oss-sink",
	"connector": {
		"state": "RUNNING",
		"worker_id": "192.168.0.174:8083"
	},
	"tasks": [{
		"state": "RUNNING",
		"id": 0,
		"worker_id": "192.168.0.173:8083"
	}, {
		"state": "RUNNING",
		"id": 1,
		"worker_id": "192.168.0.172:8083"
	}, {
		"state": "RUNNING",
		"id": 2,
		"worker_id": "192.168.0.175:8083"
	}, {
		"state": "RUNNING",
		"id": 3,
		"worker_id": "192.168.0.174:8083"
	}]
}

Then, we can write json data to kafka topic and see results from logs(You can also check from OSS console):

[2019-06-12 16:50:11,806] INFO Start to commit file oss://hadoop-oss-test/topics/test/2019-06-12-16/test+0+0000154000.parquet (com.aliyun.oss.connect.kafka.format.parquet.Parqu
etJsonRecordWriterProvider:131)
[2019-06-12 16:50:11,806] INFO Flushing mem columnStore to file. allocated memory: 1299286 (org.apache.parquet.hadoop.InternalParquetRecordWriter:165)
[2019-06-12 16:50:11,877] INFO File oss://hadoop-oss-test/topics/test/2019-06-12-16/test+0+0000154000.parquet committed (com.aliyun.oss.connect.kafka.format.parquet.ParquetJson
RecordWriterProvider:133)

kafka-connect-oss's People

Contributors

wujinhu avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar

kafka-connect-oss's Issues

Failed to find any class that implements Connector and which name matches OSSSinkConnector

在插件目录下已增加kafka oss connector,输出日志显示插件已加入,提交任务报错。

任务启动日志:
[2019-12-13 07:53:26,629] INFO Added plugin 'com.aliyun.oss.connect.kafka.OSSSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
[2019-12-13 07:53:27,787] INFO Registered loader: sun.misc.Launcher$AppClassLoader@764c12b6 (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
[2019-12-13 07:53:27,790] INFO Added aliases 'OSSSinkConnector' and 'OSSSink' to plugin 'com.aliyun.oss.connect.kafka.OSSSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)

报错信息:
image

Repo URL outdated (conjars.org), Unable to generate jar file, connection timed out

Hello I am having an issue while following the steps to generate the jar file. The build fails because conjars.org is not a valid url anymore which needs to be switched to conjars.wensel.net, can you please direct me to where I can manually change this url and make a quick fix?

[INFO] ------------------< com.aliyun.oss:kafka-connect-oss >------------------
[INFO] Building Kafka Connect OSS 5.2.0
[INFO] --------------------------------[ jar ]---------------------------------
Downloading from conjars: http://conjars.org/repo/org/pentaho/pentaho-aggdesigner-algorithm/5.1.5-jhyde/pentaho-aggdesigner-algorithm-5.1.5-jhyde.pom
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  02:10 min
[INFO] Finished at: 2023-04-19T17:40:46+08:00
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal on project kafka-connect-oss: Could not resolve dependencies for project com.aliyun.oss:kafka-connect-oss:jar:5.2.0: Failed to collect dependencies at io.confluent:kafka-connect-storage-format:jar:5.2.0 -> io.confluent:kafka-connect-storage-hive:jar:5.2.0 -> org.apache.hive:hive-exec:jar:core:1.2.2 -> org.apache.calcite:calcite-core:jar:1.2.0-incubating -> org.pentaho:pentaho-aggdesigner-algorithm:jar:5.1.5-jhyde: Failed to read artifact descriptor for org.pentaho:pentaho-aggdesigner-algorithm:jar:5.1.5-jhyde: Could not transfer artifact org.pentaho:pentaho-aggdesigner-algorithm:pom:5.1.5-jhyde from/to conjars (http://conjars.org/repo): Transfer failed for http://conjars.org/repo/org/pentaho/pentaho-aggdesigner-algorithm/5.1.5-jhyde/pentaho-aggdesigner-algorithm-5.1.5-jhyde.pom: Connect to conjars.org:80 [conjars.org/54.235.127.59] failed: Connection timed out -> [Help 1]
[ERROR] 
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR] 
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/DependencyResolutionException

Docker Compatibility?

Hi, I just wanted to know how to make this compatible with docker?

I've loaded the compiled .jar file correctly, but it seems the docker Kafka connect instance cannot access core-site.xml properly.

I think this because I get this error most likely caused by the instance not being able to read the fs.oss.impl property:

[2020-06-10 04:00:11,573] ERROR WorkerSinkTask{id=oss-sink-1} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: org.apache.kafka.connect.errors.ConnectException: java.lang.reflect.InvocationTargetException
	at com.aliyun.oss.connect.kafka.OSSSinkTask.start(OSSSinkTask.java:112)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:305)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.ConnectException: java.lang.reflect.InvocationTargetException
	at io.confluent.connect.storage.StorageFactory.createStorage(StorageFactory.java:55)
	at com.aliyun.oss.connect.kafka.OSSSinkTask.start(OSSSinkTask.java:104)
	... 9 more
Caused by: java.lang.reflect.InvocationTargetException
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at io.confluent.connect.storage.StorageFactory.createStorage(StorageFactory.java:50)
	... 10 more
Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "oss"
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3281)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3301)
	at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3352)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3320)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:479)
	at com.aliyun.oss.connect.kafka.storage.OSSStorage.<init>(OSSStorage.java:50)
	... 15 more

Thanks!

Filesystem not recognized

Please find the pom.xml and core-site.xml files here:
https://gist.github.com/viscory/cff52c7e6e11bfbcea1c39cbb324fec2
https://gist.github.com/viscory/3f4e3b52e32b5c18917034e74d4240b1

The particular error I am facing is:
https://gist.github.com/viscory/f0f9653211c46fca6a2024af901efcf5
which can be summarized to: https://gist.github.com/viscory/f0f9653211c46fca6a2024af901efcf5

I have experimented with multiple different versions, but have not come closer to a solution.

Please provide guidance on solving this.

OSS sink crashes repeatedly when having to overwrite files

When the oss-sink crashes it reads topics again from the beginning of the queue and therefore tries to overwrite files. However, when querying metadata about existing files on OSS it fails. Perhaps an update for hadoop is in order.

[2020-12-18 13:54:47,759] ERROR WorkerSinkTask{id=oss-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: java.lang.NullPointerException (org.apache.kafka.connect.runtime.WorkerSinkTask)
org.apache.kafka.connect.errors.ConnectException: java.lang.NullPointerException
	at com.aliyun.oss.connect.kafka.storage.OSSStorage.create(OSSStorage.java:88)
	at com.aliyun.oss.connect.kafka.format.json.JsonRecordWriterProvider$1.<init>(JsonRecordWriterProvider.java:64)
	at com.aliyun.oss.connect.kafka.format.json.JsonRecordWriterProvider.getRecordWriter(JsonRecordWriterProvider.java:63)
	at com.aliyun.oss.connect.kafka.format.json.JsonRecordWriterProvider.getRecordWriter(JsonRecordWriterProvider.java:39)
	at com.aliyun.oss.connect.kafka.storage.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:260)
	at com.aliyun.oss.connect.kafka.storage.TopicPartitionWriter.checkRotationOrAppend(TopicPartitionWriter.java:229)
	at com.aliyun.oss.connect.kafka.storage.TopicPartitionWriter.executeState(TopicPartitionWriter.java:199)
	at com.aliyun.oss.connect.kafka.storage.TopicPartitionWriter.write(TopicPartitionWriter.java:165)
	at com.aliyun.oss.connect.kafka.OSSSinkTask.put(OSSSinkTask.java:173)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:546)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:326)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:228)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
	at org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem.getFileStatus(AliyunOSSFileSystem.java:287)
	at org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem.create(AliyunOSSFileSystem.java:117)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1118)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1098)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:987)
	at com.aliyun.oss.connect.kafka.storage.OSSStorage.create(OSSStorage.java:86)
	... 19 more
[2020-12-18 13:54:47,761] ERROR WorkerSinkTask{id=oss-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:568)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:326)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:228)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.ConnectException: java.lang.NullPointerException
	at com.aliyun.oss.connect.kafka.storage.OSSStorage.create(OSSStorage.java:88)
	at com.aliyun.oss.connect.kafka.format.json.JsonRecordWriterProvider$1.<init>(JsonRecordWriterProvider.java:64)
	at com.aliyun.oss.connect.kafka.format.json.JsonRecordWriterProvider.getRecordWriter(JsonRecordWriterProvider.java:63)
	at com.aliyun.oss.connect.kafka.format.json.JsonRecordWriterProvider.getRecordWriter(JsonRecordWriterProvider.java:39)
	at com.aliyun.oss.connect.kafka.storage.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:260)
	at com.aliyun.oss.connect.kafka.storage.TopicPartitionWriter.checkRotationOrAppend(TopicPartitionWriter.java:229)
	at com.aliyun.oss.connect.kafka.storage.TopicPartitionWriter.executeState(TopicPartitionWriter.java:199)
	at com.aliyun.oss.connect.kafka.storage.TopicPartitionWriter.write(TopicPartitionWriter.java:165)
	at com.aliyun.oss.connect.kafka.OSSSinkTask.put(OSSSinkTask.java:173)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:546)
	... 10 more
Caused by: java.lang.NullPointerException
	at org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem.getFileStatus(AliyunOSSFileSystem.java:287)
	at org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem.create(AliyunOSSFileSystem.java:117)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1118)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1098)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:987)
	at com.aliyun.oss.connect.kafka.storage.OSSStorage.create(OSSStorage.java:86)
	... 19 more

No FileSystem for scheme "oss"

I am going to persistent kafka messages to aliyun oss using confluent suite, I put my compiled kafka-connect-oss-5.5.1.jar in $CONFLUENT_HOME/share/java/kafka-connect-oss folder and create a new connector, then I found the following exceptions in kafka connect, what does it mean? How to fix it? Thanks!

org.apache.kafka.connect.errors.ConnectException: org.apache.kafka.connect.errors.ConnectException: java.lang.reflect.InvocationTargetException
at com.aliyun.oss.connect.kafka.OSSSinkTask.start(OSSSinkTask.java:112)
at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:304)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:195)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.ConnectException: java.lang.reflect.InvocationTargetException
at io.confluent.connect.storage.StorageFactory.createStorage(StorageFactory.java:55)
at com.aliyun.oss.connect.kafka.OSSSinkTask.start(OSSSinkTask.java:104)
... 9 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at io.confluent.connect.storage.StorageFactory.createStorage(StorageFactory.java:50)
... 10 more
Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "oss"
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3390)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3411)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:158)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3474)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3442)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:524)
at com.aliyun.oss.connect.kafka.storage.OSSStorage.(OSSStorage.java:50)
... 15 more

Json to Parquet with "timestamp.extractor":"RecordField",

here is the config of my connect worker
{
"name":"oss-sink-test",
"config": {
"name":"oss-sink-test",
"topics":"test",
"oss.parquet.protobuf.schema.class":"test, com.aliyun.oss.connect.kafka.format.parquet.BLog$testmessage",
"connector.class":"com.aliyun.oss.connect.kafka.OSSSinkConnector",
"format.class":"com.aliyun.oss.connect.kafka.format.parquet.ParquetJsonFormat",
"flush.size":"10000",
"tasks.max":"1",
"storage.class":"com.aliyun.oss.connect.kafka.storage.OSSStorage",
"partitioner.class":"io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"timestamp.extractor":"RecordField",
"timestamp.field":"timestamp",
"oss.bucket":"",
"partition.duration.ms":"10000",
"path.format":"/'dt'=YYYY-MM-dd/'hour'=HH",
"locale":"US",
"timezone":"",
"rotate.interval.ms":"30000"
}
}

however I got following error:
[2020-06-10 17:33:20,688] ERROR Value is not of Struct or Map type. (io.confluent.connect.storage.partitioner.TimeBasedPartitioner:322)
[2020-06-10 17:33:20,688] ERROR WorkerSinkTask{id=oss-sink-test-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask:559)
io.confluent.connect.storage.errors.PartitionException: Error encoding partition.
at io.confluent.connect.storage.partitioner.TimeBasedPartitioner$RecordFieldTimestampExtractor.extract(TimeBasedPartitioner.java:323)
at com.aliyun.oss.connect.kafka.storage.TopicPartitionWriter.executeState(TopicPartitionWriter.java:187)
at com.aliyun.oss.connect.kafka.storage.TopicPartitionWriter.write(TopicPartitionWriter.java:165)
at com.aliyun.oss.connect.kafka.OSSSinkTask.put(OSSSinkTask.java:173)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[2020-06-10 17:33:20,689] ERROR WorkerSinkTask{id=oss-sink-test-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:561)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: io.confluent.connect.storage.errors.PartitionException: Error encoding partition.
at io.confluent.connect.storage.partitioner.TimeBasedPartitioner$RecordFieldTimestampExtractor.extract(TimeBasedPartitioner.java:323)
at com.aliyun.oss.connect.kafka.storage.TopicPartitionWriter.executeState(TopicPartitionWriter.java:187)
at com.aliyun.oss.connect.kafka.storage.TopicPartitionWriter.write(TopicPartitionWriter.java:165)
at com.aliyun.oss.connect.kafka.OSSSinkTask.put(OSSSinkTask.java:173)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)

How can I upload my data(json to parquet) to the partition based on the field "timestamp"?
Thanks.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.