flume向kafkaSink发送消息,kafkaSink接收不到

对这个问题困惑许久,请各位指教!

flume配置文件

agg.sources=r_dcc
agg.sinks=k_comm
agg.channels=c_comm

agg.sources.r_dcc.type=exec
agg.sources.r_dcc.command=tail -F /home/ocs2/travelOcs/logs/dcc.log
#agg.sources.r_dcc.selector.type = replicating
agg.sources.r_dcc.channels=c_comm

agg.sinks.k_comm.type=org.apache.flume.sink.kafka.KafkaSink
agg.sinks.k_comm.topic=test
agg.sinks.k_comm.brokerList=192.168.2.99:9092
agg.sinks.k_comm.requiredAcks=1
agg.sinks.k_comm.batchSize=20
agg.sinks.k_comm.channel=c_comm

#agg.channels.c_comm.type=memory
agg.channels.c_comm.capacity=10000
agg.channels.c_comm.transactionCapacity=1000

agg.channels.c_comm.type=org.apache.flume.channel.kafka.KafkaChannel
agg.channels.c_comm.brokerList=192.168.2.99:9092
agg.channels.c_comm.topic=test
agg.channels.c_comm.zookeeperConnect=192.168.2.99:2181


报错信息

28 七月 2016 17:15:52,485 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.SinkRunner$PollingRunner.run:160) - Unable to deliver event. Exception follows.
java.lang.IllegalStateException: close() called when transaction is OPEN - you must either commit or rollback first
at com.google.common.base.Preconditions.checkState(Preconditions.java:172)
at org.apache.flume.channel.BasicTransactionSemantics.close(BasicTransactionSemantics.java:179)
at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:153)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:744)
28 七月 2016 17:15:57,486 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.sink.kafka.KafkaSink.process:139) - Failed to publish events
java.lang.IllegalStateException: begin() called when transaction is OPEN!
at com.google.common.base.Preconditions.checkState(Preconditions.java:145)
at org.apache.flume.channel.BasicTransactionSemantics.begin(BasicTransactionSemantics.java:131)
at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:93)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:744)
28 七月 2016 17:15:57,486 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.sink.kafka.KafkaSink.process:146) - Transaction rollback failed
java.lang.IllegalStateException: value is absent
at com.google.common.base.Optional$Absent.get(Optional.java:263)
at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doRollback(KafkaChannel.java:387)
at org.apache.flume.channel.BasicTransactionSemantics.rollback(BasicTransactionSemantics.java:168)
at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:143)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:744)
28 七月 2016 17:15:57,487 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.SinkRunner$PollingRunner.run:160) - Unable to deliver event. Exception follows.
java.lang.IllegalStateException: value is absent
at com.google.common.base.Optional$Absent.get(Optional.java:263)
at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doRollback(KafkaChannel.java:387)
at org.apache.flume.channel.BasicTransactionSemantics.rollback(BasicTransactionSemantics.java:168)
at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:143)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:744)

fish - Hadooper

赞同来自:

先把channel的类型agg.channels.c_comm.type改成memory,还会出这问题么?

fish - Hadooper

赞同来自:

c_comm保持memory类型不变,将k_comm.type换成logger,你能在控制台上看到消息输出么?

依风逗春

赞同来自:

 问下楼主 ,java.lang.IllegalStateException: begin() called when transaction is OPEN! 这个问题怎么解决啊,Flume 版本1.6 kafka_2.10_0.8.2.2 这个问题已经困扰几天啦。 想问下怎么解决呢

要回复问题请先登录注册