johanvandevenne / kafka-connect-mqtt Goto Github PK
View Code? Open in Web Editor NEWKafka Connect MQTT Connector
Kafka Connect MQTT Connector
Hello,
I was wondering if you tried deploying it as kubernetes pod ?
I was trying it and encountered some problems, I had to define an "entrypoint" script which looks like this :
# shellcheck disable=SC1083
# Wait until the REST server is available
while [ "$(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors)" -ne 200 ]; do
sleep 5
done
# Configure the connector
curl -i -X PUT -H "Content-Type:application/json"
http://localhost:8083/connectors/lysr-source-connector/config \
-d "{
\"connector.class\": \"ai.lysr.mqttconnector.LysrSourceMqttConnector\",
\"tasks.max\": \"1\",
\"topics.regex\": \"^[0-9]*$\",
\"key.converter\": \"org.apache.kafka.connect.storage.StringConverter\",
\"value.converter\": \"org.apache.kafka.connect.storage.StringConverter\",
\"mqtt.broker\": \"$MQTT_BROKER_URL\",
\"mqtt.clientID\": \"$MQTT_CLIENT_ID\",
\"mqtt.topic\": \"$MQTT_TOPIC\",
\"mqtt.userName\": \"$MQTT_USERNAME\",
\"mqtt.password\": \"$MQTT_PASSWORD\"
}"
To configure the connector and the dockerfile is the following:
FROM confluentinc/cp-kafka-connect-base:7.0.1
ENV CONNECT_PLUGIN_PATH=/app
WORKDIR /app
COPY build/app .
But I get the error :
===> Configuring ...
CONNECT_KEY_CONVERTER is required.
Command [/usr/local/bin/dub ensure CONNECT_KEY_CONVERTER] FAILED !
Any idea ?
A small issue with very low priority.
Within the readme, the MQTT username is declared as "mqtt.username" in sourcecode it is defined as "mqtt.userName". If you configure the connector manually it is usually not a blocker. In my case i have written a small tool to provide AsyncAPI Bindings automatically into the related infrastructure (kafka & consul) with a mixture between the confluent mqtt sink and source connector (using the regex pattern) and this one here (because it solves a problem to switch topic name between kafka and mqtt in a sink configuration). I built a workaround within the provisioner that fixes the difference for me, so that is really a problem.
Best Regards
MQTT Broker = nanomq, mosquito
kafka version = 2.6.2
I'm testing your kafka-connect-mqtt in local ubuntu environment.
[{"class":"be.jovacon.kafka.connect.MQTTSinkConnector","type":"sink","version":"1.1.0"},{"class":"be.jovacon.kafka.connect.MQTTSourceConnector","type":"source","version":"1.1.0"},{"class":"org.apache.kafka.connect.file.FileStreamSinkConnector","type":"sink","version":"2.6.2"},{"class":"org.apache.kafka.connect.file.FileStreamSourceConnector","type":"source","version":"2.6.2"},{"class":"org.apache.kafka.connect.mirror.MirrorCheckpointConnector","type":"source","version":"1"},{"class":"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector","type":"source","version":"1"},{"class":"org.apache.kafka.connect.mirror.MirrorSourceConnector","type":"source","version":"1"}]
name=mqtt-source-connector
connector.class=be.jovacon.kafka.connect.MQTTSourceConnector
mqtt.topic=test
kafka.topic=mqtt.test
mqtt.clientID=cid
mqtt.broker=tcp://127.0.0.1:1883
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.storage.StringConverter
value.converter.schemas.enable=false
restart kafka connect
$ bin/connect-standalone.sh config/connect-standalone.properties config/connect-mqtt-source.properties
result
[2022-11-23 10:21:20,740] INFO Kafka Connect standalone worker initializing ... (org.apache.kafka.connect.cli.ConnectStandalone:69)
[2022-11-23 10:21:20,746] INFO WorkerInfo values:
jvm.args = -Xms256M, -Xmx2G, -XX:+UseG1GC, -XX:MaxGCPauseMillis=20, -XX:InitiatingHeapOccupancyPercent=35, -XX:+ExplicitGCInvokesConcurrent, -XX:MaxInlineLevel=15, -Djava.awt.headless=true, -Dcom.sun.management.jmxremote, -Dcom.sun.management.jmxremote.authenticate=false, -Dcom.sun.management.jmxremote.ssl=false, -Dkafka.logs.dir=/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../logs, -Dlog4j.configuration=file:bin/../config/connect-log4j.properties
jvm.spec = Ubuntu, OpenJDK 64-Bit Server VM, 11.0.17, 11.0.17+8-post-Ubuntu-1ubuntu222.04
jvm.classpath = /home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/activation-1.1.1.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/aopalliance-repackaged-2.6.1.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/argparse4j-0.7.0.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/audience-annotations-0.5.0.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/commons-cli-1.4.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/commons-lang3-3.8.1.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/connect-api-2.6.2.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/connect-basic-auth-extension-2.6.2.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/connect-file-2.6.2.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/connect-json-2.6.2.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/connect-mirror-2.6.2.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/connect-mirror-client-2.6.2.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/connect-runtime-2.6.2.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/connect-transforms-2.6.2.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/connect-utils-0.4.156.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/freemarker-2.3.28.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/hk2-api-2.6.1.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/hk2-locator-2.6.1.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/hk2-utils-2.6.1.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jackson-annotations-2.10.5.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jackson-core-2.10.5.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jackson-databind-2.10.5.1.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jackson-dataformat-csv-2.10.5.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jackson-datatype-jdk8-2.10.5.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jackson-jaxrs-base-2.10.5.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jackson-jaxrs-json-provider-2.10.5.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jackson-module-jaxb-annotations-2.10.5.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jackson-module-paranamer-2.10.5.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jackson-module-scala_2.12-2.10.5.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jakarta.activation-api-1.2.1.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jakarta.annotation-api-1.3.5.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jakarta.inject-2.6.1.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jakarta.validation-api-2.0.2.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jakarta.ws.rs-api-2.1.6.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jakarta.xml.bind-api-2.3.2.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/javassist-3.25.0-GA.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/javassist-3.26.0-GA.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/javax.servlet-api-3.1.0.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/javax.ws.rs-api-2.1.1.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jaxb-api-2.3.0.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jersey-client-2.31.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jersey-common-2.31.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jersey-container-servlet-2.31.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jersey-container-servlet-core-2.31.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jersey-hk2-2.31.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jersey-media-jaxb-2.31.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jersey-server-2.31.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jetty-client-9.4.38.v20210224.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jetty-continuation-9.4.38.v20210224.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jetty-http-9.4.38.v20210224.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jetty-io-9.4.38.v20210224.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jetty-security-9.4.38.v20210224.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jetty-server-9.4.38.v20210224.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jetty-servlet-9.4.38.v20210224.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jetty-servlets-9.4.38.v20210224.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jetty-util-9.4.38.v20210224.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jetty-util-ajax-9.4.38.v20210224.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/jopt-simple-5.0.4.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/kafka-clients-2.6.2.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/kafka-connect-mqtt-1.1.0.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/kafka-log4j-appender-2.6.2.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/kafka-streams-2.6.2.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/kafka-streams-examples-2.6.2.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/kafka-streams-scala_2.12-2.6.2.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/kafka-streams-test-utils-2.6.2.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/kafka-tools-2.6.2.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/kafka_2.12-2.6.2-sources.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/kafka_2.12-2.6.2.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/log4j-1.2.17.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/log4j-api-2.11.2.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/log4j-core-2.11.2.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/lz4-java-1.7.1.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/maven-artifact-3.6.3.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/metrics-core-2.2.0.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/netty-buffer-4.1.59.Final.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/netty-codec-4.1.59.Final.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/netty-common-4.1.59.Final.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/netty-handler-4.1.59.Final.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/netty-resolver-4.1.59.Final.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/netty-transport-4.1.59.Final.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/netty-transport-native-epoll-4.1.59.Final.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/netty-transport-native-unix-common-4.1.59.Final.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/org.eclipse.paho.client.mqttv3-1.2.2.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/osgi-resource-locator-1.0.3.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/paranamer-2.8.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/plexus-utils-3.2.1.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/reflections-0.9.12.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/rocksdbjni-5.18.4.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/scala-collection-compat_2.12-2.1.6.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/scala-java8-compat_2.12-0.9.1.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/scala-library-2.12.11.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/scala-logging_2.12-3.9.2.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/scala-reflect-2.12.11.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/slf4j-api-1.7.30.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/slf4j-log4j12-1.7.30.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/snappy-java-1.1.7.3.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/zookeeper-3.5.9.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/zookeeper-jute-3.5.9.jar:/home/gwjeon/kafka/kafka_2.12-2.6.2/bin/../libs/zstd-jni-1.4.4-7.jar
os.spec = Linux, amd64, 5.15.74.2-microsoft-standard-WSL2
os.vcpus = 16
(org.apache.kafka.connect.runtime.WorkerInfo:71)
[2022-11-23 10:21:20,747] INFO Scanning for plugin classes. This might take a moment ... (org.apache.kafka.connect.cli.ConnectStandalone:78)
[2022-11-23 10:21:20,756] INFO Loading plugin from: /home/gwjeon/kafka/kafka_2.12-2.6.2/plugins/kafka-connect-mqtt/freemarker-2.3.28.jar (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:246)
[2022-11-23 10:21:20,914] INFO Registered loader: PluginClassLoader{pluginLocation=file:/home/gwjeon/kafka/kafka_2.12-2.6.2/plugins/kafka-connect-mqtt/freemarker-2.3.28.jar} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:269)
[2022-11-23 10:21:20,915] INFO Added plugin 'org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
[2022-11-23 10:21:20,915] INFO Added plugin 'org.apache.kafka.connect.connector.policy.PrincipalConnectorClientConfigOverridePolicy' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
[2022-11-23 10:21:20,915] INFO Added plugin 'org.apache.kafka.connect.connector.policy.NoneConnectorClientConfigOverridePolicy' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
[2022-11-23 10:21:20,916] INFO Loading plugin from: /home/gwjeon/kafka/kafka_2.12-2.6.2/plugins/kafka-connect-mqtt/org.eclipse.paho.client.mqttv3-1.2.2.jar (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:246)
[2022-11-23 10:21:20,992] INFO Registered loader: PluginClassLoader{pluginLocation=file:/home/gwjeon/kafka/kafka_2.12-2.6.2/plugins/kafka-connect-mqtt/org.eclipse.paho.client.mqttv3-1.2.2.jar} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:269)
[2022-11-23 10:21:20,993] INFO Loading plugin from: /home/gwjeon/kafka/kafka_2.12-2.6.2/plugins/kafka-connect-mqtt/log4j-api-2.11.2.jar (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:246)
[2022-11-23 10:21:21,011] INFO Registered loader: PluginClassLoader{pluginLocation=file:/home/gwjeon/kafka/kafka_2.12-2.6.2/plugins/kafka-connect-mqtt/log4j-api-2.11.2.jar} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:269)
[2022-11-23 10:21:21,012] INFO Loading plugin from: /home/gwjeon/kafka/kafka_2.12-2.6.2/plugins/kafka-connect-mqtt/kafka-connect-mqtt-1.1.0.jar (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:246)
[2022-11-23 10:21:21,026] INFO Registered loader: PluginClassLoader{pluginLocation=file:/home/gwjeon/kafka/kafka_2.12-2.6.2/plugins/kafka-connect-mqtt/kafka-connect-mqtt-1.1.0.jar} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:269)
[2022-11-23 10:21:21,026] INFO Added plugin 'be.jovacon.kafka.connect.MQTTSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
[2022-11-23 10:21:21,026] INFO Added plugin 'be.jovacon.kafka.connect.MQTTSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
[2022-11-23 10:21:21,028] INFO Loading plugin from: /home/gwjeon/kafka/kafka_2.12-2.6.2/plugins/kafka-connect-mqtt/log4j-core-2.11.2.jar (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:246)
[2022-11-23 10:21:21,111] INFO Registered loader: PluginClassLoader{pluginLocation=file:/home/gwjeon/kafka/kafka_2.12-2.6.2/plugins/kafka-connect-mqtt/log4j-core-2.11.2.jar} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:269)
[2022-11-23 10:21:21,111] INFO Loading plugin from: /home/gwjeon/kafka/kafka_2.12-2.6.2/plugins/kafka-connect-mqtt/connect-utils-0.4.156.jar (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:246)
[2022-11-23 10:21:21,122] INFO Registered loader: PluginClassLoader{pluginLocation=file:/home/gwjeon/kafka/kafka_2.12-2.6.2/plugins/kafka-connect-mqtt/connect-utils-0.4.156.jar} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:269)
[2022-11-23 10:21:21,740] INFO Registered loader: jdk.internal.loader.ClassLoaders$AppClassLoader@3d4eac69 (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:269)
[2022-11-23 10:21:21,740] INFO Added plugin 'org.apache.kafka.connect.mirror.MirrorSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
[2022-11-23 10:21:21,740] INFO Added plugin 'org.apache.kafka.connect.file.FileStreamSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
[2022-11-23 10:21:21,740] INFO Added plugin 'org.apache.kafka.connect.tools.SchemaSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
[2022-11-23 10:21:21,740] INFO Added plugin 'org.apache.kafka.connect.tools.MockSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
[2022-11-23 10:21:21,740] INFO Added plugin 'org.apache.kafka.connect.file.FileStreamSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
[2022-11-23 10:21:21,740] INFO Added plugin 'org.apache.kafka.connect.mirror.MirrorCheckpointConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
[2022-11-23 10:21:21,740] INFO Added plugin 'org.apache.kafka.connect.tools.VerifiableSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
[2022-11-23 10:21:21,741] INFO Added plugin 'org.apache.kafka.connect.tools.VerifiableSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
[2022-11-23 10:21:21,741] INFO Added plugin 'org.apache.kafka.connect.tools.MockSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
[2022-11-23 10:21:21,741] INFO Added plugin 'org.apache.kafka.connect.mirror.MirrorHeartbeatConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
[2022-11-23 10:21:21,741] INFO Added plugin 'org.apache.kafka.connect.tools.MockConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
[2022-11-23 10:21:21,741] INFO Added plugin 'org.apache.kafka.connect.converters.FloatConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
[2022-11-23 10:21:21,741] INFO Added plugin 'org.apache.kafka.connect.converters.DoubleConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
[2022-11-23 10:21:21,741] INFO Added plugin 'org.apache.kafka.connect.converters.ByteArrayConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
[2022-11-23 10:21:21,741] INFO Added plugin 'org.apache.kafka.connect.converters.LongConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
[2022-11-23 10:21:21,741] INFO Added plugin 'org.apache.kafka.connect.converters.IntegerConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
[2022-11-23 10:21:21,741] INFO Added plugin 'org.apache.kafka.connect.json.JsonConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
[2022-11-23 10:21:21,741] INFO Added plugin 'org.apache.kafka.connect.storage.StringConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
[2022-11-23 10:21:21,741] INFO Added plugin 'org.apache.kafka.connect.converters.ShortConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
[2022-11-23 10:21:21,742] INFO Added plugin 'org.apache.kafka.connect.storage.SimpleHeaderConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
[2022-11-23 10:21:21,742] INFO Added plugin 'org.apache.kafka.connect.transforms.ReplaceField$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
[2022-11-23 10:21:21,742] INFO Added plugin 'org.apache.kafka.connect.transforms.SetSchemaMetadata$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
[2022-11-23 10:21:21,742] INFO Added plugin 'org.apache.kafka.connect.transforms.ReplaceField$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
[2022-11-23 10:21:21,742] INFO Added plugin 'org.apache.kafka.connect.transforms.Filter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
[2022-11-23 10:21:21,742] INFO Added plugin 'org.apache.kafka.connect.transforms.InsertField$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
[2022-11-23 10:21:21,742] INFO Added plugin 'org.apache.kafka.connect.transforms.TimestampConverter$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
[2022-11-23 10:21:21,742] INFO Added plugin 'org.apache.kafka.connect.transforms.MaskField$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
[2022-11-23 10:21:21,742] INFO Added plugin 'org.apache.kafka.connect.transforms.TimestampRouter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
[2022-11-23 10:21:21,742] INFO Added plugin 'org.apache.kafka.connect.transforms.RegexRouter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
[2022-11-23 10:21:21,743] INFO Added plugin 'org.apache.kafka.connect.transforms.HoistField$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
[2022-11-23 10:21:21,743] INFO Added plugin 'org.apache.kafka.connect.transforms.ValueToKey' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
[2022-11-23 10:21:21,743] INFO Added plugin 'org.apache.kafka.connect.transforms.MaskField$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
[2022-11-23 10:21:21,743] INFO Added plugin 'org.apache.kafka.connect.transforms.Cast$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
[2022-11-23 10:21:21,743] INFO Added plugin 'org.apache.kafka.connect.transforms.Cast$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
[2022-11-23 10:21:21,743] INFO Added plugin 'org.apache.kafka.connect.runtime.PredicatedTransformation' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
[2022-11-23 10:21:21,743] INFO Added plugin 'org.apache.kafka.connect.transforms.ExtractField$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
[2022-11-23 10:21:21,743] INFO Added plugin 'org.apache.kafka.connect.transforms.Flatten$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
[2022-11-23 10:21:21,743] INFO Added plugin 'org.apache.kafka.connect.transforms.InsertField$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
[2022-11-23 10:21:21,743] INFO Added plugin 'org.apache.kafka.connect.transforms.Flatten$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
[2022-11-23 10:21:21,743] INFO Added plugin 'org.apache.kafka.connect.transforms.SetSchemaMetadata$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
[2022-11-23 10:21:21,743] INFO Added plugin 'org.apache.kafka.connect.transforms.ExtractField$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
[2022-11-23 10:21:21,743] INFO Added plugin 'org.apache.kafka.connect.transforms.TimestampConverter$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
[2022-11-23 10:21:21,744] INFO Added plugin 'org.apache.kafka.connect.transforms.HoistField$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
[2022-11-23 10:21:21,745] INFO Added plugin 'org.apache.kafka.connect.transforms.predicates.HasHeaderKey' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
[2022-11-23 10:21:21,745] INFO Added plugin 'org.apache.kafka.connect.transforms.predicates.RecordIsTombstone' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
[2022-11-23 10:21:21,745] INFO Added plugin 'org.apache.kafka.connect.transforms.predicates.TopicNameMatches' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
[2022-11-23 10:21:21,745] INFO Added plugin 'org.apache.kafka.common.config.provider.FileConfigProvider' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
[2022-11-23 10:21:21,745] INFO Added plugin 'org.apache.kafka.connect.rest.basic.auth.extension.BasicAuthSecurityRestExtension' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
[2022-11-23 10:21:21,746] INFO Added aliases 'MQTTSinkConnector' and 'MQTTSink' to plugin 'be.jovacon.kafka.connect.MQTTSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430)
[2022-11-23 10:21:21,746] INFO Added aliases 'MQTTSourceConnector' and 'MQTTSource' to plugin 'be.jovacon.kafka.connect.MQTTSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430)
[2022-11-23 10:21:21,746] INFO Added aliases 'FileStreamSinkConnector' and 'FileStreamSink' to plugin 'org.apache.kafka.connect.file.FileStreamSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430)
[2022-11-23 10:21:21,746] INFO Added aliases 'FileStreamSourceConnector' and 'FileStreamSource' to plugin 'org.apache.kafka.connect.file.FileStreamSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430)
[2022-11-23 10:21:21,746] INFO Added aliases 'MirrorCheckpointConnector' and 'MirrorCheckpoint' to plugin 'org.apache.kafka.connect.mirror.MirrorCheckpointConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430)
[2022-11-23 10:21:21,746] INFO Added aliases 'MirrorHeartbeatConnector' and 'MirrorHeartbeat' to plugin 'org.apache.kafka.connect.mirror.MirrorHeartbeatConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430)
[2022-11-23 10:21:21,746] INFO Added aliases 'MirrorSourceConnector' and 'MirrorSource' to plugin 'org.apache.kafka.connect.mirror.MirrorSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430)
[2022-11-23 10:21:21,746] INFO Added aliases 'MockConnector' and 'Mock' to plugin 'org.apache.kafka.connect.tools.MockConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430)
[2022-11-23 10:21:21,747] INFO Added aliases 'MockSinkConnector' and 'MockSink' to plugin 'org.apache.kafka.connect.tools.MockSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430)
[2022-11-23 10:21:21,747] INFO Added aliases 'MockSourceConnector' and 'MockSource' to plugin 'org.apache.kafka.connect.tools.MockSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430)
[2022-11-23 10:21:21,747] INFO Added aliases 'SchemaSourceConnector' and 'SchemaSource' to plugin 'org.apache.kafka.connect.tools.SchemaSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430)
[2022-11-23 10:21:21,747] INFO Added aliases 'VerifiableSinkConnector' and 'VerifiableSink' to plugin 'org.apache.kafka.connect.tools.VerifiableSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430)
[2022-11-23 10:21:21,747] INFO Added aliases 'VerifiableSourceConnector' and 'VerifiableSource' to plugin 'org.apache.kafka.connect.tools.VerifiableSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430)
[2022-11-23 10:21:21,747] INFO Added aliases 'ByteArrayConverter' and 'ByteArray' to plugin 'org.apache.kafka.connect.converters.ByteArrayConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430)
[2022-11-23 10:21:21,747] INFO Added aliases 'DoubleConverter' and 'Double' to plugin 'org.apache.kafka.connect.converters.DoubleConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430)
[2022-11-23 10:21:21,747] INFO Added aliases 'FloatConverter' and 'Float' to plugin 'org.apache.kafka.connect.converters.FloatConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430)
[2022-11-23 10:21:21,747] INFO Added aliases 'IntegerConverter' and 'Integer' to plugin 'org.apache.kafka.connect.converters.IntegerConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430)
[2022-11-23 10:21:21,747] INFO Added aliases 'LongConverter' and 'Long' to plugin 'org.apache.kafka.connect.converters.LongConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430)
[2022-11-23 10:21:21,747] INFO Added aliases 'ShortConverter' and 'Short' to plugin 'org.apache.kafka.connect.converters.ShortConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430)
[2022-11-23 10:21:21,747] INFO Added aliases 'JsonConverter' and 'Json' to plugin 'org.apache.kafka.connect.json.JsonConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430)
[2022-11-23 10:21:21,747] INFO Added aliases 'StringConverter' and 'String' to plugin 'org.apache.kafka.connect.storage.StringConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430)
[2022-11-23 10:21:21,747] INFO Added aliases 'ByteArrayConverter' and 'ByteArray' to plugin 'org.apache.kafka.connect.converters.ByteArrayConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430)
[2022-11-23 10:21:21,748] INFO Added aliases 'DoubleConverter' and 'Double' to plugin 'org.apache.kafka.connect.converters.DoubleConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430)
[2022-11-23 10:21:21,748] INFO Added aliases 'FloatConverter' and 'Float' to plugin 'org.apache.kafka.connect.converters.FloatConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430)
[2022-11-23 10:21:21,748] INFO Added aliases 'IntegerConverter' and 'Integer' to plugin 'org.apache.kafka.connect.converters.IntegerConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430)
[2022-11-23 10:21:21,748] INFO Added aliases 'LongConverter' and 'Long' to plugin 'org.apache.kafka.connect.converters.LongConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430)
[2022-11-23 10:21:21,748] INFO Added aliases 'ShortConverter' and 'Short' to plugin 'org.apache.kafka.connect.converters.ShortConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430)
[2022-11-23 10:21:21,748] INFO Added aliases 'JsonConverter' and 'Json' to plugin 'org.apache.kafka.connect.json.JsonConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430)
[2022-11-23 10:21:21,748] INFO Added alias 'SimpleHeaderConverter' to plugin 'org.apache.kafka.connect.storage.SimpleHeaderConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:427)
[2022-11-23 10:21:21,748] INFO Added aliases 'StringConverter' and 'String' to plugin 'org.apache.kafka.connect.storage.StringConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430)
[2022-11-23 10:21:21,748] INFO Added aliases 'PredicatedTransformation' and 'Predicated' to plugin 'org.apache.kafka.connect.runtime.PredicatedTransformation' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430)
[2022-11-23 10:21:21,748] INFO Added alias 'Filter' to plugin 'org.apache.kafka.connect.transforms.Filter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:427)
[2022-11-23 10:21:21,748] INFO Added alias 'RegexRouter' to plugin 'org.apache.kafka.connect.transforms.RegexRouter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:427)
[2022-11-23 10:21:21,748] INFO Added alias 'TimestampRouter' to plugin 'org.apache.kafka.connect.transforms.TimestampRouter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:427)
[2022-11-23 10:21:21,749] INFO Added alias 'ValueToKey' to plugin 'org.apache.kafka.connect.transforms.ValueToKey' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:427)
[2022-11-23 10:21:21,749] INFO Added alias 'HasHeaderKey' to plugin 'org.apache.kafka.connect.transforms.predicates.HasHeaderKey' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:427)
[2022-11-23 10:21:21,749] INFO Added alias 'RecordIsTombstone' to plugin 'org.apache.kafka.connect.transforms.predicates.RecordIsTombstone' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:427)
[2022-11-23 10:21:21,749] INFO Added alias 'TopicNameMatches' to plugin 'org.apache.kafka.connect.transforms.predicates.TopicNameMatches' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:427)
[2022-11-23 10:21:21,749] INFO Added alias 'BasicAuthSecurityRestExtension' to plugin 'org.apache.kafka.connect.rest.basic.auth.extension.BasicAuthSecurityRestExtension' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:427)
[2022-11-23 10:21:21,749] INFO Added aliases 'AllConnectorClientConfigOverridePolicy' and 'All' to plugin 'org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430)
[2022-11-23 10:21:21,749] INFO Added aliases 'NoneConnectorClientConfigOverridePolicy' and 'None' to plugin 'org.apache.kafka.connect.connector.policy.NoneConnectorClientConfigOverridePolicy' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430)
[2022-11-23 10:21:21,749] INFO Added aliases 'PrincipalConnectorClientConfigOverridePolicy' and 'Principal' to plugin 'org.apache.kafka.connect.connector.policy.PrincipalConnectorClientConfigOverridePolicy' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:430)
[2022-11-23 10:21:21,761] INFO StandaloneConfig values:
access.control.allow.methods =
access.control.allow.origin =
admin.listeners = null
bootstrap.servers = [localhost:9092]
client.dns.lookup = use_all_dns_ips
config.providers = []
connector.client.config.override.policy = None
header.converter = class org.apache.kafka.connect.storage.SimpleHeaderConverter
internal.key.converter = class org.apache.kafka.connect.json.JsonConverter
internal.value.converter = class org.apache.kafka.connect.json.JsonConverter
key.converter = class org.apache.kafka.connect.json.JsonConverter
listeners = null
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
offset.flush.interval.ms = 10000
offset.flush.timeout.ms = 5000
offset.storage.file.filename = /tmp/connect.offsets
plugin.path = [plugins/kafka-connect-mqtt/]
response.http.headers.config =
rest.advertised.host.name = null
rest.advertised.listener = null
rest.advertised.port = null
rest.extension.classes = []
rest.host.name = null
rest.port = 8083
ssl.cipher.suites = null
ssl.client.auth = none
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
task.shutdown.graceful.timeout.ms = 5000
topic.creation.enable = true
topic.tracking.allow.reset = true
topic.tracking.enable = true
value.converter = class org.apache.kafka.connect.json.JsonConverter
(org.apache.kafka.connect.runtime.standalone.StandaloneConfig:354)
[2022-11-23 10:21:21,761] INFO Creating Kafka admin client (org.apache.kafka.connect.util.ConnectUtils:49)
[2022-11-23 10:21:21,764] INFO AdminClientConfig values:
bootstrap.servers = [localhost:9092]
client.dns.lookup = use_all_dns_ips
client.id =
connections.max.idle.ms = 300000
default.api.timeout.ms = 60000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
(org.apache.kafka.clients.admin.AdminClientConfig:354)
[2022-11-23 10:21:21,809] WARN The configuration 'offset.flush.interval.ms' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:362)
[2022-11-23 10:21:21,809] WARN The configuration 'key.converter.schemas.enable' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:362)
[2022-11-23 10:21:21,809] WARN The configuration 'offset.storage.file.filename' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:362)
[2022-11-23 10:21:21,809] WARN The configuration 'value.converter.schemas.enable' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:362)
[2022-11-23 10:21:21,809] WARN The configuration 'plugin.path' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:362)
[2022-11-23 10:21:21,809] WARN The configuration 'value.converter' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:362)
[2022-11-23 10:21:21,809] WARN The configuration 'key.converter' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:362)
[2022-11-23 10:21:21,810] INFO Kafka version: 2.6.2 (org.apache.kafka.common.utils.AppInfoParser:117)
[2022-11-23 10:21:21,810] INFO Kafka commitId: da65af02e5856e34 (org.apache.kafka.common.utils.AppInfoParser:118)
[2022-11-23 10:21:21,810] INFO Kafka startTimeMs: 1669166481809 (org.apache.kafka.common.utils.AppInfoParser:119)
[2022-11-23 10:21:22,013] INFO Kafka cluster ID: GFsZxHhuTKS1t3X54RH32g (org.apache.kafka.connect.util.ConnectUtils:65)
[2022-11-23 10:21:22,026] INFO Logging initialized @1614ms to org.eclipse.jetty.util.log.Slf4jLog (org.eclipse.jetty.util.log:169)
[2022-11-23 10:21:22,055] INFO Added connector for http://:8083 (org.apache.kafka.connect.runtime.rest.RestServer:132)
[2022-11-23 10:21:22,056] INFO Initializing REST server (org.apache.kafka.connect.runtime.rest.RestServer:204)
[2022-11-23 10:21:22,062] INFO jetty-9.4.38.v20210224; built: 2021-02-24T20:25:07.675Z; git: 288f3cc74549e8a913bf363250b0744f2695b8e6; jvm 11.0.17+8-post-Ubuntu-1ubuntu222.04 (org.eclipse.jetty.server.Server:375)
[2022-11-23 10:21:22,085] INFO Started http_8083@5649ec46{HTTP/1.1, (http/1.1)}{0.0.0.0:8083} (org.eclipse.jetty.server.AbstractConnector:331)
[2022-11-23 10:21:22,086] INFO Started @1674ms (org.eclipse.jetty.server.Server:415)
[2022-11-23 10:21:22,105] INFO Advertised URI: http://127.0.1.1:8083/ (org.apache.kafka.connect.runtime.rest.RestServer:371)
[2022-11-23 10:21:22,105] INFO REST server listening at http://127.0.1.1:8083/, advertising URL http://127.0.1.1:8083/ (org.apache.kafka.connect.runtime.rest.RestServer:219)
[2022-11-23 10:21:22,106] INFO Advertised URI: http://127.0.1.1:8083/ (org.apache.kafka.connect.runtime.rest.RestServer:371)
[2022-11-23 10:21:22,106] INFO REST admin endpoints at http://127.0.1.1:8083/ (org.apache.kafka.connect.runtime.rest.RestServer:220)
[2022-11-23 10:21:22,106] INFO Advertised URI: http://127.0.1.1:8083/ (org.apache.kafka.connect.runtime.rest.RestServer:371)
[2022-11-23 10:21:22,106] INFO Setting up None Policy for ConnectorClientConfigOverride. This will disallow any client configuration to be overridden (org.apache.kafka.connect.connector.policy.NoneConnectorClientConfigOverridePolicy:45)
[2022-11-23 10:21:22,112] INFO Creating Kafka admin client (org.apache.kafka.connect.util.ConnectUtils:49)
[2022-11-23 10:21:22,113] INFO AdminClientConfig values:
bootstrap.servers = [localhost:9092]
client.dns.lookup = use_all_dns_ips
client.id =
connections.max.idle.ms = 300000
default.api.timeout.ms = 60000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
(org.apache.kafka.clients.admin.AdminClientConfig:354)
[2022-11-23 10:21:22,118] WARN The configuration 'offset.flush.interval.ms' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:362)
[2022-11-23 10:21:22,119] WARN The configuration 'key.converter.schemas.enable' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:362)
[2022-11-23 10:21:22,119] WARN The configuration 'offset.storage.file.filename' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:362)
[2022-11-23 10:21:22,119] WARN The configuration 'value.converter.schemas.enable' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:362)
[2022-11-23 10:21:22,119] WARN The configuration 'plugin.path' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:362)
[2022-11-23 10:21:22,119] WARN The configuration 'value.converter' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:362)
[2022-11-23 10:21:22,119] WARN The configuration 'key.converter' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:362)
[2022-11-23 10:21:22,119] INFO Kafka version: 2.6.2 (org.apache.kafka.common.utils.AppInfoParser:117)
[2022-11-23 10:21:22,120] INFO Kafka commitId: da65af02e5856e34 (org.apache.kafka.common.utils.AppInfoParser:118)
[2022-11-23 10:21:22,120] INFO Kafka startTimeMs: 1669166482119 (org.apache.kafka.common.utils.AppInfoParser:119)
[2022-11-23 10:21:22,132] INFO Kafka cluster ID: GFsZxHhuTKS1t3X54RH32g (org.apache.kafka.connect.util.ConnectUtils:65)
[2022-11-23 10:21:22,137] INFO Kafka version: 2.6.2 (org.apache.kafka.common.utils.AppInfoParser:117)
[2022-11-23 10:21:22,137] INFO Kafka commitId: da65af02e5856e34 (org.apache.kafka.common.utils.AppInfoParser:118)
[2022-11-23 10:21:22,137] INFO Kafka startTimeMs: 1669166482137 (org.apache.kafka.common.utils.AppInfoParser:119)
[2022-11-23 10:21:22,210] INFO JsonConverterConfig values:
converter.type = key
decimal.format = BASE64
schemas.cache.size = 1000
schemas.enable = false
(org.apache.kafka.connect.json.JsonConverterConfig:354)
[2022-11-23 10:21:22,211] INFO JsonConverterConfig values:
converter.type = value
decimal.format = BASE64
schemas.cache.size = 1000
schemas.enable = false
(org.apache.kafka.connect.json.JsonConverterConfig:354)
[2022-11-23 10:21:22,217] INFO Kafka Connect standalone worker initialization took 1476ms (org.apache.kafka.connect.cli.ConnectStandalone:100)
[2022-11-23 10:21:22,217] INFO Kafka Connect starting (org.apache.kafka.connect.runtime.Connect:51)
[2022-11-23 10:21:22,218] INFO Herder starting (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:94)
[2022-11-23 10:21:22,218] INFO Worker starting (org.apache.kafka.connect.runtime.Worker:195)
[2022-11-23 10:21:22,218] INFO Starting FileOffsetBackingStore with file /tmp/connect.offsets (org.apache.kafka.connect.storage.FileOffsetBackingStore:58)
[2022-11-23 10:21:22,222] INFO Worker started (org.apache.kafka.connect.runtime.Worker:202)
[2022-11-23 10:21:22,222] INFO Herder started (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:97)
[2022-11-23 10:21:22,222] INFO Initializing REST resources (org.apache.kafka.connect.runtime.rest.RestServer:224)
[2022-11-23 10:21:22,248] INFO Adding admin resources to main listener (org.apache.kafka.connect.runtime.rest.RestServer:241)
[2022-11-23 10:21:22,297] INFO DefaultSessionIdManager workerName=node0 (org.eclipse.jetty.server.session:334)
[2022-11-23 10:21:22,297] INFO No SessionScavenger set, using defaults (org.eclipse.jetty.server.session:339)
[2022-11-23 10:21:22,298] INFO node0 Scavenging every 600000ms (org.eclipse.jetty.server.session:132)
Nov 23, 2022 10:21:22 AM org.glassfish.jersey.internal.inject.Providers checkProviderRuntime
WARNING: A provider org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource registered in SERVER runtime does not implement any provider interfaces applicable in the SERVER runtime. Due to constraint configuration problems the provider org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource will be ignored.
Nov 23, 2022 10:21:22 AM org.glassfish.jersey.internal.inject.Providers checkProviderRuntime
WARNING: A provider org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource registered in SERVER runtime does not implement any provider interfaces applicable in the SERVER runtime. Due to constraint configuration problems the provider org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource will be ignored.
Nov 23, 2022 10:21:22 AM org.glassfish.jersey.internal.inject.Providers checkProviderRuntime
WARNING: A provider org.apache.kafka.connect.runtime.rest.resources.LoggingResource registered in SERVER runtime does not implement any provider interfaces applicable in the SERVER runtime. Due to constraint configuration problems the provider org.apache.kafka.connect.runtime.rest.resources.LoggingResource will be ignored.
Nov 23, 2022 10:21:22 AM org.glassfish.jersey.internal.inject.Providers checkProviderRuntime
WARNING: A provider org.apache.kafka.connect.runtime.rest.resources.RootResource registered in SERVER runtime does not implement any provider interfaces applicable in the SERVER runtime. Due to constraint configuration problems the provider org.apache.kafka.connect.runtime.rest.resources.RootResource will be ignored.
Nov 23, 2022 10:21:22 AM org.glassfish.jersey.internal.Errors logErrors
WARNING: The following warnings have been detected: WARNING: The (sub)resource method listLoggers in org.apache.kafka.connect.runtime.rest.resources.LoggingResource contains empty path annotation.
WARNING: The (sub)resource method createConnector in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.
WARNING: The (sub)resource method listConnectors in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.
WARNING: The (sub)resource method listConnectorPlugins in org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource contains empty path annotation.
WARNING: The (sub)resource method serverInfo in org.apache.kafka.connect.runtime.rest.resources.RootResource contains empty path annotation.
[2022-11-23 10:21:22,663] INFO Started o.e.j.s.ServletContextHandler@27a7ef08{/,null,AVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:916)
[2022-11-23 10:21:22,663] INFO REST resources initialized; server is started and ready to handle requests (org.apache.kafka.connect.runtime.rest.RestServer:319)
[2022-11-23 10:21:22,663] INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect:57)
[2022-11-23 10:21:22,672] INFO AbstractConfig values:
(org.apache.kafka.common.config.AbstractConfig:354)
[2022-11-23 10:21:22,686] INFO Creating connector mqtt-source-connector of type be.jovacon.kafka.connect.MQTTSourceConnector (org.apache.kafka.connect.runtime.Worker:274)
[2022-11-23 10:21:22,687] INFO SourceConnectorConfig values:
config.action.reload = restart
connector.class = be.jovacon.kafka.connect.MQTTSourceConnector
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = class org.apache.kafka.connect.storage.StringConverter
name = mqtt-source-connector
predicates = []
tasks.max = 1
topic.creation.groups = []
transforms = []
value.converter = class org.apache.kafka.connect.storage.StringConverter
(org.apache.kafka.connect.runtime.SourceConnectorConfig:354)
[2022-11-23 10:21:22,687] INFO EnrichedConnectorConfig values:
config.action.reload = restart
connector.class = be.jovacon.kafka.connect.MQTTSourceConnector
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = class org.apache.kafka.connect.storage.StringConverter
name = mqtt-source-connector
predicates = []
tasks.max = 1
topic.creation.groups = []
transforms = []
value.converter = class org.apache.kafka.connect.storage.StringConverter
(org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:354)
[2022-11-23 10:21:22,692] INFO Instantiated connector mqtt-source-connector with version 1.1.0 of type class be.jovacon.kafka.connect.MQTTSourceConnector (org.apache.kafka.connect.runtime.Worker:284)
[2022-11-23 10:21:22,693] INFO Finished creating connector mqtt-source-connector (org.apache.kafka.connect.runtime.Worker:310)
[2022-11-23 10:21:22,695] INFO MQTTSourceConnectorConfig values:
kafka.topic = mqtt.test
mqtt.automaticReconnect = true
mqtt.broker = tcp://127.0.0.1:1883
mqtt.cleanSession = true
mqtt.clientID = cid
mqtt.connectionTimeout = 30
mqtt.keepAliveInterval = 60
mqtt.password = [hidden]
mqtt.qos = 1
mqtt.topic = test
mqtt.userName =
(be.jovacon.kafka.connect.config.MQTTSourceConnectorConfig:354)
[2022-11-23 10:21:22,697] INFO SourceConnectorConfig values:
config.action.reload = restart
connector.class = be.jovacon.kafka.connect.MQTTSourceConnector
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = class org.apache.kafka.connect.storage.StringConverter
name = mqtt-source-connector
predicates = []
tasks.max = 1
topic.creation.groups = []
transforms = []
value.converter = class org.apache.kafka.connect.storage.StringConverter
(org.apache.kafka.connect.runtime.SourceConnectorConfig:354)
[2022-11-23 10:21:22,697] INFO EnrichedConnectorConfig values:
config.action.reload = restart
connector.class = be.jovacon.kafka.connect.MQTTSourceConnector
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = class org.apache.kafka.connect.storage.StringConverter
name = mqtt-source-connector
predicates = []
tasks.max = 1
topic.creation.groups = []
transforms = []
value.converter = class org.apache.kafka.connect.storage.StringConverter
(org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:354)
[2022-11-23 10:21:22,698] INFO Creating task mqtt-source-connector-0 (org.apache.kafka.connect.runtime.Worker:509)
[2022-11-23 10:21:22,700] INFO ConnectorConfig values:
config.action.reload = restart
connector.class = be.jovacon.kafka.connect.MQTTSourceConnector
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = class org.apache.kafka.connect.storage.StringConverter
name = mqtt-source-connector
predicates = []
tasks.max = 1
transforms = []
value.converter = class org.apache.kafka.connect.storage.StringConverter
(org.apache.kafka.connect.runtime.ConnectorConfig:354)
[2022-11-23 10:21:22,700] INFO EnrichedConnectorConfig values:
config.action.reload = restart
connector.class = be.jovacon.kafka.connect.MQTTSourceConnector
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = class org.apache.kafka.connect.storage.StringConverter
name = mqtt-source-connector
predicates = []
tasks.max = 1
transforms = []
value.converter = class org.apache.kafka.connect.storage.StringConverter
(org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:354)
[2022-11-23 10:21:22,701] INFO TaskConfig values:
task.class = class be.jovacon.kafka.connect.MQTTSourceTask
(org.apache.kafka.connect.runtime.TaskConfig:354)
[2022-11-23 10:21:22,701] INFO Instantiated task mqtt-source-connector-0 with version 1.1.0 of type be.jovacon.kafka.connect.MQTTSourceTask (org.apache.kafka.connect.runtime.Worker:524)
[2022-11-23 10:21:22,702] INFO StringConverterConfig values:
converter.encoding = UTF8
converter.type = key
(org.apache.kafka.connect.storage.StringConverterConfig:354)
[2022-11-23 10:21:22,702] INFO StringConverterConfig values:
converter.encoding = UTF8
converter.type = value
(org.apache.kafka.connect.storage.StringConverterConfig:354)
[2022-11-23 10:21:22,702] INFO Set up the key converter class org.apache.kafka.connect.storage.StringConverter for task mqtt-source-connector-0 using the connector config (org.apache.kafka.connect.runtime.Worker:539)
[2022-11-23 10:21:22,702] INFO Set up the value converter class org.apache.kafka.connect.storage.StringConverter for task mqtt-source-connector-0 using the connector config (org.apache.kafka.connect.runtime.Worker:545)
[2022-11-23 10:21:22,703] INFO Set up the header converter class org.apache.kafka.connect.storage.SimpleHeaderConverter for task mqtt-source-connector-0 using the worker config (org.apache.kafka.connect.runtime.Worker:550)
[2022-11-23 10:21:22,705] INFO SourceConnectorConfig values:
config.action.reload = restart
connector.class = be.jovacon.kafka.connect.MQTTSourceConnector
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = class org.apache.kafka.connect.storage.StringConverter
name = mqtt-source-connector
predicates = []
tasks.max = 1
topic.creation.groups = []
transforms = []
value.converter = class org.apache.kafka.connect.storage.StringConverter
(org.apache.kafka.connect.runtime.SourceConnectorConfig:354)
[2022-11-23 10:21:22,705] INFO EnrichedConnectorConfig values:
config.action.reload = restart
connector.class = be.jovacon.kafka.connect.MQTTSourceConnector
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = class org.apache.kafka.connect.storage.StringConverter
name = mqtt-source-connector
predicates = []
tasks.max = 1
topic.creation.groups = []
transforms = []
value.converter = class org.apache.kafka.connect.storage.StringConverter
(org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:354)
[2022-11-23 10:21:22,707] INFO Initializing: org.apache.kafka.connect.runtime.TransformationChain{} (org.apache.kafka.connect.runtime.Worker:606)
[2022-11-23 10:21:22,713] INFO ProducerConfig values:
acks = -1
batch.size = 16384
bootstrap.servers = [localhost:9092]
buffer.memory = 33554432
client.dns.lookup = use_all_dns_ips
client.id = connector-producer-mqtt-source-connector-0
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 2147483647
enable.idempotence = false
interceptor.classes = []
internal.auto.downgrade.txn.commit = false
key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
linger.ms = 0
max.block.ms = 9223372036854775807
max.in.flight.requests.per.connection = 1
max.request.size = 1048576
metadata.max.age.ms = 300000
metadata.max.idle.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 2147483647
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
(org.apache.kafka.clients.producer.ProducerConfig:354)
[2022-11-23 10:21:22,728] WARN The configuration 'metrics.context.connect.kafka.cluster.id' was supplied but isn't a known config. (org.apache.kafka.clients.producer.ProducerConfig:362)
[2022-11-23 10:21:22,728] INFO Kafka version: 2.6.2 (org.apache.kafka.common.utils.AppInfoParser:117)
[2022-11-23 10:21:22,729] INFO Kafka commitId: da65af02e5856e34 (org.apache.kafka.common.utils.AppInfoParser:118)
[2022-11-23 10:21:22,729] INFO Kafka startTimeMs: 1669166482728 (org.apache.kafka.common.utils.AppInfoParser:119)
[2022-11-23 10:21:22,738] INFO MQTTSourceConnectorConfig values:
kafka.topic = mqtt.test
mqtt.automaticReconnect = true
mqtt.broker = tcp://127.0.0.1:1883
mqtt.cleanSession = true
mqtt.clientID = cid
mqtt.connectionTimeout = 30
mqtt.keepAliveInterval = 60
mqtt.password = [hidden]
mqtt.qos = 1
mqtt.topic = test
mqtt.userName =
(be.jovacon.kafka.connect.config.MQTTSourceConnectorConfig:354)
[2022-11-23 10:21:22,740] INFO Created connector mqtt-source-connector (org.apache.kafka.connect.cli.ConnectStandalone:112)
[2022-11-23 10:21:22,741] INFO WorkerSourceTask{id=mqtt-source-connector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:488)
[2022-11-23 10:21:22,741] INFO WorkerSourceTask{id=mqtt-source-connector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:505)
[2022-11-23 10:21:22,742] ERROR WorkerSourceTask{id=mqtt-source-connector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:191)
java.lang.NoClassDefFoundError: com/google/common/base/Preconditions
at com.github.jcustenborder.kafka.connect.utils.data.SourceRecordDequeBuilder.build(SourceRecordDequeBuilder.java:95)
at be.jovacon.kafka.connect.MQTTSourceTask.start(MQTTSourceTask.java:32)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:232)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:239)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassNotFoundException: com.google.common.base.Preconditions
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
... 10 more
[2022-11-23 10:21:22,742] INFO [Producer clientId=connector-producer-mqtt-source-connector-0] Cluster ID: GFsZxHhuTKS1t3X54RH32g (org.apache.kafka.clients.Metadata:279)
[2022-11-23 10:21:22,744] ERROR WorkerSourceTask{id=mqtt-source-connector-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:192)
[2022-11-23 10:21:22,744] WARN Could not stop task (org.apache.kafka.connect.runtime.WorkerSourceTask:175)
java.lang.NullPointerException
at be.jovacon.kafka.connect.MQTTSourceTask.stop(MQTTSourceTask.java:87)
at org.apache.kafka.connect.runtime.WorkerSourceTask.close(WorkerSourceTask.java:173)
at org.apache.kafka.connect.runtime.WorkerTask.doClose(WorkerTask.java:168)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:195)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:239)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
[2022-11-23 10:21:22,745] INFO [Producer clientId=connector-producer-mqtt-source-connector-0] Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer:1189
I think I did a very good job following README, but is there anything missing or overlooked in the middle?
Hi,
I'd like to use your module for a project.
Can you tell me what the license is?
Thanks in advance and best regards
Objective: to setup MQTT source connector with Confluent kafka v 5.4
Steps Performed:
git clone https://github.com/johanvandevenne/kafka-connect-mqtt.git
cd kafka-connect-mqtt
mvn clean install
copied the jar from /kafka-connect-mqtt/target to the plugins directory. The path of this directory is mentioned in the worker configuration.
executing http://:8083/connector-plugins gives the expected output.
curl -d @./mqtt-source-connector-1.json -H "Content-Type: application/json" -X POST http://localhost:8083/connectors
Expected: There should be no error logs in kafka connect and connector should be available for usage. But after 6th step is performed following is the error received in kafka connect logs :
[2020-06-01 09:42:38,755] ERROR WorkerSourceTask{id=johan-mqtt-source-connector-1-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)
org.apache.kafka.connect.errors.ConnectException: Not authorized to connect (5)
at be.jovacon.kafka.connect.MQTTSourceTask.start(MQTTSourceTask.java:53)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:208)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: Not authorized to connect (5)
at org.eclipse.paho.client.mqttv3.internal.ExceptionHelper.createMqttException(ExceptionHelper.java:28)
at org.eclipse.paho.client.mqttv3.internal.ClientState.notifyReceivedAck(ClientState.java:1040)
at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:151)
... 1 more
[2020-06-01 09:42:38,756] ERROR WorkerSourceTask{id=johan-mqtt-source-connector-1-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:180)
The error shows connection failure between mqtt broker and kafka connect. Although, I have entered correct mqtt broker credentials. I verified this by running the mosquitto pub sub commands from the VM on which my confluent platform is running. Any idea why am I getting this error?
Could you, please, explain the procedure in the absence of the REST API? (that needs is not listed in the prerequisites... Such as java >8, maven or git)
Furthermore, at the end of the build, the folder /target/kafka-connect-mqtt-1.0-0-package/share/kafka-connect-mqtt
does not exist.
Hi Johan,
can you provide the license type under which your code is being licensed?
Thank you
Dom
This connector doesn't retrieve the messages that were sent if the client is disconnected and reconnects back with the same clientId, cleanSession=false and Qos >= 1
Hello All,
I am trying to use this connector plugin to connect to a 'mqtts' protocol broker.
I have built the required plugin directory and copied it to the connector plugins directory. Then I have passed all the details through a .properties file like this :
name=mqtt-source-connector
connector.class=be.jovacon.kafka.connect.MQTTSourceConnector
mqtt.topic=test
kafka.topic=test
mqtt.clientID=<client-id>
mqtt.broker=mqtts://<mqtt-broker-address>:8883
mqtt.username=<username>
mqtt.password=<password>
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.storage.StringConverter
value.converter.schemas.enable=false
I am getting this error :
[2020-12-08 09:52:10,884] ERROR WorkerSourceTask{id=mqtt-source-connector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:187)
java.lang.NoClassDefFoundError: com/google/common/base/Preconditions
at com.github.jcustenborder.kafka.connect.utils.data.SourceRecordDequeBuilder.build(SourceRecordDequeBuilder.java:95)
at be.jovacon.kafka.connect.MQTTSourceTask.start(MQTTSourceTask.java:32)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:232)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: com.google.common.base.Preconditions
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 10 more
[2020-12-08 09:52:10,924] ERROR WorkerSourceTask{id=mqtt-source-connector-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:188)
[2020-12-08 09:52:10,924] WARN Could not stop task (org.apache.kafka.connect.runtime.WorkerSourceTask:223)
java.lang.NullPointerException
at be.jovacon.kafka.connect.MQTTSourceTask.stop(MQTTSourceTask.java:87)
at org.apache.kafka.connect.runtime.WorkerSourceTask.tryStop(WorkerSourceTask.java:220)
at org.apache.kafka.connect.runtime.WorkerSourceTask.close(WorkerSourceTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.doClose(WorkerTask.java:164)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:191)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
I think because of this error I am unable to connect to the mqtt broker. Is it possible that the plugin wasn't built correctly? I was able to see the plugin using
http://<kafkaconnect>:8083/connector-plugins
This repository currently uses Log4j 2.11.2, I would like to hopefully get this updated to a patched version without vulnerabilities.
Objective: to setup MQTT source connector with Confluent kafka v 5.4
Steps Performed:
Expected: There should be no error logs in kafka connect and connector should be available for usage. But after 6th step is performed following is the error received in kafka connect logs :
[2020-06-01 08:01:31,181] ERROR [Worker clientId=connect-1, groupId=connect-cluster] Failed to reconfigure connector's tasks, retrying after backoff: (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1236)
java.lang.NoClassDefFoundError: be/jovacon/kafka/connect/MQTTSourceTask
at be.jovacon.kafka.connect.MQTTSourceConnector.taskClass(MQTTSourceConnector.java:27)
at org.apache.kafka.connect.runtime.Worker.connectorTaskConfigs(Worker.java:322)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnector(DistributedHerder.java:1287)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnectorTasksWithRetry(DistributedHerder.java:1225)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1000(DistributedHerder.java:125)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$17$1.call(DistributedHerder.java:1242)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$17$1.call(DistributedHerder.java:1239)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:342)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:282)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
following is connector config json file
{ "name": "mqtt-source-connector-1",
"config":
{
"connector.class":"be.jovacon.kafka.connect.MQTTSourceConnector",
"mqtt.topic":"my_mqtt_topic",
"kafka.topic":"my_kafka_topic",
"mqtt.clientID":"my_client_id",
"mqtt.broker":"tcp://broker-ip:1883",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable":false,
"value.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter.schemas.enable":false,
"mqtt.username":"username",
"mqtt.password":"password"
}
}
````
How can I fix the above issue?
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.