Giter VIP home page Giter VIP logo

kafka-elasticsearch-standalone-consumer's Introduction

Welcome to the kafka-elasticsearch-standalone-consumer wiki!

Illustration of kafka-elasticsearch-standalone-consumer usage

The consumer is positioned in the middle.

Introduction

Kafka Standalone Consumer will read the messages from Kafka, processes and index them in ElasticSearch.

Easily Scaleable & Extendable !

As described in the illustration above, here is how the StandAlone Consumer works:

  • Kafka has a topic named, say Topic_1

  • Lets say, Topic_1 has 5 partitions.

  • Now, there is a needed to read, process the messages from Kafka and ElasticSearch

  • In order to do that, have 5 Config Files and start 5 instances of this Standalone Consumer by tying each config file to the respective Consumer Instance.

  • Now, we will have 5 Consumer Standalone Daemons running, listening & processing messages from each partition of Topic_1 in Kafka.

  • When there is a new partitions(say 6th partition) in the same Topic_1, then start a new Consumer Daemon instance pointing to the new partition(i.e: 6th partition)

  • This way, there is a clear way of subscribing and processing messages from multiple partitions across multiple topics using this Stand alone Consumer.

How to use ?

1. Download the code. Let's say, $CONSUMER_HOME contains the code.

2. From the $CONSUMER_HOME, build the maven project. - this step will create the JAR file of the Consumer and the dependency files in the $CONSUMER_HOME/bin directory

mvn clean package

3. Create a config file for the Consumer Instance and provide all necessary properties. - Use the existing Config file $CONSUMER_HOME/config/kafkaESConsumer.properties as template.

cp $CONSUMER_HOME/config/kafkaESConsumer.properties $CONSUMER_HOME/config/<consumerGroupName><topicName><PartitionNum>.properties

vi $CONSUMER_HOME/config/<consumerGroupName><topicName><PartitionNum>.properties - Edit & provide the correct config details.

The details & guide for each property in the config file is given in the property file itself.

It is IMPORTANT to SPECIFY 1 UNIQUE LOG PROPERTY FILE(using the below property) FOR EACH CONSUMER INSTANCE in the respective config file to have logging happen in separate log file for each consumer instance.

#Log property file for the consumer instance. One instance of consumer should have 1 log file.
logPropertyFile=log4j<consumerGroupName><topicName><PartitionNum>.properties

4. Start the Consumer as follows:

cd $CONSUMER_HOME/scripts

vi consumerNew.sh

Provide the value for all the below variables:

# Setup variables
#Set the full path of top directory of this kafka consumer
base_dir=
JAVA_HOME=
#User as which the Consumer Daemon has to be run
USER=

./consumerNew.sh -p start -c $CONSUMER_HOME/config/<consumerGroupName><topicName><PartitionNum>.properties

# ' -p ' - Can take either start | stop | restart

# ' -c ' - the config file for the consumer instance with path 
# (e.g: '$CONSUMER_HOME/config/<consumerGroupName><topicName><PartitionNum>.properties')

5. Confirm the successful start of the Consumer by looking into:

The below log file contains INFO during starting, restarting & stopping the Consumer Instance.

#'$consumerGroupName,$topic,$partition' - properties as defined in the consumer instances's config file (i.e: '<consumerGroupName><topicName><PartitionNum>.properties' in this example

vi $CONSUMER_HOME/processLogs/<$consumerGroupName>_<$topic>_<$partition>.out

The below log file contains ERROR during starting, restarting & stopping the Consumer Instance.

#'$consumerGroupName,$topic,$partition' - properties as defined in the consumer instances's config file (i.e: '<consumerGroupName><topicName><PartitionNum>.properties' in this example

vi $CONSUMER_HOME/processLogs/<$consumerGroupName>_<$topic>_<$partition>.err

6. Monitor the processing in the log file defined by the following property in the Consumer's Respective Config file.

(i.e: in this example, $CONSUMER_HOME/config/<consumerGroupName><topicName><PartitionNum>.properties)

Property/Config name: logPropertyFile

7. To Stop the Consumer Instance:

cd $CONSUMER_HOME/scripts

./consumerNew.sh -p stop -c $CONSUMER_HOME/config/<consumerGroupName><topicName><PartitionNum>.properties

8. To Restart the Consumer Instance:

cd $CONSUMER_HOME/scripts

./consumerNew.sh -p restart -c $CONSUMER_HOME/config/<consumerGroupName><topicName><PartitionNum>.properties

Versions:

Kafka Version: 0.8.1

ElasticSearch: > 1.0.0 (Works great for 1.3.4 as well in my Production)

Scala Version for Kafka Build: 2.10.0

Configuring the Consumer Instance:

The details of each config property can be seen in the template file (below)

Config File with details about each property

Message Handler Class

  • If the message in your Kafka has to handled in Raw UTF-8 text, then you can use message handler class org.elasticsearch.kafka.consumer.messageHandlers.RawMessageStringHandler

  • You can code your own Message Handler class by extending the abstract class org.elasticsearch.kafka.consumer.MessageHandler and implementing the methods: transformMessage() & prepareForPostToElasticSearch().

  • Also, if the message in your Kafka has to handled in Raw UTF-8 text and you just want to change the way the raw message is transformed(into your desired format), then extend the org.elasticsearch.kafka.consumer.messageHandlers.RawMessageStringHandler class and override/implement the transformMessage() method alone. An example can be found here: org.elasticsearch.kafka.consumer.messageHandlers.AccessLogMessageHandler

  • Usually, its effective to Index the message in JSON format in ElasticSearch. This can be done using a Mapper Class and transforming the message from Kafka by overriding/implementing the transformMessage() method. An example can be found here: org.elasticsearch.kafka.consumer.messageHandlers.AccessLogMessageHandler

  • Do remember to set the newly created message handler class in the messageHandlerClass config property of the consumer instance.

License

kafka-elasticsearch-standalone-consumer

Licensed under the Apache License, Version 2.0 (the "License"); you may
not use this file except in compliance with the License. You may obtain
a copy of the License at

     http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied.  See the License for the
specific language governing permissions and limitations
under the License.

Contributors

kafka-elasticsearch-standalone-consumer's People

Contributors

reachkrishnaraj avatar

Watchers

 avatar  avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.