mariamhakobyan / elasticsearch-river-kafka Goto Github PK
View Code? Open in Web Editor NEWKafka River Plugin for ElasticSearch
License: Apache License 2.0
Kafka River Plugin for ElasticSearch
License: Apache License 2.0
what should be my configuration to read avro data? when I use the default it gives me an error because it is not able to parse the data
Hey,
I am planning to use this plugin in my project, where we have thousands of events per sec from multiple tenants.
My question is : single BulkProcessor is enough to serve events from multiple tenants OR we can have
one BulkProcessor per Tenant?
We have one kafka partition per tenant.
As of now we are using Kafka stream with "number of threads" = "number of partitions".
Hey!
Could you please possibly outline how I should use the raw execute action type?
I'm especially curious if I can create an update
type request using that action... Something with both a doc upsert, and an update script.
Hi
Does the way river integrate with kafka allow for exactly once semantics? I'm trying to understand if the data could be lost or duplicated while moved from kafka into ES.
My guess is that it is impossible for data to be lost.
For example spark integrates tightly w kafka to provide exactly once semantics: https://databricks.com/blog/2015/03/30/improvements-to-kafka-integration-of-spark-streaming.html
Can river kafka be configured to consume from a specific <topic, partition>? instead of from a topic with multiple partition from multiple brokers.
Thanks
Peter
Hi,
I did not see group ID in kafaka configuration. So how do I install elasticsearch-river-kafka on multiple nodes for HA and scaling purpose? Assume in AWS, I want to have two elasticsearch client nodes in two different AZs with elasticsearch-river-kafka installed, in this way, if I lose one AZ, my system is still working; on the other hand, we can scale the system just by adding mode nodes.
Could we add this configuration item just like logstash-kafka-output plugin does?
Regards,
Autumn Wang.
Hi,
Plugins for 2.0 requires additional plugin-descriptor.properties file.
http://stackoverflow.com/questions/33538903/elasticsearch-2-0-plugin-installation-info
thanks
Some river for e.g. CouchDb ES river allows a script tag in the river config and thus allowing a way to define a context condition. Is there a way to do so in the ES-Kafka river plugin ? I want to define the parent id by having something like this basically:
"script": "ctx._type = ctx.doc.type; if (ctx._type == "country") { ctx._parent = ctx.doc.parent_id; }"
Hi ,
I installed kafka plugin in elasticsearch 1.3.7 version. when i executed river script it is searching for zookeeper server.i installed zookeeper server issue has gone.suppose i have index which will take 80000 records perday so how can we manage kafka to work with my index how we know kafka is working fine is there any special commands required to saw kafka working fine.
please help me in this.
Thanks
phani
Hello can you help me, i got this error when i deploy the river
[2014-03-17 11:38:44,200][WARN ][river ] [Infectia] failed to delete river on stop [kafka]/[my_kafka_river]
java.lang.NullPointerException
at org.elasticsearch.river.kafka.KafkaRiver.close(KafkaRiver.java:64)
at org.elasticsearch.river.RiversService.closeRiver(RiversService.java:202)
at org.elasticsearch.river.RiversService$1.run(RiversService.java:106)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
For your information i use elasticsearch 1.0.1 and Apache Kafka 0.8.0.
I think it would be better if you can add a configuration to read the messages from beginning or last offset.This way i can make sure all my kafka messages are consumed if i interrupted the river for some reason.
At the moment the IndexDocumentProducer genereates an id for every indexRequest via .id(UUID.randomUUID().toString())
.
If no _id is specified, an ID will be autogenerated
Therefore, there is no reason to generate an id in this river.
One reason why the id should no be generated is because it prevents the _id
from being set through a document field specified in _id.path
In pull request #5 I added a RiverConfig option to enable / disable the logging of every message consumed by the KafkaWorker via the logMessage()
function.
I love having that when I'm debugging a river, but don't want it there when I'm not debugging.
The other logging messages at the INFO level:
Executing bulk request composed of {} actions
Executed bulk composed of {} actions.
Error executing bulk
Error executing bulk
Consumer is already running, new one will not be started...
Kafka consumer started...
Kafka consumer has stopped...
Nothing to be consumed for now. Consume flag is:
Are also a bit verbose for production but great also for debugging a new rivers or problems in the data being sent.
I don't know if adding more RiverConfig options is the right way to do it though. I guess the Java way would be to change them from logger.info to logger.debug? I believe there is a way to dynamically change the log level without restarting the process. That would allow you to turn on debug just for the Classes you want to debug, but not sure if that is actually an option.
Are there any standard ways to do this? Any opinions on this one?
Not sure if this is part of my problem that I'm not seeing data or not. But it would be interesting to be able to tell the Kafka Consumer to not start consuming from the beginning of the queue.
When I have a bunch of stuff in the Kafka queue and the river starts up it seems to get overwhelmed.
Instead of having the log info header start at the top of the message, it starts having the log info header for every line and then eventually says:
WARN Reconnect due to socket error: null (kafka.consumer.SimpleConsumer)
and goes back to normal.
Here's the example of the end of it having the log info headers for every line:
[2014-11-28 08:42:28,560][INFO ][org.elasticsearch.river.kafka.KafkaWorker] "Topic": "ep-client-stats-staging",
[2014-11-28 08:42:28,560][INFO ][org.elasticsearch.river.kafka.KafkaWorker] "HashKey": "00-11-22-33-44-55",
[2014-11-28 08:42:28,560][INFO ][org.elasticsearch.river.kafka.KafkaWorker] "ID": "00-11-22-33-44-55",
[2014-11-28 08:42:28,560][INFO ][org.elasticsearch.river.kafka.KafkaWorker] "When": "2014-11-26T11:52:38.731846801-08:00",
[2014-11-28 08:42:28,560][INFO ][org.elasticsearch.river.kafka.KafkaWorker] "Interval": 10000290099,
[2014-11-28 08:42:28,560][INFO ][org.elasticsearch.river.kafka.KafkaWorker] "Clients": []
[2014-11-28 08:42:28,560][INFO ][org.elasticsearch.river.kafka.KafkaWorker] }
[2014-11-28 08:42:28,560][INFO ][org.elasticsearch.river.kafka.KafkaWorker] [2014-11-27 01:52:48,046] WARN Reconnect due to socket error: null (kafka.consumer.SimpleConsumer)
Then it goes back to normal:
[2014-11-28 08:42:28,560][INFO ][org.elasticsearch.river.kafka.KafkaWorker] {
"Topic": "ep-client-stats-staging",
"HashKey": "00-11-22-33-44-55",
"ID": "00-11-22-33-44-55",
"When": "2014-11-27T08:11:37.569473228Z",
"Interval": 10000572989,
"Clients": [
{
"MAC": "b8-76-3f-01-02-5b",
"Wlan": "wlan0",
"ConnectedTimeSec": 9714,
"InactiveTimeMilliSec": 592,
"RSSI": -40,
"AvgRSSI": -45,
"TxBitRate": 78.0,
"TxBytes": 48237910,
"TxPkts": 85591,
"TxRetries": 18676,
"TxFailed": 1,
"Txbps": 5057,
"Txpps": 2,
"RxBitRate": 144.4,
"RxBytes": 20109918,
"RxPkts": 79654,
"Rxbps": 8051,
"Rxpps": 3,
"ARCStatus": 0
}
]
}
But in any case even way after 100 samples I never see them in the elasticsearch index as described in Issue #3. It would be nice if I could tell the Kafka consumer to just start at the end of the queue.
Hi,
Do you have an example of what structure the kafka events should have in order to work with this river?
I'm receiving json from kafka and I have configured a mapping in ES to describe the messages and configured the river to use this type. I see the messages coming in from kafka in the ES logs, but no docs are put into ES.
Any clues?
Thanks / Jonas
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.