Giter VIP home page Giter VIP logo

kafka-connect-mqtt's People

Contributors

johanvandevenne avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar

kafka-connect-mqtt's Issues

Deploying as kubernetes component

Hello,
I was wondering if you tried deploying it as kubernetes pod ?
I was trying it and encountered some problems, I had to define an "entrypoint" script which looks like this :

# shellcheck disable=SC1083

# Wait until the REST server is available
while [ "$(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors)" -ne 200 ]; do
  sleep 5
done

# Configure the connector
curl -i -X PUT -H "Content-Type:application/json"
http://localhost:8083/connectors/lysr-source-connector/config \
  -d "{
  \"connector.class\": \"ai.lysr.mqttconnector.LysrSourceMqttConnector\",
  \"tasks.max\": \"1\",
  \"topics.regex\": \"^[0-9]*$\",
  \"key.converter\": \"org.apache.kafka.connect.storage.StringConverter\",
  \"value.converter\": \"org.apache.kafka.connect.storage.StringConverter\",
  \"mqtt.broker\": \"$MQTT_BROKER_URL\",
  \"mqtt.clientID\": \"$MQTT_CLIENT_ID\",
  \"mqtt.topic\": \"$MQTT_TOPIC\",
  \"mqtt.userName\": \"$MQTT_USERNAME\",
  \"mqtt.password\": \"$MQTT_PASSWORD\"
}"

To configure the connector and the dockerfile is the following:

FROM confluentinc/cp-kafka-connect-base:7.0.1
ENV CONNECT_PLUGIN_PATH=/app
WORKDIR /app
COPY build/app .

But I get the error :
===> Configuring ...
CONNECT_KEY_CONVERTER is required.
Command [/usr/local/bin/dub ensure CONNECT_KEY_CONVERTER] FAILED !

Any idea ?

Not an error, but small mismatch between sourcecode an readme regarding username

A small issue with very low priority.

Within the readme, the MQTT username is declared as "mqtt.username" in sourcecode it is defined as "mqtt.userName". If you configure the connector manually it is usually not a blocker. In my case i have written a small tool to provide AsyncAPI Bindings automatically into the related infrastructure (kafka & consul) with a mixture between the confluent mqtt sink and source connector (using the regex pattern) and this one here (because it solves a problem to switch topic name between kafka and mqtt in a sink configuration). I built a workaround within the provisioner that fixes the difference for me, so that is really a problem.

Best Regards

No error, but no MQTT message detected by KAFKA

MQTT Broker = nanomq, mosquito
kafka version = 2.6.2

I'm testing your kafka-connect-mqtt in local ubuntu environment.

  1. clone this repository
  2. change directory into kafka-connect-mqtt and mvn clean install
  3. copy the folder /target/kafka-connect-mqtt-1.1.0-package/kafka-connect-mqtt to my kafka connect plugin path
  4. connector has been installed successfully

[{"class":"be.jovacon.kafka.connect.MQTTSinkConnector","type":"sink","version":"1.1.0"},{"class":"be.jovacon.kafka.connect.MQTTSourceConnector","type":"source","version":"1.1.0"},{"class":"org.apache.kafka.connect.file.FileStreamSinkConnector","type":"sink","version":"2.6.2"},{"class":"org.apache.kafka.connect.file.FileStreamSourceConnector","type":"source","version":"2.6.2"},{"class":"org.apache.kafka.connect.mirror.MirrorCheckpointConnector","type":"source","version":"1"},{"class":"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector","type":"source","version":"1"},{"class":"org.apache.kafka.connect.mirror.MirrorSourceConnector","type":"source","version":"1"}]

  1. Configuring the source connector. I made connect-mqtt-source.properties at $KAFKA_HOME/config.

connect-mqtt-source.properties

name=mqtt-source-connector
connector.class=be.jovacon.kafka.connect.MQTTSourceConnector
mqtt.topic=test
kafka.topic=mqtt.test
mqtt.clientID=cid
mqtt.broker=tcp://127.0.0.1:1883
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.storage.StringConverter
value.converter.schemas.enable=false

  1. restart kafka connect
    $ bin/connect-standalone.sh config/connect-standalone.properties config/connect-mqtt-source.properties

  2. result
    [2022-11-23 10:21:20,740] INFO Kafka Connect standalone worker initializing ... (org.apache.kafka.connect.cli.ConnectStandalone:69)
    [2022-11-23 10:21:20,746] INFO WorkerInfo values:
    jvm.args = -Xms256M, -Xmx2G, -XX:+UseG1GC, -XX:MaxGCPauseMillis=20, -XX:InitiatingHeapOccupancyPercent=35, -XX:+ExplicitGCInvokesConcurrent, -XX:MaxInlineLevel=15, -Djava.awt.headless=true, -Dcom.sun.management.jmxremote, -Dcom.sun.management.jmxremote.authenticate=false, -Dcom.sun.management.jmxremote.ssl=false, -Dkafka.logs.dir=/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../logs, -Dlog4j.configuration=file:bin/../config/connect-log4j.properties
    jvm.spec = Ubuntu, OpenJDK 64-Bit Server VM, 11.0.17, 11.0.17+8-post-Ubuntu-1ubuntu222.04
    jvm.classpath = /home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/activation-1.1.1.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/aopalliance-repackaged-2.6.1.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/argparse4j-0.7.0.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/audience-annotations-0.5.0.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/commons-cli-1.4.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/commons-lang3-3.8.1.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/connect-api-2.6.2.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/connect-basic-auth-extension-2.6.2.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/connect-file-2.6.2.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/connect-json-2.6.2.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/connect-mirror-2.6.2.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/connect-mirror-client-2.6.2.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/connect-runtime-2.6.2.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/connect-transforms-2.6.2.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/connect-utils-0.4.156.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/freemarker-2.3.28.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/hk2-api-2.6.1.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/hk2-locator-2.6.1.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/hk2-utils-2.6.1.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jackson-annotations-2.10.5.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jackson-core-2.10.5.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jackson-databind-2.10.5.1.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jackson-dataformat-csv-2.10.5.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jackson-datatype-jdk8-2.10.5.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jackson-jaxrs-base-2.10.5.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jackson-jaxrs-json-provider-2.10.5.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jackson-module-jaxb-annotations-2.10.5.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jackson-module-paranamer-2.10.5.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jackson-module-scala_2.12-2.10.5.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jakarta.activation-api-1.2.1.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jakarta.annotation-api-1.3.5.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jakarta.inject-2.6.1.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jakarta.validation-api-2.0.2.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jakarta.ws.rs-api-2.1.6.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jakarta.xml.bind-api-2.3.2.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/javassist-3.25.0-GA.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/javassist-3.26.0-GA.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/javax.servlet-api-3.1.0.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/javax.ws.rs-api-2.1.1.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jaxb-api-2.3.0.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jersey-client-2.31.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jersey-common-2.31.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jersey-container-servlet-2.31.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jersey-container-servlet-core-2.31.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jersey-hk2-2.31.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jersey-media-jaxb-2.31.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jersey-server-2.31.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jetty-client-9.4.38.v20210224.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jetty-continuation-9.4.38.v20210224.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jetty-http-9.4.38.v20210224.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jetty-io-9.4.38.v20210224.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jetty-security-9.4.38.v20210224.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jetty-server-9.4.38.v20210224.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jetty-servlet-9.4.38.v20210224.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jetty-servlets-9.4.38.v20210224.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jetty-util-9.4.38.v20210224.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jetty-util-ajax-9.4.38.v20210224.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jopt-simple-5.0.4.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/kafka-clients-2.6.2.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/kafka-connect-mqtt-1.1.0.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/kafka-log4j-appender-2.6.2.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/kafka-streams-2.6.2.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/kafka-streams-examples-2.6.2.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/kafka-streams-scala_2.12-2.6.2.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/kafka-streams-test-utils-2.6.2.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/kafka-tools-2.6.2.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/kafka_2.12-2.6.2-sources.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/kafka_2.12-2.6.2.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/log4j-1.2.17.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/log4j-api-2.11.2.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/log4j-core-2.11.2.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/lz4-java-1.7.1.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/maven-artifact-3.6.3.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/metrics-core-2.2.0.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/netty-buffer-4.1.59.Final.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/netty-codec-4.1.59.Final.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/netty-common-4.1.59.Final.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/netty-handler-4.1.59.Final.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/netty-resolver-4.1.59.Final.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/netty-transport-4.1.59.Final.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/netty-transport-native-epoll-4.1.59.Final.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/netty-transport-native-unix-common-4.1.59.Final.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/org.eclipse.paho.client.mqttv3-1.2.2.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/osgi-resource-locator-1.0.3.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/paranamer-2.8.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/plexus-utils-3.2.1.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/reflections-0.9.12.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/rocksdbjni-5.18.4.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/scala-collection-compat_2.12-2.1.6.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/scala-java8-compat_2.12-0.9.1.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/scala-library-2.12.11.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/scala-logging_2.12-3.9.2.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/scala-reflect-2.12.11.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/slf4j-api-1.7.30.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/slf4j-log4j12-1.7.30.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/snappy-java-1.1.7.3.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/zookeeper-3.5.9.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/zookeeper-jute-3.5.9.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/zstd-jni-1.4.4-7.jar
    os.spec = Linux, amd64, 5.15.74.2-microsoft-standard-WSL2
    os.vcpus = 16
    (org.apache.kafka.connect.runtime.WorkerInfo:71)
    [2022-11-23 10:21:20,747] INFO Scanning for plugin classes. This might take a moment ... (org.apache.kafka.connect.cli.ConnectStandalone:78)
    [2022-11-23 10:21:20,756] INFO Loading plugin from: /home/gwjeon/kafka/kafka_2.12-2.6.2/plugins/kafka-connect-mqtt/freemarker-2.3.28.jar (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:246)
    [2022-11-23 10:21:20,914] INFO Registered loader: PluginClassLoader{pluginLocation=file:/home/gwjeon/kafka/kafka_2.12-2.6.2/plugins/kafka-connect-mqtt/freemarker-2.3.28.jar} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:269)
    [2022-11-23 10:21:20,915] INFO Added plugin 'org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
    [2022-11-23 10:21:20,915] INFO Added plugin 'org.apache.kafka.connect.connector.policy.PrincipalConnectorClientConfigOverridePolicy' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
    [2022-11-23 10:21:20,915] INFO Added plugin 'org.apache.kafka.connect.connector.policy.NoneConnectorClientConfigOverridePolicy' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
    [2022-11-23 10:21:20,916] INFO Loading plugin from: /home/gwjeon/kafka/kafka_2.12-2.6.2/plugins/kafka-connect-mqtt/org.eclipse.paho.client.mqttv3-1.2.2.jar (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:246)
    [2022-11-23 10:21:20,992] INFO Registered loader: PluginClassLoader{pluginLocation=file:/home/gwjeon/kafka/kafka_2.12-2.6.2/plugins/kafka-connect-mqtt/org.eclipse.paho.client.mqttv3-1.2.2.jar} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:269)
    [2022-11-23 10:21:20,993] INFO Loading plugin from: /home/gwjeon/kafka/kafka_2.12-2.6.2/plugins/kafka-connect-mqtt/log4j-api-2.11.2.jar (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:246)
    [2022-11-23 10:21:21,011] INFO Registered loader: PluginClassLoader{pluginLocation=file:/home/gwjeon/kafka/kafka_2.12-2.6.2/plugins/kafka-connect-mqtt/log4j-api-2.11.2.jar} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:269)
    [2022-11-23 10:21:21,012] INFO Loading plugin from: /home/gwjeon/kafka/kafka_2.12-2.6.2/plugins/kafka-connect-mqtt/kafka-connect-mqtt-1.1.0.jar (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:246)
    [2022-11-23 10:21:21,026] INFO Registered loader: PluginClassLoader{pluginLocation=file:/home/gwjeon/kafka/kafka_2.12-2.6.2/plugins/kafka-connect-mqtt/kafka-connect-mqtt-1.1.0.jar} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:269)
    [2022-11-23 10:21:21,026] INFO Added plugin 'be.jovacon.kafka.connect.MQTTSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
    [2022-11-23 10:21:21,026] INFO Added plugin 'be.jovacon.kafka.connect.MQTTSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
    [2022-11-23 10:21:21,028] INFO Loading plugin from: /home/gwjeon/kafka/kafka_2.12-2.6.2/plugins/kafka-connect-mqtt/log4j-core-2.11.2.jar (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:246)
    [2022-11-23 10:21:21,111] INFO Registered loader: PluginClassLoader{pluginLocation=file:/home/gwjeon/kafka/kafka_2.12-2.6.2/plugins/kafka-connect-mqtt/log4j-core-2.11.2.jar} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:269)
    [2022-11-23 10:21:21,111] INFO Loading plugin from: /home/gwjeon/kafka/kafka_2.12-2.6.2/plugins/kafka-connect-mqtt/connect-utils-0.4.156.jar (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:246)
    [2022-11-23 10:21:21,122] INFO Registered loader: PluginClassLoader{pluginLocation=file:/home/gwjeon/kafka/kafka_2.12-2.6.2/plugins/kafka-connect-mqtt/connect-utils-0.4.156.jar} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:269)
    [2022-11-23 10:21:21,740] INFO Registered loader: jdk.internal.loader.ClassLoaders$AppClassLoader@3d4eac69 (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:269)
    [2022-11-23 10:21:21,740] INFO Added plugin 'org.apache.kafka.connect.mirror.MirrorSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
    [2022-11-23 10:21:21,740] INFO Added plugin 'org.apache.kafka.connect.file.FileStreamSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
    [2022-11-23 10:21:21,740] INFO Added plugin 'org.apache.kafka.connect.tools.SchemaSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
    [2022-11-23 10:21:21,740] INFO Added plugin 'org.apache.kafka.connect.tools.MockSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
    [2022-11-23 10:21:21,740] INFO Added plugin 'org.apache.kafka.connect.file.FileStreamSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
    [2022-11-23 10:21:21,740] INFO Added plugin 'org.apache.kafka.connect.mirror.MirrorCheckpointConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
    [2022-11-23 10:21:21,740] INFO Added plugin 'org.apache.kafka.connect.tools.VerifiableSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
    [2022-11-23 10:21:21,741] INFO Added plugin 'org.apache.kafka.connect.tools.VerifiableSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
    [2022-11-23 10:21:21,741] INFO Added plugin 'org.apache.kafka.connect.tools.MockSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
    [2022-11-23 10:21:21,741] INFO Added plugin 'org.apache.kafka.connect.mirror.MirrorHeartbeatConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
    [2022-11-23 10:21:21,741] INFO Added plugin 'org.apache.kafka.connect.tools.MockConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
    [2022-11-23 10:21:21,741] INFO Added plugin 'org.apache.kafka.connect.converters.FloatConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
    [2022-11-23 10:21:21,741] INFO Added plugin 'org.apache.kafka.connect.converters.DoubleConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
    [2022-11-23 10:21:21,741] INFO Added plugin 'org.apache.kafka.connect.converters.ByteArrayConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
    [2022-11-23 10:21:21,741] INFO Added plugin 'org.apache.kafka.connect.converters.LongConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
    [2022-11-23 10:21:21,741] INFO Added plugin 'org.apache.kafka.connect.converters.IntegerConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
    [2022-11-23 10:21:21,741] INFO Added plugin 'org.apache.kafka.connect.json.JsonConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
    [2022-11-23 10:21:21,741] INFO Added plugin 'org.apache.kafka.connect.storage.StringConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
    [2022-11-23 10:21:21,741] INFO Added plugin 'org.apache.kafka.connect.converters.ShortConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
    [2022-11-23 10:21:21,742] INFO Added plugin 'org.apache.kafka.connect.storage.SimpleHeaderConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
    [2022-11-23 10:21:21,742] INFO Added plugin 'org.apache.kafka.connect.transforms.ReplaceField$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
    [2022-11-23 10:21:21,742] INFO Added plugin 'org.apache.kafka.connect.transforms.SetSchemaMetadata$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
    [2022-11-23 10:21:21,742] INFO Added plugin 'org.apache.kafka.connect.transforms.ReplaceField$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
    [2022-11-23 10:21:21,742] INFO Added plugin 'org.apache.kafka.connect.transforms.Filter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
    [2022-11-23 10:21:21,742] INFO Added plugin 'org.apache.kafka.connect.transforms.InsertField$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
    [2022-11-23 10:21:21,742] INFO Added plugin 'org.apache.kafka.connect.transforms.TimestampConverter$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
    [2022-11-23 10:21:21,742] INFO Added plugin 'org.apache.kafka.connect.transforms.MaskField$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
    [2022-11-23 10:21:21,742] INFO Added plugin 'org.apache.kafka.connect.transforms.TimestampRouter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
    [2022-11-23 10:21:21,742] INFO Added plugin 'org.apache.kafka.connect.transforms.RegexRouter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
    [2022-11-23 10:21:21,743] INFO Added plugin 'org.apache.kafka.connect.transforms.HoistField$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
    [2022-11-23 10:21:21,743] INFO Added plugin 'org.apache.kafka.connect.transforms.ValueToKey' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
    [2022-11-23 10:21:21,743] INFO Added plugin 'org.apache.kafka.connect.transforms.MaskField$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
    [2022-11-23 10:21:21,743] INFO Added plugin 'org.apache.kafka.connect.transforms.Cast$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
    [2022-11-23 10:21:21,743] INFO Added plugin 'org.apache.kafka.connect.transforms.Cast$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
    [2022-11-23 10:21:21,743] INFO Added plugin 'org.apache.kafka.connect.runtime.PredicatedTransformation' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
    [2022-11-23 10:21:21,743] INFO Added plugin 'org.apache.kafka.connect.transforms.ExtractField$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
    [2022-11-23 10:21:21,743] INFO Added plugin 'org.apache.kafka.connect.transforms.Flatten$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
    [2022-11-23 10:21:21,743] INFO Added plugin 'org.apache.kafka.connect.transforms.InsertField$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
    [2022-11-23 10:21:21,743] INFO Added plugin 'org.apache.kafka.connect.transforms.Flatten$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
    [2022-11-23 10:21:21,743] INFO Added plugin 'org.apache.kafka.connect.transforms.SetSchemaMetadata$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
    [2022-11-23 10:21:21,743] INFO Added plugin 'org.apache.kafka.connect.transforms.ExtractField$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
    [2022-11-23 10:21:21,743] INFO Added plugin 'org.apache.kafka.connect.transforms.TimestampConverter$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
    [2022-11-23 10:21:21,744] INFO Added plugin 'org.apache.kafka.connect.transforms.HoistField$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
    [2022-11-23 10:21:21,745] INFO Added plugin 'org.apache.kafka.connect.transforms.predicates.HasHeaderKey' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
    [2022-11-23 10:21:21,745] INFO Added plugin 'org.apache.kafka.connect.transforms.predicates.RecordIsTombstone' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
    [2022-11-23 10:21:21,745] INFO Added plugin 'org.apache.kafka.connect.transforms.predicates.TopicNameMatches' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
    [2022-11-23 10:21:21,745] INFO Added plugin 'org.apache.kafka.common.config.provider.FileConfigProvider' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
    [2022-11-23 10:21:21,745] INFO Added plugin 'org.apache.kafka.connect.rest.basic.auth.extension.BasicAuthSecurityRestExtension' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
    [2022-11-23 10:21:21,746] INFO Added aliases 'MQTTSinkConnector' and 'MQTTSink' to plugin 'be.jovacon.kafka.connect.MQTTSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430)
    [2022-11-23 10:21:21,746] INFO Added aliases 'MQTTSourceConnector' and 'MQTTSource' to plugin 'be.jovacon.kafka.connect.MQTTSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430)
    [2022-11-23 10:21:21,746] INFO Added aliases 'FileStreamSinkConnector' and 'FileStreamSink' to plugin 'org.apache.kafka.connect.file.FileStreamSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430)
    [2022-11-23 10:21:21,746] INFO Added aliases 'FileStreamSourceConnector' and 'FileStreamSource' to plugin 'org.apache.kafka.connect.file.FileStreamSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430)
    [2022-11-23 10:21:21,746] INFO Added aliases 'MirrorCheckpointConnector' and 'MirrorCheckpoint' to plugin 'org.apache.kafka.connect.mirror.MirrorCheckpointConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430)
    [2022-11-23 10:21:21,746] INFO Added aliases 'MirrorHeartbeatConnector' and 'MirrorHeartbeat' to plugin 'org.apache.kafka.connect.mirror.MirrorHeartbeatConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430)
    [2022-11-23 10:21:21,746] INFO Added aliases 'MirrorSourceConnector' and 'MirrorSource' to plugin 'org.apache.kafka.connect.mirror.MirrorSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430)
    [2022-11-23 10:21:21,746] INFO Added aliases 'MockConnector' and 'Mock' to plugin 'org.apache.kafka.connect.tools.MockConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430)
    [2022-11-23 10:21:21,747] INFO Added aliases 'MockSinkConnector' and 'MockSink' to plugin 'org.apache.kafka.connect.tools.MockSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430)
    [2022-11-23 10:21:21,747] INFO Added aliases 'MockSourceConnector' and 'MockSource' to plugin 'org.apache.kafka.connect.tools.MockSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430)
    [2022-11-23 10:21:21,747] INFO Added aliases 'SchemaSourceConnector' and 'SchemaSource' to plugin 'org.apache.kafka.connect.tools.SchemaSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430)
    [2022-11-23 10:21:21,747] INFO Added aliases 'VerifiableSinkConnector' and 'VerifiableSink' to plugin 'org.apache.kafka.connect.tools.VerifiableSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430)
    [2022-11-23 10:21:21,747] INFO Added aliases 'VerifiableSourceConnector' and 'VerifiableSource' to plugin 'org.apache.kafka.connect.tools.VerifiableSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430)
    [2022-11-23 10:21:21,747] INFO Added aliases 'ByteArrayConverter' and 'ByteArray' to plugin 'org.apache.kafka.connect.converters.ByteArrayConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430)
    [2022-11-23 10:21:21,747] INFO Added aliases 'DoubleConverter' and 'Double' to plugin 'org.apache.kafka.connect.converters.DoubleConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430)
    [2022-11-23 10:21:21,747] INFO Added aliases 'FloatConverter' and 'Float' to plugin 'org.apache.kafka.connect.converters.FloatConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430)
    [2022-11-23 10:21:21,747] INFO Added aliases 'IntegerConverter' and 'Integer' to plugin 'org.apache.kafka.connect.converters.IntegerConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430)
    [2022-11-23 10:21:21,747] INFO Added aliases 'LongConverter' and 'Long' to plugin 'org.apache.kafka.connect.converters.LongConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430)
    [2022-11-23 10:21:21,747] INFO Added aliases 'ShortConverter' and 'Short' to plugin 'org.apache.kafka.connect.converters.ShortConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430)
    [2022-11-23 10:21:21,747] INFO Added aliases 'JsonConverter' and 'Json' to plugin 'org.apache.kafka.connect.json.JsonConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430)
    [2022-11-23 10:21:21,747] INFO Added aliases 'StringConverter' and 'String' to plugin 'org.apache.kafka.connect.storage.StringConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430)
    [2022-11-23 10:21:21,747] INFO Added aliases 'ByteArrayConverter' and 'ByteArray' to plugin 'org.apache.kafka.connect.converters.ByteArrayConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430)
    [2022-11-23 10:21:21,748] INFO Added aliases 'DoubleConverter' and 'Double' to plugin 'org.apache.kafka.connect.converters.DoubleConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430)
    [2022-11-23 10:21:21,748] INFO Added aliases 'FloatConverter' and 'Float' to plugin 'org.apache.kafka.connect.converters.FloatConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430)
    [2022-11-23 10:21:21,748] INFO Added aliases 'IntegerConverter' and 'Integer' to plugin 'org.apache.kafka.connect.converters.IntegerConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430)
    [2022-11-23 10:21:21,748] INFO Added aliases 'LongConverter' and 'Long' to plugin 'org.apache.kafka.connect.converters.LongConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430)
    [2022-11-23 10:21:21,748] INFO Added aliases 'ShortConverter' and 'Short' to plugin 'org.apache.kafka.connect.converters.ShortConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430)
    [2022-11-23 10:21:21,748] INFO Added aliases 'JsonConverter' and 'Json' to plugin 'org.apache.kafka.connect.json.JsonConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430)
    [2022-11-23 10:21:21,748] INFO Added alias 'SimpleHeaderConverter' to plugin 'org.apache.kafka.connect.storage.SimpleHeaderConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:427)
    [2022-11-23 10:21:21,748] INFO Added aliases 'StringConverter' and 'String' to plugin 'org.apache.kafka.connect.storage.StringConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430)
    [2022-11-23 10:21:21,748] INFO Added aliases 'PredicatedTransformation' and 'Predicated' to plugin 'org.apache.kafka.connect.runtime.PredicatedTransformation' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430)
    [2022-11-23 10:21:21,748] INFO Added alias 'Filter' to plugin 'org.apache.kafka.connect.transforms.Filter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:427)
    [2022-11-23 10:21:21,748] INFO Added alias 'RegexRouter' to plugin 'org.apache.kafka.connect.transforms.RegexRouter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:427)
    [2022-11-23 10:21:21,748] INFO Added alias 'TimestampRouter' to plugin 'org.apache.kafka.connect.transforms.TimestampRouter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:427)
    [2022-11-23 10:21:21,749] INFO Added alias 'ValueToKey' to plugin 'org.apache.kafka.connect.transforms.ValueToKey' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:427)
    [2022-11-23 10:21:21,749] INFO Added alias 'HasHeaderKey' to plugin 'org.apache.kafka.connect.transforms.predicates.HasHeaderKey' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:427)
    [2022-11-23 10:21:21,749] INFO Added alias 'RecordIsTombstone' to plugin 'org.apache.kafka.connect.transforms.predicates.RecordIsTombstone' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:427)
    [2022-11-23 10:21:21,749] INFO Added alias 'TopicNameMatches' to plugin 'org.apache.kafka.connect.transforms.predicates.TopicNameMatches' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:427)
    [2022-11-23 10:21:21,749] INFO Added alias 'BasicAuthSecurityRestExtension' to plugin 'org.apache.kafka.connect.rest.basic.auth.extension.BasicAuthSecurityRestExtension' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:427)
    [2022-11-23 10:21:21,749] INFO Added aliases 'AllConnectorClientConfigOverridePolicy' and 'All' to plugin 'org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430)
    [2022-11-23 10:21:21,749] INFO Added aliases 'NoneConnectorClientConfigOverridePolicy' and 'None' to plugin 'org.apache.kafka.connect.connector.policy.NoneConnectorClientConfigOverridePolicy' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430)
    [2022-11-23 10:21:21,749] INFO Added aliases 'PrincipalConnectorClientConfigOverridePolicy' and 'Principal' to plugin 'org.apache.kafka.connect.connector.policy.PrincipalConnectorClientConfigOverridePolicy' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430)
    [2022-11-23 10:21:21,761] INFO StandaloneConfig values:
    access.control.allow.methods =
    access.control.allow.origin =
    admin.listeners = null
    bootstrap.servers = [localhost:9092]
    client.dns.lookup = use_all_dns_ips
    config.providers = []
    connector.client.config.override.policy = None
    header.converter = class org.apache.kafka.connect.storage.SimpleHeaderConverter
    internal.key.converter = class org.apache.kafka.connect.json.JsonConverter
    internal.value.converter = class org.apache.kafka.connect.json.JsonConverter
    key.converter = class org.apache.kafka.connect.json.JsonConverter
    listeners = null
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    offset.flush.interval.ms = 10000
    offset.flush.timeout.ms = 5000
    offset.storage.file.filename = /tmp/connect.offsets
    plugin.path = [plugins/kafka-connect-mqtt/]
    response.http.headers.config =
    rest.advertised.host.name = null
    rest.advertised.listener = null
    rest.advertised.port = null
    rest.extension.classes = []
    rest.host.name = null
    rest.port = 8083
    ssl.cipher.suites = null
    ssl.client.auth = none
    ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.3
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    task.shutdown.graceful.timeout.ms = 5000
    topic.creation.enable = true
    topic.tracking.allow.reset = true
    topic.tracking.enable = true
    value.converter = class org.apache.kafka.connect.json.JsonConverter
    (org.apache.kafka.connect.runtime.standalone.StandaloneConfig:354)
    [2022-11-23 10:21:21,761] INFO Creating Kafka admin client (org.apache.kafka.connect.util.ConnectUtils:49)
    [2022-11-23 10:21:21,764] INFO AdminClientConfig values:
    bootstrap.servers = [localhost:9092]
    client.dns.lookup = use_all_dns_ips
    client.id =
    connections.max.idle.ms = 300000
    default.api.timeout.ms = 60000
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 2147483647
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.3
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    (org.apache.kafka.clients.admin.AdminClientConfig:354)
    [2022-11-23 10:21:21,809] WARN The configuration 'offset.flush.interval.ms' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:362)
    [2022-11-23 10:21:21,809] WARN The configuration 'key.converter.schemas.enable' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:362)
    [2022-11-23 10:21:21,809] WARN The configuration 'offset.storage.file.filename' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:362)
    [2022-11-23 10:21:21,809] WARN The configuration 'value.converter.schemas.enable' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:362)
    [2022-11-23 10:21:21,809] WARN The configuration 'plugin.path' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:362)
    [2022-11-23 10:21:21,809] WARN The configuration 'value.converter' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:362)
    [2022-11-23 10:21:21,809] WARN The configuration 'key.converter' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:362)
    [2022-11-23 10:21:21,810] INFO Kafka version: 2.6.2 (org.apache.kafka.common.utils.AppInfoParser:117)
    [2022-11-23 10:21:21,810] INFO Kafka commitId: da65af02e5856e34 (org.apache.kafka.common.utils.AppInfoParser:118)
    [2022-11-23 10:21:21,810] INFO Kafka startTimeMs: 1669166481809 (org.apache.kafka.common.utils.AppInfoParser:119)
    [2022-11-23 10:21:22,013] INFO Kafka cluster ID: GFsZxHhuTKS1t3X54RH32g (org.apache.kafka.connect.util.ConnectUtils:65)
    [2022-11-23 10:21:22,026] INFO Logging initialized @1614ms to org.eclipse.jetty.util.log.Slf4jLog (org.eclipse.jetty.util.log:169)
    [2022-11-23 10:21:22,055] INFO Added connector for http://:8083 (org.apache.kafka.connect.runtime.rest.RestServer:132)
    [2022-11-23 10:21:22,056] INFO Initializing REST server (org.apache.kafka.connect.runtime.rest.RestServer:204)
    [2022-11-23 10:21:22,062] INFO jetty-9.4.38.v20210224; built: 2021-02-24T20:25:07.675Z; git: 288f3cc74549e8a913bf363250b0744f2695b8e6; jvm 11.0.17+8-post-Ubuntu-1ubuntu222.04 (org.eclipse.jetty.server.Server:375)
    [2022-11-23 10:21:22,085] INFO Started http_8083@5649ec46{HTTP/1.1, (http/1.1)}{0.0.0.0:8083} (org.eclipse.jetty.server.AbstractConnector:331)
    [2022-11-23 10:21:22,086] INFO Started @1674ms (org.eclipse.jetty.server.Server:415)
    [2022-11-23 10:21:22,105] INFO Advertised URI: http://127.0.1.1:8083/ (org.apache.kafka.connect.runtime.rest.RestServer:371)
    [2022-11-23 10:21:22,105] INFO REST server listening at http://127.0.1.1:8083/, advertising URL http://127.0.1.1:8083/ (org.apache.kafka.connect.runtime.rest.RestServer:219)
    [2022-11-23 10:21:22,106] INFO Advertised URI: http://127.0.1.1:8083/ (org.apache.kafka.connect.runtime.rest.RestServer:371)
    [2022-11-23 10:21:22,106] INFO REST admin endpoints at http://127.0.1.1:8083/ (org.apache.kafka.connect.runtime.rest.RestServer:220)
    [2022-11-23 10:21:22,106] INFO Advertised URI: http://127.0.1.1:8083/ (org.apache.kafka.connect.runtime.rest.RestServer:371)
    [2022-11-23 10:21:22,106] INFO Setting up None Policy for ConnectorClientConfigOverride. This will disallow any client configuration to be overridden (org.apache.kafka.connect.connector.policy.NoneConnectorClientConfigOverridePolicy:45)
    [2022-11-23 10:21:22,112] INFO Creating Kafka admin client (org.apache.kafka.connect.util.ConnectUtils:49)
    [2022-11-23 10:21:22,113] INFO AdminClientConfig values:
    bootstrap.servers = [localhost:9092]
    client.dns.lookup = use_all_dns_ips
    client.id =
    connections.max.idle.ms = 300000
    default.api.timeout.ms = 60000
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 2147483647
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.3
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    (org.apache.kafka.clients.admin.AdminClientConfig:354)
    [2022-11-23 10:21:22,118] WARN The configuration 'offset.flush.interval.ms' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:362)
    [2022-11-23 10:21:22,119] WARN The configuration 'key.converter.schemas.enable' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:362)
    [2022-11-23 10:21:22,119] WARN The configuration 'offset.storage.file.filename' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:362)
    [2022-11-23 10:21:22,119] WARN The configuration 'value.converter.schemas.enable' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:362)
    [2022-11-23 10:21:22,119] WARN The configuration 'plugin.path' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:362)
    [2022-11-23 10:21:22,119] WARN The configuration 'value.converter' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:362)
    [2022-11-23 10:21:22,119] WARN The configuration 'key.converter' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:362)
    [2022-11-23 10:21:22,119] INFO Kafka version: 2.6.2 (org.apache.kafka.common.utils.AppInfoParser:117)
    [2022-11-23 10:21:22,120] INFO Kafka commitId: da65af02e5856e34 (org.apache.kafka.common.utils.AppInfoParser:118)
    [2022-11-23 10:21:22,120] INFO Kafka startTimeMs: 1669166482119 (org.apache.kafka.common.utils.AppInfoParser:119)
    [2022-11-23 10:21:22,132] INFO Kafka cluster ID: GFsZxHhuTKS1t3X54RH32g (org.apache.kafka.connect.util.ConnectUtils:65)
    [2022-11-23 10:21:22,137] INFO Kafka version: 2.6.2 (org.apache.kafka.common.utils.AppInfoParser:117)
    [2022-11-23 10:21:22,137] INFO Kafka commitId: da65af02e5856e34 (org.apache.kafka.common.utils.AppInfoParser:118)
    [2022-11-23 10:21:22,137] INFO Kafka startTimeMs: 1669166482137 (org.apache.kafka.common.utils.AppInfoParser:119)
    [2022-11-23 10:21:22,210] INFO JsonConverterConfig values:
    converter.type = key
    decimal.format = BASE64
    schemas.cache.size = 1000
    schemas.enable = false
    (org.apache.kafka.connect.json.JsonConverterConfig:354)
    [2022-11-23 10:21:22,211] INFO JsonConverterConfig values:
    converter.type = value
    decimal.format = BASE64
    schemas.cache.size = 1000
    schemas.enable = false
    (org.apache.kafka.connect.json.JsonConverterConfig:354)
    [2022-11-23 10:21:22,217] INFO Kafka Connect standalone worker initialization took 1476ms (org.apache.kafka.connect.cli.ConnectStandalone:100)
    [2022-11-23 10:21:22,217] INFO Kafka Connect starting (org.apache.kafka.connect.runtime.Connect:51)
    [2022-11-23 10:21:22,218] INFO Herder starting (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:94)
    [2022-11-23 10:21:22,218] INFO Worker starting (org.apache.kafka.connect.runtime.Worker:195)
    [2022-11-23 10:21:22,218] INFO Starting FileOffsetBackingStore with file /tmp/connect.offsets (org.apache.kafka.connect.storage.FileOffsetBackingStore:58)
    [2022-11-23 10:21:22,222] INFO Worker started (org.apache.kafka.connect.runtime.Worker:202)
    [2022-11-23 10:21:22,222] INFO Herder started (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:97)
    [2022-11-23 10:21:22,222] INFO Initializing REST resources (org.apache.kafka.connect.runtime.rest.RestServer:224)
    [2022-11-23 10:21:22,248] INFO Adding admin resources to main listener (org.apache.kafka.connect.runtime.rest.RestServer:241)
    [2022-11-23 10:21:22,297] INFO DefaultSessionIdManager workerName=node0 (org.eclipse.jetty.server.session:334)
    [2022-11-23 10:21:22,297] INFO No SessionScavenger set, using defaults (org.eclipse.jetty.server.session:339)
    [2022-11-23 10:21:22,298] INFO node0 Scavenging every 600000ms (org.eclipse.jetty.server.session:132)
    Nov 23, 2022 10:21:22 AM org.glassfish.jersey.internal.inject.Providers checkProviderRuntime
    WARNING: A provider org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource registered in SERVER runtime does not implement any provider interfaces applicable in the SERVER runtime. Due to constraint configuration problems the provider org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource will be ignored.
    Nov 23, 2022 10:21:22 AM org.glassfish.jersey.internal.inject.Providers checkProviderRuntime
    WARNING: A provider org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource registered in SERVER runtime does not implement any provider interfaces applicable in the SERVER runtime. Due to constraint configuration problems the provider org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource will be ignored.
    Nov 23, 2022 10:21:22 AM org.glassfish.jersey.internal.inject.Providers checkProviderRuntime
    WARNING: A provider org.apache.kafka.connect.runtime.rest.resources.LoggingResource registered in SERVER runtime does not implement any provider interfaces applicable in the SERVER runtime. Due to constraint configuration problems the provider org.apache.kafka.connect.runtime.rest.resources.LoggingResource will be ignored.
    Nov 23, 2022 10:21:22 AM org.glassfish.jersey.internal.inject.Providers checkProviderRuntime
    WARNING: A provider org.apache.kafka.connect.runtime.rest.resources.RootResource registered in SERVER runtime does not implement any provider interfaces applicable in the SERVER runtime. Due to constraint configuration problems the provider org.apache.kafka.connect.runtime.rest.resources.RootResource will be ignored.
    Nov 23, 2022 10:21:22 AM org.glassfish.jersey.internal.Errors logErrors
    WARNING: The following warnings have been detected: WARNING: The (sub)resource method listLoggers in org.apache.kafka.connect.runtime.rest.resources.LoggingResource contains empty path annotation.
    WARNING: The (sub)resource method createConnector in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.
    WARNING: The (sub)resource method listConnectors in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.
    WARNING: The (sub)resource method listConnectorPlugins in org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource contains empty path annotation.
    WARNING: The (sub)resource method serverInfo in org.apache.kafka.connect.runtime.rest.resources.RootResource contains empty path annotation.

[2022-11-23 10:21:22,663] INFO Started o.e.j.s.ServletContextHandler@27a7ef08{/,null,AVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:916)
[2022-11-23 10:21:22,663] INFO REST resources initialized; server is started and ready to handle requests (org.apache.kafka.connect.runtime.rest.RestServer:319)
[2022-11-23 10:21:22,663] INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect:57)
[2022-11-23 10:21:22,672] INFO AbstractConfig values:
(org.apache.kafka.common.config.AbstractConfig:354)
[2022-11-23 10:21:22,686] INFO Creating connector mqtt-source-connector of type be.jovacon.kafka.connect.MQTTSourceConnector (org.apache.kafka.connect.runtime.Worker:274)
[2022-11-23 10:21:22,687] INFO SourceConnectorConfig values:
config.action.reload = restart
connector.class = be.jovacon.kafka.connect.MQTTSourceConnector
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = class org.apache.kafka.connect.storage.StringConverter
name = mqtt-source-connector
predicates = []
tasks.max = 1
topic.creation.groups = []
transforms = []
value.converter = class org.apache.kafka.connect.storage.StringConverter
(org.apache.kafka.connect.runtime.SourceConnectorConfig:354)
[2022-11-23 10:21:22,687] INFO EnrichedConnectorConfig values:
config.action.reload = restart
connector.class = be.jovacon.kafka.connect.MQTTSourceConnector
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = class org.apache.kafka.connect.storage.StringConverter
name = mqtt-source-connector
predicates = []
tasks.max = 1
topic.creation.groups = []
transforms = []
value.converter = class org.apache.kafka.connect.storage.StringConverter
(org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:354)
[2022-11-23 10:21:22,692] INFO Instantiated connector mqtt-source-connector with version 1.1.0 of type class be.jovacon.kafka.connect.MQTTSourceConnector (org.apache.kafka.connect.runtime.Worker:284)
[2022-11-23 10:21:22,693] INFO Finished creating connector mqtt-source-connector (org.apache.kafka.connect.runtime.Worker:310)
[2022-11-23 10:21:22,695] INFO MQTTSourceConnectorConfig values:
kafka.topic = mqtt.test
mqtt.automaticReconnect = true
mqtt.broker = tcp://127.0.0.1:1883
mqtt.cleanSession = true
mqtt.clientID = cid
mqtt.connectionTimeout = 30
mqtt.keepAliveInterval = 60
mqtt.password = [hidden]
mqtt.qos = 1
mqtt.topic = test
mqtt.userName =
(be.jovacon.kafka.connect.config.MQTTSourceConnectorConfig:354)
[2022-11-23 10:21:22,697] INFO SourceConnectorConfig values:
config.action.reload = restart
connector.class = be.jovacon.kafka.connect.MQTTSourceConnector
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = class org.apache.kafka.connect.storage.StringConverter
name = mqtt-source-connector
predicates = []
tasks.max = 1
topic.creation.groups = []
transforms = []
value.converter = class org.apache.kafka.connect.storage.StringConverter
(org.apache.kafka.connect.runtime.SourceConnectorConfig:354)
[2022-11-23 10:21:22,697] INFO EnrichedConnectorConfig values:
config.action.reload = restart
connector.class = be.jovacon.kafka.connect.MQTTSourceConnector
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = class org.apache.kafka.connect.storage.StringConverter
name = mqtt-source-connector
predicates = []
tasks.max = 1
topic.creation.groups = []
transforms = []
value.converter = class org.apache.kafka.connect.storage.StringConverter
(org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:354)
[2022-11-23 10:21:22,698] INFO Creating task mqtt-source-connector-0 (org.apache.kafka.connect.runtime.Worker:509)
[2022-11-23 10:21:22,700] INFO ConnectorConfig values:
config.action.reload = restart
connector.class = be.jovacon.kafka.connect.MQTTSourceConnector
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = class org.apache.kafka.connect.storage.StringConverter
name = mqtt-source-connector
predicates = []
tasks.max = 1
transforms = []
value.converter = class org.apache.kafka.connect.storage.StringConverter
(org.apache.kafka.connect.runtime.ConnectorConfig:354)
[2022-11-23 10:21:22,700] INFO EnrichedConnectorConfig values:
config.action.reload = restart
connector.class = be.jovacon.kafka.connect.MQTTSourceConnector
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = class org.apache.kafka.connect.storage.StringConverter
name = mqtt-source-connector
predicates = []
tasks.max = 1
transforms = []
value.converter = class org.apache.kafka.connect.storage.StringConverter
(org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:354)
[2022-11-23 10:21:22,701] INFO TaskConfig values:
task.class = class be.jovacon.kafka.connect.MQTTSourceTask
(org.apache.kafka.connect.runtime.TaskConfig:354)
[2022-11-23 10:21:22,701] INFO Instantiated task mqtt-source-connector-0 with version 1.1.0 of type be.jovacon.kafka.connect.MQTTSourceTask (org.apache.kafka.connect.runtime.Worker:524)
[2022-11-23 10:21:22,702] INFO StringConverterConfig values:
converter.encoding = UTF8
converter.type = key
(org.apache.kafka.connect.storage.StringConverterConfig:354)
[2022-11-23 10:21:22,702] INFO StringConverterConfig values:
converter.encoding = UTF8
converter.type = value
(org.apache.kafka.connect.storage.StringConverterConfig:354)
[2022-11-23 10:21:22,702] INFO Set up the key converter class org.apache.kafka.connect.storage.StringConverter for task mqtt-source-connector-0 using the connector config (org.apache.kafka.connect.runtime.Worker:539)
[2022-11-23 10:21:22,702] INFO Set up the value converter class org.apache.kafka.connect.storage.StringConverter for task mqtt-source-connector-0 using the connector config (org.apache.kafka.connect.runtime.Worker:545)
[2022-11-23 10:21:22,703] INFO Set up the header converter class org.apache.kafka.connect.storage.SimpleHeaderConverter for task mqtt-source-connector-0 using the worker config (org.apache.kafka.connect.runtime.Worker:550)
[2022-11-23 10:21:22,705] INFO SourceConnectorConfig values:
config.action.reload = restart
connector.class = be.jovacon.kafka.connect.MQTTSourceConnector
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = class org.apache.kafka.connect.storage.StringConverter
name = mqtt-source-connector
predicates = []
tasks.max = 1
topic.creation.groups = []
transforms = []
value.converter = class org.apache.kafka.connect.storage.StringConverter
(org.apache.kafka.connect.runtime.SourceConnectorConfig:354)
[2022-11-23 10:21:22,705] INFO EnrichedConnectorConfig values:
config.action.reload = restart
connector.class = be.jovacon.kafka.connect.MQTTSourceConnector
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = class org.apache.kafka.connect.storage.StringConverter
name = mqtt-source-connector
predicates = []
tasks.max = 1
topic.creation.groups = []
transforms = []
value.converter = class org.apache.kafka.connect.storage.StringConverter
(org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:354)
[2022-11-23 10:21:22,707] INFO Initializing: org.apache.kafka.connect.runtime.TransformationChain{} (org.apache.kafka.connect.runtime.Worker:606)
[2022-11-23 10:21:22,713] INFO ProducerConfig values:
acks = -1
batch.size = 16384
bootstrap.servers = [localhost:9092]
buffer.memory = 33554432
client.dns.lookup = use_all_dns_ips
client.id = connector-producer-mqtt-source-connector-0
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 2147483647
enable.idempotence = false
interceptor.classes = []
internal.auto.downgrade.txn.commit = false
key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
linger.ms = 0
max.block.ms = 9223372036854775807
max.in.flight.requests.per.connection = 1
max.request.size = 1048576
metadata.max.age.ms = 300000
metadata.max.idle.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 2147483647
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
(org.apache.kafka.clients.producer.ProducerConfig:354)
[2022-11-23 10:21:22,728] WARN The configuration 'metrics.context.connect.kafka.cluster.id' was supplied but isn't a known config. (org.apache.kafka.clients.producer.ProducerConfig:362)
[2022-11-23 10:21:22,728] INFO Kafka version: 2.6.2 (org.apache.kafka.common.utils.AppInfoParser:117)
[2022-11-23 10:21:22,729] INFO Kafka commitId: da65af02e5856e34 (org.apache.kafka.common.utils.AppInfoParser:118)
[2022-11-23 10:21:22,729] INFO Kafka startTimeMs: 1669166482728 (org.apache.kafka.common.utils.AppInfoParser:119)
[2022-11-23 10:21:22,738] INFO MQTTSourceConnectorConfig values:
kafka.topic = mqtt.test
mqtt.automaticReconnect = true
mqtt.broker = tcp://127.0.0.1:1883
mqtt.cleanSession = true
mqtt.clientID = cid
mqtt.connectionTimeout = 30
mqtt.keepAliveInterval = 60
mqtt.password = [hidden]
mqtt.qos = 1
mqtt.topic = test
mqtt.userName =
(be.jovacon.kafka.connect.config.MQTTSourceConnectorConfig:354)
[2022-11-23 10:21:22,740] INFO Created connector mqtt-source-connector (org.apache.kafka.connect.cli.ConnectStandalone:112)
[2022-11-23 10:21:22,741] INFO WorkerSourceTask{id=mqtt-source-connector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:488)
[2022-11-23 10:21:22,741] INFO WorkerSourceTask{id=mqtt-source-connector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:505)
[2022-11-23 10:21:22,742] ERROR WorkerSourceTask{id=mqtt-source-connector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:191)
java.lang.NoClassDefFoundError: com/google/common/base/Preconditions
at com.github.jcustenborder.kafka.connect.utils.data.SourceRecordDequeBuilder.build(SourceRecordDequeBuilder.java:95)
at be.jovacon.kafka.connect.MQTTSourceTask.start(MQTTSourceTask.java:32)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:232)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:239)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassNotFoundException: com.google.common.base.Preconditions
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
... 10 more
[2022-11-23 10:21:22,742] INFO [Producer clientId=connector-producer-mqtt-source-connector-0] Cluster ID: GFsZxHhuTKS1t3X54RH32g (org.apache.kafka.clients.Metadata:279)
[2022-11-23 10:21:22,744] ERROR WorkerSourceTask{id=mqtt-source-connector-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:192)
[2022-11-23 10:21:22,744] WARN Could not stop task (org.apache.kafka.connect.runtime.WorkerSourceTask:175)
java.lang.NullPointerException
at be.jovacon.kafka.connect.MQTTSourceTask.stop(MQTTSourceTask.java:87)
at org.apache.kafka.connect.runtime.WorkerSourceTask.close(WorkerSourceTask.java:173)
at org.apache.kafka.connect.runtime.WorkerTask.doClose(WorkerTask.java:168)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:195)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:239)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
[2022-11-23 10:21:22,745] INFO [Producer clientId=connector-producer-mqtt-source-connector-0] Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer:1189

  1. publish mqtt message to mqtt broker(localhost:1883) topic test, but i can't receive message at kafka topic mqtt.test

I think I did a very good job following README, but is there anything missing or overlooked in the middle?

License

Hi,

I'd like to use your module for a project.
Can you tell me what the license is?

Thanks in advance and best regards

unable to connect MQTT source connector with confluent kafka v5.4

Objective: to setup MQTT source connector with Confluent kafka v 5.4
Steps Performed:

git clone https://github.com/johanvandevenne/kafka-connect-mqtt.git
cd kafka-connect-mqtt
mvn clean install
copied the jar from /kafka-connect-mqtt/target to the plugins directory. The path of this directory is mentioned in the worker configuration.
executing http://:8083/connector-plugins gives the expected output.
curl -d @./mqtt-source-connector-1.json -H "Content-Type: application/json" -X POST http://localhost:8083/connectors
Expected: There should be no error logs in kafka connect and connector should be available for usage. But after 6th step is performed following is the error received in kafka connect logs :

[2020-06-01 09:42:38,755] ERROR WorkerSourceTask{id=johan-mqtt-source-connector-1-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)
org.apache.kafka.connect.errors.ConnectException: Not authorized to connect (5)
        at be.jovacon.kafka.connect.MQTTSourceTask.start(MQTTSourceTask.java:53)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:208)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: Not authorized to connect (5)
        at org.eclipse.paho.client.mqttv3.internal.ExceptionHelper.createMqttException(ExceptionHelper.java:28)
        at org.eclipse.paho.client.mqttv3.internal.ClientState.notifyReceivedAck(ClientState.java:1040)
        at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:151)
        ... 1 more
[2020-06-01 09:42:38,756] ERROR WorkerSourceTask{id=johan-mqtt-source-connector-1-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:180)

The error shows connection failure between mqtt broker and kafka connect. Although, I have entered correct mqtt broker credentials. I verified this by running the mosquitto pub sub commands from the VM on which my confluent platform is running. Any idea why am I getting this error?

Instructions for using without REST API

Could you, please, explain the procedure in the absence of the REST API? (that needs is not listed in the prerequisites... Such as java >8, maven or git)

Furthermore, at the end of the build, the folder /target/kafka-connect-mqtt-1.0-0-package/share/kafka-connect-mqtt does not exist.

license

Hi Johan,
can you provide the license type under which your code is being licensed?
Thank you
Dom

Getting Error while starting connector

Hello All,

I am trying to use this connector plugin to connect to a 'mqtts' protocol broker.

I have built the required plugin directory and copied it to the connector plugins directory. Then I have passed all the details through a .properties file like this :

name=mqtt-source-connector
connector.class=be.jovacon.kafka.connect.MQTTSourceConnector
mqtt.topic=test
kafka.topic=test
mqtt.clientID=<client-id>
mqtt.broker=mqtts://<mqtt-broker-address>:8883
mqtt.username=<username>
mqtt.password=<password>
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.storage.StringConverter
value.converter.schemas.enable=false

I am getting this error :

[2020-12-08 09:52:10,884] ERROR WorkerSourceTask{id=mqtt-source-connector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:187)
java.lang.NoClassDefFoundError: com/google/common/base/Preconditions
	at com.github.jcustenborder.kafka.connect.utils.data.SourceRecordDequeBuilder.build(SourceRecordDequeBuilder.java:95)
	at be.jovacon.kafka.connect.MQTTSourceTask.start(MQTTSourceTask.java:32)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:232)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: com.google.common.base.Preconditions
	at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	... 10 more
[2020-12-08 09:52:10,924] ERROR WorkerSourceTask{id=mqtt-source-connector-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:188)
[2020-12-08 09:52:10,924] WARN Could not stop task (org.apache.kafka.connect.runtime.WorkerSourceTask:223)
java.lang.NullPointerException
	at be.jovacon.kafka.connect.MQTTSourceTask.stop(MQTTSourceTask.java:87)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.tryStop(WorkerSourceTask.java:220)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.close(WorkerSourceTask.java:170)
	at org.apache.kafka.connect.runtime.WorkerTask.doClose(WorkerTask.java:164)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:191)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

I think because of this error I am unable to connect to the mqtt broker. Is it possible that the plugin wasn't built correctly? I was able to see the plugin using

http://<kafkaconnect>:8083/connector-plugins

Getting error when MQTT source connector is created

Objective: to setup MQTT source connector with Confluent kafka v 5.4
Steps Performed:

  1. git clone https://github.com/johanvandevenne/kafka-connect-mqtt.git
  2. cd kafka-connect-mqtt
  3. mvn clean install
  4. copied the jar from /kafka-connect-mqtt/target to the plugins directory. The path of this directory is mentioned in the worker configuration.
  5. executing http://:8083/connector-plugins gives the expected output.
  6. curl -d @./mqtt-source-connector-1.json -H "Content-Type: application/json" -X POST http://localhost:8083/connectors

Expected: There should be no error logs in kafka connect and connector should be available for usage. But after 6th step is performed following is the error received in kafka connect logs :

[2020-06-01 08:01:31,181] ERROR [Worker clientId=connect-1, groupId=connect-cluster] Failed to reconfigure connector's tasks, retrying after backoff: (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1236)
java.lang.NoClassDefFoundError: be/jovacon/kafka/connect/MQTTSourceTask
        at be.jovacon.kafka.connect.MQTTSourceConnector.taskClass(MQTTSourceConnector.java:27)
        at org.apache.kafka.connect.runtime.Worker.connectorTaskConfigs(Worker.java:322)
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnector(DistributedHerder.java:1287)
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnectorTasksWithRetry(DistributedHerder.java:1225)
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1000(DistributedHerder.java:125)
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder$17$1.call(DistributedHerder.java:1242)
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder$17$1.call(DistributedHerder.java:1239)
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:342)
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:282)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

following is connector config json file

{ "name": "mqtt-source-connector-1",
    "config":
    {
      "connector.class":"be.jovacon.kafka.connect.MQTTSourceConnector",
      "mqtt.topic":"my_mqtt_topic",
      "kafka.topic":"my_kafka_topic",
      "mqtt.clientID":"my_client_id",
      "mqtt.broker":"tcp://broker-ip:1883",
      "key.converter":"org.apache.kafka.connect.storage.StringConverter",
      "key.converter.schemas.enable":false,
      "value.converter":"org.apache.kafka.connect.storage.StringConverter",
      "value.converter.schemas.enable":false,
      "mqtt.username":"username",
      "mqtt.password":"password"
    }
}
````

How can I fix the above issue?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.