dariobalinzo / kafka-connect-elasticsearch-source Goto Github PK
View Code? Open in Web Editor NEWKafka Connect Elasticsearch Source
License: Apache License 2.0
Kafka Connect Elasticsearch Source
License: Apache License 2.0
Hi,
I wrote earlier about adding https support. I just tried today with version 1.1 and I'm still getting an error. I looked at the code against https://github.com/inspectorioinc/kafka-connect-elasticsearch-source/ and I didn't see anything obvious.
However I did notice that if I enter an invalid host I get the same error.
Config:
{
"connector.class": "com.github.dariobalinzo.ElasticSourceConnector",
"es.host": "xxx",
"es.scheme": "https",
"tasks.max": "1",
"incrementing.field.name": "@timestamp",
"connection.attempts": "1",
"es.port": "9243",
"mode": "timestamp",
"topic.prefix": "essource_",
"es.password": "xxx",
"name": "essource",
"es.user": "xxx",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"index.prefix": "test_"
}
Call stack:
[2020-12-21 19:40:29,753] ERROR error in searching index names (com.github.dariobalinzo.elastic.ElasticRepository)
[2020-12-21 19:40:29,753] ERROR [Worker clientId=connect-1, groupId=compose-connect-group] Failed to reconfigure connector's tasks, retrying after backoff: (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
java.lang.RuntimeException: org.elasticsearch.client.ResponseException: method [GET], host [https://xxx:9243], URI [_cat/indices], status line [HTTP/1.1 400 Bad Request]
400 Bad Request
at com.github.dariobalinzo.elastic.ElasticRepository.catIndices(ElasticRepository.java:106)
at com.github.dariobalinzo.ElasticSourceConnector.taskConfigs(ElasticSourceConnector.java:98)
at org.apache.kafka.connect.runtime.Worker.connectorTaskConfigs(Worker.java:373)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnector(DistributedHerder.java:1420)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnectorTasksWithRetry(DistributedHerder.java:1358)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1000(DistributedHerder.java:127)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$15$1.call(DistributedHerder.java:1375)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$15$1.call(DistributedHerder.java:1372)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:365)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:294)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Hi there ,I am facing with no mapping found error while using this connector.
I have checked on kibana by sorting the hits based on time stamp ,where i got positive result .
In this case i dont know where i have done mistake ... So i am in need of help .Any solution or knowledge is welcome.
Passed config:
{
"name": "elastic_source2",
"config": {
"connector.class": "com.github.dariobalinzo.ElasticSourceConnector",
"tasks.max": "1",
"es.host": "03dc4cd17baf46e190268130c6cb7d1d.us-central1.gcp.cloud.es.io",
"es.scheme": "https",
"es.port": "9243",
"es.user": "elastic",
"es.password": "uqBE5g4ZIJA9Tp****2leGPq",
"index.name": "kibana_sample_data_flights",
"topic.prefix": "es_",
"incrementing.field.name": "timestamp"
}
}
Error logs:
[2022-05-10 10:25:45,189] INFO [elastic_source2|task-0] Stopping task elastic_source2-0 (org.apache.kafka.connect.runtime.Worker:829)
[2022-05-10 10:25:45,723] ERROR [elastic_source2|task-0] error (com.github.dariobalinzo.task.ElasticSourceTask:217)
ElasticsearchStatusException[Elasticsearch exception [type=search_phase_execution_exception, reason=all shards failed]]
at org.elasticsearch.rest.BytesRestResponse.errorFromXContent(BytesRestResponse.java:178)
at org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:2484)
at org.elasticsearch.client.RestHighLevelClient.parseResponseException(RestHighLevelClient.java:2461)
at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:2184)
at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:2137)
at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:2105)
at org.elasticsearch.client.RestHighLevelClient.search(RestHighLevelClient.java:1367)
at com.github.dariobalinzo.elastic.ElasticRepository.executeSearch(ElasticRepository.java:176)
at com.github.dariobalinzo.elastic.ElasticRepository.searchAfter(ElasticRepository.java:90)
at com.github.dariobalinzo.task.ElasticSourceTask.poll(ElasticSourceTask.java:205)
at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:291)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:248)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:186)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:241)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Suppressed: org.elasticsearch.client.ResponseException: method [POST], host [https://03dc4cd17baf46e190268130c6cb7d1d.us-central1.gcp.cloud.es.io:9243], URI [/.ds-logs-enterprise_search.api-default-2022.05.09-000001/_search?typed_keys=true&max_concurrent_shard_requests=5&search_type=query_then_fetch&batched_reduce_size=512], status line [HTTP/1.1 400 Bad Request]
{"error":{"root_cause":[{"type":"query_shard_exception","reason":"No mapping found for [timestamp] in order to sort on","index_uuid":"Wx0FJ-RFQFOTqp_wJykMfA","index":".ds-logs-enterprise_search.api-default-2022.05.09-000001"}],"type":"search_phase_execution_exception","reason":"all shards failed","phase":"query","grouped":true,"failed_shards":[{"shard":0,"index":".ds-logs-enterprise_search.api-default-2022.05.09-000001","node":"-Vr1NH-xQ-miv45oOjwkXw","reason":{"type":"query_shard_exception","reason":"No mapping found for [timestamp] in order to sort on","index_uuid":"Wx0FJ-RFQFOTqp_wJykMfA","index":".ds-logs-enterprise_search.api-default-2022.05.09-000001"}}]},"status":400}
at org.elasticsearch.client.RestClient.convertResponse(RestClient.java:331)
at org.elasticsearch.client.RestClient.performRequest(RestClient.java:301)
at org.elasticsearch.client.RestClient.performRequest(RestClient.java:276)
at org.elasticsearch.client.RestHighLevelClient.performClientRequest(RestHighLevelClient.java:2699)
at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:2171)
... 15 more
[2022-05-10 10:25:45,725] INFO [elastic_source2|task-0] WorkerSourceTask{id=elastic_source2-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:505)
[2022-05-10 10:25:45,728] INFO [elastic_source2|task-0] [Producer clientId=connector-producer-elastic_source2-0] Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer:1208)
Hi
I am getting below error when I start connector.
Please help on this. Its running but Cant see any desired topic in kafka.
[2021-05-12 16:30:42,611] INFO ElasticSourceConnectorConfig values:
batch.max.rows = 10000
connection.attempts = 3
connection.backoff.ms = 10000
es.host = "myhost.uat.com"
es.password = "myEspass"
es.port = "9200"
es.scheme = http
es.user = "elastic"
fieldname_converter = avro
filters.json_cast = null
filters.whitelist = null
incrementing.field.name = "@timestamp_log"
index.prefix = "my_index-*"
mode =
poll.interval.ms = 5000
topic.prefix = "myprefix_"
(com.github.dariobalinzo.ElasticSourceConnectorConfig:347)
[2021-05-12 16:30:42,611] ERROR WorkerConnector{id=elasticsearch-source} Error while starting connector (org.apache.kafka.connect.runtime.WorkerConnector:119)
java.lang.NumberFormatException: For input string: ""9200""
at java.base/java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.base/java.lang.Integer.parseInt(Integer.java:638)
at java.base/java.lang.Integer.parseInt(Integer.java:770)
at com.github.dariobalinzo.ElasticSourceConnector.start(ElasticSourceConnector.java:63)
at org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:111)
at org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:136)
at org.apache.kafka.connect.runtime.WorkerConnector.transitionTo(WorkerConnector.java:196)
at org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:253)
at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.startConnector(StandaloneHerder.java:293)
at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:209)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:115)
[2021-05-12 16:30:42,614] INFO Finished creating connector elasticsearch-source (org.apache.kafka.connect.runtime.Worker:269)
[2021-05-12 16:30:42,615] INFO Skipping update of connector elasticsearch-source since it is not running
I'm updating the elasticsearch and the kafka connect api to the latest version.
I'm planning to refactor the code and tests to improve code quality and reduce bugs.
Hey, I have some error in log when i try to use elasticsearch source connector
[2022-08-19 11:53:46,179] ERROR [elastic-source|worker] error in searching index names (com.github.dariobalinzo.elastic.ElasticRepository:193)
[2022-08-19 11:53:46,180] ERROR [elastic-source|worker] Error while trying to get updated topics list, ignoring and waiting for next table poll interval (com.github.dariobalinzo.elastic.ElasticIndexMonitorThread:92)
this link https://localhost:9200/_cat/indices?v return
health status index uuid pri rep docs.count docs.deleted store.size pri.store.size
green open kibana_sample_data_ecommerce tRcxyLrsSKuWcmOx3ZNKew 1 0 4675 0 4mb 4mb
yellow open index-elastic CvedHZsxTB6aGeP9cUVedw 1 1 0 0 225b 225b
yellow open test R9t_ToRpQGyi_tTw6MdkSA 1 1 15 0 15.5kb 15.5kb
And this is my properties connector:
{
"name": "elastic-source",
"config": {
"name": "elastic-source",
"connector.class": "com.github.dariobalinzo.ElasticSourceConnector",
"tasks.max": "1",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"es.host": "127.0.0.1",
"es.port": "9200",
"es.user": "elastic",
"es.password": "password",
"index.prefix": "test",
"topic.prefix": "test"
}
}
As we see, text index is already exist, so what's wrong with my approach?
Hi and thank you for this connector,
I have 3 elastic indexes that have the same prefix test_1 , test_2 , test_3.
One index is not happy to be sent to kafka (log_test3) while the other two are ok (the schema is different).
No errors on the connector logs. Not clear what is the issue, the same configuration used before on elastic version 6 was working fine.
Also is there a way to prioritise some indexes to be collected first?
Any limitations with the number of indexes with the same prefix are collected by one connector?
This is my connector conf:
"connector.class": "com.github.dariobalinzo.ElasticSourceConnector",
"tasks.max": "2",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"errors.retry.timeout": "-1",
"errors.log.enable": true,
"errors.log.include.messages": true,
"es.host": "192.168.1.20",
"es.port": "9200",
"connection.attempts": "5",
"connection.backoff.ms": "10000",
"index.prefix": "logs_",
"incrementing.field.name": "@timestamp",
"topic.prefix": "topic_"
Thank you.
Hello,
I have successfully used this plugin to get records from Elasticsearch, and I trying to sink the topics inside s3, and, it's very very slowly, I have debezium CDC and sink from these topics is extremely fast.
There someone had this experiment it too? Any ideas?
Thank you
Any solutions/suggestions in how we can avoid the plaintext password in the json files?
Hello @DarioBalinzo,
After some time, the connector stops working for me with the following error.
[2021-02-28 21:36:03,823] INFO fetching from nuvla-nuvlabox-status (com.github.dariobalinzo.task.ElasticSourceTask:166)
[2021-02-28 21:36:03,823] INFO found last value 2021-02-28T21:35:46.489Z (com.github.dariobalinzo.task.ElasticSourceTask:168)
[2021-02-28 21:36:03,827] ERROR error (com.github.dariobalinzo.task.ElasticSourceTask:180)
org.apache.kafka.connect.errors.DataException: Invalid Java object for schema type INT32: class java.lang.Long for field: "bytes-transmitted"
at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:245)
at org.apache.kafka.connect.data.Struct.put(Struct.java:216)
at org.apache.kafka.connect.data.Struct.put(Struct.java:203)
at com.github.dariobalinzo.schema.StructConverter.convertDocumentStruct(StructConverter.java:47)
at com.github.dariobalinzo.schema.StructConverter.convertListOfObject(StructConverter.java:114)
at com.github.dariobalinzo.schema.StructConverter.lambda$convertListToAvroArray$0(StructConverter.java:81)
at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
at com.github.dariobalinzo.schema.StructConverter.convertListToAvroArray(StructConverter.java:82)
at com.github.dariobalinzo.schema.StructConverter.convertDocumentStruct(StructConverter.java:49)
at com.github.dariobalinzo.schema.StructConverter.covertMapToAvroStruct(StructConverter.java:96)
at com.github.dariobalinzo.schema.StructConverter.convertDocumentStruct(StructConverter.java:51)
at com.github.dariobalinzo.schema.StructConverter.convert(StructConverter.java:37)
at com.github.dariobalinzo.task.ElasticSourceTask.parseResult(ElasticSourceTask.java:214)
at com.github.dariobalinzo.task.ElasticSourceTask.poll(ElasticSourceTask.java:170)
at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:289)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:256)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
The bytes-transmitted
value is always growing. And eventually, the connector chocks (2313987073
below) as java int32 max value is 2,147,483,647.
"net-stats" : [ {
"interface" : "docker_gwbridge",
"bytes-transmitted" : 0,
"bytes-received" : 0
}, {
"interface" : "eth0",
"bytes-transmitted" : 2313987073,
"bytes-received" : 139486370
}, {
"interface" : "veth3a85f64",
"bytes-transmitted" : 37243658,
"bytes-received" : 20884091
}, {
"interface" : "veth23d5664",
"bytes-transmitted" : 73770,
"bytes-received" : 7149
}, {
"interface" : "veth480948a",
"bytes-transmitted" : 1702,
"bytes-received" : 0
}, {
"interface" : "veth4bdbfca",
"bytes-transmitted" : 47246,
"bytes-received" : 68340
}, {
"interface" : "vethdd78a5f",
"bytes-transmitted" : 1772,
"bytes-received" : 0
}, {
"interface" : "veth9ba3f40",
"bytes-transmitted" : 350,
"bytes-received" : 0
}, {
"interface" : "docker0",
"bytes-transmitted" : 0,
"bytes-received" : 0
}, {
"interface" : "vethc58514b",
"bytes-transmitted" : 2128,
"bytes-received" : 0
}, {
"interface" : "br-4788132ed466",
"bytes-transmitted" : 35697531,
"bytes-received" : 15309643
}, {
"interface" : "veth9ab8cd6",
"bytes-transmitted" : 9663155,
"bytes-received" : 2041434
}, {
"interface" : "veth5fb3fb4",
"bytes-transmitted" : 6890675,
"bytes-received" : 11141372
}, {
"interface" : "lo",
"bytes-transmitted" : 18483607,
"bytes-received" : 18483607
} ]
I think the conversion should treat all int values as longs. Or alternatively, provide a way to define this through a schema?
At the moment I'm not using net-stats
at all. So, as the workaround, I will now try to filters.json_cast: net-stats
(hoping it will not parse it at all).
At the same time, you have filters.whitelist
parameter. Is it possible to have filters.blacklist
?
Thank you.
When using it with elastic search sink connector - https://github.com/confluentinc/kafka-connect-elasticsearch
Facing error:
[2021-10-20 17:42:45,631] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:121)
java.lang.NoSuchMethodError: 'org.elasticsearch.client.Response org.elasticsearch.client.RestClient.performRequest(java.lang.String, java.lang.String, org.apache.http.Header[])'
at com.github.dariobalinzo.elastic.ElasticRepository.catIndices(ElasticRepository.java:103)
at com.github.dariobalinzo.ElasticSourceConnector.taskConfigs(ElasticSourceConnector.java:98)
at org.apache.kafka.connect.runtime.Worker.connectorTaskConfigs(Worker.java:323)
at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.recomputeTaskConfigs(StandaloneHerder.java:305)
at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.updateConnectorTasks(StandaloneHerder.java:331)
at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:216)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:115)
I think this is due to version mismatch b/w both connectors dependency - as the RestClient have been changed in both versions.
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>6.2.3</version>
</dependency>
and
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.9.3</version>
</dependency>
Can we update this dependency?
Testcontainers cannot pull docker images anymore when running inside github actions, this might be related to the dockerhub rate limiter.
It is still possible to run the tests locally, but the CI pipeline will be broken.
Fails to process below json messages, fails to process statusHistoryRanking:
{
"statusHistory": [
{
"date": 2021-10-19,
"status": "IN PASS"
}
],
"statusHistoryRanking": []
}
Error Message:
ERROR (com.github.dariobalinzo.task.ElasticSourceTask:202) error
java.lang.RuntimeException: error in converting list: type not supported
at com.github.dariobalinzo.schema.SchemaConverter.convertListSchema(SchemaConverter.java:127)
at com.github.dariobalinzo.schema.SchemaConverter.convertDocumentSchema(SchemaConverter.java:70)
at com.github.dariobalinzo.schema.SchemaConverter.convertMapSchema(SchemaConverter.java:89)
at com.github.dariobalinzo.schema.SchemaConverter.convertDocumentSchema(SchemaConverter.java:74)
at com.github.dariobalinzo.schema.SchemaConverter.convert(SchemaConverter.java:42)
at com.github.dariobalinzo.task.ElasticSourceTask.parseResult(ElasticSourceTask.java:245)
at com.github.dariobalinzo.task.ElasticSourceTask.poll(ElasticSourceTask.java:192)
at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:268)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:241)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Solution:
SchemaConverter.convertDocumentSchema: check for head is not null:
Object head = ((List<?>) value).get(0);
StructConverter.convertListToAvroArray: check for head is not null:
Object head = value.get(0);
I've been using this repo (thanks again btw) and it worked very well until today when suddenly the console is flooded with this error:
[2021-07-16 14:33:03,515] ERROR error (com.github.dariobalinzo.task.ElasticSourceTask:180)
org.apache.kafka.connect.errors.DataException: Invalid Java object for schema type STRUCT: class java.lang.String for field: "old"
at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:245)
at org.apache.kafka.connect.data.Struct.put(Struct.java:216)
at org.apache.kafka.connect.data.Struct.put(Struct.java:203)
at com.github.dariobalinzo.schema.StructConverter.convertDocumentStruct(StructConverter.java:47)
at com.github.dariobalinzo.schema.StructConverter.convertListOfObject(StructConverter.java:114)
at com.github.dariobalinzo.schema.StructConverter.lambda$convertListToAvroArray$0(StructConverter.java:81)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
at com.github.dariobalinzo.schema.StructConverter.convertListToAvroArray(StructConverter.java:82)
at com.github.dariobalinzo.schema.StructConverter.convertDocumentStruct(StructConverter.java:49)
at com.github.dariobalinzo.schema.StructConverter.convert(StructConverter.java:37)
at com.github.dariobalinzo.task.ElasticSourceTask.parseResult(ElasticSourceTask.java:214)
at com.github.dariobalinzo.task.ElasticSourceTask.poll(ElasticSourceTask.java:170)
at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:289)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:256)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
At first I thought it was just a few badly formatted messages but it seems to be for every single message and my pipeline is not working anymore.
Any thoughts on what the problem could be?
Thanks in advance
Hi,
First, thank you for this amazing connect plugin.
I have a lot of indexes, named by day, like idx-2021.08.30, the first problem that I disovered here in my environment, it's look like the elasticsearch connect freezes in some index, and the connect enter in type of loop:
cp-kafka-connect-connect-es-74df8cf9d6-92whw connect-es-server [2021-08-31 20:44:45,664] INFO index idx-2021.08.31 total messages: 295939 (com.github.dariobalinzo.task.ElasticSourceTask)
cp-kafka-connect-connect-es-74df8cf9d6-92whw connect-es-server [2021-08-31 20:44:56,592] INFO index idx-2021.08.31 total messages: 295939 (com.github.dariobalinzo.task.ElasticSourceTask)
cp-kafka-connect-connect-es-74df8cf9d6-92whw connect-es-server [2021-08-31 20:44:58,016] INFO index idx-2021.08.31 total messages: 295939 (com.github.dariobalinzo.task.ElasticSourceTask)
And this line repeat eternally.
Other problem is with new indexes, new indexes are created every day, but only after restart the connect task, connect can find the new index. And my indexes has only @timestamp as incremental field, but always has less document in topics comparing with index.
I appreciate so much if you can help me.
Thank you,
Hello,
I'm using the elasticsearch-source:1.3 with success.
I install it with the confluent-hub client by:
confluent-hub install --no-prompt dariobalinzo/kafka-connect-elasticsearch-source:1.3
I need to use, the filters.blacklist
which as far as I know are available in 1.4.2
branch of this repository.
I tried to install it
confluent-hub install --no-prompt dariobalinzo/kafka-connect-elasticsearch-source:1.4.2
No success.
Then, cloned the repository, moved to 1.4.2
branch and run the mvn
build command: mvn clean package -DskipTests
Build was ok.
I'm placing the generated jar into the plugin.path
set in settings, which would be /usr/share/confluent-hub-components
, and so to the /etc/kafka-connect/jars/
folder.
If I use the elastic-source-connect-1.4-jar-with-dependencies.jar
I got the following error:
kafka kafka-cp-kafka-connect-5f7d74847b-hrbl5 cp-kafka-connect-server [2021-09-29 12:22:58,664] ERROR Stopping due to error (org.apache.kafka.connect.cli.ConnectDistributed)
kafka kafka-cp-kafka-connect-5f7d74847b-hrbl5 cp-kafka-connect-server java.lang.NoSuchFieldError: DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MS
kafka kafka-cp-kafka-connect-5f7d74847b-hrbl5 cp-kafka-connect-server at org.apache.kafka.connect.runtime.distributed.DistributedConfig.<clinit>(DistributedConfig.java:245)
kafka kafka-cp-kafka-connect-5f7d74847b-hrbl5 cp-kafka-connect-server at org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:95)
kafka kafka-cp-kafka-connect-5f7d74847b-hrbl5 cp-kafka-connect-server at org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:80)
which prevents kafka-connect from starting up.
If I use the elastic-source-connect-1.4.jar
, I got:
kafka kafka-cp-kafka-connect-68f487696-c6fbt cp-kafka-connect-server [2021-09-29 12:37:56,765] ERROR WorkerConnector{id=elastic-source-20210710} Error while starting connector (org.apache.kafka.connect.runtime.WorkerConnector)
kafka kafka-cp-kafka-connect-68f487696-c6fbt cp-kafka-connect-server java.lang.NoClassDefFoundError: org/apache/http/client/CredentialsProvider
kafka kafka-cp-kafka-connect-68f487696-c6fbt cp-kafka-connect-server at com.github.dariobalinzo.elastic.ElasticConnectionBuilder.build(ElasticConnectionBuilder.java:77)
which also prevents kafka-connect from starting up.
Using as base image the confluentinc/cp-kafka-connect-base:6.1.1
Is there anything to do to overcome these errors?
Thanks for any clue
Hello!
Cloudera Kafka Connect ships with log4j:2.10
, but the elasticsearch.client
which is dependency for this project uses log4j:2.9
. When we try to use this connector an error like this raises:
java.util.ServiceConfigurationError: org.apache.logging.log4j.spi.Provider: Provider org.apache.logging.log4j.core.impl.Log4jProvider not a subtype
It seems a bug in log4j
, ref.: https://issues.apache.org/jira/browse/LOG4J2-2055 . I've explicitly added jog4j
dependencies with version 2.10, like this:
<!-- ... -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.10.0</version>
</dependency>
<!-- ... -->
Tests have passed, but I don't know if you'll consider this a bug, since it's a behavior occurring in Cloudera's distribution, not in Confluent's.
Any guidance or help would be appreciated :)
Hi there,
I'm currently using this repo (thanks very much by the way) and I'm hoping to take my JSON documents and use a certain key to either partition them or send them to different topics. One approach is to write a custom JSON deserialiser that reads from the elasticsearch source topic, but I was wondering if you could select a key with a certain value with ES source and only pull those documents in that had that key value pair.
Thanks in advance
Pulling data on create of source connector after it is not pulling latest data from ES
Configured :
{
"name": "elastic-source-logs",
"config": {
"connector.class": "com.github.dariobalinzo.ElasticSourceConnector",
"topic.prefix": "logs",
"es.host": "elastic",
"tasks.max": "1",
"incrementing.field.name": "timestamp",
"name": "elastic-source-logs",
"es.port": "9200",
"index.prefix": "logs,
"poll.interval.ms":"10000"
"timestamp.delay.interval.ms":"3000"
}}
This will pulling data from ES only once after that it is not pulling again new changes from ES .
Is there any configuration paramter to configure pull interval ????
Would it be possible to produce jar w/ deps along with the release?
Hi,
I've noticed sometimes records can be missed by the connector. I think it has to do with batched writes and the frequency of polling based on my experiments.
I'm using the Kafka message offset as the incrementing field.
I've adjusted the polling interval from the default 5 seconds to 30 minutes in some cases. So far this has helped quite a bit.
Just wondering if anyone else has come across this or has a different strategy?
Inspired by a fork made by https://github.com/inspectorioinc/kafka-connect-elasticsearch-source.
I would like to add the possibility to avoid to convert a possibly complex nested object as json string intead of an avro sub object.
The default behavior should remain the avro sub-object, the json string casting should be optional.
Hi,
First of all thank you for your amazing job.
I have a question. right now i'm tryin to insert mutli index from ES to one topic on Kafka.
I wonder if it's possible to do that. Because right now, as long as i'm using one parameters on "index.prefix" everything work perfectly. As soon as I'm tryin to put an extra index like => index.prefix= my_awesome_index, my_beauty_index (using yaml)
Nothing come from ES.
So my question is, Do I have to add an other connector or simply duplicate the line ?
I'm getting this following errors in taks config. The connection test works before getting to this point.
[2019-08-05 14:59:45,929] ERROR error in searching index names (com.github.dariobalinzo.ElasticSourceConnector)
[2019-08-05 14:59:45,929] ERROR [Worker clientId=connect-1, groupId=data-dev] Failed to reconfigure connector's tasks, retrying after backoff: (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
java.lang.RuntimeException: org.elasticsearch.client.ResponseException: method [GET], host [http://elastic-dev.data:80], URI [_cat/indices], status line [HTTP/1.1 400 BAD_REQUEST]
We managed to connect to the elasticsearch but we are getting a nullpointerexception, what could be causing that? Below is our kafka (strimzi) configuration and the error message
class: com.github.dariobalinzo.ElasticSourceConnector config: poll.interval.ms: 1000 es.port: >- ${file:/opt/kafka/external-configuration/elasticsearch-creds/elasticsearch-creds:port} es.scheme: https fieldname_converter: nop es.tls.truststore.location: >- ${file:/opt/kafka/external-configuration/elasticsearch-creds/elasticsearch-creds:truststore-location} es.host: >- ${file:/opt/kafka/external-configuration/elasticsearch-creds/elasticsearch-creds:host} es.user: >- ${file:/opt/kafka/external-configuration/elasticsearch-creds/elasticsearch-creds:user} elastic.security.protocol: SSL es.tls.truststore.password: >- ${file:/opt/kafka/external-configuration/elasticsearch-creds/elasticsearch-creds:truststore-password} index.prefix: ca-dict-sanctions-stg errors.log.enable: true topic.prefix: '' incrementing.field.name: status.currentUpdatedOn es.password: >- ${file:/opt/kafka/external-configuration/elasticsearch-creds/elasticsearch-creds:password} tasksMax: 1
"_source" : { "field" : "Country", "operator" : "Equals", "value" : "CYP", "action" : "Block", "replace" : "", "carrierBank" : [ ], "status" : { "currentStatus" : "Active", "currentUpdatedOn" : "2021-07-01T12:20:04.453+0300", "currentNote" : null, "previousStatus" : null, "previousUpdatedOn" : null, "previousNote" : null, "daysSinceLastUpdate" : 0 }, "metadata" : { "creationDate" : "2021-07-01T12:20:04.453+0300", "createdBy" : "Userlevel4", "lastEditedBy" : "Userlevel4", "lastEditedDate" : "2021-07-01T12:20:04.454+0300" } }
2021-07-01 11:12:57,262 INFO fetching from ca-dict-sanctions-stg (com.github.dariobalinzo.task.ElasticSourceTask) [task-thread-ca-dict-sanctions-stg-0] 2021-07-01 11:12:57,795 INFO found last value Cursor{primaryCursor='null', secondaryCursor='null'} (com.github.dariobalinzo.task.ElasticSourceTask) [task-thread-ca-dict-sanctions-stg-0] 2021-07-01 11:12:57,797 ERROR error (com.github.dariobalinzo.task.ElasticSourceTask) [task-thread-ca-dict-sanctions-stg-0] java.lang.NullPointerException at com.github.dariobalinzo.elastic.ElasticRepository.searchAfter(ElasticRepository.java:97) at com.github.dariobalinzo.task.ElasticSourceTask.poll(ElasticSourceTask.java:203) at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:270) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:237) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834)
Inspired by a fork made by https://github.com/inspectorioinc/kafka-connect-elasticsearch-source, I would like to add the possibility to select only a subset of fields from elastisearch documents when creating a new kafka message.
Is it possible to add configuration parameters to set partition/replica value for the topics created by the connector?
At the moment the connector creates 1/1.
I have ksqldb streams reading from the topics. They expect partition/replica configurations other than 1/1.
We are able to insert data to sink(Postgres) using the Kafka-connect elasticsearch-source from a data source(elastic search).
I have a use case to delete a document from elastic search and the same should be reflected in the data sink(Postgres).
please, I had a problem with this field. In my ES i have a scheme with long field, but i have too others fields with same type, but this problem happen just that field.
Exemple mapping ES:
},
"redirectEnd" : {
"type" : "long"
},
"redirectStart" : {
"type" : "long"
},
"requestStart" : {
"type" : "float"
},
"resourceType" : {
"type" : "keyword"
},
"responseEnd" : {
"type" : "float"
},
"responseStart" : {
"type" : "float"
},
"secureConnectionStart" : {
"type" : "float"
},
"startTime" : {
"type" : "float"
},
"transferSize" : {
"type" : "long"
},
"workerStart" : {
"type" : "long"
}
Exemple values ES:
{
"connectEnd" : 4898.4000000059605,
"connectStart" : 4898.4000000059605,
"encodedBodySize" : 21504,
"decodedBodySize" : 140288,
"transferSize" : 8192,
"domainLookupEnd" : 4898.4000000059605,
"domainLookupStart" : 4898.4000000059605,
"duration" : 3224,
"entryType" : "resource",
"fetchStart" : 4898.4000000059605,
"initiatorType" : "script",
"resourceType" : "js",
tks.
Hi,
I'm wondering if there's a way to support null values? We have some documents like this:
{"a": "value", "b":"value"}, {"a":null, "b":"value2"}. For now I can modify the source to generate {"b":"value2"}, but I was wondering if there's a different way.
Exception thrown:
[2021-02-24 22:46:40,733] ERROR error (com.github.dariobalinzo.task.ElasticSourceTask)
java.lang.RuntimeException: type not supported XXXX
at com.github.dariobalinzo.schema.SchemaConverter.convertDocumentSchema(SchemaConverter.java:76)
at com.github.dariobalinzo.schema.SchemaConverter.convert(SchemaConverter.java:42)
at com.github.dariobalinzo.task.ElasticSourceTask.parseResult(ElasticSourceTask.java:237)
ERROR error in searching index names (com.github.dariobalinzo.ElasticSourceConnector) (DistributedHerder-connect-1-1)
Error [Worker clientID=connect-1, groupID=connect-cluster] fialed to reconfigure connector's tasks
I see that the ES connect code removes dashes from the attributes: in my case current-time
turns into currenttime
etc. Is it intentional? When working with Kafka I can have keys and attributes of JSON values with dashes, as well as in ksqlDB column names with dashes are fine (just need to quote them in KSQL queries).
Why is it that you are removing them in the ES connect?
Thank you.
Hi there.
Been using this plugin for a while, it's great! Thanks!
Having a problem with nested fields that are constructed from arbitrary (different every time) JSON in elasticsearch, what I really want is to load in this json as a string using the json_cast feature, which isn't working for me. The blacklist and whitelist filters both work well and use the exact same syntax to specify fields so I'm not sure what's wrong...
Really appreciate your responses to my multiple issues so thank you.
We are getting an error missing build flavor oss when trying to connect to ES 7.2 build_falvor oss, but no issue in ES 7.2 build_falvor default.
[2022-03-24 16:49:06,640] ERROR error (com.github.dariobalinzo.task.ElasticSourceTask:217)
ElasticsearchException[Invalid or missing build flavor [oss]]
at org.elasticsearch.client.RestHighLevelClient.performClientRequest(RestHighLevelClient.java:2701)
at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:2171)
at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:2137)
at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:2105)
at org.elasticsearch.client.RestHighLevelClient.search(RestHighLevelClient.java:1367)
at com.github.dariobalinzo.elastic.ElasticRepository.executeSearch(ElasticRepository.java:176)
at com.github.dariobalinzo.elastic.ElasticRepository.searchAfter(ElasticRepository.java:90)
at com.github.dariobalinzo.task.ElasticSourceTask.poll(ElasticSourceTask.java:205)
at org.apache.kafka.connect.runtime.Wor
Hey Dario!
I finally managed to successfully run elasticsearch source connector witch apache kafka.
{
"name": "elastic-source",
"connector": {
"state": "RUNNING",
"worker_id": "127.0.1.1:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "127.0.1.1:8083"
}
],
"type": "source"
}
However, when I run the elasticsearch-source.properties as distributed:
name=elastic-source
connector.class=com.github.dariobalinzo.ElasticSourceConnector
tasks.max=1
es.host=127.0.0.1
es.port=1750
index.prefix=products
topic.prefix=es_
topic=elastic-events
I have some error like this:
[2022-09-20 17:45:38,127] INFO [elastic-source|task-0] fetching from products (com.github.dariobalinzo.task.ElasticSourceTask:201)
[2022-09-20 17:45:38,128] INFO [elastic-source|task-0] found last value Cursor{primaryCursor='null', secondaryCursor='null'} (com.github.dariobalinzo.task.ElasticSourceTask:203)
[2022-09-20 17:45:38,129] WARN [elastic-source|task-0] request [POST http://localhost:1750/products/_search?typed_keys=true&max_concurrent_shard_requests=5&search_type=query_then_fetch&batched_reduce_size=512] returned 1 warnings: [299 Elasticsearch-7.15.0-79d65f6e357953a5b3cbcc5e2c7c21073d89aa29 "Elasticsearch built-in security features are not enabled. Without authentication, your cluster could be accessible to anyone. See https://www.elastic.co/guide/en/elasticsearch/reference/7.15/security-minimal-setup.html to enable security."] (org.elasticsearch.client.RestClient:72)
[2022-09-20 17:45:38,129] ERROR [elastic-source|task-0] error (com.github.dariobalinzo.task.ElasticSourceTask:217)
ElasticsearchStatusException[Elasticsearch exception [type=search_phase_execution_exception, reason=all shards failed]]
at org.elasticsearch.rest.BytesRestResponse.errorFromXContent(BytesRestResponse.java:178)
at org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:2484)
at org.elasticsearch.client.RestHighLevelClient.parseResponseException(RestHighLevelClient.java:2461)
at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:2184)
at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:2137)
at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:2105)
at org.elasticsearch.client.RestHighLevelClient.search(RestHighLevelClient.java:1367)
at com.github.dariobalinzo.elastic.ElasticRepository.executeSearch(ElasticRepository.java:176)
at com.github.dariobalinzo.elastic.ElasticRepository.searchAfter(ElasticRepository.java:90)
at com.github.dariobalinzo.task.ElasticSourceTask.poll(ElasticSourceTask.java:205)
at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:305)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:249)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Suppressed: org.elasticsearch.client.ResponseException: method [POST], host [http://localhost:1750], URI [/products/_search?typed_keys=true&max_concurrent_shard_requests=5&search_type=query_then_fetch&batched_reduce_size=512], status line [HTTP/1.1 400 Bad Request]
and this:
{"error":{"root_cause":[{"type":"query_shard_exception","reason":"No mapping found for [@timestamp] in order to sort on","index_uuid":"b575v16yTXmq5o2sk77zbA","index":"products"}],"type":"search_phase_execution_exception","reason":"all shards failed","phase":"can_match","grouped":true,"failed_shards":[{"shard":0,"index":"products","node":"asmTRFlgThS7kBU6yyCJzA","reason":{"type":"query_shard_exception","reason":"No mapping found for [@timestamp] in order to sort on","index_uuid":"b575v16yTXmq5o2sk77zbA","index":"products"}}]},"status":400}
at org.elasticsearch.client.RestClient.convertResponse(RestClient.java:331)
at org.elasticsearch.client.RestClient.performRequest(RestClient.java:301)
at org.elasticsearch.client.RestClient.performRequest(RestClient.java:276)
at org.elasticsearch.client.RestHighLevelClient.performClientRequest(RestHighLevelClient.java:2699)
at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:2171)
... 15 more
All list topics are:
__consumer_offsets
connect-configs
connect-offset
connect-offsets
connect-status
elastic-events
Normally in your tutorial all the indices products* are sent to Kafka using the es_ string as a topic prefix.
So my question am I doing things right here? what's wrong?
how can i know if all the data i pass to elasticsearch is read by kafka?
thanks
Hi @DarioBalinzo,
I was wondering if there's a way to access the ElasticSearch _id field? e.g.
{
"_index": "xxx",
"_id": 1,
"_source": {
...
}
Hi, I'm wondering if there's a good way to reset the incrementing field? For example I want to run the connector on all the data since the beginning of time again.
Hi Dario,
I tried the plugin with opendistro Elastic version, and I added username and password to my connector conf however I am getting this result once I registered the connector:
kafka-connect | (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:354)
kafka-connect | [2021-09-26 05:19:29,894] ERROR [-es-source|worker] error in searching index names (com.github.dariobalinzo.elastic.ElasticRepository:190)
kafka-connect | [2021-09-26 05:19:29,895] ERROR [Worker clientId=connect-1, groupId=kafka-connect] Failed to reconfigure connector's tasks, retrying after backoff: (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1376)
kafka-connect | java.lang.RuntimeException: org.
Registration conf:
curl -X PUT http://192.168.1.100:8083/connectors/es-source/config -H "Content-Type: application/json" -d '{
"connector.class": "com.github.dariobalinzo.ElasticSourceConnector",
"tasks.max": "2",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"errors.retry.timeout": "-1",
"errors.log.enable": true,
"errors.log.include.messages": true,
"es.host": "192.168.1.50",
"es.port": "9200",
"es.user": "admin",
"es.password": "temppass",
"connection.attempts": "5",
"connection.backoff.ms": "10000",
"index.prefix": "log",
"incrementing.field.name": "@timestamp",
"topic.prefix": "My_"
}'
Thank you in advance
Hฤฑ,
When I tried to run the following config, I am getting exception over and over again
{
"name": "elastic-source-10",
"config": {
"connector.class": "com.github.dariobalinzo.ElasticSourceConnector",
"tasks.max": "1",
"es.host": "elasticsearch",
"es.port": "9200",
"index.prefix": "index-1",
"topic.prefix": "elastic-"
}
}
Here is the exception stack trace
2021-06-21 21:39:34,699 INFO || Initializing: org.apache.kafka.connect.runtime.TransformationChain{} [org.apache.kafka.connect.runtime.Worker]
2021-06-21 21:39:34,699 INFO || ProducerConfig values:
acks = all
batch.size = 16384
bootstrap.servers = [kafka:9092]
buffer.memory = 33554432
client.dns.lookup = default
client.id = connector-producer-elastic-source-10-0
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 2147483647
enable.idempotence = false
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
linger.ms = 0
max.block.ms = 9223372036854775807
max.in.flight.requests.per.connection = 1
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 2147483647
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
[org.apache.kafka.clients.producer.ProducerConfig]
2021-06-21 21:39:34,703 INFO || Kafka version: 2.4.0 [org.apache.kafka.common.utils.AppInfoParser]
2021-06-21 21:39:34,703 INFO || Kafka commitId: 77a89fcf8d7fa018 [org.apache.kafka.common.utils.AppInfoParser]
2021-06-21 21:39:34,703 INFO || Kafka startTimeMs: 1624311574703 [org.apache.kafka.common.utils.AppInfoParser]
2021-06-21 21:39:34,717 INFO || [Worker clientId=connect-1, groupId=1] Finished starting connectors and tasks [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2021-06-21 21:39:34,717 INFO || ElasticSourceTaskConfig values:
batch.max.rows = 10000
connection.attempts = 3
connection.backoff.ms = 10000
es.host = elasticsearch
es.indices = index-1
es.password = null
es.port = 9200
es.scheme = http
es.user = null
fieldname_converter = avro
filters.json_cast = null
filters.whitelist = null
incrementing.field.name =
index.prefix = index-1
mode =
poll.interval.ms = 5000
topic.prefix = elastic-
[com.github.dariobalinzo.task.ElasticSourceTaskConfig]
2021-06-21 21:39:34,717 INFO || elastic auth disabled [com.github.dariobalinzo.elastic.ElasticConnection]
2021-06-21 21:39:34,719 INFO || WorkerSourceTask{id=elastic-source-10-0} Source task finished initialization and start [org.apache.kafka.connect.runtime.WorkerSourceTask]
2021-06-21 21:39:34,719 INFO || fetching from index-1 [com.github.dariobalinzo.task.ElasticSourceTask]
2021-06-21 21:39:34,818 INFO || [Producer clientId=connector-producer-elastic-source-10-0] Cluster ID: v38SOhq8SU-h4JvAGoDgTg [org.apache.kafka.clients.Metadata]
2021-06-21 21:39:34,882 INFO || found last value null [com.github.dariobalinzo.task.ElasticSourceTask]
2021-06-21 21:39:34,887 ERROR || error [com.github.dariobalinzo.task.ElasticSourceTask]
ElasticsearchStatusException[Elasticsearch exception [type=search_phase_execution_exception, reason=all shards failed]]
at org.elasticsearch.rest.BytesRestResponse.errorFromXContent(BytesRestResponse.java:177)
at org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:618)
at org.elasticsearch.client.RestHighLevelClient.parseResponseException(RestHighLevelClient.java:594)
at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:501)
at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:474)
at org.elasticsearch.client.RestHighLevelClient.search(RestHighLevelClient.java:391)
at com.github.dariobalinzo.elastic.ElasticRepository.executeSearch(ElasticRepository.java:89)
at com.github.dariobalinzo.elastic.ElasticRepository.searchAfter(ElasticRepository.java:72)
at com.github.dariobalinzo.task.ElasticSourceTask.poll(ElasticSourceTask.java:169)
at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:265)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:232)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Suppressed: org.elasticsearch.client.ResponseException: method [POST], host [http://elasticsearch:9200], URI [/index-1/_search?typed_keys=true&ignore_unavailable=false&expand_wildcards=open&allow_no_indices=true&search_type=query_then_fetch&batched_reduce_size=512], status line [HTTP/1.1 400 Bad Request]
{"error":{"root_cause":[{"type":"query_shard_exception","reason":"No mapping found for [] in order to sort on","index_uuid":"vwbDQ_nlQYqwRxxqe-S9Fg","index":"index-1"}],"type":"search_phase_execution_exception","reason":"all shards failed","phase":"query","grouped":true,"failed_shards":[{"shard":0,"index":"index-1","node":"jTlMT-4gS2Gu66gtHyuG9g","reason":{"type":"query_shard_exception","reason":"No mapping found for [] in order to sort on","index_uuid":"vwbDQ_nlQYqwRxxqe-S9Fg","index":"index-1"}}]},"status":400}
at org.elasticsearch.client.RestClient$1.completed(RestClient.java:357)
at org.elasticsearch.client.RestClient$1.completed(RestClient.java:346)
at org.apache.http.concurrent.BasicFuture.completed(BasicFuture.java:119)
at org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:177)
at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:436)
at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:326)
at org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265)
at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)
at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)
at org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114)
at org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
... 1 more
I also send the request with manually and there is no problem with related to it because I am getting response succesfully.
After that point, I checked the connector status and it is in RUNNING state but I think, it has to be in FAILED status.
Do you have any idea what could be cause the problem ?
Thanks ๐
Hello @DarioBalinzo ,
I have two indexes in ES nuvla-nuvlabox
and nuvla-nuvlabox-status
and with the configuration below, I noticed, I'm getting messages twice in es_nuvla-nuvlabox-status
. I added "elastic-source-nuvlabox"
much later due to new requirements.
[root@31e4e221f046 appuser]# cat /etc/kafka/elastic-source-nuvlabox.json
{ "name": "elastic-source-nuvlabox",
"config": {"connector.class":"com.github.dariobalinzo.ElasticSourceConnector",
"tasks.max": "1",
"es.host" : "es",
"es.port" : "9200",
"index.prefix" : "nuvla-nuvlabox",
"topic.prefix" : "es_",
"incrementing.field.name" : "updated",
"fieldname_converter": "nop"
}
}
[root@31e4e221f046 appuser]# cat /etc/kafka/elastic-source-nuvlabox-status.json
{ "name": "elastic-source-nuvlabox-status",
"config": {"connector.class":"com.github.dariobalinzo.ElasticSourceConnector",
"tasks.max": "1",
"es.host" : "es",
"es.port" : "9200",
"index.prefix" : "nuvla-nuvlabox-status",
"topic.prefix" : "es_",
"incrementing.field.name" : "updated",
"fieldname_converter": "nop"
}
}
[root@31e4e221f046 appuser]#
I understand this can be fixed by removing elastic-source-nuvlabox-status
connector. However, what will happen if I add e.g. nuvla-nuvlabox-telem
index? I don't want it to end up in Kafka as es_nuvla-nuvlabox-telem
.
Maybe, adding an optional index.name
(or comma-separated index.names
) parameter for the index(s) strict definition would be a solution?
Thank you.
How to set a configuration....
to avoid this error, noted the fact the has defaulted this to 100mb (https://github.com/elastic/elasticsearch/blob/master/client/rest/src/main/java/org/elasticsearch/client/HttpAsyncResponseConsumerFactory.java)
(com.github.dariobalinzo.task.ElasticSourceTask:202)
org.apache.http.ContentTooLongException: entity content is too long [167942814] for the configured buffer limit [104857600]
at org.elasticsearch.client.HeapBufferedAsyncResponseConsumer.onEntityEnclosed(HeapBufferedAsyncResponseConsumer.java:76)
at org.apache.http.nio.protocol.AbstractAsyncResponseConsumer.responseReceived(AbstractAsyncResponseConsumer.java:131)
tried lowering configuration:
"batch.max.rows": 1000,
but still hitting the error?
help! idea! thanks
The JSON document in the ES index is rather complex and contains several lists. I can paste it here if you think this can be useful.
[2020-12-10 13:11:53,542] ERROR error (com.github.dariobalinzo.task.ElasticSourceTask:152)
java.lang.NullPointerException
at com.github.dariobalinzo.schema.StructConverter.convertListOfObject(StructConverter.java:101)
at com.github.dariobalinzo.schema.StructConverter.lambda$convertListToAvroArray$1(StructConverter.java:74)
at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
at com.github.dariobalinzo.schema.StructConverter.convertListToAvroArray(StructConverter.java:75)
at com.github.dariobalinzo.schema.StructConverter.convertDocumentStruct(StructConverter.java:42)
at com.github.dariobalinzo.schema.StructConverter.covertMapToAvroStruct(StructConverter.java:89)
at com.github.dariobalinzo.schema.StructConverter.convertDocumentStruct(StructConverter.java:44)
at com.github.dariobalinzo.schema.StructConverter.convert(StructConverter.java:30)
at com.github.dariobalinzo.task.ElasticSourceTask.parseResult(ElasticSourceTask.java:179)
at com.github.dariobalinzo.task.ElasticSourceTask.poll(ElasticSourceTask.java:142)
at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:289)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:256)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
I've got an error because apparently it's changing my url.
I set a port and host but in the command line it gives me this:
java.net.ConnectException: Timeout connecting to [<es.host>/<ip_address>:<es.port>]
Could you please help me?
Hi @DarioBalinzo,
I am wondering if pagination of results is supported when the number of results > batch size. I seem to be seeing the following:
When activate this connector with below configuration:
{
"name": "global-es-source",
"config": {
"connector.class":"com.github.dariobalinzo.ElasticSourceConnector",
"tasks.max": "1",
"es.host" : "10.0.129.171;10.0.128.237;10.0.128.64",
"es.port" : "9200",
"es.user" : "xxxx",
"es.password" : "xxxx",
"es.scheme" : "http",
"incrementing.field.name" : "@timestamp",
"topic.prefix" : "global-es-",
"type.name": "source"
}
}
And check status of this connector, found below error:
java.lang.NoSuchFieldError: INSTANCE
at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.(PoolingNHttpClientConnectionManager.java:591)
at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.(PoolingNHttpClientConnectionManager.java:163)
at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.(PoolingNHttpClientConnectionManager.java:147)
at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.(PoolingNHttpClientConnectionManager.java:119)
at org.apache.http.impl.nio.client.HttpAsyncClientBuilder.build(HttpAsyncClientBuilder.java:668)
at org.elasticsearch.client.RestClientBuilder.run(RestClientBuilder.java:218)
at org.elasticsearch.client.RestClientBuilder.run(RestClientBuilder.java:215)
at java.security.AccessController.doPrivileged(Native Method)
at org.elasticsearch.client.RestClientBuilder.createHttpClient(RestClientBuilder.java:215)
at org.elasticsearch.client.RestClientBuilder.accessbash00(RestClientBuilder.java:42)
at org.elasticsearch.client.RestClientBuilder.run(RestClientBuilder.java:187)
at org.elasticsearch.client.RestClientBuilder.run(RestClientBuilder.java:184)
at java.security.AccessController.doPrivileged(Native Method)
at org.elasticsearch.client.RestClientBuilder.build(RestClientBuilder.java:184)
at org.elasticsearch.client.RestHighLevelClient.(RestHighLevelClient.java:200)
at org.elasticsearch.client.RestHighLevelClient.(RestHighLevelClient.java:192)
at com.github.dariobalinzo.elastic.ElasticConnection.createConnectionUsingAuth(ElasticConnection.java:73)
at com.github.dariobalinzo.elastic.ElasticConnection.(ElasticConnection.java:49)
at com.github.dariobalinzo.elastic.ElasticConnectionBuilder.build(ElasticConnectionBuilder.java:60)
at com.github.dariobalinzo.ElasticSourceConnector.start(ElasticSourceConnector.java:85)
at org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:111)
at org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:136)
at org.apache.kafka.connect.runtime.WorkerConnector.transitionTo(WorkerConnector.java:196)
at org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:242)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:908)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access300(DistributedHerder.java:110)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder5.call(DistributedHerder.java:924)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder5.call(DistributedHerder.java:920)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Any idea of this issue? Thank you for advice.
Dear @DarioBalinzo,
I was having a look at your repo and find it really useful, thank you!
Playing around with it, I could not find any way to configure specific search query to limit the retrieved documents. Am I overlooking something, or is this not possible at the moment?
Hi There,
i am having problem when using configuration filters.whitelist for nested value,
my json document value part ;
"resPayload": [
"{ \"timestamp\":1611882445, \"status\":200, \"errors\":{ \"identity_photo\":\"invalid\" }, \"data\":{ \"name\":true, \"birthdate\":true, \"birthplace\":true, \"address\":\"M*MP*NG\",\"selfie_photo\":80}}"
]
when i use this configuration, i got all value from resPayload field,
"filters.whitelist" : "apiName;resPayload",
but when i do this, i only get value apiName field, none from resPayload field
"filters.whitelist" : "apiName;resPayload.status",
how do i get only some field value i needed from resPayload
anyone can help me?
really appreciate your response.
ES document mapping:
PUT /test-websitesources
{
"mappings": {
"source": {
"properties": {
"sourceId": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"weblinks": {
"type":"nested",
"properties" : {
"id" : {
"type" : "text"
},
"name" : {
"type" : "text"
}
}
},
"updated_date": {
"type": "date"
},
"createdDate": {
"type": "date"
}
}
}
}
}
Valid Scenario:
Kafka connectors are successfully able to pull this data and I can view this under topics
"source": {
"sourceId": "s1",
"weblinks": {
"id": "street",
"name": "www.street.com"
},
"updated_date": "2021-04-18T17:54:49.643073Z",
"createdDate": "2020-02-18T20:32:41.1775655Z"
}
Failing Scenario (passing the weblinks field as array):
"source": {
"sourceId": "s1",
"weblinks": [{
"id": "afr",
"name": "www.afr.com"
}
],
"updated_date": "2021-04-17T17:54:49.643073Z",
"createdDate": "2020-02-18T20:32:41.1775655Z"
}
It fails. I am getting the exception like below
Exception:
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)
Caused by: org.apache.avro.SchemaParseException: Empty name
at org.apache.avro.Schema.validateName(Schema.java:1523)
The connector should be use more than one node when connecting to an ES cluster.
Hi!
I'm having this error when using your connector:
java.lang.NullPointerException at com.github.dariobalinzo.task.ElasticSourceTask.fetchLastOffset(ElasticSourceTask.java:215)
This is the code of the Connector creation request:
{
"name": "el-source",
"config": {
"connector.class": "com.github.dariobalinzo.ElasticSourceConnector",
"tasks.max": "1",
"es.host": "host.docker.internal",
"es.port": "9201",
"index.prefix": "persons",
"topic.prefix": "persons",
"incrementing.field.name": "_id"
}
}
I have tried different values for the field "incrementing.field.name" with no luck.
Any idea?
Thanks!
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.