Giter VIP home page Giter VIP logo

kafka-connect-elasticsearch-source's Introduction

Kafka-connect-elasticsearch-source

YourActionName Actions Status

Kafka Connect Elasticsearch Source: fetch data from elastic-search and sends it to kafka. The connector fetches only new data using a strictly incremental / temporal field (like a timestamp or an incrementing id). It supports dynamic schema and nested objects/ arrays.

Requirements:

  • Elasticsearch 6.x and 7.x
  • Java >= 8
  • Maven

Output data serialization format:

The connector uses kafka-connect schema and structs, that are agnostic regarding the user serialization method (e.g. it might be Avro or json, etc...).

Bugs or new Ideas?

Installation:

Compile the project with:

mvn clean package -DskipTests

You can also compile and running both unit and integration tests (docker is mandatory) with:

mvn clean package

Copy the jar with dependencies from the target folder into connect classpath ( e.g /usr/share/java/kafka-connect-elasticsearch ) or set plugin.path parameter appropriately.

Example

Using kafka connect in distributed way, a sample config file to fetch my_awesome_index* indices and to produce output topics with es_ prefix:

{       
  "name": "elastic-source",
   "config": {
             "connector.class":"com.github.dariobalinzo.ElasticSourceConnector",
             "tasks.max": "1",
             "es.host" : "localhost",
             "es.port" : "9200",
             "index.prefix" : "my_awesome_index",
             "topic.prefix" : "es_",
             "incrementing.field.name" : "@timestamp"
        }
}

To start the connector with curl:

curl -X POST -H "Content-Type: application/json" --data @config.json http://localhost:8083/connectors | jq

To check the status:

curl localhost:8083/connectors/elastic-source/status | jq

To stop the connector:

curl -X DELETE localhost:8083/connectors/elastic-source | jq

Documentation

Elasticsearch Configuration

es.host ElasticSearch host. Optionally it is possible to specify many hosts using ; as separator (host1;host2;host3)

  • Type: string
  • Importance: high
  • Dependents: index.prefix

es.port ElasticSearch port

  • Type: string
  • Importance: high
  • Dependents: index.prefix

es.scheme ElasticSearch scheme (http/https)

  • Type: string
  • Importance: medium
  • Default: http

es.user Elasticsearch username

  • Type: string
  • Default: null
  • Importance: high

es.password Elasticsearch password

  • Type: password
  • Default: null
  • Importance: high

incrementing.field.name The name of the strictly incrementing field to use to detect new records.

  • Type: any
  • Importance: high

incrementing.secondary.field.name In case the main incrementing field may have duplicates, this secondary field is used as a secondary sort field in order to avoid data losses when paginating (available starting from versions >= 1.4).

  • Type: any
  • Importance: low

es.tls.truststore.location Elastic ssl truststore location

  • Type: string
  • Importance: medium

es.tls.truststore.password Elastic ssl truststore password

  • Type: string
  • Default: ""
  • Importance: medium

es.tls.keystore.location Elasticsearch keystore location

  • Type: string
  • Importance: medium

es.tls.keystore.password Elasticsearch keystore password

  • Type: string
  • Default: ""
  • Importance: medium

connection.attempts Maximum number of attempts to retrieve a valid Elasticsearch connection.

  • Type: int
  • Default: 3
  • Importance: low

connection.backoff.ms Backoff time in milliseconds between connection attempts.

  • Type: long
  • Default: 10000
  • Importance: low

index.prefix Indices prefix to include in copying. Periodically, new indices are discovered if they match the pattern.

  • Type: string
  • Default: ""
  • Importance: medium

index.names List of elasticsearch indices: es1,es2,es3

  • Type: string
  • Default: null
  • Importance: medium

Connector Configuration

poll.interval.ms Frequency in ms to poll for new data in each index.

  • Type: int
  • Default: 5000
  • Importance: high

batch.max.rows Maximum number of documents to include in a single batch when polling for new data.

  • Type: int
  • Default: 10000
  • Importance: low

topic.prefix Prefix to prepend to index names to generate the name of the Kafka topic to publish data

  • Type: string
  • Importance: high

filters.whitelist Whitelist filter for extracting a subset of fields from elastic-search json documents. The whitelist filter supports nested fields. To provide multiple fields use ; as separator (e.g. customer;order.qty;order.price).

  • Type: string
  • Importance: medium
  • Default: null

filters.blacklist Blacklist filter for extracting a subset of fields from elastic-search json documents. The blacklist filter supports nested fields. To provide multiple fields use ; as separator (e.g. customer;order.qty;order.price).

  • Type: string
  • Importance: medium
  • Default: null

filters.json_cast This filter casts nested fields to json string, avoiding parsing recursively as kafka connect-schema. The json-cast filter supports nested fields. To provide multiple fields use ; as separator (e.g. customer;order.qty;order.price).

  • Type: string
  • Importance: medium
  • Default: null

fieldname_converter Configuring which field name converter should be used (allowed values: avro or nop). By default, the avro field name converter renames the json fields non respecting the avro specifications (https://avro.apache.org/docs/current/spec.html#names) in order to be serialized correctly. To disable the field name conversion set this parameter to nop.

  • Type: string
  • Importance: medium
  • Default: avro

kafka-connect-elasticsearch-source's People

Contributors

danny02 avatar dariobalinzo avatar dependabot[bot] avatar feed3r avatar ilosamart avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kafka-connect-elasticsearch-source's Issues

Rest client version conflicts when used with confluent elastic search sink connector

When using it with elastic search sink connector - https://github.com/confluentinc/kafka-connect-elasticsearch

Facing error:

[2021-10-20 17:42:45,631] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:121)
java.lang.NoSuchMethodError: 'org.elasticsearch.client.Response org.elasticsearch.client.RestClient.performRequest(java.lang.String, java.lang.String, org.apache.http.Header[])'
        at com.github.dariobalinzo.elastic.ElasticRepository.catIndices(ElasticRepository.java:103)
        at com.github.dariobalinzo.ElasticSourceConnector.taskConfigs(ElasticSourceConnector.java:98)
        at org.apache.kafka.connect.runtime.Worker.connectorTaskConfigs(Worker.java:323)
        at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.recomputeTaskConfigs(StandaloneHerder.java:305)
        at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.updateConnectorTasks(StandaloneHerder.java:331)
        at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:216)
        at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:115)

I think this is due to version mismatch b/w both connectors dependency - as the RestClient have been changed in both versions.

        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>6.2.3</version>
        </dependency>

and

        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>7.9.3</version>
        </dependency>

Can we update this dependency?

No mapping found for [] in order to sort on

Hı,

When I tried to run the following config, I am getting exception over and over again

{
    "name": "elastic-source-10",
    "config": {
        "connector.class": "com.github.dariobalinzo.ElasticSourceConnector",
        "tasks.max": "1",
        "es.host": "elasticsearch",
        "es.port": "9200",
        "index.prefix": "index-1",
        "topic.prefix": "elastic-"
    }
}

Here is the exception stack trace

2021-06-21 21:39:34,699 INFO   ||  Initializing: org.apache.kafka.connect.runtime.TransformationChain{}   [org.apache.kafka.connect.runtime.Worker]
2021-06-21 21:39:34,699 INFO   ||  ProducerConfig values:
	acks = all
	batch.size = 16384
	bootstrap.servers = [kafka:9092]
	buffer.memory = 33554432
	client.dns.lookup = default
	client.id = connector-producer-elastic-source-10-0
	compression.type = none
	connections.max.idle.ms = 540000
	delivery.timeout.ms = 2147483647
	enable.idempotence = false
	interceptor.classes = []
	key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
	linger.ms = 0
	max.block.ms = 9223372036854775807
	max.in.flight.requests.per.connection = 1
	max.request.size = 1048576
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
	receive.buffer.bytes = 32768
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 2147483647
	retries = 2147483647
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	security.providers = null
	send.buffer.bytes = 131072
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = https
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	transaction.timeout.ms = 60000
	transactional.id = null
	value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
   [org.apache.kafka.clients.producer.ProducerConfig]
2021-06-21 21:39:34,703 INFO   ||  Kafka version: 2.4.0   [org.apache.kafka.common.utils.AppInfoParser]
2021-06-21 21:39:34,703 INFO   ||  Kafka commitId: 77a89fcf8d7fa018   [org.apache.kafka.common.utils.AppInfoParser]
2021-06-21 21:39:34,703 INFO   ||  Kafka startTimeMs: 1624311574703   [org.apache.kafka.common.utils.AppInfoParser]
2021-06-21 21:39:34,717 INFO   ||  [Worker clientId=connect-1, groupId=1] Finished starting connectors and tasks   [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2021-06-21 21:39:34,717 INFO   ||  ElasticSourceTaskConfig values:
	batch.max.rows = 10000
	connection.attempts = 3
	connection.backoff.ms = 10000
	es.host = elasticsearch
	es.indices = index-1
	es.password = null
	es.port = 9200
	es.scheme = http
	es.user = null
	fieldname_converter = avro
	filters.json_cast = null
	filters.whitelist = null
	incrementing.field.name =
	index.prefix = index-1
	mode =
	poll.interval.ms = 5000
	topic.prefix = elastic-
   [com.github.dariobalinzo.task.ElasticSourceTaskConfig]
2021-06-21 21:39:34,717 INFO   ||  elastic auth disabled   [com.github.dariobalinzo.elastic.ElasticConnection]
2021-06-21 21:39:34,719 INFO   ||  WorkerSourceTask{id=elastic-source-10-0} Source task finished initialization and start   [org.apache.kafka.connect.runtime.WorkerSourceTask]
2021-06-21 21:39:34,719 INFO   ||  fetching from index-1   [com.github.dariobalinzo.task.ElasticSourceTask]
2021-06-21 21:39:34,818 INFO   ||  [Producer clientId=connector-producer-elastic-source-10-0] Cluster ID: v38SOhq8SU-h4JvAGoDgTg   [org.apache.kafka.clients.Metadata]
2021-06-21 21:39:34,882 INFO   ||  found last value null   [com.github.dariobalinzo.task.ElasticSourceTask]
2021-06-21 21:39:34,887 ERROR  ||  error   [com.github.dariobalinzo.task.ElasticSourceTask]
ElasticsearchStatusException[Elasticsearch exception [type=search_phase_execution_exception, reason=all shards failed]]
	at org.elasticsearch.rest.BytesRestResponse.errorFromXContent(BytesRestResponse.java:177)
	at org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:618)
	at org.elasticsearch.client.RestHighLevelClient.parseResponseException(RestHighLevelClient.java:594)
	at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:501)
	at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:474)
	at org.elasticsearch.client.RestHighLevelClient.search(RestHighLevelClient.java:391)
	at com.github.dariobalinzo.elastic.ElasticRepository.executeSearch(ElasticRepository.java:89)
	at com.github.dariobalinzo.elastic.ElasticRepository.searchAfter(ElasticRepository.java:72)
	at com.github.dariobalinzo.task.ElasticSourceTask.poll(ElasticSourceTask.java:169)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:265)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:232)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
	Suppressed: org.elasticsearch.client.ResponseException: method [POST], host [http://elasticsearch:9200], URI [/index-1/_search?typed_keys=true&ignore_unavailable=false&expand_wildcards=open&allow_no_indices=true&search_type=query_then_fetch&batched_reduce_size=512], status line [HTTP/1.1 400 Bad Request]
{"error":{"root_cause":[{"type":"query_shard_exception","reason":"No mapping found for [] in order to sort on","index_uuid":"vwbDQ_nlQYqwRxxqe-S9Fg","index":"index-1"}],"type":"search_phase_execution_exception","reason":"all shards failed","phase":"query","grouped":true,"failed_shards":[{"shard":0,"index":"index-1","node":"jTlMT-4gS2Gu66gtHyuG9g","reason":{"type":"query_shard_exception","reason":"No mapping found for [] in order to sort on","index_uuid":"vwbDQ_nlQYqwRxxqe-S9Fg","index":"index-1"}}]},"status":400}
		at org.elasticsearch.client.RestClient$1.completed(RestClient.java:357)
		at org.elasticsearch.client.RestClient$1.completed(RestClient.java:346)
		at org.apache.http.concurrent.BasicFuture.completed(BasicFuture.java:119)
		at org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:177)
		at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:436)
		at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:326)
		at org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265)
		at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)
		at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)
		at org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114)
		at org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
		at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
		at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
		at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
		at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
		at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
		... 1 more

I also send the request with manually and there is no problem with related to it because I am getting response succesfully.
After that point, I checked the connector status and it is in RUNNING state but I think, it has to be in FAILED status.

Do you have any idea what could be cause the problem ?

Thanks 🖖

bad request on searching index names

I'm getting this following errors in taks config. The connection test works before getting to this point.

[2019-08-05 14:59:45,929] ERROR error in searching index names (com.github.dariobalinzo.ElasticSourceConnector)
[2019-08-05 14:59:45,929] ERROR [Worker clientId=connect-1, groupId=data-dev] Failed to reconfigure connector's tasks, retrying after backoff: (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
java.lang.RuntimeException: org.elasticsearch.client.ResponseException: method [GET], host [http://elastic-dev.data:80], URI [_cat/indices], status line [HTTP/1.1 400 BAD_REQUEST]

Errors using kafka-connect-elasticsearch-source:1.4.2

Hello,

I'm using the elasticsearch-source:1.3 with success.
I install it with the confluent-hub client by:

confluent-hub install --no-prompt dariobalinzo/kafka-connect-elasticsearch-source:1.3

I need to use, the filters.blacklist which as far as I know are available in 1.4.2 branch of this repository.

I tried to install it

confluent-hub install --no-prompt dariobalinzo/kafka-connect-elasticsearch-source:1.4.2

No success.

Then, cloned the repository, moved to 1.4.2 branch and run the mvn build command: mvn clean package -DskipTests
Build was ok.

I'm placing the generated jar into the plugin.path set in settings, which would be /usr/share/confluent-hub-components, and so to the /etc/kafka-connect/jars/ folder.
If I use the elastic-source-connect-1.4-jar-with-dependencies.jar I got the following error:

kafka kafka-cp-kafka-connect-5f7d74847b-hrbl5 cp-kafka-connect-server [2021-09-29 12:22:58,664] ERROR Stopping due to error (org.apache.kafka.connect.cli.ConnectDistributed)
kafka kafka-cp-kafka-connect-5f7d74847b-hrbl5 cp-kafka-connect-server java.lang.NoSuchFieldError: DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MS
kafka kafka-cp-kafka-connect-5f7d74847b-hrbl5 cp-kafka-connect-server  	at org.apache.kafka.connect.runtime.distributed.DistributedConfig.<clinit>(DistributedConfig.java:245)
kafka kafka-cp-kafka-connect-5f7d74847b-hrbl5 cp-kafka-connect-server  	at org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:95)
kafka kafka-cp-kafka-connect-5f7d74847b-hrbl5 cp-kafka-connect-server  	at org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:80)

which prevents kafka-connect from starting up.

If I use the elastic-source-connect-1.4.jar, I got:

kafka kafka-cp-kafka-connect-68f487696-c6fbt cp-kafka-connect-server [2021-09-29 12:37:56,765] ERROR WorkerConnector{id=elastic-source-20210710} Error while starting connector (org.apache.kafka.connect.runtime.WorkerConnector)
kafka kafka-cp-kafka-connect-68f487696-c6fbt cp-kafka-connect-server java.lang.NoClassDefFoundError: org/apache/http/client/CredentialsProvider
kafka kafka-cp-kafka-connect-68f487696-c6fbt cp-kafka-connect-server   	at com.github.dariobalinzo.elastic.ElasticConnectionBuilder.build(ElasticConnectionBuilder.java:77)

which also prevents kafka-connect from starting up.

Using as base image the confluentinc/cp-kafka-connect-base:6.1.1

Is there anything to do to overcome these errors?

Thanks for any clue

question: '-' removed from the attributes of the JSON value

Hi @DarioBalinzo

I see that the ES connect code removes dashes from the attributes: in my case current-time turns into currenttime etc. Is it intentional? When working with Kafka I can have keys and attributes of JSON values with dashes, as well as in ksqlDB column names with dashes are fine (just need to quote them in KSQL queries).
Why is it that you are removing them in the ES connect?
Thank you.

Nested fields

Hi there.

Been using this plugin for a while, it's great! Thanks!

Having a problem with nested fields that are constructed from arbitrary (different every time) JSON in elasticsearch, what I really want is to load in this json as a string using the json_cast feature, which isn't working for me. The blacklist and whitelist filters both work well and use the exact same syntax to specify fields so I'm not sure what's wrong...

Really appreciate your responses to my multiple issues so thank you.

Missing records?

Hi,

I've noticed sometimes records can be missed by the connector. I think it has to do with batched writes and the frequency of polling based on my experiments.

I'm using the Kafka message offset as the incrementing field.

I've adjusted the polling interval from the default 5 seconds to 30 minutes in some cases. So far this has helped quite a bit.

Just wondering if anyone else has come across this or has a different strategy?

ERROR [elastic-source|task-0] error (com.github.dariobalinzo.task.ElasticSourceTask:217)

Hey Dario!
I finally managed to successfully run elasticsearch source connector witch apache kafka.

{
  "name": "elastic-source",
  "connector": {
    "state": "RUNNING",
    "worker_id": "127.0.1.1:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "127.0.1.1:8083"
    }
  ],
  "type": "source"
}

However, when I run the elasticsearch-source.properties as distributed:

name=elastic-source
connector.class=com.github.dariobalinzo.ElasticSourceConnector
tasks.max=1
es.host=127.0.0.1
es.port=1750
index.prefix=products
topic.prefix=es_
topic=elastic-events

I have some error like this:

[2022-09-20 17:45:38,127] INFO [elastic-source|task-0] fetching from products (com.github.dariobalinzo.task.ElasticSourceTask:201)
[2022-09-20 17:45:38,128] INFO [elastic-source|task-0] found last value Cursor{primaryCursor='null', secondaryCursor='null'} (com.github.dariobalinzo.task.ElasticSourceTask:203)
[2022-09-20 17:45:38,129] WARN [elastic-source|task-0] request [POST http://localhost:1750/products/_search?typed_keys=true&max_concurrent_shard_requests=5&search_type=query_then_fetch&batched_reduce_size=512] returned 1 warnings: [299 Elasticsearch-7.15.0-79d65f6e357953a5b3cbcc5e2c7c21073d89aa29 "Elasticsearch built-in security features are not enabled. Without authentication, your cluster could be accessible to anyone. See https://www.elastic.co/guide/en/elasticsearch/reference/7.15/security-minimal-setup.html to enable security."] (org.elasticsearch.client.RestClient:72)
[2022-09-20 17:45:38,129] ERROR [elastic-source|task-0] error (com.github.dariobalinzo.task.ElasticSourceTask:217)
ElasticsearchStatusException[Elasticsearch exception [type=search_phase_execution_exception, reason=all shards failed]]
	at org.elasticsearch.rest.BytesRestResponse.errorFromXContent(BytesRestResponse.java:178)
	at org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:2484)
	at org.elasticsearch.client.RestHighLevelClient.parseResponseException(RestHighLevelClient.java:2461)
	at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:2184)
	at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:2137)
	at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:2105)
	at org.elasticsearch.client.RestHighLevelClient.search(RestHighLevelClient.java:1367)
	at com.github.dariobalinzo.elastic.ElasticRepository.executeSearch(ElasticRepository.java:176)
	at com.github.dariobalinzo.elastic.ElasticRepository.searchAfter(ElasticRepository.java:90)
	at com.github.dariobalinzo.task.ElasticSourceTask.poll(ElasticSourceTask.java:205)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:305)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:249)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
	Suppressed: org.elasticsearch.client.ResponseException: method [POST], host [http://localhost:1750], URI [/products/_search?typed_keys=true&max_concurrent_shard_requests=5&search_type=query_then_fetch&batched_reduce_size=512], status line [HTTP/1.1 400 Bad Request]

and this:

{"error":{"root_cause":[{"type":"query_shard_exception","reason":"No mapping found for [@timestamp] in order to sort on","index_uuid":"b575v16yTXmq5o2sk77zbA","index":"products"}],"type":"search_phase_execution_exception","reason":"all shards failed","phase":"can_match","grouped":true,"failed_shards":[{"shard":0,"index":"products","node":"asmTRFlgThS7kBU6yyCJzA","reason":{"type":"query_shard_exception","reason":"No mapping found for [@timestamp] in order to sort on","index_uuid":"b575v16yTXmq5o2sk77zbA","index":"products"}}]},"status":400}
		at org.elasticsearch.client.RestClient.convertResponse(RestClient.java:331)
		at org.elasticsearch.client.RestClient.performRequest(RestClient.java:301)
		at org.elasticsearch.client.RestClient.performRequest(RestClient.java:276)
		at org.elasticsearch.client.RestHighLevelClient.performClientRequest(RestHighLevelClient.java:2699)
		at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:2171)
		... 15 more

All list topics are:

__consumer_offsets
connect-configs
connect-offset
connect-offsets
connect-status
elastic-events

Normally in your tutorial all the indices products* are sent to Kafka using the es_ string as a topic prefix.
So my question am I doing things right here? what's wrong?
how can i know if all the data i pass to elasticsearch is read by kafka?
thanks

connector failing to process empty array, throws error

Fails to process below json messages, fails to process statusHistoryRanking:
{
"statusHistory": [
{
"date": 2021-10-19,
"status": "IN PASS"
}
],
"statusHistoryRanking": []
}

Error Message:
ERROR (com.github.dariobalinzo.task.ElasticSourceTask:202) error
java.lang.RuntimeException: error in converting list: type not supported
at com.github.dariobalinzo.schema.SchemaConverter.convertListSchema(SchemaConverter.java:127)
at com.github.dariobalinzo.schema.SchemaConverter.convertDocumentSchema(SchemaConverter.java:70)
at com.github.dariobalinzo.schema.SchemaConverter.convertMapSchema(SchemaConverter.java:89)
at com.github.dariobalinzo.schema.SchemaConverter.convertDocumentSchema(SchemaConverter.java:74)
at com.github.dariobalinzo.schema.SchemaConverter.convert(SchemaConverter.java:42)
at com.github.dariobalinzo.task.ElasticSourceTask.parseResult(ElasticSourceTask.java:245)
at com.github.dariobalinzo.task.ElasticSourceTask.poll(ElasticSourceTask.java:192)
at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:268)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:241)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)

Solution:
SchemaConverter.convertDocumentSchema: check for head is not null:
Object head = ((List<?>) value).get(0);
StructConverter.convertListToAvroArray: check for head is not null:
Object head = value.get(0);

null values

Hi,

I'm wondering if there's a way to support null values? We have some documents like this:
{"a": "value", "b":"value"}, {"a":null, "b":"value2"}. For now I can modify the source to generate {"b":"value2"}, but I was wondering if there's a different way.

Exception thrown:

[2021-02-24 22:46:40,733] ERROR error (com.github.dariobalinzo.task.ElasticSourceTask)
java.lang.RuntimeException: type not supported XXXX
at com.github.dariobalinzo.schema.SchemaConverter.convertDocumentSchema(SchemaConverter.java:76)
at com.github.dariobalinzo.schema.SchemaConverter.convert(SchemaConverter.java:42)
at com.github.dariobalinzo.task.ElasticSourceTask.parseResult(ElasticSourceTask.java:237)

Same error as issue 3

ERROR error in searching index names (com.github.dariobalinzo.ElasticSourceConnector) (DistributedHerder-connect-1-1)
Error [Worker clientID=connect-1, groupID=connect-cluster] fialed to reconfigure connector's tasks

avoid wide matching of indexes with index.prefix

Hello @DarioBalinzo ,

I have two indexes in ES nuvla-nuvlabox and nuvla-nuvlabox-status and with the configuration below, I noticed, I'm getting messages twice in es_nuvla-nuvlabox-status. I added "elastic-source-nuvlabox" much later due to new requirements.

[root@31e4e221f046 appuser]# cat /etc/kafka/elastic-source-nuvlabox.json
{   "name": "elastic-source-nuvlabox",
    "config": {"connector.class":"com.github.dariobalinzo.ElasticSourceConnector",
               "tasks.max": "1",
               "es.host" : "es",
               "es.port" : "9200",
               "index.prefix" : "nuvla-nuvlabox",
               "topic.prefix" : "es_",
               "incrementing.field.name" : "updated",
               "fieldname_converter": "nop"
    }
}
[root@31e4e221f046 appuser]# cat /etc/kafka/elastic-source-nuvlabox-status.json
{   "name": "elastic-source-nuvlabox-status",
    "config": {"connector.class":"com.github.dariobalinzo.ElasticSourceConnector",
               "tasks.max": "1",
               "es.host" : "es",
               "es.port" : "9200",
               "index.prefix" : "nuvla-nuvlabox-status",
               "topic.prefix" : "es_",
               "incrementing.field.name" : "updated",
               "fieldname_converter": "nop"
    }
}
[root@31e4e221f046 appuser]#

I understand this can be fixed by removing elastic-source-nuvlabox-status connector. However, what will happen if I add e.g. nuvla-nuvlabox-telem index? I don't want it to end up in Kafka as es_nuvla-nuvlabox-telem.

Maybe, adding an optional index.name (or comma-separated index.names) parameter for the index(s) strict definition would be a solution?

Thank you.

Problem using Connector on Cloudera Kafka Connect

Hello!

Cloudera Kafka Connect ships with log4j:2.10, but the elasticsearch.client which is dependency for this project uses log4j:2.9. When we try to use this connector an error like this raises:

java.util.ServiceConfigurationError: org.apache.logging.log4j.spi.Provider: Provider org.apache.logging.log4j.core.impl.Log4jProvider not a subtype

It seems a bug in log4j, ref.: https://issues.apache.org/jira/browse/LOG4J2-2055 . I've explicitly added jog4j dependencies with version 2.10, like this:

        <!-- ... -->
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.10.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>2.10.0</version>
        </dependency>
        <!-- ... -->

Tests have passed, but I don't know if you'll consider this a bug, since it's a behavior occurring in Cloudera's distribution, not in Confluent's.

Any guidance or help would be appreciated :)

Data Exception

I've been using this repo (thanks again btw) and it worked very well until today when suddenly the console is flooded with this error:

[2021-07-16 14:33:03,515] ERROR error (com.github.dariobalinzo.task.ElasticSourceTask:180)
org.apache.kafka.connect.errors.DataException: Invalid Java object for schema type STRUCT: class java.lang.String for field: "old"
at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:245)
at org.apache.kafka.connect.data.Struct.put(Struct.java:216)
at org.apache.kafka.connect.data.Struct.put(Struct.java:203)
at com.github.dariobalinzo.schema.StructConverter.convertDocumentStruct(StructConverter.java:47)
at com.github.dariobalinzo.schema.StructConverter.convertListOfObject(StructConverter.java:114)
at com.github.dariobalinzo.schema.StructConverter.lambda$convertListToAvroArray$0(StructConverter.java:81)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
at com.github.dariobalinzo.schema.StructConverter.convertListToAvroArray(StructConverter.java:82)
at com.github.dariobalinzo.schema.StructConverter.convertDocumentStruct(StructConverter.java:49)
at com.github.dariobalinzo.schema.StructConverter.convert(StructConverter.java:37)
at com.github.dariobalinzo.task.ElasticSourceTask.parseResult(ElasticSourceTask.java:214)
at com.github.dariobalinzo.task.ElasticSourceTask.poll(ElasticSourceTask.java:170)
at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:289)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:256)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

At first I thought it was just a few badly formatted messages but it seems to be for every single message and my pipeline is not working anymore.

Any thoughts on what the problem could be?

Thanks in advance

partition/replica configuration

Is it possible to add configuration parameters to set partition/replica value for the topics created by the connector?
At the moment the connector creates 1/1.

I have ksqldb streams reading from the topics. They expect partition/replica configurations other than 1/1.

NumberFormatException for es.host while starting Elasticsearch source connector

Hi
I am getting below error when I start connector.
Please help on this. Its running but Cant see any desired topic in kafka.

[2021-05-12 16:30:42,611] INFO ElasticSourceConnectorConfig values:
batch.max.rows = 10000
connection.attempts = 3
connection.backoff.ms = 10000
es.host = "myhost.uat.com"
es.password = "myEspass"
es.port = "9200"
es.scheme = http
es.user = "elastic"
fieldname_converter = avro
filters.json_cast = null
filters.whitelist = null
incrementing.field.name = "@timestamp_log"
index.prefix = "my_index-*"
mode =
poll.interval.ms = 5000
topic.prefix = "myprefix_"
(com.github.dariobalinzo.ElasticSourceConnectorConfig:347)
[2021-05-12 16:30:42,611] ERROR WorkerConnector{id=elasticsearch-source} Error while starting connector (org.apache.kafka.connect.runtime.WorkerConnector:119)
java.lang.NumberFormatException: For input string: ""9200""
at java.base/java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.base/java.lang.Integer.parseInt(Integer.java:638)
at java.base/java.lang.Integer.parseInt(Integer.java:770)
at com.github.dariobalinzo.ElasticSourceConnector.start(ElasticSourceConnector.java:63)
at org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:111)
at org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:136)
at org.apache.kafka.connect.runtime.WorkerConnector.transitionTo(WorkerConnector.java:196)
at org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:253)
at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.startConnector(StandaloneHerder.java:293)
at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:209)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:115)
[2021-05-12 16:30:42,614] INFO Finished creating connector elasticsearch-source (org.apache.kafka.connect.runtime.Worker:269)
[2021-05-12 16:30:42,615] INFO Skipping update of connector elasticsearch-source since it is not running

No errors but data not pulled out from Elastic

Hi and thank you for this connector,

I have 3 elastic indexes that have the same prefix test_1 , test_2 , test_3.
One index is not happy to be sent to kafka (log_test3) while the other two are ok (the schema is different).
No errors on the connector logs. Not clear what is the issue, the same configuration used before on elastic version 6 was working fine.
Also is there a way to prioritise some indexes to be collected first?
Any limitations with the number of indexes with the same prefix are collected by one connector?

This is my connector conf:

    "connector.class": "com.github.dariobalinzo.ElasticSourceConnector",
    "tasks.max": "2",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "errors.retry.timeout": "-1",
    "errors.log.enable": true,
    "errors.log.include.messages": true,
    "es.host": "192.168.1.20",
    "es.port": "9200",
    "connection.attempts": "5",
    "connection.backoff.ms": "10000",
    "index.prefix": "logs_",
    "incrementing.field.name": "@timestamp",
    "topic.prefix": "topic_"

Thank you.

Detect new indexes and other issues

Hi,

First, thank you for this amazing connect plugin.

I have a lot of indexes, named by day, like idx-2021.08.30, the first problem that I disovered here in my environment, it's look like the elasticsearch connect freezes in some index, and the connect enter in type of loop:

cp-kafka-connect-connect-es-74df8cf9d6-92whw connect-es-server [2021-08-31 20:44:45,664] INFO index idx-2021.08.31 total messages: 295939  (com.github.dariobalinzo.task.ElasticSourceTask)
cp-kafka-connect-connect-es-74df8cf9d6-92whw connect-es-server [2021-08-31 20:44:56,592] INFO index idx-2021.08.31 total messages: 295939  (com.github.dariobalinzo.task.ElasticSourceTask)
cp-kafka-connect-connect-es-74df8cf9d6-92whw connect-es-server [2021-08-31 20:44:58,016] INFO index idx-2021.08.31 total messages: 295939  (com.github.dariobalinzo.task.ElasticSourceTask)

And this line repeat eternally.

Other problem is with new indexes, new indexes are created every day, but only after restart the connect task, connect can find the new index. And my indexes has only @timestamp as incremental field, but always has less document in topics comparing with index.

I appreciate so much if you can help me.

Thank you,

Resetting the incrementing field?

Hi, I'm wondering if there's a good way to reset the incrementing field? For example I want to run the connector on all the data since the beginning of time again.

No mapping found for [timestamp] in order to sort on

Hi there ,I am facing with no mapping found error while using this connector.
I have checked on kibana by sorting the hits based on time stamp ,where i got positive result .
In this case i dont know where i have done mistake ... So i am in need of help .Any solution or knowledge is welcome.

Passed config:

{
    "name": "elastic_source2",
    "config": {
        "connector.class": "com.github.dariobalinzo.ElasticSourceConnector",
        "tasks.max": "1",
        "es.host": "03dc4cd17baf46e190268130c6cb7d1d.us-central1.gcp.cloud.es.io",
        "es.scheme": "https",
        "es.port": "9243",
        "es.user": "elastic",
        "es.password": "uqBE5g4ZIJA9Tp****2leGPq",
        "index.name": "kibana_sample_data_flights",
        "topic.prefix": "es_",
        "incrementing.field.name": "timestamp"
    }
}

Error logs:


[2022-05-10 10:25:45,189] INFO [elastic_source2|task-0] Stopping task elastic_source2-0 (org.apache.kafka.connect.runtime.Worker:829)
[2022-05-10 10:25:45,723] ERROR [elastic_source2|task-0] error (com.github.dariobalinzo.task.ElasticSourceTask:217)
ElasticsearchStatusException[Elasticsearch exception [type=search_phase_execution_exception, reason=all shards failed]]
	at org.elasticsearch.rest.BytesRestResponse.errorFromXContent(BytesRestResponse.java:178)
	at org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:2484)
	at org.elasticsearch.client.RestHighLevelClient.parseResponseException(RestHighLevelClient.java:2461)
	at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:2184)
	at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:2137)
	at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:2105)
	at org.elasticsearch.client.RestHighLevelClient.search(RestHighLevelClient.java:1367)
	at com.github.dariobalinzo.elastic.ElasticRepository.executeSearch(ElasticRepository.java:176)
	at com.github.dariobalinzo.elastic.ElasticRepository.searchAfter(ElasticRepository.java:90)
	at com.github.dariobalinzo.task.ElasticSourceTask.poll(ElasticSourceTask.java:205)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:291)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:248)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:186)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:241)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
	Suppressed: org.elasticsearch.client.ResponseException: method [POST], host [https://03dc4cd17baf46e190268130c6cb7d1d.us-central1.gcp.cloud.es.io:9243], URI [/.ds-logs-enterprise_search.api-default-2022.05.09-000001/_search?typed_keys=true&max_concurrent_shard_requests=5&search_type=query_then_fetch&batched_reduce_size=512], status line [HTTP/1.1 400 Bad Request]
{"error":{"root_cause":[{"type":"query_shard_exception","reason":"No mapping found for [timestamp] in order to sort on","index_uuid":"Wx0FJ-RFQFOTqp_wJykMfA","index":".ds-logs-enterprise_search.api-default-2022.05.09-000001"}],"type":"search_phase_execution_exception","reason":"all shards failed","phase":"query","grouped":true,"failed_shards":[{"shard":0,"index":".ds-logs-enterprise_search.api-default-2022.05.09-000001","node":"-Vr1NH-xQ-miv45oOjwkXw","reason":{"type":"query_shard_exception","reason":"No mapping found for [timestamp] in order to sort on","index_uuid":"Wx0FJ-RFQFOTqp_wJykMfA","index":".ds-logs-enterprise_search.api-default-2022.05.09-000001"}}]},"status":400}
		at org.elasticsearch.client.RestClient.convertResponse(RestClient.java:331)
		at org.elasticsearch.client.RestClient.performRequest(RestClient.java:301)
		at org.elasticsearch.client.RestClient.performRequest(RestClient.java:276)
		at org.elasticsearch.client.RestHighLevelClient.performClientRequest(RestHighLevelClient.java:2699)
		at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:2171)
		... 15 more
[2022-05-10 10:25:45,725] INFO [elastic_source2|task-0] WorkerSourceTask{id=elastic_source2-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:505)
[2022-05-10 10:25:45,728] INFO [elastic_source2|task-0] [Producer clientId=connector-producer-elastic_source2-0] Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer:1208)

java.lang.NoSuchFieldError: INSTANCE

When activate this connector with below configuration:
{
"name": "global-es-source",
"config": {
"connector.class":"com.github.dariobalinzo.ElasticSourceConnector",
"tasks.max": "1",
"es.host" : "10.0.129.171;10.0.128.237;10.0.128.64",
"es.port" : "9200",
"es.user" : "xxxx",
"es.password" : "xxxx",
"es.scheme" : "http",
"incrementing.field.name" : "@timestamp",
"topic.prefix" : "global-es-",
"type.name": "source"
}
}
And check status of this connector, found below error:
java.lang.NoSuchFieldError: INSTANCE
at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.(PoolingNHttpClientConnectionManager.java:591)
at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.(PoolingNHttpClientConnectionManager.java:163)
at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.(PoolingNHttpClientConnectionManager.java:147)
at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.(PoolingNHttpClientConnectionManager.java:119)
at org.apache.http.impl.nio.client.HttpAsyncClientBuilder.build(HttpAsyncClientBuilder.java:668)
at org.elasticsearch.client.RestClientBuilder.run(RestClientBuilder.java:218)
at org.elasticsearch.client.RestClientBuilder.run(RestClientBuilder.java:215)
at java.security.AccessController.doPrivileged(Native Method)
at org.elasticsearch.client.RestClientBuilder.createHttpClient(RestClientBuilder.java:215)
at org.elasticsearch.client.RestClientBuilder.accessbash00(RestClientBuilder.java:42)
at org.elasticsearch.client.RestClientBuilder.run(RestClientBuilder.java:187)
at org.elasticsearch.client.RestClientBuilder.run(RestClientBuilder.java:184)
at java.security.AccessController.doPrivileged(Native Method)
at org.elasticsearch.client.RestClientBuilder.build(RestClientBuilder.java:184)
at org.elasticsearch.client.RestHighLevelClient.(RestHighLevelClient.java:200)
at org.elasticsearch.client.RestHighLevelClient.(RestHighLevelClient.java:192)
at com.github.dariobalinzo.elastic.ElasticConnection.createConnectionUsingAuth(ElasticConnection.java:73)
at com.github.dariobalinzo.elastic.ElasticConnection.(ElasticConnection.java:49)
at com.github.dariobalinzo.elastic.ElasticConnectionBuilder.build(ElasticConnectionBuilder.java:60)
at com.github.dariobalinzo.ElasticSourceConnector.start(ElasticSourceConnector.java:85)
at org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:111)
at org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:136)
at org.apache.kafka.connect.runtime.WorkerConnector.transitionTo(WorkerConnector.java:196)
at org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:242)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:908)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access300(DistributedHerder.java:110)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder5.call(DistributedHerder.java:924)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder5.call(DistributedHerder.java:920)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Any idea of this issue? Thank you for advice.

NPE in com.github.dariobalinzo.schema.StructConverter.convertListOfObject

The JSON document in the ES index is rather complex and contains several lists. I can paste it here if you think this can be useful.

[2020-12-10 13:11:53,542] ERROR error (com.github.dariobalinzo.task.ElasticSourceTask:152)
java.lang.NullPointerException
        at com.github.dariobalinzo.schema.StructConverter.convertListOfObject(StructConverter.java:101)
        at com.github.dariobalinzo.schema.StructConverter.lambda$convertListToAvroArray$1(StructConverter.java:74)
        at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
        at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
        at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
        at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
        at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
        at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
        at com.github.dariobalinzo.schema.StructConverter.convertListToAvroArray(StructConverter.java:75)
        at com.github.dariobalinzo.schema.StructConverter.convertDocumentStruct(StructConverter.java:42)
        at com.github.dariobalinzo.schema.StructConverter.covertMapToAvroStruct(StructConverter.java:89)
        at com.github.dariobalinzo.schema.StructConverter.convertDocumentStruct(StructConverter.java:44)
        at com.github.dariobalinzo.schema.StructConverter.convert(StructConverter.java:30)
        at com.github.dariobalinzo.task.ElasticSourceTask.parseResult(ElasticSourceTask.java:179)
        at com.github.dariobalinzo.task.ElasticSourceTask.poll(ElasticSourceTask.java:142)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:289)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:256)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)

adds an ip address to my host url

I've got an error because apparently it's changing my url.
I set a port and host but in the command line it gives me this:

java.net.ConnectException: Timeout connecting to [<es.host>/<ip_address>:<es.port>]

Could you please help me?

Issue using https

Hi,

I wrote earlier about adding https support. I just tried today with version 1.1 and I'm still getting an error. I looked at the code against https://github.com/inspectorioinc/kafka-connect-elasticsearch-source/ and I didn't see anything obvious.

However I did notice that if I enter an invalid host I get the same error.

Config:
{
"connector.class": "com.github.dariobalinzo.ElasticSourceConnector",
"es.host": "xxx",
"es.scheme": "https",
"tasks.max": "1",
"incrementing.field.name": "@timestamp",
"connection.attempts": "1",
"es.port": "9243",
"mode": "timestamp",
"topic.prefix": "essource_",
"es.password": "xxx",
"name": "essource",
"es.user": "xxx",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"index.prefix": "test_"
}

Call stack:

[2020-12-21 19:40:29,753] ERROR error in searching index names (com.github.dariobalinzo.elastic.ElasticRepository)
[2020-12-21 19:40:29,753] ERROR [Worker clientId=connect-1, groupId=compose-connect-group] Failed to reconfigure connector's tasks, retrying after backoff: (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
java.lang.RuntimeException: org.elasticsearch.client.ResponseException: method [GET], host [https://xxx:9243], URI [_cat/indices], status line [HTTP/1.1 400 Bad Request]
400 Bad Request
at com.github.dariobalinzo.elastic.ElasticRepository.catIndices(ElasticRepository.java:106)
at com.github.dariobalinzo.ElasticSourceConnector.taskConfigs(ElasticSourceConnector.java:98)
at org.apache.kafka.connect.runtime.Worker.connectorTaskConfigs(Worker.java:373)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnector(DistributedHerder.java:1420)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnectorTasksWithRetry(DistributedHerder.java:1358)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1000(DistributedHerder.java:127)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$15$1.call(DistributedHerder.java:1375)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$15$1.call(DistributedHerder.java:1372)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:365)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:294)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)

Pulling data on create of source connector after it is not pulling latest data from ES

Pulling data on create of source connector after it is not pulling latest data from ES

Configured :
{
"name": "elastic-source-logs",
"config": {
"connector.class": "com.github.dariobalinzo.ElasticSourceConnector",
"topic.prefix": "logs",
"es.host": "elastic",
"tasks.max": "1",
"incrementing.field.name": "timestamp",
"name": "elastic-source-logs",
"es.port": "9200",
"index.prefix": "logs,

	"poll.interval.ms":"10000"
"timestamp.delay.interval.ms":"3000"
}}

This will pulling data from ES only once after that it is not pulling again new changes from ES .

Is there any configuration paramter to configure pull interval ????

Does this connector support elastic OpenDistro

Hi Dario,

I tried the plugin with opendistro Elastic version, and I added username and password to my connector conf however I am getting this result once I registered the connector:

kafka-connect    |  (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:354)
kafka-connect    | [2021-09-26 05:19:29,894] ERROR [-es-source|worker] error in searching index names (com.github.dariobalinzo.elastic.ElasticRepository:190)
kafka-connect    | [2021-09-26 05:19:29,895] ERROR [Worker clientId=connect-1, groupId=kafka-connect] Failed to reconfigure connector's tasks, retrying after backoff: (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1376)
kafka-connect    | java.lang.RuntimeException: org.

Registration conf:
curl -X PUT http://192.168.1.100:8083/connectors/es-source/config -H "Content-Type: application/json" -d '{
"connector.class": "com.github.dariobalinzo.ElasticSourceConnector",
"tasks.max": "2",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"errors.retry.timeout": "-1",
"errors.log.enable": true,
"errors.log.include.messages": true,
"es.host": "192.168.1.50",
"es.port": "9200",
"es.user": "admin",
"es.password": "temppass",
"connection.attempts": "5",
"connection.backoff.ms": "10000",
"index.prefix": "log",
"incrementing.field.name": "@timestamp",
"topic.prefix": "My_"
}'

Thank you in advance

Found last value Cursor{primaryCursor='null', secondaryCursor='null'} java.lang.NullPointerException

We managed to connect to the elasticsearch but we are getting a nullpointerexception, what could be causing that? Below is our kafka (strimzi) configuration and the error message

class: com.github.dariobalinzo.ElasticSourceConnector config: poll.interval.ms: 1000 es.port: >- ${file:/opt/kafka/external-configuration/elasticsearch-creds/elasticsearch-creds:port} es.scheme: https fieldname_converter: nop es.tls.truststore.location: >- ${file:/opt/kafka/external-configuration/elasticsearch-creds/elasticsearch-creds:truststore-location} es.host: >- ${file:/opt/kafka/external-configuration/elasticsearch-creds/elasticsearch-creds:host} es.user: >- ${file:/opt/kafka/external-configuration/elasticsearch-creds/elasticsearch-creds:user} elastic.security.protocol: SSL es.tls.truststore.password: >- ${file:/opt/kafka/external-configuration/elasticsearch-creds/elasticsearch-creds:truststore-password} index.prefix: ca-dict-sanctions-stg errors.log.enable: true topic.prefix: '' incrementing.field.name: status.currentUpdatedOn es.password: >- ${file:/opt/kafka/external-configuration/elasticsearch-creds/elasticsearch-creds:password} tasksMax: 1

"_source" : { "field" : "Country", "operator" : "Equals", "value" : "CYP", "action" : "Block", "replace" : "", "carrierBank" : [ ], "status" : { "currentStatus" : "Active", "currentUpdatedOn" : "2021-07-01T12:20:04.453+0300", "currentNote" : null, "previousStatus" : null, "previousUpdatedOn" : null, "previousNote" : null, "daysSinceLastUpdate" : 0 }, "metadata" : { "creationDate" : "2021-07-01T12:20:04.453+0300", "createdBy" : "Userlevel4", "lastEditedBy" : "Userlevel4", "lastEditedDate" : "2021-07-01T12:20:04.454+0300" } }

2021-07-01 11:12:57,262 INFO fetching from ca-dict-sanctions-stg (com.github.dariobalinzo.task.ElasticSourceTask) [task-thread-ca-dict-sanctions-stg-0] 2021-07-01 11:12:57,795 INFO found last value Cursor{primaryCursor='null', secondaryCursor='null'} (com.github.dariobalinzo.task.ElasticSourceTask) [task-thread-ca-dict-sanctions-stg-0] 2021-07-01 11:12:57,797 ERROR error (com.github.dariobalinzo.task.ElasticSourceTask) [task-thread-ca-dict-sanctions-stg-0] java.lang.NullPointerException at com.github.dariobalinzo.elastic.ElasticRepository.searchAfter(ElasticRepository.java:97) at com.github.dariobalinzo.task.ElasticSourceTask.poll(ElasticSourceTask.java:203) at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:270) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:237) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834)

using filter.whitelist for nested value

Hi There,

i am having problem when using configuration filters.whitelist for nested value,

my json document value part ;

"resPayload": [
  "{ \"timestamp\":1611882445, \"status\":200, \"errors\":{ \"identity_photo\":\"invalid\" }, \"data\":{ \"name\":true, \"birthdate\":true, \"birthplace\":true, \"address\":\"M*MP*NG\",\"selfie_photo\":80}}"
]

when i use this configuration, i got all value from resPayload field,
"filters.whitelist" : "apiName;resPayload",

but when i do this, i only get value apiName field, none from resPayload field
"filters.whitelist" : "apiName;resPayload.status",

how do i get only some field value i needed from resPayload

anyone can help me?
really appreciate your response.

insert mutli index on the same topic

Hi,
First of all thank you for your amazing job.
I have a question. right now i'm tryin to insert mutli index from ES to one topic on Kafka.
I wonder if it's possible to do that. Because right now, as long as i'm using one parameters on "index.prefix" everything work perfectly. As soon as I'm tryin to put an extra index like => index.prefix= my_awesome_index, my_beauty_index (using yaml)
Nothing come from ES.
So my question is, Do I have to add an other connector or simply duplicate the line ?

int32 vs long, and filters.blacklist

Hello @DarioBalinzo,

After some time, the connector stops working for me with the following error.

[2021-02-28 21:36:03,823] INFO fetching from nuvla-nuvlabox-status (com.github.dariobalinzo.task.ElasticSourceTask:166)
[2021-02-28 21:36:03,823] INFO found last value 2021-02-28T21:35:46.489Z (com.github.dariobalinzo.task.ElasticSourceTask:168)
[2021-02-28 21:36:03,827] ERROR error (com.github.dariobalinzo.task.ElasticSourceTask:180)
org.apache.kafka.connect.errors.DataException: Invalid Java object for schema type INT32: class java.lang.Long for field: "bytes-transmitted"
        at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:245)
        at org.apache.kafka.connect.data.Struct.put(Struct.java:216)
        at org.apache.kafka.connect.data.Struct.put(Struct.java:203)
        at com.github.dariobalinzo.schema.StructConverter.convertDocumentStruct(StructConverter.java:47)
        at com.github.dariobalinzo.schema.StructConverter.convertListOfObject(StructConverter.java:114)
        at com.github.dariobalinzo.schema.StructConverter.lambda$convertListToAvroArray$0(StructConverter.java:81)
        at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
        at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
        at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
        at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
        at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
        at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
        at com.github.dariobalinzo.schema.StructConverter.convertListToAvroArray(StructConverter.java:82)
        at com.github.dariobalinzo.schema.StructConverter.convertDocumentStruct(StructConverter.java:49)
        at com.github.dariobalinzo.schema.StructConverter.covertMapToAvroStruct(StructConverter.java:96)
        at com.github.dariobalinzo.schema.StructConverter.convertDocumentStruct(StructConverter.java:51)
        at com.github.dariobalinzo.schema.StructConverter.convert(StructConverter.java:37)
        at com.github.dariobalinzo.task.ElasticSourceTask.parseResult(ElasticSourceTask.java:214)
        at com.github.dariobalinzo.task.ElasticSourceTask.poll(ElasticSourceTask.java:170)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:289)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:256)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)

The bytes-transmitted value is always growing. And eventually, the connector chocks (2313987073 below) as java int32 max value is 2,147,483,647.

    "net-stats" : [ {
      "interface" : "docker_gwbridge",
      "bytes-transmitted" : 0,
      "bytes-received" : 0
    }, {
      "interface" : "eth0",
      "bytes-transmitted" : 2313987073,
      "bytes-received" : 139486370
    }, {
      "interface" : "veth3a85f64",
      "bytes-transmitted" : 37243658,
      "bytes-received" : 20884091
    }, {
      "interface" : "veth23d5664",
      "bytes-transmitted" : 73770,
      "bytes-received" : 7149
    }, {
      "interface" : "veth480948a",
      "bytes-transmitted" : 1702,
      "bytes-received" : 0
    }, {
      "interface" : "veth4bdbfca",
      "bytes-transmitted" : 47246,
      "bytes-received" : 68340
    }, {
      "interface" : "vethdd78a5f",
      "bytes-transmitted" : 1772,
      "bytes-received" : 0
    }, {
      "interface" : "veth9ba3f40",
      "bytes-transmitted" : 350,
      "bytes-received" : 0
    }, {
      "interface" : "docker0",
      "bytes-transmitted" : 0,
      "bytes-received" : 0
    }, {
      "interface" : "vethc58514b",
      "bytes-transmitted" : 2128,
      "bytes-received" : 0
    }, {
      "interface" : "br-4788132ed466",
      "bytes-transmitted" : 35697531,
      "bytes-received" : 15309643
    }, {
      "interface" : "veth9ab8cd6",
      "bytes-transmitted" : 9663155,
      "bytes-received" : 2041434
    }, {
      "interface" : "veth5fb3fb4",
      "bytes-transmitted" : 6890675,
      "bytes-received" : 11141372
    }, {
      "interface" : "lo",
      "bytes-transmitted" : 18483607,
      "bytes-received" : 18483607
    } ]

I think the conversion should treat all int values as longs. Or alternatively, provide a way to define this through a schema?

At the moment I'm not using net-stats at all. So, as the workaround, I will now try to filters.json_cast: net-stats (hoping it will not parse it at all).

At the same time, you have filters.whitelist parameter. Is it possible to have filters.blacklist?

Thank you.

Security

Any solutions/suggestions in how we can avoid the plaintext password in the json files?

Batch size question

Hi @DarioBalinzo,

I am wondering if pagination of results is supported when the number of results > batch size. I seem to be seeing the following:

  • batch size of 10 000 (default)
  • 10 001 records with the same timestamp
  • 10 000 records are read into the topic

org.apache.kafka.connect.errors.DataException: Invalid Java object for schema with type FLOAT64: class java.lang.Long for field:

please, I had a problem with this field. In my ES i have a scheme with long field, but i have too others fields with same type, but this problem happen just that field.

Exemple mapping ES:

},
"redirectEnd" : {
"type" : "long"
},
"redirectStart" : {
"type" : "long"
},
"requestStart" : {
"type" : "float"
},
"resourceType" : {
"type" : "keyword"
},
"responseEnd" : {
"type" : "float"
},
"responseStart" : {
"type" : "float"
},
"secureConnectionStart" : {
"type" : "float"
},
"startTime" : {
"type" : "float"
},
"transferSize" : {
"type" : "long"
},
"workerStart" : {
"type" : "long"
}

Exemple values ES:

{
"connectEnd" : 4898.4000000059605,
"connectStart" : 4898.4000000059605,
"encodedBodySize" : 21504,
"decodedBodySize" : 140288,
"transferSize" : 8192,
"domainLookupEnd" : 4898.4000000059605,
"domainLookupStart" : 4898.4000000059605,
"duration" : 3224,
"entryType" : "resource",
"fetchStart" : 4898.4000000059605,
"initiatorType" : "script",
"resourceType" : "js",

tks.

Elasticsearch api 7.x

I'm updating the elasticsearch and the kafka connect api to the latest version.

I'm planning to refactor the code and tests to improve code quality and reduce bugs.

Configure elastic search queries

Dear @DarioBalinzo,

I was having a look at your repo and find it really useful, thank you!
Playing around with it, I could not find any way to configure specific search query to limit the retrieved documents. Am I overlooking something, or is this not possible at the moment?

Entity content too long: com.github.dariobalinzo.task.ElasticSourceTask:202

How to set a configuration....

to avoid this error, noted the fact the has defaulted this to 100mb (https://github.com/elastic/elasticsearch/blob/master/client/rest/src/main/java/org/elasticsearch/client/HttpAsyncResponseConsumerFactory.java)

(com.github.dariobalinzo.task.ElasticSourceTask:202)
org.apache.http.ContentTooLongException: entity content is too long [167942814] for the configured buffer limit [104857600]
at org.elasticsearch.client.HeapBufferedAsyncResponseConsumer.onEntityEnclosed(HeapBufferedAsyncResponseConsumer.java:76)
at org.apache.http.nio.protocol.AbstractAsyncResponseConsumer.responseReceived(AbstractAsyncResponseConsumer.java:131)

tried lowering configuration:
"batch.max.rows": 1000,

but still hitting the error?

help! idea!  thanks    

missing build flavor oss

We are getting an error missing build flavor oss when trying to connect to ES 7.2 build_falvor oss, but no issue in ES 7.2 build_falvor default.

[2022-03-24 16:49:06,640] ERROR error (com.github.dariobalinzo.task.ElasticSourceTask:217)
ElasticsearchException[Invalid or missing build flavor [oss]]
at org.elasticsearch.client.RestHighLevelClient.performClientRequest(RestHighLevelClient.java:2701)
at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:2171)
at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:2137)
at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:2105)
at org.elasticsearch.client.RestHighLevelClient.search(RestHighLevelClient.java:1367)
at com.github.dariobalinzo.elastic.ElasticRepository.executeSearch(ElasticRepository.java:176)
at com.github.dariobalinzo.elastic.ElasticRepository.searchAfter(ElasticRepository.java:90)
at com.github.dariobalinzo.task.ElasticSourceTask.poll(ElasticSourceTask.java:205)
at org.apache.kafka.connect.runtime.Wor

Schema Exception- When a complex property (type as 'nested') is included in a ES document

ES document mapping:
PUT /test-websitesources
{
"mappings": {
"source": {
"properties": {
"sourceId": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"weblinks": {
"type":"nested",
"properties" : {
"id" : {
"type" : "text"
},
"name" : {
"type" : "text"
}
}
},
"updated_date": {
"type": "date"
},
"createdDate": {
"type": "date"
}
}
}
}
}

Valid Scenario:
Kafka connectors are successfully able to pull this data and I can view this under topics
"source": {
"sourceId": "s1",
"weblinks": {
"id": "street",
"name": "www.street.com"
},
"updated_date": "2021-04-18T17:54:49.643073Z",
"createdDate": "2020-02-18T20:32:41.1775655Z"
}

Failing Scenario (passing the weblinks field as array):
"source": {
"sourceId": "s1",
"weblinks": [{
"id": "afr",
"name": "www.afr.com"
}
],
"updated_date": "2021-04-17T17:54:49.643073Z",
"createdDate": "2020-02-18T20:32:41.1775655Z"
}
It fails. I am getting the exception like below

Exception:
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)

Caused by: org.apache.avro.SchemaParseException: Empty name
at org.apache.avro.Schema.validateName(Schema.java:1523)

java.lang.NullPointerException at com.github.dariobalinzo.task.ElasticSourceTask.fetchLastOffset(ElasticSourceTask.java:215)

Hi!
I'm having this error when using your connector:

java.lang.NullPointerException at com.github.dariobalinzo.task.ElasticSourceTask.fetchLastOffset(ElasticSourceTask.java:215)

This is the code of the Connector creation request:
{
"name": "el-source",
"config": {
"connector.class": "com.github.dariobalinzo.ElasticSourceConnector",
"tasks.max": "1",
"es.host": "host.docker.internal",
"es.port": "9201",
"index.prefix": "persons",
"topic.prefix": "persons",
"incrementing.field.name": "_id"
}
}

I have tried different values for the field "incrementing.field.name" with no luck.
Any idea?
Thanks!

Source is fine, but Sink is very slow

Hello,

I have successfully used this plugin to get records from Elasticsearch, and I trying to sink the topics inside s3, and, it's very very slowly, I have debezium CDC and sink from these topics is extremely fast.

There someone had this experiment it too? Any ideas?

Thank you

partition elasticsearch records by values

Hi there,

I'm currently using this repo (thanks very much by the way) and I'm hoping to take my JSON documents and use a certain key to either partition them or send them to different topics. One approach is to write a custom JSON deserialiser that reads from the elasticsearch source topic, but I was wondering if you could select a key with a certain value with ES source and only pull those documents in that had that key value pair.

Thanks in advance

source connector elasticsearch failed

Hey, I have some error in log when i try to use elasticsearch source connector

[2022-08-19 11:53:46,179] ERROR [elastic-source|worker] error in searching index names (com.github.dariobalinzo.elastic.ElasticRepository:193)
[2022-08-19 11:53:46,180] ERROR [elastic-source|worker] Error while trying to get updated topics list, ignoring and waiting for next table poll interval (com.github.dariobalinzo.elastic.ElasticIndexMonitorThread:92)

this link https://localhost:9200/_cat/indices?v return

health status index                        uuid                   pri rep docs.count docs.deleted store.size pri.store.size
green  open   kibana_sample_data_ecommerce tRcxyLrsSKuWcmOx3ZNKew   1   0       4675            0        4mb            4mb
yellow open   index-elastic                CvedHZsxTB6aGeP9cUVedw   1   1          0            0       225b           225b
yellow open   test                         R9t_ToRpQGyi_tTw6MdkSA   1   1         15            0     15.5kb         15.5kb

And this is my properties connector:

{
  "name": "elastic-source",
  "config": {
    "name": "elastic-source",
    "connector.class": "com.github.dariobalinzo.ElasticSourceConnector",
    "tasks.max": "1",
    "errors.log.enable": "true",
    "errors.log.include.messages": "true",
    "es.host": "127.0.0.1",
    "es.port": "9200",
    "es.user": "elastic",
    "es.password": "password",
    "index.prefix": "test",
    "topic.prefix": "test"
  }
}

As we see, text index is already exist, so what's wrong with my approach?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.