Giter VIP home page Giter VIP logo

kafka-connect-zeebe's Introduction

Community Extension Lifecycle: Incubating Compatible with: Camunda Platform 8

⚠️ This is not the official Kafka connector for Camunda 8. The Kafka Producer and Kafka Consumer connectors are found here. This project uses Kafka Connect from Confluence.

kafka-connect-zeebe

This Kafka Connect connector for Zeebe can do two things:

  • Send messages to a Kafka topic when a workflow instance reached a specific activity. Please note that a message is more precisely a kafka record, which is also often named event. This is a source in the Kafka Connect speak.

  • Consume messages from a Kafka topic and correlate them to a workflow. This is a Kafka Connect sink.

It can work with Camunda Platform 8 SaaS or self-managed.

Overview

See this blog post for some background on the implementation.

Examples and walk-through

Examples

The following video walks you through an example connecting to Camunda Platform 8 - SaaS:

Walkthrough

Installation and quickstart

You will find information on how to build the connector and how to run Kafka and Zeebe to get started quickly here:

Installation

Connectors

The plugin comes with two connectors, a source and a sink connector.

The source connector activates Zeebe jobs, publishes them as Kafka records, and completes them once they have been committed to Kafka.

Sink connector

In a workflow model you can wait for certain events by name (extracted from the payload by messageNameJsonPath):

Overview

The sink connector consumes Kafka records and publishes messages constructed from those records to Zeebe. This uses the Zeebe Message Correlation features. So for example if no matching workflow instance is found, the message is buffered for its time-to-live (TTL) and then discarded. You could simply ingest all messages from a Kafka topic and check if they correlate to something in Zeebe.

Configuration

In order to communicate with the Zeebe workflow engine, the connector has to create a Zeebe client.

Camunda SaaS Properties

If you want to connect to Camunda SaaS, you can use these properties:

  • zeebe.client.cloud.clusterId: Cluster ID you want to connect to. The Cluster must run on the public Camunda Cloud
  • zeebe.client.cloud.region: If you don't connect to the default region (bru-2) you can specify the region here
  • zeebe.client.cloud.clientId: Client ID for the connection. Ideally, create dedicated client credentials for this communication using the Camunda SaaS Console.
  • zeebe.client.cloud.clientSecret: The Client Secret required
  • zeebe.client.requestTimeout: timeout in milliseconds for requests to the Zeebe broker; defaults to 10000 (or 10 seconds)

If you want to connect to another endpoint than the public SaaS endpoint, you can further specify:

  • zeebe.client.cloud.token.audience: The address for which the authorization server token should be valid
  • zeebe.client.cloud.authorization.server.url: The URL of the authorization server from which the access token will be requested (by default, configured for Camunda SaaS)";

Zeebe Broker Properties

If you want to connect to a Zeebe broker hosted yourself (e.g. running on localhost), use these properties:

  • zeebe.client.broker.gateway-address: the Zeebe gateway address, specified as host:port; defaults to localhost:26500
  • zeebe.client.requestTimeout: timeout in milliseconds for requests to the Zeebe broker; defaults to 10000 (or 10 seconds)
  • zeebe.client.security.plaintext: disable secure connections to the gateway for local development setups

Common Configuration

The Zeebe client and job workers can be configured by system properties understood by the Zeebe Java Client. Typical other properties are:

  • zeebe.client.worker.maxJobsActive: the maximum number of jobs that the worker can activate in a single request; defaults to 100
  • zeebe.client.job.worker: the worker name; defaults to kafka-connector
  • zeebe.client.job.timeout: how long before a job activated by the worker is made activatable again to others, in milliseconds; defaults to 5000 (or 5 seconds)
  • job.types: a comma-separated list of job types that should be consumed by the connector; defaults to kafka
  • job.header.topics: the custom service task header which specifies to which topics the message should be published to; defaults to kafka-topic

You can find sample properties for the source connector here.

Sink

The connector does support schemas, but only supports JSON. The connector will use JSON path to extract certain properties from this JSON data:

  • message.path.correlationKey: JSONPath query to use to extract the correlation key from the record; defaults to $.correlationKey
  • message.path.messageName: JSONPath query to use to extract the message name from the record; defaults to $.messageName
  • message.path.timeToLive: JSONPath query to use to extract the time to live from the record; defaults to $.timeToLive
  • message.path.variables: JSONPath query to use to extract the variables from the record; defaults to $.variables

You can find sample properties for the sink connector here.

Source

Similar to receiving a message, the process can also create records. In your BPMN process model you can add a ServiceTask with a configurable task type which will create a record on the configured Kafka topic:

Overview

Under the hood, the connector will create one job worker that publishes records to Kafka. The record value is a JSON representation of the job itself, the record key is the job key.

Filtering Variables

You can filter the variables being sent to Kafka by adding a configuration option "job.variables" to your source properties. It must contain a comma-separated list of variables to pass to Kafka.

If this property is not present, then all variables in the scope will be sent to Kafka by default.

{
  "name": "ping",
  "config": {
    ...
    "job.variables": "a, b, andSomeVariableC",
    ...

Configuring Error Handling of Kafka Connect, e.g. Logging or Dead Letter Queues

Kafka Connect allows you to configure what happens if a message cannot be processed. A great explanation can be found in Kafka Connect Deep Dive – Error Handling and Dead Letter Queues. This of course also applies to this connector.

Remote Debugging During Development

To ease with development, you can add this environment variable to kafka-connect: "JAVA_TOOL_OPTIONS": "-agentlib:jdwp=transport=dt_socket,address=*:5005,server=y,suspend=n"

And then use remote debugging

Confluent Hub

This project is set up to be released on Confluent Hub.

When

  • Building this project via mvn package
  • You will find the plugin package as ZIP file under target/components/packages, e.g. target/components/packages/zeebe-io-kafka-connect-zeebe-1.0.0.zip
  • Which can be installed onto the Confluent Hub

kafka-connect-zeebe's People

Contributors

aanodin avatar actions-user avatar berndruecker avatar celanthe avatar chaima-mnsr avatar cmur2 avatar dependabot[bot] avatar falko avatar imgbotapp avatar jwulf avatar menski avatar npepinpe avatar parikshit-hooda avatar renovate[bot] avatar vmaleze avatar xomiamoore avatar zeebe-bors[bot] avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

kafka-connect-zeebe's Issues

Examples should use different string values

I recently helped a user through his use case with Kafka/Connect/Zeebe, and one of the hurdles he faced with the examples is we reuse the same strings everywhere, which for someone unaware might imply a connection between them. For example, the sink connector name is "pong", the message name is "pong", workflow name is "pong", etc. We should use different names to illustrate they have nothing related to each other (except in the places where they do!).

Allow BPMN message name to be hard coded in Sink connector

In some situation records on one Kafka topic should always sent into Zeebe with the same message name. Currently you have to extract the message name itself from the data of the record.

Current config

    "message.path.messageName": "$.variablesAsMap.messageName",
    "message.path.correlationKey": "$.variablesAsMap.key",
    "message.path.variables": "$.variablesAsMap"

I want to do this

    "message.path.messageName": "messageName",
    "message.path.correlationKey": "$.variablesAsMap.key",
    "message.path.variables": "$.variablesAsMap"

Or maybe we need to rename the attribute into e.g.

    "message.path.messageNameExpression": "$.variablesAsMap.messageName",     
    "message.path.messageName": "messageName",

Which I think is inline to what we do at other places in Zeebe.

And I actually wondered why the attribute is called "message.path" - what's the "path" in it? @npepinpe - probably you know?

Make source variables filtering more powerful

According to the documentation, the "job.variables" property should contain the name of a custom header that is defined in the source task and that contains a comma-separated list of variables that should be included in the produced Kafka message.

In the current implementation (0.32.0), the value put in "job.variables" is used as the actual list of variable names, not as a reference to the header containing the list of variables.

The idempotency exception from a retry needs to be ignored in the Sink

When a message redelivery is triggered, the idempotent start functionality in Zeebe throws an exception - which keeps the Sink from going on (it is blocked totally now).

So I think we should do two thinks:

  • This specific exception can be ignored and should probably logged as INFO or WARN. If the retry creates it, the record is already processed and it is all good. The log should just make sure somebody, that USES it incorrectly gets some hint.

  • Discuss if we really want to stop the whole Sink whenever there is an exception - or if we log an error and keep moving on.

{"name":"pong","connector":{"state":"RUNNING","worker_id":"connect:8083"},"tasks":[{"id":0,"state":"FAILED","worker_id":"connect:8083","trace":"org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.kafka.connect.errors.ConnectException: java.util.concurrent.CompletionException: io.grpc.StatusRuntimeException: ALREADY_EXISTS: Command rejected with code 'PUBLISH': Expected to publish a new message with id 'pong:0:170249', but a message with that id was already published\n\tat io.zeebe.kafka.connect.sink.ZeebeSinkTask.put(ZeebeSinkTask.java:65)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)\n\t... 10 more\nCaused by: java.util.concurrent.CompletionException: io.grpc.StatusRuntimeException: ALREADY_EXISTS: Command rejected with code 'PUBLISH': Expected to publish a new message with id 'pong:0:170249', but a message with that id was already published\n\tat java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)\n\tat java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)\n\tat java.util.concurrent.CompletableFuture.biRelay(CompletableFuture.java:1284)\n\tat java.util.concurrent.CompletableFuture$BiRelay.tryFire(CompletableFuture.java:1270)\n\tat java.util.concurrent.CompletableFuture$CoCompletion.tryFire(CompletableFuture.java:1020)\n\tat java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)\n\tat java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)\n\tat io.zeebe.client.impl.ZeebeClientFutureImpl.onError(ZeebeClientFutureImpl.java:80)\n\tat io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:442)\n\tat io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)\n\tat io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)\n\tat io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)\n\tat io.grpc.internal.CensusStatsModule$StatsClientInterceptor$1$1.onClose(CensusStatsModule.java:700)\n\tat io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)\n\tat io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)\n\tat io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)\n\tat io.grpc.internal.CensusTracingModule$TracingClientInterceptor$1$1.onClose(CensusTracingModule.java:399)\n\tat io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:507)\n\tat io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:66)\n\tat io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:627)\n\tat io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$700(ClientCallImpl.java:515)\n\tat io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:686)\n\tat io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:675)\n\tat io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)\n\tat io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)\n\t... 3 more\nCaused by: io.grpc.StatusRuntimeException: ALREADY_EXISTS: Command rejected with code 'PUBLISH': Expected to publish a new message with id 'pong:0:170249', but a message with that id was already published\n\tat io.grpc.Status.asRuntimeException(Status.java:533)\n\t... 20 more\n"}],"type":"sink"}

Updating sample ping-pong process to use FEEL expression in correlation key fails process

Upgrade your Zeebe installation to 0.23.1 and retry the sample ping-pong process.

Deployment will fail with the following error:

Command rejected with code 'CREATE': Expected to deploy new resources, but encountered the following errors:
'kafka-test-process': - Element: Message_19mpeg2 > extensionElements > subscription
    - ERROR: Expected expression but found static value 'key'. An expression must start with '=' (e.g. '=key').

However, if you correct the error by adding = in front of the Subscription Correlation Key for the Kafka Sink (From Kafka), the process will no longer execute to completion. Error encountered by the sink is:

com.jayway.jsonpath.PathNotFoundException: No results for path: $['variablesAsMap']['key']\n\tat io.zeebe.kafka.connect.sink.ZeebeSinkTask.put(ZeebeSinkTask.java:66)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)\n\t... 10 more\nCaused by: com.jayway.jsonpath.PathNotFoundException: No results for path: $['variablesAsMap']['key']\n\tat com.jayway.jsonpath.internal.path.EvaluationContextImpl.getValue(EvaluationContextImpl.java:133)\n\tat com.jayway.jsonpath.JsonPath.read(JsonPath.java:187)\n\tat com.jayway.jsonpath.internal.JsonContext.read(JsonContext.java:102)\n\tat io.zeebe.kafka.connect.sink.message.JsonRecordParser.parse(JsonRecordParser.java:56)\n\tat io.zeebe.kafka.connect.sink.ZeebeSinkTask.preparePublishRequest(ZeebeSinkTask.java:95)

connect service does not respond after docker-compose up

mvn clean install -DskipTests worked fine for me. However, when I try to run docker-compose up the connect service doesn’t start properly and I’m getting an empty response from the server when running any command like this:

curl localhost:8083/

Here are some logs which may help:

kafka_1 | [2020-01-22 11:36:36,591] INFO Opening socket connection to server zookeeper/172.24.0.2:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
kafka_1 | [2020-01-22 11:36:37,837] WARN Attempting to send response via channel for which there is no open connection, connection id 172.24.0.6:19092-172.24.0.7:51372-21 (kafka.network.Processor)
kafka_1 | [2020-01-22 11:36:36,629] WARN Attempting to send response via channel for which there is no open connection, connection id 172.24.0.6:19092-172.24.0.9:46752-21 (kafka.network.Processor)
kafka_1 | [2020-01-22 11:36:44,019] WARN Attempting to send response via channel for which there is no open connection, connection id 172.24.0.6:19092-172.24.0.9:46802-22 (kafka.network.Processor)
kafka_1 | [2020-01-22 11:36:44,153] INFO Socket connection established to zookeeper/172.24.0.2:2181, initiating session (org.apache.zookeeper.ClientCnxn)
kafka_1 | [2020-01-22 11:36:48,600] WARN Client session timed out, have not heard from server in 6127ms for sessionid 0x16fcd000c600001 (org.apache.zookeeper.ClientCnxn)
kafka_1 | [2020-01-22 11:36:51,094] INFO Client session timed out, have not heard from server in 6127ms for sessionid 0x16fcd000c600001, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
zookeeper_1 | 2020-01-22 11:36:43,543 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@192] - Accepted socket connection from /172.24.0.6:34712
zookeeper_1 | 2020-01-22 11:36:57,792 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@921] - Client attempting to renew session 0x16fcd000c600001 at /172.24.0.6:34712
kafka_1 | [2020-01-22 11:36:58,993] INFO [Controller id=1] Processing automatic preferred replica leader election (kafka.controller.KafkaController)
zookeeper_1 | 2020-01-22 11:37:00,263 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@666] - Invalid session 0x16fcd000c600001 for client /172.24.0.6:34712, probably expired
schema-registry_1 | [2020-01-22 11:37:05,846] INFO [Schema registry clientId=sr-1, groupId=schema-registry] Discovered group coordinator kafka:19092 (id: 2147483646 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
zookeeper_1 | 2020-01-22 11:37:01,439 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1008] - Closed socket connection for client /172.24.0.6:34712 which had sessionid 0x16fcd000c600001
schema-registry_1 | [2020-01-22 11:37:18,580] INFO [Schema registry clientId=sr-1, groupId=schema-registry] Group coordinator kafka:19092 (id: 2147483646 rack: null) is unavailable or invalid, will attempt rediscovery (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
kafka_1 | [2020-01-22 11:37:19,794] WARN Attempting to send response via channel for which there is no open connection, connection id 172.24.0.6:19092-172.24.0.9:46808-22 (kafka.network.Processor)

Open questions

There's a few open questions/issues regarding the connector and how it should eventually work. I'm just using this issue to list them for future reference.

SourceTask#poll

From the Java doc:

Poll this source task for new records. If no data is currently available, this method should block but return control to the caller regularly (by returning {@code null}) in order for the task to transition to the {@code PAUSED} state if requested to do so.

Current implementation does not block if no data is available, simply returns NULL. Seems that this causes it to be immediately polled again; not sure if this is fine or not. Doesn't affect Zeebe at any rate, but maybe we should look into blocking for some time before returning if no data is available. Also not sure when the connector is transitioned to PAUSED?

EDIT: I think not blocking is causing connect to essentially be in a busy loop polling my connector. The connect process is using 100% of a core even though there are no jobs to get from Zeebe.

Latest implementation blocks up to 5 second for some data to be available, which indeeds brings the CPU down to 4-5% while "idle", as opposed to >= 100%.

Schema Support

Seems most connectors support schemas to some extent, but I couldn't quite figure out how to support people who use schemas and also those who don't, except with an explicit configuration that specifies whether or not to use schema, which we can define as "use schema OR fallback to JSON"? That doesn't seem like a great UX though...

I think we can sort of do source schema easily though, since we know the Zeebe schema, and then users just need to configure their converter to decide how it'll be printed out. To spit out JSON they can just use the Kafka JsonConverter (connect-json module) with schemas disabled.

Where I'm not sure is with sink schemas; like I said, I don't know a good way to support people who want schema-less and people who would want with schemas. Maybe it's a non-issue and most connect users actually use schemas?

Job Queue/Time out

Current source task implementation has a background thread (Zeebe Job Worker) which polls for jobs and dumps them in a queue, which is drained by the task poll() method. Since I'm not sure when Kafka actually calls poll(), it's possible (?) that a job is activated, put into the queue, and then by the time it is removed from the queue Zeebe has timed it out and it cannot be completed. This could result in us publishing the job twice or more to the Kafka topic, though since we use the key as the record key, it's possible for users to "deduplicate" these records fairly easily. Still not a great UX.

Failing invalid jobs

I had the idea of instead of exploding when a job is invalid (e.g. does not have the minimum expect header, target topic), we rather fail it and raise an incident instead of throwing an exception. Not sure if this makes sense for a Kafka user though, they might be looking for errors in the connect logs/control center panel.

Minor issue as well, we don't block for the fail request, but that's probably not super important.

Source target topics

Changed functionality by having the target topic for the source connector be part of the job headers. I think this is more flexible, but willing to challenge this. It's also a possible list now.

One option to keep previous functionality is to make it possible to configure a default topic on the connector, and override this with job headers. I'm not too keen on this though as it can lead to unexpected behaviour (e.g. "Oops I forgot we had a default topic!"), but that might also be a non-issue since it'd have to explicitly be set by a user.

Following the Microservices-Orchestration Example leads to error Expected to find an object with property ['orderId'] in path $ but found 'java.lang.String'

Following the exact How-To in Microservices-Orchestration using docker-compose local-kafka and camunda cloud leads to an error and causes the Connector-Sink to degrade with the error:

Expected to find an object with property ['orderId'] in path $ but found 'java.lang.String'

I unsuccesfully tried setting key.converter.schemas.enable=false and value.converter.schemas.enable=true which i believe is what has been done here

Support messages in Avro format

Context:

Now there are two possibilities:

  1. Convert messages from/to Json/Avro when they come in/go out. Maybe transforms (#18) can do this and there is no need to do changes in this connector
  2. Extend the connector to be more flexible on this, for example in the sink it might be the https://github.com/camunda-community-hub/kafka-connect-zeebe/blob/master/src/main/java/io/zeebe/kafka/connect/sink/message/JsonRecordParser.java#L49 and in the source around
    private SourceRecord transformJob(final ActivatedJob job) {

We should first explore if "transforms" do the trick (in this case adding some documentation might be sufficient).

I encountered an concurrency issue

Hi experts,

I had following exception during load test with all default configuration:

[2021-08-18 16:59:23,139] ERROR WorkerSourceTask{id=approvedTickets-0} Exception thrown while calling task.commit() (org.apache.kafka.connect.runtime.WorkerSourceTask:586) java.util.ConcurrentModificationException at java.util.LinkedList$LLSpliterator.forEachRemaining(LinkedList.java:1239) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151) at java.util.stream.ForEachOps$ForEachOp$OfLong.evaluateSequential(ForEachOps.java:210) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.LongPipeline.forEach(LongPipeline.java:385) at io.zeebe.kafka.connect.source.ZeebeSourceTask.commit(ZeebeSourceTask.java:114) at org.apache.kafka.connect.runtime.WorkerSourceTask.commitSourceTask(WorkerSourceTask.java:584) at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:577) at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.commit(SourceTaskOffsetCommitter.java:113) at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.access$000(SourceTaskOffsetCommitter.java:47) at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter$1.run(SourceTaskOffsetCommitter.java:86) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

Please explain me about this issue, and how could I fix this?

Thanks

Is Zeebe accurate for me?

Hi, I have a Kafka cluster and a Kafka Connect cluster running on AKS, using Kubernetes and Strimzi. And I wanted that my Kafka Connect cluster were able to recieve and orchestrate a lot of requests of reading RSS feeds. So I want a way of dynamically instantiate Connectors which reads a desired RSS feed and also something that ensures the state of the kafka connectors is the desired one, I mean that if a connector stops working it should be restarted in anyway, or if it were deleted it would be recreated ensuring the desired system state is met. I have been thinking about doing this by making the service, which sends the RSS feeds reading requests, responsible of the Connectors states via the Kafka Connect REST API but lots of doubts arises to me. I do not know if Zeebee could help me in any way on this since after reading the docs I have not understand if it suited me. Thanks!

Use Kafka Connect "Single Message Transforms" instead of custom JsonPath

We currently use some JsonPath ourselves to configure where to read certain data from, see e.g. https://github.com/zeebe-io/kafka-connect-zeebe/blob/master/examples/ping-pong/sink.json#L13

That also means we only support Json, not Avro

It would be more Kafka Connect style, if we use the “transforms” (https://docs.confluent.io/current/connect/transforms/index.html) – as in the example in https://www.confluent.io/blog/kafka-connect-deep-dive-jdbc-source-connector

Then we could simply define the expected message format and provide some sample transforms to get there.

Source connector worker task is getting stuck and not publishing any records

worker task is getting stuck and not producing any records to kafka topic in case if there is a bad/malformed record present in the pipeline.

Step to reproduce:

  1. Try creating a process instance of a bpmn process without providing kafka topic header (or provide any topic which doesn't exist in kafka).
  2. Create instances of other bpmn process where topics are provided and exist in kafka.
  3. Following error is reported in kafka connect log and its not processing any record from zeebe.

06:34:01.044 [task-thread-go-source-0] o.a.k.c.r.WorkerTask ERROR - WorkerSourceTask{id=go-source-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted org.apache.kafka.connect.errors.ConnectException: Unrecoverable exception from producer send callback
at org.apache.kafka.connect.runtime.WorkerSourceTask.maybeThrowProducerSendException(WorkerSourceTask.java:266)
at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:320)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:248)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.common.errors.TimeoutException: Topic ota-topics not present in metadata after 30000 ms.

06:34:30.085 [SourceTaskOffsetCommitter-1] o.a.k.c.r.WorkerSourceTask ERROR - WorkerSourceTask{id=go-source-0} Failed to flush, timed out while waiting for producer to flush outstanding 1 messages
06:34:30.085 [SourceTaskOffsetCommitter-1] o.a.k.c.r.SourceTaskOffsetCommitter ERROR - WorkerSourceTask{id=go-source-0} Failed to commit offsets
06:35:35.086 [SourceTaskOffsetCommitter-1] o.a.k.c.r.WorkerSourceTask ERROR - WorkerSourceTask{id=go-source-0} Failed to flush, timed out while waiting for producer to flush outstanding 1 messages
06:35:35.086 [SourceTaskOffsetCommitter-1] o.a.k.c.r.SourceTaskOffsetCommitter ERROR - WorkerSourceTask{id=go-source-0} Failed to commit offsets
06:36:40.086 [SourceTaskOffsetCommitter-1] o.a.k.c.r.WorkerSourceTask ERROR - WorkerSourceTask{id=go-source-0} Failed to flush, timed out while waiting for producer to flush outstanding 1 messages

Expectation:
zeebe-kafka-connect should throw the error for any bad/malformed records but should continue with the processing of other records which are valid by skipping all bad records from the pipeline/buffer.

SMT vs JSONPath

At the moment we use JSONPath to extract properties from a sink record when transforming it into a Zeebe message.

From what I understand, for our sink connector, we should instead specify the format we want, and users should use Single Message Transformations to map their records to our expected format. That's in theory, no idea in practice if users actually make use of this.

Sink Connector is blocked if backpressure kicked in once

If we get a backpressure exception, the connector needs to retry instead of just giving up (blocking all further work).

Comparable to #22

{"name":"pong","connector":{"state":"RUNNING","worker_id":"connect:8083"},"tasks":[{"id":0,"state":"FAILED","worker_id":"connect:8083","trace":"org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.kafka.connect.errors.ConnectException: java.util.concurrent.CompletionException: io.grpc.StatusRuntimeException: RESOURCE_EXHAUSTED: Reached maximum capacity of requests handled\n\tat io.zeebe.kafka.connect.sink.ZeebeSinkTask.put(ZeebeSinkTask.java:65)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)\n\t... 10 more\nCaused by: java.util.concurrent.CompletionException: io.grpc.StatusRuntimeException: RESOURCE_EXHAUSTED: Reached maximum capacity of requests handled\n\tat java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)\n\tat java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)\n\tat java.util.concurrent.CompletableFuture.biRelay(CompletableFuture.java:1286)\n\tat java.util.concurrent.CompletableFuture.andTree(CompletableFuture.java:1307)\n\tat java.util.concurrent.CompletableFuture.andTree(CompletableFuture.java:1305)\n\tat java.util.concurrent.CompletableFuture.andTree(CompletableFuture.java:1305)\n\tat java.util.concurrent.CompletableFuture.andTree(CompletableFuture.java:1303)\n\tat java.util.concurrent.CompletableFuture.andTree(CompletableFuture.java:1305)\n\tat java.util.concurrent.CompletableFuture.andTree(CompletableFuture.java:1303)\n\tat java.util.concurrent.CompletableFuture.andTree(CompletableFuture.java:1303)\n\tat java.util.concurrent.CompletableFuture.andTree(CompletableFuture.java:1303)\n\tat java.util.concurrent.CompletableFuture.andTree(CompletableFuture.java:1303)\n\tat java.util.concurrent.CompletableFuture.allOf(CompletableFuture.java:2225)\n\tat io.zeebe.kafka.connect.sink.ZeebeSinkTask.publishMessages(ZeebeSinkTask.java:88)\n\tat io.zeebe.kafka.connect.sink.ZeebeSinkTask.lambda$put$0(ZeebeSinkTask.java:57)\n\tat io.zeebe.kafka.connect.util.ManagedClient.withClient(ManagedClient.java:52)\n\tat io.zeebe.kafka.connect.sink.ZeebeSinkTask.put(ZeebeSinkTask.java:57)\n\t... 11 more\nCaused by: io.grpc.StatusRuntimeException: RESOURCE_EXHAUSTED: Reached maximum capacity of requests handled\n\tat io.grpc.Status.asRuntimeException(Status.java:533)\n\tat io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:442)\n\tat io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)\n\tat io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)\n\tat io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)\n\tat io.grpc.internal.CensusStatsModule$StatsClientInterceptor$1$1.onClose(CensusStatsModule.java:700)\n\tat io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)\n\tat io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)\n\tat io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)\n\tat io.grpc.internal.CensusTracingModule$TracingClientInterceptor$1$1.onClose(CensusTracingModule.java:399)\n\tat io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:507)\n\tat io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:66)\n\tat io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:627)\n\tat io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$700(ClientCallImpl.java:515)\n\tat io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:686)\n\tat io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:675)\n\tat io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)\n\tat io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)\n\t... 3 more\n"}],"type":"sink"}

Update to Zeebe 1.0

As zeebe v1 is finally out and had some breaking changes, I was wondering is this compatible with zeebe v1?

If so, what needs to be done?

Sink connector is stucked without any log messages if Zeebe is unavailable

Hi team!

While testing locally, it seems that the connector gets stuck if we provide invalid value to connect to Zeebe. If I deploy a connector locally, without having Zeebe running, I can observe that the thread is blocked indefinitely on https://github.com/zeebe-io/kafka-connect-zeebe/blob/master/src/main/java/io/zeebe/kafka/connect/sink/ZeebeSinkTask.java#L58.

My connector configuration is:

{
  "name": "zeebe",
  "config": {
    "connector.class": "io.zeebe.kafka.connect.ZeebeSinkConnector",
    "tasks.max": "1",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "topics": "kafka-to-zeebe",
    "zeebe.client.broker.contactPoint": "127.0.0.1:26500",
    "zeebe.client.security.plaintext": true,
    "message.path.messageName": "$.eventType",
    "message.path.correlationKey": "$.orderId",
    "message.path.variables": "$.['amount', 'orderId']"
  }
}

If I enable DEBUG log, I can only see:

ESC[36mconnect            |ESC[0m [2020-05-05 15:29:36,527] DEBUG Evaluating path: $['eventType'] (com.jayway.jsonpath.internal.path.CompiledPath)
ESC[36mconnect            |ESC[0m [2020-05-05 15:29:36,527] DEBUG Evaluating path: $['timeToLive'] (com.jayway.jsonpath.internal.path.CompiledPath)
ESC[36mconnect            |ESC[0m [2020-05-05 15:29:36,527] TRACE No timeToLive found, ignoring (io.zeebe.kafka.connect.sink.message.JsonRecordParser)
ESC[36mconnect            |ESC[0m [2020-05-05 15:29:36,527] DEBUG Evaluating path: $['amount','orderId'] (com.jayway.jsonpath.internal.path.CompiledPath)
ESC[36mconnect            |ESC[0m [2020-05-05 15:29:36,528] DEBUG Publishing message io.zeebe.kafka.connect.sink.message.Message@1573292e (io.zeebe.kafka.connect.sink.ZeebeSinkTask)

While looking at the thread, I can see that the thread of the connector is indeed stucked:

DeepinScreenshot_select-area_20200505165533

I understand that the root cause is my invalid configuration, but the fact that there is no log at all, even in DEBUG or TRACE, makes it really hard to debug. Having to open VisualVM to understand that there is a typo in the configuration is fun, but time-consuming ;)

Feature request, publish messages in bulk to zeebe

Currently, SINK connector consume messages in batch but publishing message to zeebe in multiple thread. https://github.com/camunda-community-hub/kafka-connect-zeebe/blob/master/src/main/java/io/zeebe/kafka/connect/sink/ZeebeSinkTask.java#L102

For example, it consume 10 records in batch then it will fire 10 thread parallel to publish message. This creates bottleneck for improving performance. Due to this we can not run multiple task of SINK connector because its start throwing resource exhaust error.
I am not sure why zeebe-java-client does not have bulk message publish support. This is really needed in my use case. We have very high load 100K RPM (activation of process instance). This feature help to minimise the resource exhaust exception because We will publish message for consumed kafka records. This will improve the performance.

Zeebe Sink Connector changes to "Degraded" if a empty message is published.

Hi, while testing zeebe connector for kafka, I found something that was causing me some trouble, and maybe is some kind of bug or undesired behavior.

If an empty message is published in the kafka topic, the sink connector will consumes it, and raise the exception:

Running on Kubernetes from GCP (GKE):

  • Zeebe [camunda/zeebe] 0.24.1
  • Connector [kafka-connect-zeebe] 0.24.0
  • Kafka [confluentinc/cp-enterprise-kafka] 5.3.0
org.apache.kafka.connect.errors.ConnectException: java.lang.IllegalArgumentException: json string can not be null or empty
	at io.zeebe.kafka.connect.sink.ZeebeSinkTask.put(ZeebeSinkTask.java:65)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: json string can not be null or empty
	at com.jayway.jsonpath.internal.Utils.notEmpty(Utils.java:386)
	at com.jayway.jsonpath.internal.ParseContextImpl.parse(ParseContextImpl.java:36)
	at com.jayway.jsonpath.JsonPath.parse(JsonPath.java:599)
	at io.zeebe.kafka.connect.sink.message.JsonRecordParser.parseDocument(JsonRecordParser.java:79)
	at io.zeebe.kafka.connect.sink.message.JsonRecordParser.parse(JsonRecordParser.java:53)
	at io.zeebe.kafka.connect.sink.ZeebeSinkTask.preparePublishRequest(ZeebeSinkTask.java:94)
	at io.zeebe.kafka.connect.sink.ZeebeSinkTask.lambda$publishMessages$1(ZeebeSinkTask.java:84)
	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438)
	at io.zeebe.kafka.connect.sink.ZeebeSinkTask.publishMessages(ZeebeSinkTask.java:87)
	at io.zeebe.kafka.connect.sink.ZeebeSinkTask.lambda$put$0(ZeebeSinkTask.java:57)
	at io.zeebe.kafka.connect.util.ManagedClient.withClient(ManagedClient.java:53)
	at io.zeebe.kafka.connect.sink.ZeebeSinkTask.put(ZeebeSinkTask.java:57)

Followed by:

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.ConnectException: java.lang.IllegalArgumentException: json string can not be null or empty
	at io.zeebe.kafka.connect.sink.ZeebeSinkTask.put(ZeebeSinkTask.java:65)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
	... 10 more
Caused by: java.lang.IllegalArgumentException: json string can not be null or empty
	at com.jayway.jsonpath.internal.Utils.notEmpty(Utils.java:386)
	at com.jayway.jsonpath.internal.ParseContextImpl.parse(ParseContextImpl.java:36)
	at com.jayway.jsonpath.JsonPath.parse(JsonPath.java:599)
	at io.zeebe.kafka.connect.sink.message.JsonRecordParser.parseDocument(JsonRecordParser.java:79)
	at io.zeebe.kafka.connect.sink.message.JsonRecordParser.parse(JsonRecordParser.java:53)
	at io.zeebe.kafka.connect.sink.ZeebeSinkTask.preparePublishRequest(ZeebeSinkTask.java:94)
	at io.zeebe.kafka.connect.sink.ZeebeSinkTask.lambda$publishMessages$1(ZeebeSinkTask.java:84)
	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438)
	at io.zeebe.kafka.connect.sink.ZeebeSinkTask.publishMessages(ZeebeSinkTask.java:87)
	at io.zeebe.kafka.connect.sink.ZeebeSinkTask.lambda$put$0(ZeebeSinkTask.java:57)
	at io.zeebe.kafka.connect.util.ManagedClient.withClient(ManagedClient.java:53)
	at io.zeebe.kafka.connect.sink.ZeebeSinkTask.put(ZeebeSinkTask.java:57)

The logs are from 'Kafka Connect'. The connector do not recover, it stays as "Degraded" (status from confluent control center) and a fresh deploy is needed

Instability on correlation key definition in sink configuration

I ran the simple ping-pong example without issue most of the day. Then I attempted to model my own process based on the ping-pong example but could not get the sink to pick up the message with my own correlation key. The error I have been getting all day is:

com.jayway.jsonpath.PathNotFoundException: No results for path: $['correlationKey']\n\tat com.jayway.jsonpath.internal.path.EvaluationContextImpl.getValue

It does not matter what key I define. And yes, my workflow is defining the message correlation key as a FEEL expression =key

I even went back and modified the original ping-pong example and the sink can no longer correlate messages to the workflow. I wonder if this is related to #35 as the record is certainly over escaped.

Here is the Kafka message received by the sink connector:

[{"topic":"pong-topic","partition":0,"offset":0,"timestamp":1588651176053,"timestampType":"CREATE_TIME","headers":[],"key":"4503599627370509","value":"\"{\\\"key\\\":4503599627370509,\\\"type\\\":\\\"ping-eventType\\\",\\\"customHeaders\\\":{\\\"topic\\\":\\\"pong-topic\\\"},\\\"workflowInstanceKey\\\":4503599627370501,\\\"bpmnProcessId\\\":\\\"kafka-connector-test\\\",\\\"workflowDefinitionVersion\\\":1,\\\"workflowKey\\\":2251799813685252,\\\"elementId\\\":\\\"kafka-source\\\",\\\"elementInstanceKey\\\":4503599627370508,\\\"worker\\\":\\\"kafka-connector\\\",\\\"retries\\\":3,\\\"deadline\\\":1588651180659,\\\"variables\\\":\\\"{\\\\\\\"key\\\\\\\":\\\\\\\"12345\\\\\\\",\\\\\\\"payload\\\\\\\":{\\\\\\\"foo\\\\\\\":\\\\\\\"bar\\\\\\\"},\\\\\\\"eventType\\\\\\\":\\\\\\\"ping-eventType\\\\\\\"}\\\",\\\"variablesAsMap\\\":{\\\"key\\\":\\\"12345\\\",\\\"payload\\\":{\\\"foo\\\":\\\"bar\\\"},\\\"eventType\\\":\\\"ping-eventType\\\"}}\"","__confluent_index":0}]

ClassCastException: class io.netty.channel.epoll.EpollSocketChannel

Using Confluent Platform(CP) 7.0.1, I get the following error with the connector (tested with 0.32.0 and 0.40.0):

[2022-06-23 07:44:54,564] ERROR [zeebe-sink|task-2] WorkerSourceTask{id=zeebe-sink-2} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:206)
java.lang.ExceptionInInitializerError
at io.grpc.netty.NettyChannelBuilder.<clinit>(NettyChannelBuilder.java:83)
at io.camunda.zeebe.client.impl.ZeebeClientImpl.buildChannel(ZeebeClientImpl.java:127)
at io.camunda.zeebe.client.impl.ZeebeClientImpl.<init>(ZeebeClientImpl.java:83)
at io.camunda.zeebe.client.impl.ZeebeClientBuilderImpl.build(ZeebeClientBuilderImpl.java:300)
at io.zeebe.kafka.connect.util.ZeebeClientHelper.buildClient(ZeebeClientHelper.java:62)
at io.zeebe.kafka.connect.source.ZeebeSourceTask.start(ZeebeSourceTask.java:52)
at org.apache.kafka.connect.runtime.WorkerSourceTask.initializeAndStart(WorkerSourceTask.java:229)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:197)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:254)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassCastException: class io.netty.channel.epoll.EpollSocketChannel
at java.base/java.lang.Class.asSubclass(Class.java:3640)
at io.grpc.netty.Utils.epollChannelType(Utils.java:322)
at io.grpc.netty.Utils.<clinit>(Utils.java:114)
... 14 more
[25.480s][info][class,load  ] org.apache.kafka.connect.runtime.WorkerTask$$Lambda$865/0x0000000100955c40 source: org.apache.kafka.connect.runtime.WorkerTask
[2022-06-23 07:44:54,566] ERROR [zeebe-sink|task-0] WorkerSourceTask{id=zeebe-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:206)
java.lang.NoClassDefFoundError: Could not initialize class io.grpc.netty.NettyChannelBuilder
at io.camunda.zeebe.client.impl.ZeebeClientImpl.buildChannel(ZeebeClientImpl.java:127)
at io.camunda.zeebe.client.impl.ZeebeClientImpl.<init>(ZeebeClientImpl.java:83)
at io.camunda.zeebe.client.impl.ZeebeClientBuilderImpl.build(ZeebeClientBuilderImpl.java:300)
at io.zeebe.kafka.connect.util.ZeebeClientHelper.buildClient(ZeebeClientHelper.java:62)
at io.zeebe.kafka.connect.source.ZeebeSourceTask.start(ZeebeSourceTask.java:52)
at org.apache.kafka.connect.runtime.WorkerSourceTask.initializeAndStart(WorkerSourceTask.java:229)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:197)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:254)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)

The problem is that the connector depends on netty-transport-classes-epoll-4.1.73.Final.jar but it is not included in the connector (at least on the version shipped on Confluent Hub).

Since the connector does not ship with this dependency, it behaves differently depending on the deployments:

In CP 7.0.1, there is a package io.netty.channe.epoll, but it is from a different jar (so not working):

io.netty.channel.epoll.Epoll source: file:/usr/share/java/confluent-security/connect/netty-all-4.1.68.Final.jar

in CP 7.1.1, it is present there:

io.netty.channel.epoll.Epoll source: file:/usr/share/java/confluent-security/connect/netty-transport-classes-epoll-4.1.73.Final.jar

After adding missing netty-transport-classes-epoll-4.1.73.Final.jar in lib folder, the connector works fine.
Can you ship this dependency directly in the connector ?

How to connect zeebe to AWS MSK

Hi Team,

I am looking for connecting Zeebe to my kafka implementation which is there in AWS msk.

But i am not sure how to connect to same. Is there a way in docker-compose.yaml file where i can set the broker endpoints of my MSK.

`version: '2.1'

services:
zeebe:
image: camunda/zeebe:0.21.1
hostname: zeebe
ports:
- "26500:26500"
- "9600:9600"
volumes:
- ./zeebe.cfg.toml:/usr/local/zeebe/conf/zeebe.cfg.toml
environment:
ZEEBE_INSECURE_CONNECTION: "true"

operate:
image: camunda/operate:1.1.0
ports:
- "8080:8080"
volumes:
- ./operate.yml:/usr/local/operate/config/application.yml

elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch-oss:6.8.3
ports:
- "9200:9200"
environment:
- discovery.type=single-node
- cluster.name=elasticsearch
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"

zookeeper:
image: zookeeper:3.4.9
restart: unless-stopped
hostname: zookeeper
ports:
- "2181:2181"
environment:
ZOO_PORT: 2181
ZOO_SERVERS: server.1=zookeeper:2888:3888

kafka:
image: confluentinc/cp-enterprise-kafka:5.3.0
hostname: kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_BROKER_ID: 1
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_METRIC_REPORTERS: "io.confluent.metrics.reporter.ConfluentMetricsReporter"
KAFKA_CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: "kafka:19092"
KAFKA_CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: "zookeeper:2181"
depends_on:
- zookeeper

schema-registry:
image: confluentinc/cp-schema-registry:5.3.0
hostname: schema-registry
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka:19092
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
depends_on:
- zookeeper
- kafka

connect:
image: confluentinc/cp-kafka-connect:5.3.0
hostname: connect
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: "kafka:19092"
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: "connect-group"
CONNECT_CONFIG_STORAGE_TOPIC: "connect-configs"
CONNECT_OFFSET_STORAGE_TOPIC: "connect-offsets"
CONNECT_STATUS_STORAGE_TOPIC: "connect-status"
CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_REST_ADVERTISED_HOST_NAME: "connect"
CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR,io.zeebe.kafka.connect=TRACE,io.zeebe.client=WARN"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_PLUGIN_PATH: '/usr/share/java,/etc/kafka-connect/jars'
volumes:
- ./connectors:/etc/kafka-connect/jars/
depends_on:
- schema-registry
- kafka
- zeebe

control-center:
image: confluentinc/cp-enterprise-control-center:5.3.0
hostname: control-center
ports:
- "9021:9021"
environment:
CONTROL_CENTER_ZOOKEEPER_CONNECT: "zookeeper:2181"
CONTROL_CENTER_BOOTSTRAP_SERVERS: "kafka:19092"
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_CONNECT_CLUSTER: "connect:8083"
CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
depends_on:
- zookeeper
- schema-registry
- kafka
- connect`

Send valid JSON from Source

When sending messages from a Zeebe workflow with the example ping-pong configs, messages are overly escaped to the point that they aren't valid JSON anymore:

{\"key\":2251799813685264,
\"type\":\"ping\",
\"customHeaders\":{\"topic\":\"pong\"},
\"workflowInstanceKey\":2251799813685256,
\"bpmnProcessId\":\"ping-pong\",
\"workflowDefinitionVersion\":1,
\"workflowKey\":2251799813685249,
\"elementId\":\"ServiceTask_1sx6rbe\",
\"elementInstanceKey\":2251799813685263,
\"worker\":\"kafka-connector\",
\"retries\":3,\"deadline\":1580056465256,
\"variables\":\"{\\\"foo\\\":\\\"bar\\\",\\\"key\\\":123456,\\\"name\\\":\\\"pong\\\"}\",
\"variablesAsMap\":{\"foo\":\"bar\",\"key\":123456,\"name\":\"pong\"}}

and while this works for the round trip other consumers expecting proper JSON are failing on this message.

Possibly not closing all resources for SourceTask

Current behaviour is modelled against the Kafka JDBC connector, which essentially keeps a running flag that is set to false on stop. Resources are closed once poll is called and running is false, as opposed to in stop, since if already polling we might not those resources (stop is called from another thread).

However I'm not 100% sure that the following is not possible:

  1. poll() is called
  2. stop() is called and running is set to false, but poll is already running
  3. poll() returns with some jobs (so nothing closed)

If poll() isn't called again then resources aren't closed. Should verify this.

Failed to deploy artifacts: Could not transfer artifact, status: 401 Unauthorized

This is wired - it still worked on January 16th, but not today - there were any specific changes I am aware of (I tried something today but rolled it back as it is was not the problem): https://github.com/camunda-community-hub/kafka-connect-zeebe/runs/4957329187?check_suite_focus=true

the target URL seems to be correct: https://oss.sonatype.org/content/repositories/snapshots/io/zeebe/kafka-connect-zeebe/0.32.1-SNAPSHOT/

@celanthe: Any idea? Any credentials removed recently?

Consider not using BlockingQueue but just Lock/Conditions in SourceTask

The SourceTask#poll API specifies that we should block to wait for data, but it should be interruptible when stop is called. At the moment it's not, we simply block for 5 seconds until there is data, and if no data is available return control to the caller (which, unless stopping, will just call poll again).

So it could be a little nicer to allow poll to be interrupted by using a combination of ReetrantLock and Condition instead of the BlockingQueue, as suggested by @Zelldon.

Add integration tests

We currently have very little testing going on, except those provided by menski (:muscle:)

It shouldn't be too difficult to add integration tests using TestContainers, and it would greatly help to ensure other people can easily contribute to this.

Publish on Confluent Hub

One of our goals is to publish the connector to Confluent Hub. I think we're almost there, but we're missing some things, or some things I'm not sure are complete. Here are the official guidelines. At the moment, when doing a mvn package, it will produce a zip file in the target folder which has the correct archive format, with manifest, logos, doc, JARs, etc. But there are some open issues I think:

  1. Logos: are these the Camunda approved ones?
  2. Package metadata and description: is the below correct? Who to ask?
<configuration>
  <title>Kafka Connect Zeebe</title>
  <documentationUrl>https://github.com/zeebe-io/kafka-connect-zeebe/</documentationUrl>
  <description>Write description</description>
  <logo>logos/zeebe.png</logo>

  <supportProviderName>Camunda Services GmbH</supportProviderName>
  <supportSummary>Camunda supports the Zeebe source and sink connectors as a community
    project.
  </supportSummary>
  <supportUrl>https://github.com/zeebe-io/kafka-connect-zeebe/</supportUrl>
  <supportLogo>logos/camunda.png</supportLogo>

  <ownerUsername>zeebe-io</ownerUsername>
  <ownerType>organization</ownerType>
  <ownerName>Camunda Services GmbH</ownerName>
  <ownerUrl>https://zeebe.io/</ownerUrl>
  <ownerLogo>logos/zeebe.png</ownerLogo>

  <!-- TODO later
  <dockerNamespace>camunda</dockerNamespace>
  <dockerName>cp-kafka-connect</dockerName>
  <dockerTag>${project.version}</dockerTag>
  -->

  <componentTypes>
    <componentType>sink</componentType>
    <componentType>source</componentType>
  </componentTypes>

  <tags>
    <tag>Zeebe</tag>
    <tag>zeebe</tag>
    <tag>camunda</tag>
    <tag>Camunda</tag>
    <tag>workflows</tag>
    <tag>orchestration</tag>
  </tags>

  <requirements>
    <requirement>Zeebe ${version.zeebe}</requirement>
  </requirements>

  <deliveryGuarantee>
    <deliveryGuarantee>exactlyOnce</deliveryGuarantee>
  </deliveryGuarantee>

  <confluentControlCenterIntegration>true</confluentControlCenterIntegration>
</configuration>
  1. Licenses: seems most connectors package the licenses of all their dependencies as well, since these get distributed as well. I haven't done this yet, so we probably need to do something there.

Improve user feedback on configuration mistakes

The only issue I've found with it so far is, the sink can sometimes crash an not recover if you misconfigure the sink.json file.
For example: - i configured it to look for a variable named foo, but forgot to add to the payload from the C# worker.
Crashed the sink. took me a while to debug.

NPE on ZeebeSourceTask.java:114

java.lang.NullPointerException
connect_1 | at io.zeebe.kafka.connect.source.ZeebeSourceTask.commit(ZeebeSourceTask.java:114)
connect_1 | at org.apache.kafka.connect.runtime.WorkerSourceTask.commitSourceTask(WorkerSourceTask.java:507)
connect_1 | at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:448)
connect_1 | at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.commit(SourceTaskOffsetCommitter.java:111)
connect_1 | at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.access$000(SourceTaskOffsetCommitter.java:46)
connect_1 | at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter$1.run(SourceTaskOffsetCommitter.java:84)
connect_1 | at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
connect_1 | at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
connect_1 | at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
connect_1 | at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
connect_1 | at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
connect_1 | at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
connect_1 | at java.lang.Thread.run(Thread.java:748)

{
"name": "flow-retail-source",
"config": {
"connector.class": "io.zeebe.kafka.connect.ZeebeSourceConnector",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",

"zeebe.client.gateway.address": "zeebe_network:26500",
"zeebe.client.requestTimeout": "10000",
"zeebe.client.security.plaintext": true,
"__zeebe.client.cloud.clusterId": "5be4da01-1f35-4deb-8681-592c7001d1bd",
"__zeebe.client.cloud.clientId": "8Yni-2iVjOzUMsai_xQrnoY-y2EGlN_H",
"__zeebe.client.cloud.clientSecret": "RH65GZm1N4SygpLEHiqPcPkd80fz_sF2LNZfrAsC6ttIoBy288bkAexscf1PG_PV",

"zeebe.client.job.worker": "kafka-connector",
"zeebe.client.worker.maxJobsActive": "100",
"zeebe.client.job.pollinterval": "2000",
"zeebe.client.job.timeout": "5000",
"job.types": "sendMessage",
"job.header.topics": "kafka-topic"

}
}

Newest connector version on Confluent Hub

Hello everyone,

I saw that the Zeebe Connector can also be installed via Confluent Hub using the confluent-hub install command. There it says that the last version available is 0.22.0.
Since I assume that version 0.22.0 is targeted at Zeebe below 1.0.0, would it be possible to update the Connector to version 0.32.0?

Thanks in advance 😄

ability to work with kafka headers

It would be great if we had the ability to pull kafka headers out as variables on the sink side and then pack the headers from variables on the source side. not sure if i saw a way to do this with the current implementation of the connector. if there is a way, it would be great to know what that technique looks like.

Expected to find an object with property ['orderId'] in path $ but found 'java.lang.String'.

Hi, it's same error like this issue

  1. Create "create instance --variables "{"orderId": 1}" order"
  2. Manual push message {"eventType": "OrderPaid", "orderId": 1, "amount": 4000} to topic payment-confirm

And after have Degraded sink connector and follow log message -
Caused by: com.jayway.jsonpath.PathNotFoundException: Expected to find an object with property ['orderId'] in path $ but found 'java.lang.String'.

NullPointerException io.zeebe.kafka.connect.source.ZeebeSourceTask.commit

Hi
I have this issue with kafka-connect-zeebe-0.32.1-SNAPSHOT.jar

This is my docker-compose

version: "3"

services:
    zeebe:
        image: camunda/zeebe:${CAMUNDA_CLOUD_VERSION:-1.2.4}
        container_name: zeebe
        environment:
            - ZEEBE_BROKER_EXPORTERS_ELASTICSEARCH_CLASSNAME=io.camunda.zeebe.exporter.ElasticsearchExporter
            - ZEEBE_BROKER_EXPORTERS_ELASTICSEARCH_ARGS_URL=http://elasticsearch:9200
            - ZEEBE_BROKER_EXPORTERS_ELASTICSEARCH_ARGS_BULK_SIZE=1
        ports:
            - 26500:26500
        volumes:
            - zeebe:/usr/local/zeebe/data
        networks:
            - camunda-cloud
        depends_on:
            - elasticsearch

    operate:
        image: camunda/operate:${CAMUNDA_CLOUD_VERSION:-1.2.4}
        container_name: operate
        environment:
            - CAMUNDA_OPERATE_ZEEBE_GATEWAYADDRESS=zeebe:26500
            - CAMUNDA_OPERATE_ELASTICSEARCH_URL=http://elasticsearch:9200
            - CAMUNDA_OPERATE_ZEEBEELASTICSEARCH_URL=http://elasticsearch:9200
        ports:
            - 8080:8080
        networks:
            - camunda-cloud
        depends_on:
            - elasticsearch

    tasklist:
        image: camunda/tasklist:${CAMUNDA_CLOUD_VERSION:-1.2.4}
        container_name: tasklist
        environment:
            - CAMUNDA_TASKLIST_ZEEBE_GATEWAYADDRESS=zeebe:26500
            - CAMUNDA_TASKLIST_ELASTICSEARCH_URL=http://elasticsearch:9200
            - CAMUNDA_TASKLIST_ZEEBEELASTICSEARCH_URL=http://elasticsearch:9200
        ports:
            - 8081:8080
        networks:
            - camunda-cloud
        depends_on:
            - elasticsearch

    elasticsearch:
        image: docker.elastic.co/elasticsearch/elasticsearch:${ELASTIC_VERSION:-7.14.1}
        container_name: elasticsearch
        environment:
            - cluster.name=camunda-cloud
            - discovery.type=single-node
            - bootstrap.memory_lock=true
            - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
        ulimits:
            memlock:
                soft: -1
                hard: -1
        volumes:
            - elastic:/usr/share/elasticsearch/data
        networks:
            - camunda-cloud

    zookeeper:
        image: confluentinc/cp-zookeeper:latest
        restart: unless-stopped
        hostname: zookeeper
        environment:
            ZOOKEEPER_CLIENT_PORT: 2181
            ZOOKEEPER_TICK_TIME: 2000
        ports:
            - 22181:2181

    kafka:
        image: confluentinc/cp-server:7.0.0
        hostname: kafka
        container_name: kafka
        depends_on:
            - zookeeper
        ports:
            - "9092:9092"
            - "9101:9101"
        environment:
            KAFKA_BROKER_ID: 1
            KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
            KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
            KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
            KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
            KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
            KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
            KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
            KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
            KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
            KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
            KAFKA_JMX_PORT: 9101
            KAFKA_JMX_HOSTNAME: localhost
            KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8085
            CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: kafka:29092
            CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
            CONFLUENT_METRICS_ENABLE: 'true'
            CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'


    schema-registry:
        image: confluentinc/cp-schema-registry:7.0.0
        hostname: schema-registry
        container_name: schema-registry
        depends_on:
            - kafka
        ports:
            - "8085:8081"
        environment:
            SCHEMA_REGISTRY_HOST_NAME: schema-registry
            SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'kafka:29092'
            SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081

    connect:
        image: cnfldemos/cp-server-connect-datagen:0.5.0-6.2.0
        hostname: connect
        container_name: connect
        depends_on:
            - kafka
            - schema-registry
        ports:
            - "8083:8083"
        environment:
            CONNECT_BOOTSTRAP_SERVERS: 'kafka:29092'
            CONNECT_REST_ADVERTISED_HOST_NAME: connect
            CONNECT_REST_PORT: 8083
            CONNECT_GROUP_ID: compose-connect-group
            CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
            CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
            CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
            CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
            CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
            CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
            CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
            CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
            CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
            CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-7.0.0.jar
            CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
            CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
            CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
            CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
            CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
            CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
            CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
            CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=INFO,org.reflections=ERROR,io.zeebe.kafka.connect=TRACE,io.zeebe.client=INFO"
            CONNECT_PLUGIN_PATH: '/usr/share/java,/usr/share/confluent-hub-components,/etc/kafka-connect/jars'
        volumes:
            - ./connectors:/etc/kafka-connect/jars/
            
    control-center:
        image: confluentinc/cp-enterprise-control-center:7.0.0
        hostname: control-center
        container_name: control-center
        depends_on:
            - zookeeper
            - kafka
            - schema-registry
            - connect
        ports:
            - "9021:9021"
        environment:
            CONTROL_CENTER_BOOTSTRAP_SERVERS: 'kafka:29092'
            CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'connect:8083'
            CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
            CONTROL_CENTER_REPLICATION_FACTOR: 1
            CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
            CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
            CONFLUENT_METRICS_TOPIC_REPLICATION: 1
            PORT: 9021
            
volumes:
    zeebe:
        driver: local
    elastic:
        driver: local

networks:
    camunda-cloud:
        driver: bridge

When I add th ping connector, the controlcenter show:
State: FAILED task id: connect:8083
immagine

In console I see:

 [2021-11-21 11:55:36,373] ERROR WorkerSourceTask{id=ping-0} Exception thrown while calling task.commit() (org.apache.kafka.connect.runtime.WorkerSourceTask)
 java.lang.NullPointerException
 	at io.zeebe.kafka.connect.source.ZeebeSourceTask.commit(ZeebeSourceTask.java:114)
 	at org.apache.kafka.connect.runtime.WorkerSourceTask.commitSourceTask(WorkerSourceTask.java:571)
 	at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:515)
 	at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.commit(SourceTaskOffsetCommitter.java:110)
 	at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.lambda$schedule$0(SourceTaskOffsetCommitter.java:84)
 	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
 	at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
 	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
 	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
 	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
 	at java.base/java.lang.Thread.run(Thread.java:829)

Originally posted by @lrealdi in #57 (comment)

Connection refused when kafka connect hits zeebe

I am using same docker compose file on my mac and getting below error at kafka connect while creating source connector

Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: localhost/127.0.0.1:26500
2020-11-02T06:44:12.537124900Z Caused by: java.net.ConnectException: Connection refused
2020-11-02T06:44:12.537149800Z at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
2020-11-02T06:44:12.537174400Z at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
2020-11-02T06:44:12.537199000Z at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330)
2020-11-02T06:44:12.537237900Z at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334)
2020-11-02T06:44:12.537264900Z at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702)
2020-11-02T06:44:12.537290000Z at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
2020-11-02T06:44:12.537309000Z at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
2020-11-02T06:44:12.537325900Z at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
2020-11-02T06:44:12.537392500Z at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
2020-11-02T06:44:12.537435100Z at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
2020-11-02T06:44:12.537466600Z at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
2020-11-02T06:44:12.537497100Z at java.lang.Thread.run(Thread.java:748)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.