Giter VIP home page Giter VIP logo

flume-kafka's People

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

flume-kafka's Issues

org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:149

Error info:
2014-08-20 18:55:51,755 (conf-file-poller-0) [ERROR - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:149)] Unhandled error
java.lang.NoSuchMethodError: scala.math.LowPriorityOrderingImplicits.ordered()Lscala/math/Ordering; at kafka.producer.ZKBrokerPartitionInfo$$anonfun$kafka$producer$ZKBrokerPartitionInfo$$getZKTopicPartitionInfo$1.apply(ZKBrokerPartitionInfo.scala:172) at kafka.producer.ZKBrokerPartitionInfo$$anonfun$kafka$producer$ZKBrokerPartitionInfo$$getZKTopicPartitionInfo$1.apply(ZKBrokerPartitionInfo.scala:163)

I use the kafka sink functionality, kafka-0.7.2.jar and flume-kafka-0.2.jar in flume lib path, kafka version is 0.7.2,flume version is 1.5.0

flume configuration:
agent_log.sources = r1
agent_log.sinks = kafka
agent_log.channels = c1

agent_log.sources.r1.type = exec
agent_log.sources.r1.channels = c1
agent_log.sources.r1.command = tail -f /var/log/test.log

agent_log.channels.c1.type = memory
agent_log.channels.c1.capacity = 1000
agent_log.channels.c1.trasactionCapacity = 100

agent_log.sinks.kafka.type = com.vipshop.flume.sink.kafka.KafkaSink
agent_log.sinks.kafka.channel = c1
agent_log.sinks.kafka.zk.connect = kafkaNode:2181
agent_log.sinks.kafka.topic = my-replicated-topic
agent_log.sinks.kafka.batchsize = 200
agent_log.sinks.kafka.producer.type = async
agent_log.sinks.kafka.serializer.class = kafka.serializer.StringEncoder

kakfa 0.8 support

I'd like to ask if 0.8 support is existent for this. From the issues I've read, it sounds like there is a branch that exists that supports 0.8. Is this true?

Zkclient depency

Hello,

It seems that the project is missing the dependency on zkclient which is not included in the pom.xml

By the way when I try the flume sink I get this error (zookeeper connection looks ok):

02 Sep 2013 15:36:45,519 ERROR conf-file-poller-0 - Sink loggerSink has been removed due to an error during configuration
java.lang.NumberFormatException: null
at java.lang.Integer.parseInt(Integer.java:454)
at java.lang.Integer.parseInt(Integer.java:527)
at scala.collection.immutable.StringLike$class.toInt(StringLike.scala:207)
at scala.collection.immutable.StringOps.toInt(StringOps.scala:31)
at kafka.producer.ZKBrokerPartitionInfo$$anonfun$kafka$producer$ZKBrokerPartitionInfo$$getZKTopicPartitionInfo$1$$anonfun$5.apply(ZKBrokerPartitionInfo.scala:167)
at kafka.producer.ZKBrokerPartitionInfo$$anonfun$kafka$producer$ZKBrokerPartitionInfo$$getZKTopicPartitionInfo$1$$anonfun$5.apply(ZKBrokerPartitionInfo.scala:167)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
at scala.collection.Iterator$class.foreach(Iterator.scala:631)
at scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:474)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
at scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:521)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
at scala.collection.JavaConversions$JListWrapper.map(JavaConversions.scala:521)
at kafka.producer.ZKBrokerPartitionInfo$$anonfun$kafka$producer$ZKBrokerPartitionInfo$$getZKTopicPartitionInfo$1.apply(ZKBrokerPartitionInfo.scala:167)
at kafka.producer.ZKBrokerPartitionInfo$$anonfun$kafka$producer$ZKBrokerPartitionInfo$$getZKTopicPartitionInfo$1.apply(ZKBrokerPartitionInfo.scala:163)
at scala.collection.Iterator$class.foreach(Iterator.scala:631)
at scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:474)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
at scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:521)
at kafka.producer.ZKBrokerPartitionInfo.kafka$producer$ZKBrokerPartitionInfo$$getZKTopicPartitionInfo(ZKBrokerPartitionInfo.scala:163)
at kafka.producer.ZKBrokerPartitionInfo.(ZKBrokerPartitionInfo.scala:65)
at kafka.producer.Producer.(Producer.scala:47)
at kafka.javaapi.producer.Producer.(Producer.scala:33)
at kafka.javaapi.producer.Producer.(Producer.scala:40)
at com.vipshop.flume.KafkaUtil.getProducer(KafkaUtil.java:51)
at com.vipshop.flume.sink.kafka.KafkaSink.configure(KafkaSink.java:56)
at org.apache.flume.conf.Configurables.configure(Configurables.java:41)
at org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:418)
at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:103)
at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:351)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:724)

ERROR [conf-file-poller-0] (org.apache.flume.node.AbstractConfigurationProvider.loadSinks:432) - Sink kafka has been removed due to an error during configuration

27 Dec 2013 11:30:49,194 ERROR conf-file-poller-0 - Sink kafka has been removed due to an error during configuration
kafka.common.InvalidConfigException: At least one of zk.connect or broker.list must be specified
at kafka.producer.ProducerConfig.(ProducerConfig.scala:51)
at com.vipshop.flume.KafkaUtil.getProducer(KafkaUtil.java:43)
at com.vipshop.flume.sink.kafka.KafkaSink.configure(KafkaSink.java:47)
at org.apache.flume.conf.Configurables.configure(Configurables.java:41)
at org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:418)
at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:103)
at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)

has error

2013-11-27 14:45:15,915 (conf-file-poller-0) [INFO - org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:40)] Creating instance of sink: kafka, type: com.vipshop.flume.sink.kafka.KafkaSink
2013-11-27 14:45:15,920 (conf-file-poller-0) [INFO - com.vipshop.flume.KafkaUtil.getProducer(KafkaUtil.java:41)] { parameters:{topic=topic1, zkconnect=10.11.68.92:2181, batchsize=200, type=com.vipshop.flume.sink.kafka.KafkaSink, serializer.class=kafka.serializer.StringEncoder, producer.type=async, channel=svc_0_chan} }
2013-11-27 14:45:15,922 (conf-file-poller-0) [ERROR - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:145)] Failed to start agent because dependencies were not found in classpath. Error follows.
java.lang.NoClassDefFoundError: kafka/javaapi/producer/Producer
at com.vipshop.flume.KafkaUtil.getProducer(KafkaUtil.java:43)
at com.vipshop.flume.sink.kafka.KafkaSink.configure(KafkaSink.java:47)
at org.apache.flume.conf.Configurables.configure(Configurables.java:41)
at org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:418)
at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:103)
at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:351)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:722)
Caused by: java.lang.ClassNotFoundException: kafka.javaapi.producer.Producer
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:423)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:356)
... 14 more

java.lang.NoSuchMethodError: org.apache.flume.ChannelFactory.getClass(Ljava/lang/String;)Ljava/lang/Class;

I'm attempting to use the kafka source functionality.

After building the jar (with dependencies) and copying into the flume directory with all the other source/sink .jar files, I am getting this error when running flume-ng

https://gist.github.com/jconerly/81dffe5c8e9f59760a98

The following is the source configuration:

agent.sources.kafka_source.type       = com.vipshop.flume.source.kafka.KafkaSource
agent.sources.kafka_source.zk.connect = 127.0.0.1:2181
agent.sources.kafka_source.topic      = data
agent.sources.kafka_source.groupid    = data-client
agent.sources.kafka_source.batch.size = 100

I had to modify the pom.xml to include the maven-assembly-plugin so I could build a jar that included all the dependencies. I tried deploying the regular jar first and flume complained of missing dependencies like so:

13/11/26 21:23:27 ERROR node.PollingPropertiesFileConfigurationProvider: Failed to start agent because dependencies were not found in classpath. Error follows.
java.lang.NoClassDefFoundError: kafka/consumer/ConsumerConfig

I am running Flume 1.3.1, and Kafka 0.7.2-incubating

Please advise, I'd love to get this up and running.

Flume agent checking conf file in a loop

Naming the components on the current agent.

TwitterAgent.sources = Twitter
TwitterAgent.channels = MemChannel
TwitterAgent.sinks = HDFS

Describing/Configuring the source

TwitterAgent.sources.Twitter.type=flume.mytwittersource.MyTwitterSourceForFlume
TwitterAgent.sources.Twitter.channels=MemChannel
TwitterAgent.sources.Twitter.consumerKey=**********
TwitterAgent.sources.Twitter.consumerSecret=*
TwitterAgent.sources.Twitter.accessToken=
*
TwitterAgent.sources.Twitter.accessTokenSecret=
***
TwitterAgent.sources.Twitter.keywords=docker,intel

Describing/Configuring the sink

TwitterAgent.sinks.HDFS.channel=MemChannel
TwitterAgent.sinks.HDFS.type=hdfs
TwitterAgent.sinks.HDFS.hdfs.path=hdfs://quickstart.cloudera:8020/user/Flume/twitter_data/
TwitterAgent.sinks.HDFS.hdfs.fileType=DataStream
TwitterAgent.sinks.HDFS.hdfs.writeFormat=Text
TwitterAgent.sinks.HDFS.hdfs.batchSize=1000
TwitterAgent.sinks.HDFS.hdfs.rollSize=0
TwitterAgent.sinks.HDFS.hdfs.rollCount=10000

Describing/Configuring the channel

TwitterAgent.channels.MemChannel.type=memory
TwitterAgent.channels.MemChannel.capacity=10000
TwitterAgent.channels.MemChannel.transactionCapacity=1000

When I execute this file I am getting following message in a loop without fetching any tweets. Can you please helpme to identified what is the error on why it is not fetching any tweets?

(conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:126)] Checking file:conf/flume-twitter.conf for changes

(conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:126)] Checking file:conf/flume-twitter.conf for changes

Thanks in advance

kafka 8.2

I am having trouble loading the sink.

Tried stopping kakfa and removing all topics from disk , did not help.

Flume - version flume-1.4.0

Kafka - kafka_2.8.0

Flume sink config :
applog_agent.sinks = kafka
applog_agent.sinks.kafka.type = com.vipshop.flume.sink.kafka.KafkaSink
applog_agent.sinks.kafka.channel = C1
applog_agent.sinks.kafka.zk.connect = kafkanode:2181
applog_agent.sinks.kafka.topic = all
applog_agent.sinks.kafka.batchsize = 200
applog_agent.sinks.kafka.producer.type = async

applog_agent.sinks.kafka.serializer.class = kafka.serializer.StringEncoder

Plugin lib:
ls -lrt plugins.d/flume-kafka-plugin/libext
total 18820
-rw-r--r-- 1 kafkausr kafkausr 604182 Nov 26 17:21 zookeeper-3.3.4.jar
-rw-r--r-- 1 kafkausr kafkausr 6160791 Nov 26 17:21 scala-library.jar
-rw-r--r-- 1 kafkausr kafkausr 8671416 Nov 26 17:21 scala-compiler.jar
-rw-r--r-- 1 kafkausr kafkausr 2520145 Nov 26 17:21 kafka_2.8.0-0.8.0.jar
-rw-r--r-- 1 kafkausr kafkausr 53244 Nov 26 17:21 jopt-simple-3.2.jar
-rw-r--r-- 1 kafkausr kafkausr 64009 Nov 26 17:21 zkclient-0.3.jar
-rw-r--r-- 1 kafkausr kafkausr 995968 Nov 26 17:21 snappy-java-1.0.4.1.jar
-rw-r--r-- 1 kafkausr kafkausr 82123 Nov 26 17:21 metrics-core-2.2.0.jar
-rw-r--r-- 1 kafkausr kafkausr 4229 Nov 26 17:21 metrics-annotation-2.2.0.jar
ls -lrt plugins.d/flume-kafka-plugin/lib
total 12

-rw-rw-r-- 1 kafkausr kafkausr 6884 Dec 6 06:33 flume-kafka-plugin.jar

Error in Flume log :

06 Dec 2013 11:09:10,520 INFO conf-file-poller-0-SendThread(kafkanode:2181) - Socket connection established to kafkanode/x.x.x.x:2181, initiating session
06 Dec 2013 11:09:10,536 INFO conf-file-poller-0-SendThread(kafkanode:2181) - Session establishment complete on server kafkanode/x.x.x.x:2181, sessionid = 0x42c8a2118b0004, negotiated timeout = 6000
06 Dec 2013 11:09:10,540 INFO conf-file-poller-0-EventThread - zookeeper state changed (SyncConnected)
06 Dec 2013 11:09:10,633 ERROR conf-file-poller-0 - Sink kafka has been removed due to an error during configuration
java.lang.NumberFormatException: For input string: "-1, "port""
at java.lang.NumberFormatException.forInputString(NumberFormatException.java:48)
at java.lang.Integer.parseInt(Integer.java:458)
at java.lang.Integer.parseInt(Integer.java:499)
at scala.collection.immutable.StringLike$class.toInt(StringLike.scala:207)
at scala.collection.immutable.StringOps.toInt(StringOps.scala:31)
at kafka.cluster.Broker$.createBroker(Broker.scala:28)
at kafka.producer.ZKBrokerPartitionInfo$$anonfun$kafka$producer$ZKBrokerPartitionInfo$$getZKBrokerInfo$1.apply(ZKBrokerPartitionInfo.scala:195)
at kafka.producer.ZKBrokerPartitionInfo$$anonfun$kafka$producer$ZKBrokerPartitionInfo$$getZKBrokerInfo$1.apply(ZKBrokerPartitionInfo.scala:193)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
at kafka.producer.ZKBrokerPartitionInfo.kafka$producer$ZKBrokerPartitionInfo$$getZKBrokerInfo(ZKBrokerPartitionInfo.scala:193)
at kafka.producer.ZKBrokerPartitionInfo.(ZKBrokerPartitionInfo.scala:67)
at kafka.producer.Producer.(Producer.scala:47)
at kafka.javaapi.producer.Producer.(Producer.scala:33)
at kafka.javaapi.producer.Producer.(Producer.scala:40)
at com.vipshop.flume.KafkaUtil.getProducer(KafkaUtil.java:43)
at com.vipshop.flume.sink.kafka.KafkaSink.configure(KafkaSink.java:47)
at org.apache.flume.conf.Configurables.configure(Configurables.java:41)
at org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:418)
at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:103)
at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)

Build issue

Hello,

I tried to build the project with maven 3.1 but it is missing 2 dependencies:

com.linkedin.kafka:kafka:jar:0.7.2, com.vipshop.scala:scala:jar:2.8.0

I was able to find the com.linkedin.kafka but the other one seems very linked to your own code.

Will it be possible to include this 2 dependencies to allow the build?

Anyways thanks a lot for your piece of code!

Error: java.lang.IllegalStateException: close() called when transaction is OPEN - you must either commit or rollback first

serverA kafka_2.8.0-0.8.0-beta1, serverB flume-ng1.4
server B run:
flume-ng agent -c . -f flume-kafka.conf -n a1 -Dflume.root.logger=INFO,console

flume sink config:
a1.sinks.k1.type = com.vipshop.flume.sink.kafka.KafkaSink
a1.sinks.k1.channel = c1
a1.sinks.k1.zkconnect = 192.168.165.39:2181
a1.sinks.k1.topic = test2
a1.sinks.k1.producer.type = async
a1.sinks.k1.batchsize = 400

a1.sinks.k1.serializer.class = kafka.serializer.StringEncoder

error :

2013-11-21 00:13:25,909 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:173)] Starting Sink k1
2013-11-21 00:13:25,910 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:184)] Starting Source r1
2013-11-21 00:13:25,910 (lifecycleSupervisor-1-4) [INFO - org.apache.flume.source.ExecSource.start(ExecSource.java:163)] Exec source starting with command:tail -F /var/log/maillog
2013-11-21 00:13:25,914 (lifecycleSupervisor-1-4) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:110)] Monitoried counter group for type: SOURCE, name: r1, registered successfully.
2013-11-21 00:13:25,914 (lifecycleSupervisor-1-4) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:94)] Component type: SOURCE, name: r1 started
2013-11-21 00:13:28,924 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable to deliver event. Exception follows.
java.lang.IllegalStateException: close() called when transaction is OPEN - you must either commit or rollback first
at com.google.common.base.Preconditions.checkState(Preconditions.java:172)
at org.apache.flume.channel.BasicTransactionSemantics.close(BasicTransactionSemantics.java:179)
at com.vipshop.flume.sink.kafka.KafkaSink.process(KafkaSink.java:45)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:722)
2013-11-21 00:13:34,931 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable to deliver event. Exception follows.
java.lang.IllegalStateException: close() called when transaction is OPEN - you must either commit or rollback first
at com.google.common.base.Preconditions.checkState(Preconditions.java:172)
at org.apache.flume.channel.BasicTransactionSemantics.close(BasicTransactionSemantics.java:179)
at com.vipshop.flume.sink.kafka.KafkaSink.process(KafkaSink.java:45)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:722)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.