Giter VIP home page Giter VIP logo

kafka-elasticsearch-standalone-consumer's Introduction

Welcome to the kafka-elasticsearch-standalone-consumer wiki!

Architecture of the kafka-elasticsearch-standalone-consumer [indexer]

This project has moved to below repository

Introduction

Kafka Standalone Consumer [Indexer] will read messages from Kafka, in batches, process and bulk-index them into ElasticSearch.

As described in the illustration above, here is how the indexer works:

  • Kafka has a topic named, say Topic1

  • Lets say, Topic1 has 5 partitions.

  • In the configuration file, kafka-es-indexer.properties, set firstPartition=0 and lastPartition=4 properties

  • start the indexer application as described below

  • there will be 5 threads started, one for each consumer from each of the partitions

  • when a new partition is added to the kafka topic - configuration has to be updated and the indexer application has to be restarted

How to use ?

Running as a standard Jar

**1. Download the code into a $INDEXER_HOME dir.

**2. cp $INDEXER_HOME/src/main/resources/kafka-es-indexer.properties.template /your/absolute/path/kafka-es-indexer.properties file - update all relevant properties as explained in the comments

**3. cp $INDEXER_HOME/src/main/resources/logback.xml.template /your/absolute/path/logback.xml

specify directory you want to store logs in:

adjust values of max sizes and number of log files as needed

**4. build/create the app jar (make sure you have MAven installed):

	cd $INDEXER_HOME
 	mvn clean package

The kafka-es-indexer-2.0.jar will be created in the $INDEXER_HOME/bin. All dependencies will be placed into $INDEXER_HOME/bin/lib. All JAR dependencies are linked via kafka-es-indexer-2.0.jar manifest.

**5. edit your $INDEXER_HOME/run_indexer.sh script: -- make it executable if needed (chmod a+x $INDEXER_HOME/run_indexer.sh) -- update properties marked with "CHANGE FOR YOUR ENV" comments - according to your environment

**6. run the app [use JDK1.8] :

	./run_indexer.sh

Versions

Kafka Version: 0.8.2.1

ElasticSearch: > 1.5.1

Scala Version for Kafka Build: 2.10.0

Configuration

Indexer app configuration is specified in the kafka_es_indexer.properties file, which should be created from a provided template, kafka-es-indexer.properties.template. All properties are described in the template:

kafka-es-indexer.properties.template

Logging properties are specified in the logback.xml file, which should be created from a provided template, logback.xml.template:

logback.xml.template

Message Handler Class

  • org.elasticsearch.kafka.consumer.MessageHandler is an Abstract class that has most of the functionality of reading data from Kafka and batch-indexing into ElasticSearch already implemented. It has one abstract method, transformMessage(), that can be overwritten in the concrete sub-classes to customize message transformation before posting into ES

  • org.elasticsearch.kafka.consumer.messageHandlers.RawMessageStringHandler is a simple concrete sub-class of the MessageHAndler that sends messages into ES with no additional transformation, as is, in the 'UTF-8' format

  • Usually, its effective to Index the message in JSON format in ElasticSearch. This can be done using a Mapper Class and transforming the message from Kafka by overriding/implementing the transformMessage() method. An example can be found here: org.elasticsearch.kafka.consumer.messageHandlers.AccessLogMessageHandler

  • Do remember to set the newly created message handler class in the messageHandlerClass property in the kafka-es-indexer.properties file.

IndexHandler Interface and basic implementation

  • org.elasticsearch.kafka.consumer.IndexHandler is an interface that defines two methods: getIndexName(params) and getIndexType(params).

  • org.elasticsearch.kafka.consumer.BasicIndexHandler is a simple imlementation of this interface that returnes indexName and indexType values as configured in the kafkaESConsumer.properties file.

  • one might want to create a custom implementation of IndexHandler if, for example, index name and type are not static for all incoming messages but depend on the event data - for example customerId, orderId, etc. In that case, pass all info that is required to perform that custom index determination logic as a Map of parameters into the getIndexName(params) and getIndexType(params) methods (or pass NULL if no such data is required)

  • Do remember to set the index handler class in the indexHandlerClass property in the kafka-es-indexer.properties file. By default, BasicIndexHandler is used

License

kafka-elasticsearch-standalone-consumer

Licensed under the Apache License, Version 2.0 (the "License"); you may
not use this file except in compliance with the License. You may obtain
a copy of the License at

     http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied.  See the License for the
specific language governing permissions and limitations
under the License.

Contributors

kafka-elasticsearch-standalone-consumer's People

Contributors

apysh avatar dhyaneshm avatar mpopova-yottaa avatar ppine7 avatar reachkrishnaraj avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kafka-elasticsearch-standalone-consumer's Issues

Failed to post messages to ElasticSearch: None of the configured nodes are available: []

Hello guys, thanks for the tool you guys created, it's really awesome.
I just downloaded it yesterday and started to play with it....
I got some issue when I run this tool to read event(json) from broker and write them to ES.
I set up all the things in the same box(an ec2 instance) including kafka(2.10.0-0.8.2.1), kafka-elasticsearch-standalone-consumer(branch 2.0) and Elasticsearch(2.1.1)

Create a "test" topic, only 1 partition
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

Send message:

[ec2-user@xxxx kafka_2.10-0.8.2.1]$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
[2016-01-12 23:46:44,492] WARN Property topic is not valid (kafka.utils.VerifiableProperties)
{"index":{"_id":"6"}}
{"account_number":6,"balance":5686,"firstname":"Hattie","lastname":"Bond","age":36,"gender":"M","address":"671 Bristol Street","employer":"Netagy","email":"
[email protected]","city":"Dante","state":"TN"}

And then run kafka-elasticsearch-standalone-consumer(config is provided below)
HERE IS THE LOG:
23:56:43,607 |-INFO in ch.qos.logback.classic.LoggerContext[default] - Found resource [/home/ec2-user/apps/kafka/logback.xml] at [file:/home/ec2-user/apps/kafka/logback.xml]
23:56:43,708 |-INFO in ch.qos.logback.classic.joran.action.ConfigurationAction - debug attribute not set
23:56:43,729 |-INFO in ch.qos.logback.core.joran.action.AppenderAction - About to instantiate appender of type [ch.qos.logback.core.ConsoleAppender]
23:56:43,734 |-INFO in ch.qos.logback.core.joran.action.AppenderAction - Naming appender as [STDOUT]
23:56:43,863 |-WARN in ch.qos.logback.core.ConsoleAppender[STDOUT] - This appender no longer admits a layout as a sub-component, set an encoder instead.
23:56:43,863 |-WARN in ch.qos.logback.core.ConsoleAppender[STDOUT] - To ensure compatibility, wrapping your layout in LayoutWrappingEncoder.
23:56:43,863 |-WARN in ch.qos.logback.core.ConsoleAppender[STDOUT] - See also http://logback.qos.ch/codes.html#layoutInsteadOfEncoder for details
23:56:43,863 |-INFO in ch.qos.logback.core.joran.action.AppenderAction - About to instantiate appender of type [ch.qos.logback.core.rolling.RollingFileAppender]
23:56:43,867 |-INFO in ch.qos.logback.core.joran.action.AppenderAction - Naming appender as [MAIN_LOGS]
23:56:43,914 |-INFO in c.q.l.core.rolling.TimeBasedRollingPolicy - Will use gz compression
23:56:43,915 |-INFO in c.q.l.core.rolling.TimeBasedRollingPolicy - Will use the pattern /tmp/kafka_es_indexer-%d{yyyy-MM-dd}.%i.log for the active file
23:56:43,918 |-INFO in ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP@63e2203c - The date pattern is 'yyyy-MM-dd' from file name pattern '/tmp/kafka_es_indexer-%d{yyyy-MM-dd}.%i.log.gz'.
23:56:43,919 |-INFO in ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP@63e2203c - Roll-over at midnight.
23:56:43,923 |-INFO in ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP@63e2203c - Setting initial period to Tue Jan 12 23:56:25 UTC 2016
23:56:43,930 |-INFO in ch.qos.logback.core.joran.action.NestedComplexPropertyIA - Assuming default type [ch.qos.logback.classic.encoder.PatternLayoutEncoder] for [encoder] property
23:56:43,945 |-INFO in ch.qos.logback.core.rolling.RollingFileAppender[MAIN_LOGS] - Active log file name: /tmp/kafka_es_indexer.log
23:56:43,945 |-INFO in ch.qos.logback.core.rolling.RollingFileAppender[MAIN_LOGS] - File property is set to [/tmp/kafka_es_indexer.log]
23:56:43,946 |-INFO in ch.qos.logback.core.joran.action.AppenderAction - About to instantiate appender of type [ch.qos.logback.core.rolling.RollingFileAppender]
23:56:43,946 |-INFO in ch.qos.logback.core.joran.action.AppenderAction - Naming appender as [FAILED_EVENTS]
23:56:43,949 |-INFO in c.q.l.core.rolling.TimeBasedRollingPolicy - No compression will be used
23:56:43,949 |-INFO in c.q.l.core.rolling.TimeBasedRollingPolicy - Will use the pattern /tmp/failed_es_events-%d{yyyy-MM-dd}.%i.log for the active file
23:56:43,951 |-INFO in ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP@1efed156 - The date pattern is 'yyyy-MM-dd' from file name pattern '/tmp/failed_es_events-%d{yyyy-MM-dd}.%i.log'.
23:56:43,951 |-INFO in ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP@1efed156 - Roll-over at midnight.
23:56:43,951 |-INFO in ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP@1efed156 - Setting initial period to Tue Jan 12 22:19:15 UTC 2016
23:56:43,956 |-INFO in ch.qos.logback.core.joran.action.NestedComplexPropertyIA - Assuming default type [ch.qos.logback.classic.encoder.PatternLayoutEncoder] for [encoder] property
23:56:43,957 |-INFO in ch.qos.logback.core.rolling.RollingFileAppender[FAILED_EVENTS] - Active log file name: /tmp/failed_es_events.log
23:56:43,957 |-INFO in ch.qos.logback.core.rolling.RollingFileAppender[FAILED_EVENTS] - File property is set to [/tmp/failed_es_events.log]
23:56:43,958 |-INFO in ch.qos.logback.classic.joran.action.LoggerAction - Setting level of logger [org.elasticsearch.kafka.indexer.FailedEventsLogger] to ERROR
23:56:43,958 |-INFO in ch.qos.logback.core.joran.action.AppenderRefAction - Attaching appender named [FAILED_EVENTS] to Logger[org.elasticsearch.kafka.indexer.FailedEventsLogger]
23:56:43,959 |-INFO in ch.qos.logback.classic.joran.action.LoggerAction - Setting level of logger [org.elasticsearch.kafka] to INFO
23:56:43,959 |-INFO in ch.qos.logback.core.joran.action.AppenderRefAction - Attaching appender named [MAIN_LOGS] to Logger[org.elasticsearch.kafka]
23:56:44,334 |-INFO in ch.qos.logback.core.joran.action.AppenderRefAction - Attaching appender named [STDOUT] to Logger[org.elasticsearch.kafka]
23:56:44,334 |-INFO in ch.qos.logback.classic.joran.action.ConfigurationAction - End of configuration.
23:56:44,336 |-INFO in ch.qos.logback.classic.joran.JoranConfigurator@6737fd8f - Registering current configuration as safe fallback point

12/Jan/2016:23:56:44:343 +0000 [main] INFO o.e.kafka.indexer.KafkaIndexerDriver - Initializing Kafka ES Indexer, arguments passed to the Driver:
12/Jan/2016:23:56:44:346 +0000 [main] INFO o.e.kafka.indexer.KafkaIndexerDriver - /home/ec2-user/apps/kafka/kafka-es-indexer.properties
12/Jan/2016:23:56:44:347 +0000 [main] INFO o.e.kafka.indexer.ConsumerConfig - configFile : /home/ec2-user/apps/kafka/kafka-es-indexer.properties
12/Jan/2016:23:56:44:348 +0000 [main] INFO o.e.kafka.indexer.ConsumerConfig - Properties : {esIndexingRetrySleepTimeMs=10000, kafkaZookeeperList=localhost:2181, appStopTimeoutSeconds=10, esIndex=kafkaESIndex, startOffsetFrom=RESTART, esClusterName=sca, kafkaFetchSizeMinBytes=31457280, zkConnectionTimeoutMs=15000, messageEncoding=UTF-8, indexHandlerClass=org.elasticsearch.kafka.indexer.BasicIndexHandler, kafkaBrokersList=localhost:9092, kafkaSimpleConsumerBufferSizeBytes=31457280, zkCuratorRetryDelayMs=2000, consumerSleepBetweenFetchsMs=10, kafkaReinitSleepTimeMs=10000, isPerfReportingEnabled=false, esHostPortList=localhost:9300, topic=test, messageHandlerClass=org.elasticsearch.kafka.indexer.messageHandlers.RawMessageStringHandler, lastPartition=0, kafkaSimpleConsumerSocketTimeoutMs=10000, numberOfReinitTries=2, consumerGroupName=kafka_es_indexer, startOffset=0, firstPartition=0, numberOfEsIndexingRetryAttempts=2, zkCuratorRetryTimes=3, zkSessionTimeoutMs=10000, isDryRun=false, esIndexType=kafkaESType}
12/Jan/2016:23:56:44:349 +0000 [main] INFO o.e.kafka.indexer.ConsumerConfig - Config reading complete !
12/Jan/2016:23:56:44:349 +0000 [main] INFO o.e.kafka.indexer.KafkaIndexerDriver - Created kafka consumer config OK
12/Jan/2016:23:56:44:350 +0000 [main] INFO o.e.k.indexer.jobs.IndexerJobManager - ConsumerJobManager is starting, servicing partitions: [0-0]
12/Jan/2016:23:56:44:351 +0000 [main] INFO o.e.kafka.indexer.KafkaIndexerDriver - Registering KafkfaEsIndexerStatus MBean:
12/Jan/2016:23:56:44:445 +0000 [main] INFO o.e.k.indexer.jobs.IndexerJobManager - Creating IndexerJob for partition=0
12/Jan/2016:23:56:44:450 +0000 [main] INFO o.e.kafka.indexer.jobs.IndexerJob - Initializing ElasticSearch... hostPortList=localhost:9300, esClusterName=sca for partition 0
log4j:WARN No appenders could be found for logger (org.elasticsearch.plugins).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
12/Jan/2016:23:56:45:294 +0000 [main] INFO o.e.kafka.indexer.jobs.IndexerJob - adding [localhost:9300] to TransportClient for partition 0...
12/Jan/2016:23:56:45:474 +0000 [main] INFO o.e.kafka.indexer.jobs.IndexerJob - ElasticSearch Client created and intialized OK for partition 0
12/Jan/2016:23:56:45:474 +0000 [main] INFO o.e.kafka.indexer.jobs.IndexerJob - Initializing Kafka for partition 0...
12/Jan/2016:23:56:45:474 +0000 [main] INFO o.e.kafka.indexer.jobs.IndexerJob - kafkaClientId=kafka_es_indexer_0 for partition 0
12/Jan/2016:23:56:45:476 +0000 [main] INFO o.e.kafka.indexer.KafkaClient - Instantiating KafkaClient
12/Jan/2016:23:56:45:476 +0000 [main] INFO o.e.kafka.indexer.KafkaClient - ### KafkaClient Config: ###
12/Jan/2016:23:56:45:476 +0000 [main] INFO o.e.kafka.indexer.KafkaClient - kafkaZookeeperList: localhost:2181
12/Jan/2016:23:56:45:476 +0000 [main] INFO o.e.kafka.indexer.KafkaClient - kafkaBrokersList: localhost:9092
12/Jan/2016:23:56:45:477 +0000 [main] INFO o.e.kafka.indexer.KafkaClient - kafkaClientId: kafka_es_indexer_0
12/Jan/2016:23:56:45:477 +0000 [main] INFO o.e.kafka.indexer.KafkaClient - topic: test
12/Jan/2016:23:56:45:477 +0000 [main] INFO o.e.kafka.indexer.KafkaClient - partition: 0
12/Jan/2016:23:56:45:515 +0000 [main] INFO o.e.kafka.indexer.KafkaClient - Connected to Kafka Zookeeper successfully
12/Jan/2016:23:56:45:515 +0000 [main] INFO o.e.kafka.indexer.KafkaClient - Looking for Kafka leader broker for partition 0...
12/Jan/2016:23:56:45:515 +0000 [main] INFO o.e.kafka.indexer.KafkaClient - Looking for leader for partition 0 using Kafka Broker=localhost:9092, topic=test
12/Jan/2016:23:56:45:985 +0000 [main] INFO o.e.kafka.indexer.KafkaClient - Found leader for partition 0 using Kafka Broker=localhost:9092, topic=test; leader broker URL: 0.0.0.0:9092
12/Jan/2016:23:56:45:987 +0000 [main] INFO o.e.kafka.indexer.KafkaClient - Found leader: leaderBrokerURL=0.0.0.0:9092
12/Jan/2016:23:56:45:990 +0000 [main] INFO o.e.kafka.indexer.KafkaClient - Initialized Kafka Consumer successfully for partition 0
12/Jan/2016:23:56:45:990 +0000 [main] INFO o.e.kafka.indexer.jobs.IndexerJob - Kafka client created and intialized OK for partition 0
12/Jan/2016:23:56:45:990 +0000 [main] INFO o.e.kafka.indexer.jobs.IndexerJob - MessageHandler Class given in config is org.elasticsearch.kafka.indexer.messageHandlers.RawMessageStringHandler for partition 0
12/Jan/2016:23:56:45:993 +0000 [main] INFO o.e.kafka.indexer.MessageHandler - Created IndexHandler:
12/Jan/2016:23:56:45:994 +0000 [main] INFO o.e.kafka.indexer.MessageHandler - Created Message Handler
12/Jan/2016:23:56:45:994 +0000 [main] INFO o.e.k.i.m.RawMessageStringHandler - Initialized RawMessageStringHandler
12/Jan/2016:23:56:45:995 +0000 [kafka-indexer-consumer-thread-0] INFO o.e.kafka.indexer.jobs.IndexerJob - ******* Starting a new batch of events from Kafka for partition 0 ...
12/Jan/2016:23:56:45:995 +0000 [kafka-indexer-consumer-thread-0] INFO o.e.kafka.indexer.jobs.IndexerJob - **** Computing Kafka offset *** for partition 0
12/Jan/2016:23:56:45:996 +0000 [kafka-indexer-consumer-thread-0] INFO o.e.kafka.indexer.jobs.IndexerJob - startOffsetFrom=RESTART for partition 0
12/Jan/2016:23:56:45:996 +0000 [kafka-indexer-consumer-thread-0] INFO o.e.kafka.indexer.jobs.IndexerJob - Restarting from where the Offset is left for topic test, for partition 0
12/Jan/2016:23:56:46:012 +0000 [kafka-indexer-consumer-thread-0] INFO o.e.kafka.indexer.jobs.IndexerJob - offsetForThisRound is set to the CurrentOffset: 0 for partition 0
12/Jan/2016:23:56:46:038 +0000 [kafka-indexer-consumer-thread-0] INFO o.e.kafka.indexer.jobs.IndexerJob - Resulting offsetForThisRound = 0 for partition 0
12/Jan/2016:23:56:46:071 +0000 [kafka-indexer-consumer-thread-0] INFO o.e.kafka.indexer.MessageHandler - Total # of messages in this batch: 6; # of successfully transformed and added to Index messages: 6; offsetOfNextBatch: 6
12/Jan/2016:23:56:46:071 +0000 [kafka-indexer-consumer-thread-0] INFO o.e.kafka.indexer.jobs.IndexerJob - posting the messages to ElasticSearch for partition 0...
12/Jan/2016:23:56:46:077 +0000 [kafka-indexer-consumer-thread-0] ERROR o.e.kafka.indexer.MessageHandler - Failed to post messages to ElasticSearch: None of the configured nodes are available: []
org.elasticsearch.client.transport.NoNodeAvailableException: None of the configured nodes are available: []

MY CONFIG:
kafkaZookeeperList=localhost:2181
zkSessionTimeoutMs=10000
zkConnectionTimeoutMs=15000
zkCuratorRetryTimes=3
zkCuratorRetryDelayMs=2000
kafkaBrokersList=localhost:9092
consumerGroupName=kafka_es_indexer
topic=test
firstPartition=0
lastPartition=0
startOffsetFrom=RESTART
startOffset=0
kafkaFetchSizeMinBytes=31457280
kafkaSimpleConsumerBufferSizeBytes=31457280
kafkaSimpleConsumerSocketTimeoutMs=10000
appStopTimeoutSeconds=10
numberOfReinitTries=2
kafkaReinitSleepTimeMs=10000
esHostPortList=localhost:9300
esClusterName=sca
esIndex=kafkaESIndex
esIndexType=kafkaESType
messageHandlerClass=org.elasticsearch.kafka.indexer.messageHandlers.RawMessageStringHandler
indexHandlerClass=org.elasticsearch.kafka.indexer.BasicIndexHandler
messageEncoding=UTF-8
isDryRun=false
consumerSleepBetweenFetchsMs=10
numberOfEsIndexingRetryAttempts=2
esIndexingRetrySleepTimeMs=10000
isPerfReportingEnabled=false

TROUBLESHOOTING:

I acutually verified that I can run ElasticSearch API manually to do the bulk post with different index and it just works perfectly:

curl -XPOST 'localhost:9200/customer/external/_bulk?pretty' -d '
{"index":{"_id":"1"}}
{"name": "John Doe" }
{"index":{"_id":"2"}}
{"name": "Jane Doe" }
'

Here is the code I can find where throw the exception:

public boolean postToElasticSearch() throws Exception {
BulkResponse bulkResponse = null;
BulkItemResponse bulkItemResp = null;
//Nothing/NoMessages to post to ElasticSearch
if(bulkRequestBuilder.numberOfActions() <= 0){
logger.warn("No messages to post to ElasticSearch - returning");
return true;
}
try{
bulkResponse = bulkRequestBuilder.execute().actionGet();
}
catch(ElasticsearchException e){
logger.error("Failed to post messages to ElasticSearch: " + e.getMessage(), e);
throw e;
}
....

the elasticsearch port is 9300 which is used in the config...
I tried 9200 as well, and it didn't work either....

Please help look into the issue, I don't think it would be a connection issue to elasticsearch since they are in the same box.

Thanks in advance!

Probably a new feature?

Hi Guys,
I have a quick question here, sorry I haven't read through the code...
I am thinking if this tool can be easily extended to support other system than Elasticsearch?
It could be something like the plugin model...for example, relay the events to other message queue such as rabbitmq or kafka queue(I know Kafka copycat can do that) so the end user of ElasticSearch can do customization on the events such as collecting only fields of the received event?

ERROR o.e.kafka.indexer.MessageHandler - Failed Message #1, REST response:TOO_MANY_REQUESTS

Hi team,

Today I found some error message in the project log (we had 500 partition defined in the kafka, so there are 500 threads), I tried to find the error in the code, unfortunately got nothing, would you please give some advice on this? thanks

2016:10:21:05:055 +0800 [kafka-indexer-consumer-thread-165] ERROR o.e.kafka.indexer.MessageHandler - Failed Message #1, REST response:TOO_MANY_REQUESTS; errorMessage:EsRejectedExecutionException[rejected execution (queue capacity 50) on org.elasticsearch.action.support.replication.TransportShardReplicationOperationAction$AsyncShardOperationAction$1@15b1e524]
12/四月/2016:10:21:04:013 +0800 [kafka-indexer-consumer-thread-197] INFO o.e.kafka.indexer.MessageHandler - Total # of messages in this batch: 2; # of successfully transformed and added to Index messages: 2; offsetOfNextBatch: 238952

error starting daemon (java.lang.NullPointerException)

hi @reachkrishnaraj ! you project its awesome but i have a problem, the version i use are
kafka-0.8.2.1-src & elasticsearch 1.5.0

when i tried to start the server i had this message :
java.lang.NullPointerException
at java.util.Properties$LineReader.readLine(Properties.java:434)
at java.util.Properties.load0(Properties.java:353)
at java.util.Properties.load(Properties.java:341)
at org.elasticsearch.kafka.consumer.ConsumerLogger.doInitLogger(ConsumerLogger.java:17)
at org.elasticsearch.kafka.consumer.daemon.KafkaConsumerDaemon.init(KafkaConsumerDaemon.java:32)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.commons.daemon.support.DaemonLoader.load(DaemonLoader.java:207)
Cannot load daemon
Service exit with a return value of 3

thanks for your help!

my consumerNew.sh file is :

Kafka ZooKeeper's IP Address/HostName without port

default value:localhost, if not specified

zookeeper=localhost

Kafka Broker's IP Address/HostName

default value:localhost, if not specified

brokerHost=localhost

Kafka Broker's Port number

default value:9092, if not specified

brokerPort=9092

Name of the Kafka Consumer Group

default value:ESKafkaConsumerClient, if not specified

consumerGroupName=testgroup

Kafka Topic from which the message has to be processed

mandatory property, no default value specified.

topic=test

Partition in the Kafka's Topic(defined by 'topic' property) from which the message has to be processed.

One instance of this consumer read and process from only 1 partition(of the topic)

default value:0, if not specified

partition=0

Offset option from where the message fetching should happen in kafka

Values can be: CUSTOM / OLDEST / LATEST / RESTART.

CUSTOM: Message from the specified(defined by 'startOffset' property) offset in Kafka will be read. If 'CUSTOM' is set, then 'startOffset' property has to be set an integer value

OLDEST: Messages from oldest available timestamp in kafka will be read

LATEST: Messages from latest available timestamp in kafka will be read

RESTART: Message reading will happen from the Offset where the last cycle of reading by this client has stopped

Default value:"OLDEST", if not specified

startOffsetFrom=OLDEST

integer value of the offset from where the message processing should happen. Use this property in conjunction with 'startOffsetFrom=CUSTOM'

mandatory property when 'startOffsetFrom' is set to 'CUSTOM', no default value specified.

startOffset=0

Full qualified class name for the concrete message handler class factory

Default value: "org.elasticsearch.kafka.consumer.RawMessageStringHandler", if not specified

Custom class should be extended from org.elasticsearch.kafka.consumer.MessageHandler class

messageHandlerClass=org.elasticsearch.kafka.consumer.messageHandlers.RawMessageStringHandler

esHost=localhost

esHostPortList=localhost:9200

esClusterName=Mandroid

esPort=9200

IndexName in ElasticSearch to which the processed Message has to be posted/indexed

Default value is "kafkaConsumerIndex", if not specified

esIndex=testP

IndexType in ElasticSearch to which the processed Message has to be posted/indexed

Default value is "kafka", if not specified

esIndexType=kafka

Percentage of message processing failure tolerance when posting/indexing to ElasticSearch

Default value is "5", if not specified

Not used at the moment, can safely ignore

esMsgFailureTolerancePercent=5

logPropertyFile=log4j.testgroup.test.0.properties

Not used now, reserved for collecting stats

statsdPrefix=

Not used now, reserved for collecting stats

statsdHost=

Not used now, reserved for collecting stats

statsdPort=

Preferred (integer)Size(bytes) of message to be fetched from Kafka in 1 Fetch call to kafka.

Default value is: "31457280(bytes), i.e:(10 * 1024 * 1024 * 3)", if not specified

Set it to ~4MB and slowly rampup based in your heap memory.

bulkSize=4000000

Timeout when fetching message from Kafka.

Default value is "10000" if not specified

bulkTimeout=

Preferred Message Encoding to process the message before posting it to ElasticSearch.

Default value is "UTF-8" if not specified

messageEncoding=

Not used now, can be ignored for now.

Potential usage: Wanted to handle the Messages which failed either when transforming the message or when indexing/posting the message to ElasticSearch.

isGuranteedEsPostMode=

Dry runs will not post to elasticsearch and also wont commit the offser to Kafka

isDryRun=true

Time in seconds for the consumer to sleep between each round of read and post

consumerSleepTime=30

new kafka-elasticsearch-standalone-consumer project has issues about es

Hi,

I am trying to use the new version kafka-elasticsearch-standalone-consumer which supprts es2.x, but after I build it and run that with gradle, it always failed with below error, also I tried es2.1,and es2.3, still same issue. Would you please help advise? thanks

2016-07-03 07:29:23:577 [kafka-es-indexer-thread-1] ERROR o.e.k.i.s.ElasticSearchBatchService - Error posting messages to ElasticSearch: NoNodeAvailableException - ES cluster is unreachable, will try to re-connect after sleeping ...
org.elasticsearch.client.transport.NoNodeAvailableException: None of the configured nodes are available:

Does this project support elasticsearch 2.x?

Hi team,

Is there a plan to support higer version elasticsearch? I am trying to use elasticsearch sql plugin to query the data in ES, but the lower version plugin can't support advanced query, any advice? thanks.

Class KafkaIndexerDriver compile error

The KafkaIndexerDriver driver should be declared with final in main method

Cannot refer to the non-final local variable driver defined in an enclosing scope

Duplicate Elasticsearch posts

Hi Krishna, thanks for your work!

I notice that when messages are processed through the consumer, they are being posted to Elasticsearch multiple times. It seems once the consumer is started, every message that comes through results in a re-posting of every message that came through since starting up the consumer, like a cache that's not being purged.

Could I have misconfigured something? Thanks!

Too many config files

I have 6 topics each with 3 partitions. that makes 18 config files and processes. It there a way to reduce this?

Updates for this project

Hi, Krishna,
I've picked up your kafka-elasticsearch-standalone-consumer project as the base for Kafka-to-ES functionality I need to implement. As part of this work, I would like to contribute the changes/updates/fixes to your original project. I have pushed my initial changes into my fork already: https://github.com/ppine7/kafka-elasticsearch-standalone-consumer,
and I envision many more changes to come, according to the TODO items I've added to the code as well.
If you are Ok with adding these changes into the main project - I can create a Pull request. If you would rather keep your project as is , I can just keep working on my Fork.

Thanks !
Marina

ClassNotFound

Trying to start with
./scripts/consumerNew.sh -p start -c /home/ec2-user/kafka-elasticsearch-standalone-consumer/config/search.valid-tincan.1.properties ; tail -f ~/search_valid_tincan2_0.log

I am getting
[2014-12-09 21:14:50,175][FATAL][org.elasticsearch.kafka.consumer.daemon.KafkaConsumerDaemon] java.lang.ClassNotFoundException: org.elasticsearch.kafka.consumer.RawMessageStringHandler

What jar should this class be in?

Is there any elasticsearch index setting included in kafka-es-indexer.properties?

Hi team,

I used your project for a while, recently I tried to search some content in elasticsearch head plugin, it doesn't return any result, I used wildcard query for a filed, is there any config about the elasticserch index? It would be better to specify if the ES index has index property or not. Then this project would be much practical for us, we don't need to update the mapping.

Any advice for this?

Postgresql Kafka Couchbase

Hi Krishna,

I am using an openerp server and Postgresql as DB. I want the table edit or update or delete or create actions to be logged, send this messages to kafka and kafka send this messages to Couchbase.

Can you guide me or help me on the same to achieve this.

Regards,
Vijay

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.