Hello guys, thanks for the tool you guys created, it's really awesome.
I just downloaded it yesterday and started to play with it....
I got some issue when I run this tool to read event(json) from broker and write them to ES.
I set up all the things in the same box(an ec2 instance) including kafka(2.10.0-0.8.2.1), kafka-elasticsearch-standalone-consumer(branch 2.0) and Elasticsearch(2.1.1)
Create a "test" topic, only 1 partition
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Send message:
[ec2-user@xxxx kafka_2.10-0.8.2.1]$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
[2016-01-12 23:46:44,492] WARN Property topic is not valid (kafka.utils.VerifiableProperties)
{"index":{"_id":"6"}}
{"account_number":6,"balance":5686,"firstname":"Hattie","lastname":"Bond","age":36,"gender":"M","address":"671 Bristol Street","employer":"Netagy","email":"
[email protected]","city":"Dante","state":"TN"}
And then run kafka-elasticsearch-standalone-consumer(config is provided below)
HERE IS THE LOG:
23:56:43,607 |-INFO in ch.qos.logback.classic.LoggerContext[default] - Found resource [/home/ec2-user/apps/kafka/logback.xml] at [file:/home/ec2-user/apps/kafka/logback.xml]
23:56:43,708 |-INFO in ch.qos.logback.classic.joran.action.ConfigurationAction - debug attribute not set
23:56:43,729 |-INFO in ch.qos.logback.core.joran.action.AppenderAction - About to instantiate appender of type [ch.qos.logback.core.ConsoleAppender]
23:56:43,734 |-INFO in ch.qos.logback.core.joran.action.AppenderAction - Naming appender as [STDOUT]
23:56:43,863 |-WARN in ch.qos.logback.core.ConsoleAppender[STDOUT] - This appender no longer admits a layout as a sub-component, set an encoder instead.
23:56:43,863 |-WARN in ch.qos.logback.core.ConsoleAppender[STDOUT] - To ensure compatibility, wrapping your layout in LayoutWrappingEncoder.
23:56:43,863 |-WARN in ch.qos.logback.core.ConsoleAppender[STDOUT] - See also http://logback.qos.ch/codes.html#layoutInsteadOfEncoder for details
23:56:43,863 |-INFO in ch.qos.logback.core.joran.action.AppenderAction - About to instantiate appender of type [ch.qos.logback.core.rolling.RollingFileAppender]
23:56:43,867 |-INFO in ch.qos.logback.core.joran.action.AppenderAction - Naming appender as [MAIN_LOGS]
23:56:43,914 |-INFO in c.q.l.core.rolling.TimeBasedRollingPolicy - Will use gz compression
23:56:43,915 |-INFO in c.q.l.core.rolling.TimeBasedRollingPolicy - Will use the pattern /tmp/kafka_es_indexer-%d{yyyy-MM-dd}.%i.log for the active file
23:56:43,918 |-INFO in ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP@63e2203c - The date pattern is 'yyyy-MM-dd' from file name pattern '/tmp/kafka_es_indexer-%d{yyyy-MM-dd}.%i.log.gz'.
23:56:43,919 |-INFO in ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP@63e2203c - Roll-over at midnight.
23:56:43,923 |-INFO in ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP@63e2203c - Setting initial period to Tue Jan 12 23:56:25 UTC 2016
23:56:43,930 |-INFO in ch.qos.logback.core.joran.action.NestedComplexPropertyIA - Assuming default type [ch.qos.logback.classic.encoder.PatternLayoutEncoder] for [encoder] property
23:56:43,945 |-INFO in ch.qos.logback.core.rolling.RollingFileAppender[MAIN_LOGS] - Active log file name: /tmp/kafka_es_indexer.log
23:56:43,945 |-INFO in ch.qos.logback.core.rolling.RollingFileAppender[MAIN_LOGS] - File property is set to [/tmp/kafka_es_indexer.log]
23:56:43,946 |-INFO in ch.qos.logback.core.joran.action.AppenderAction - About to instantiate appender of type [ch.qos.logback.core.rolling.RollingFileAppender]
23:56:43,946 |-INFO in ch.qos.logback.core.joran.action.AppenderAction - Naming appender as [FAILED_EVENTS]
23:56:43,949 |-INFO in c.q.l.core.rolling.TimeBasedRollingPolicy - No compression will be used
23:56:43,949 |-INFO in c.q.l.core.rolling.TimeBasedRollingPolicy - Will use the pattern /tmp/failed_es_events-%d{yyyy-MM-dd}.%i.log for the active file
23:56:43,951 |-INFO in ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP@1efed156 - The date pattern is 'yyyy-MM-dd' from file name pattern '/tmp/failed_es_events-%d{yyyy-MM-dd}.%i.log'.
23:56:43,951 |-INFO in ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP@1efed156 - Roll-over at midnight.
23:56:43,951 |-INFO in ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP@1efed156 - Setting initial period to Tue Jan 12 22:19:15 UTC 2016
23:56:43,956 |-INFO in ch.qos.logback.core.joran.action.NestedComplexPropertyIA - Assuming default type [ch.qos.logback.classic.encoder.PatternLayoutEncoder] for [encoder] property
23:56:43,957 |-INFO in ch.qos.logback.core.rolling.RollingFileAppender[FAILED_EVENTS] - Active log file name: /tmp/failed_es_events.log
23:56:43,957 |-INFO in ch.qos.logback.core.rolling.RollingFileAppender[FAILED_EVENTS] - File property is set to [/tmp/failed_es_events.log]
23:56:43,958 |-INFO in ch.qos.logback.classic.joran.action.LoggerAction - Setting level of logger [org.elasticsearch.kafka.indexer.FailedEventsLogger] to ERROR
23:56:43,958 |-INFO in ch.qos.logback.core.joran.action.AppenderRefAction - Attaching appender named [FAILED_EVENTS] to Logger[org.elasticsearch.kafka.indexer.FailedEventsLogger]
23:56:43,959 |-INFO in ch.qos.logback.classic.joran.action.LoggerAction - Setting level of logger [org.elasticsearch.kafka] to INFO
23:56:43,959 |-INFO in ch.qos.logback.core.joran.action.AppenderRefAction - Attaching appender named [MAIN_LOGS] to Logger[org.elasticsearch.kafka]
23:56:44,334 |-INFO in ch.qos.logback.core.joran.action.AppenderRefAction - Attaching appender named [STDOUT] to Logger[org.elasticsearch.kafka]
23:56:44,334 |-INFO in ch.qos.logback.classic.joran.action.ConfigurationAction - End of configuration.
23:56:44,336 |-INFO in ch.qos.logback.classic.joran.JoranConfigurator@6737fd8f - Registering current configuration as safe fallback point
12/Jan/2016:23:56:44:343 +0000 [main] INFO o.e.kafka.indexer.KafkaIndexerDriver - Initializing Kafka ES Indexer, arguments passed to the Driver:
12/Jan/2016:23:56:44:346 +0000 [main] INFO o.e.kafka.indexer.KafkaIndexerDriver - /home/ec2-user/apps/kafka/kafka-es-indexer.properties
12/Jan/2016:23:56:44:347 +0000 [main] INFO o.e.kafka.indexer.ConsumerConfig - configFile : /home/ec2-user/apps/kafka/kafka-es-indexer.properties
12/Jan/2016:23:56:44:348 +0000 [main] INFO o.e.kafka.indexer.ConsumerConfig - Properties : {esIndexingRetrySleepTimeMs=10000, kafkaZookeeperList=localhost:2181, appStopTimeoutSeconds=10, esIndex=kafkaESIndex, startOffsetFrom=RESTART, esClusterName=sca, kafkaFetchSizeMinBytes=31457280, zkConnectionTimeoutMs=15000, messageEncoding=UTF-8, indexHandlerClass=org.elasticsearch.kafka.indexer.BasicIndexHandler, kafkaBrokersList=localhost:9092, kafkaSimpleConsumerBufferSizeBytes=31457280, zkCuratorRetryDelayMs=2000, consumerSleepBetweenFetchsMs=10, kafkaReinitSleepTimeMs=10000, isPerfReportingEnabled=false, esHostPortList=localhost:9300, topic=test, messageHandlerClass=org.elasticsearch.kafka.indexer.messageHandlers.RawMessageStringHandler, lastPartition=0, kafkaSimpleConsumerSocketTimeoutMs=10000, numberOfReinitTries=2, consumerGroupName=kafka_es_indexer, startOffset=0, firstPartition=0, numberOfEsIndexingRetryAttempts=2, zkCuratorRetryTimes=3, zkSessionTimeoutMs=10000, isDryRun=false, esIndexType=kafkaESType}
12/Jan/2016:23:56:44:349 +0000 [main] INFO o.e.kafka.indexer.ConsumerConfig - Config reading complete !
12/Jan/2016:23:56:44:349 +0000 [main] INFO o.e.kafka.indexer.KafkaIndexerDriver - Created kafka consumer config OK
12/Jan/2016:23:56:44:350 +0000 [main] INFO o.e.k.indexer.jobs.IndexerJobManager - ConsumerJobManager is starting, servicing partitions: [0-0]
12/Jan/2016:23:56:44:351 +0000 [main] INFO o.e.kafka.indexer.KafkaIndexerDriver - Registering KafkfaEsIndexerStatus MBean:
12/Jan/2016:23:56:44:445 +0000 [main] INFO o.e.k.indexer.jobs.IndexerJobManager - Creating IndexerJob for partition=0
12/Jan/2016:23:56:44:450 +0000 [main] INFO o.e.kafka.indexer.jobs.IndexerJob - Initializing ElasticSearch... hostPortList=localhost:9300, esClusterName=sca for partition 0
log4j:WARN No appenders could be found for logger (org.elasticsearch.plugins).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
12/Jan/2016:23:56:45:294 +0000 [main] INFO o.e.kafka.indexer.jobs.IndexerJob - adding [localhost:9300] to TransportClient for partition 0...
12/Jan/2016:23:56:45:474 +0000 [main] INFO o.e.kafka.indexer.jobs.IndexerJob - ElasticSearch Client created and intialized OK for partition 0
12/Jan/2016:23:56:45:474 +0000 [main] INFO o.e.kafka.indexer.jobs.IndexerJob - Initializing Kafka for partition 0...
12/Jan/2016:23:56:45:474 +0000 [main] INFO o.e.kafka.indexer.jobs.IndexerJob - kafkaClientId=kafka_es_indexer_0 for partition 0
12/Jan/2016:23:56:45:476 +0000 [main] INFO o.e.kafka.indexer.KafkaClient - Instantiating KafkaClient
12/Jan/2016:23:56:45:476 +0000 [main] INFO o.e.kafka.indexer.KafkaClient - ### KafkaClient Config: ###
12/Jan/2016:23:56:45:476 +0000 [main] INFO o.e.kafka.indexer.KafkaClient - kafkaZookeeperList: localhost:2181
12/Jan/2016:23:56:45:476 +0000 [main] INFO o.e.kafka.indexer.KafkaClient - kafkaBrokersList: localhost:9092
12/Jan/2016:23:56:45:477 +0000 [main] INFO o.e.kafka.indexer.KafkaClient - kafkaClientId: kafka_es_indexer_0
12/Jan/2016:23:56:45:477 +0000 [main] INFO o.e.kafka.indexer.KafkaClient - topic: test
12/Jan/2016:23:56:45:477 +0000 [main] INFO o.e.kafka.indexer.KafkaClient - partition: 0
12/Jan/2016:23:56:45:515 +0000 [main] INFO o.e.kafka.indexer.KafkaClient - Connected to Kafka Zookeeper successfully
12/Jan/2016:23:56:45:515 +0000 [main] INFO o.e.kafka.indexer.KafkaClient - Looking for Kafka leader broker for partition 0...
12/Jan/2016:23:56:45:515 +0000 [main] INFO o.e.kafka.indexer.KafkaClient - Looking for leader for partition 0 using Kafka Broker=localhost:9092, topic=test
12/Jan/2016:23:56:45:985 +0000 [main] INFO o.e.kafka.indexer.KafkaClient - Found leader for partition 0 using Kafka Broker=localhost:9092, topic=test; leader broker URL: 0.0.0.0:9092
12/Jan/2016:23:56:45:987 +0000 [main] INFO o.e.kafka.indexer.KafkaClient - Found leader: leaderBrokerURL=0.0.0.0:9092
12/Jan/2016:23:56:45:990 +0000 [main] INFO o.e.kafka.indexer.KafkaClient - Initialized Kafka Consumer successfully for partition 0
12/Jan/2016:23:56:45:990 +0000 [main] INFO o.e.kafka.indexer.jobs.IndexerJob - Kafka client created and intialized OK for partition 0
12/Jan/2016:23:56:45:990 +0000 [main] INFO o.e.kafka.indexer.jobs.IndexerJob - MessageHandler Class given in config is org.elasticsearch.kafka.indexer.messageHandlers.RawMessageStringHandler for partition 0
12/Jan/2016:23:56:45:993 +0000 [main] INFO o.e.kafka.indexer.MessageHandler - Created IndexHandler:
12/Jan/2016:23:56:45:994 +0000 [main] INFO o.e.kafka.indexer.MessageHandler - Created Message Handler
12/Jan/2016:23:56:45:994 +0000 [main] INFO o.e.k.i.m.RawMessageStringHandler - Initialized RawMessageStringHandler
12/Jan/2016:23:56:45:995 +0000 [kafka-indexer-consumer-thread-0] INFO o.e.kafka.indexer.jobs.IndexerJob - ******* Starting a new batch of events from Kafka for partition 0 ...
12/Jan/2016:23:56:45:995 +0000 [kafka-indexer-consumer-thread-0] INFO o.e.kafka.indexer.jobs.IndexerJob - **** Computing Kafka offset *** for partition 0
12/Jan/2016:23:56:45:996 +0000 [kafka-indexer-consumer-thread-0] INFO o.e.kafka.indexer.jobs.IndexerJob - startOffsetFrom=RESTART for partition 0
12/Jan/2016:23:56:45:996 +0000 [kafka-indexer-consumer-thread-0] INFO o.e.kafka.indexer.jobs.IndexerJob - Restarting from where the Offset is left for topic test, for partition 0
12/Jan/2016:23:56:46:012 +0000 [kafka-indexer-consumer-thread-0] INFO o.e.kafka.indexer.jobs.IndexerJob - offsetForThisRound is set to the CurrentOffset: 0 for partition 0
12/Jan/2016:23:56:46:038 +0000 [kafka-indexer-consumer-thread-0] INFO o.e.kafka.indexer.jobs.IndexerJob - Resulting offsetForThisRound = 0 for partition 0
12/Jan/2016:23:56:46:071 +0000 [kafka-indexer-consumer-thread-0] INFO o.e.kafka.indexer.MessageHandler - Total # of messages in this batch: 6; # of successfully transformed and added to Index messages: 6; offsetOfNextBatch: 6
12/Jan/2016:23:56:46:071 +0000 [kafka-indexer-consumer-thread-0] INFO o.e.kafka.indexer.jobs.IndexerJob - posting the messages to ElasticSearch for partition 0...
12/Jan/2016:23:56:46:077 +0000 [kafka-indexer-consumer-thread-0] ERROR o.e.kafka.indexer.MessageHandler - Failed to post messages to ElasticSearch: None of the configured nodes are available: []
org.elasticsearch.client.transport.NoNodeAvailableException: None of the configured nodes are available: []
MY CONFIG:
kafkaZookeeperList=localhost:2181
zkSessionTimeoutMs=10000
zkConnectionTimeoutMs=15000
zkCuratorRetryTimes=3
zkCuratorRetryDelayMs=2000
kafkaBrokersList=localhost:9092
consumerGroupName=kafka_es_indexer
topic=test
firstPartition=0
lastPartition=0
startOffsetFrom=RESTART
startOffset=0
kafkaFetchSizeMinBytes=31457280
kafkaSimpleConsumerBufferSizeBytes=31457280
kafkaSimpleConsumerSocketTimeoutMs=10000
appStopTimeoutSeconds=10
numberOfReinitTries=2
kafkaReinitSleepTimeMs=10000
esHostPortList=localhost:9300
esClusterName=sca
esIndex=kafkaESIndex
esIndexType=kafkaESType
messageHandlerClass=org.elasticsearch.kafka.indexer.messageHandlers.RawMessageStringHandler
indexHandlerClass=org.elasticsearch.kafka.indexer.BasicIndexHandler
messageEncoding=UTF-8
isDryRun=false
consumerSleepBetweenFetchsMs=10
numberOfEsIndexingRetryAttempts=2
esIndexingRetrySleepTimeMs=10000
isPerfReportingEnabled=false
TROUBLESHOOTING:
I acutually verified that I can run ElasticSearch API manually to do the bulk post with different index and it just works perfectly:
curl -XPOST 'localhost:9200/customer/external/_bulk?pretty' -d '
{"index":{"_id":"1"}}
{"name": "John Doe" }
{"index":{"_id":"2"}}
{"name": "Jane Doe" }
'
Here is the code I can find where throw the exception:
public boolean postToElasticSearch() throws Exception {
BulkResponse bulkResponse = null;
BulkItemResponse bulkItemResp = null;
//Nothing/NoMessages to post to ElasticSearch
if(bulkRequestBuilder.numberOfActions() <= 0){
logger.warn("No messages to post to ElasticSearch - returning");
return true;
}
try{
bulkResponse = bulkRequestBuilder.execute().actionGet();
}
catch(ElasticsearchException e){
logger.error("Failed to post messages to ElasticSearch: " + e.getMessage(), e);
throw e;
}
....
the elasticsearch port is 9300 which is used in the config...
I tried 9200 as well, and it didn't work either....
Please help look into the issue, I don't think it would be a connection issue to elasticsearch since they are in the same box.
Thanks in advance!