flume日志数据写入kafka中,启动flume报错

你好,老师
     flume日志数据写入kafka中,在启动flume服务时,查看日志文件报错,可以帮忙看看嘛?比较急, 困扰我快2天了, 谢谢。@ fish,可以帮忙看一下,非常感谢
fume的配置信息如下
# Flume agent config
agent.sources = s1                                                                                                                  
agent.channels = c1                                                                                                                 
agent.sinks = k1                                                                                                                    
                                                                                                                                      
agent.sources.s1.type=exec                                                                                                          
agent.sources.s1.command=tail -F /usr/app/flumelogs/kafka.log                                                                                
agent.sources.s1.channels=c1                                                                                                        
agent.channels.c1.type=memory                                                                                                       
agent.channels.c1.capacity=10000                                                                                                    
agent.channels.c1.transactionCapacity=100                                                                                           
                                                                                                                                      
#设置Kafka接收器                                                                                                                    
agent.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink                                                                          
#设置Kafka的broker地址和端口号                                                                                                      
agent.sinks.k1.brokerList=192.168.62.156:9092                                                                                               
#设置Kafka的Topic                                                                                                                   
agent.sinks.k1.topic=kafkatest1                                                                                                      
#设置序列化方式                                                                                                                     
agent.sinks.k1.serializer.class=kafka.serializer.StringEncoder                                                                      
                                                                                                                                      
agent.sinks.k1.channel=c1
 
首先 建立kakfa的Topicname
bin/kafka-topics.sh --create --topic Flumelog --replication-factor 1 --partitions 2 --zookeeper hadoop11:2181
 
然后启动flume
启动Flume的配置文件

flume-ng agent -n agent1 -c conf -f ./conf/agent/flume.conf -Dflume.root.logger=DEBUG,console >./flume1.log 2>&1 &
 
启动flume报错关键信息如下
kafka.common.KafkaException: fetching topic metadata for topics [Set(kafkatest1)] from broker [ArrayBuffer(id:0,host:192.168.62.156,port:9092)] failed
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
        at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
        at kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78)
        at kafka.utils.Utils$.swallow(Utils.scala:172)
        at kafka.utils.Logging$class.swallowError(Logging.scala:106)
        at kafka.utils.Utils$.swallowError(Utils.scala:45)
        at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
        at kafka.producer.Producer.send(Producer.scala:76)
        at kafka.javaapi.producer.Producer.send(Producer.scala:42)
        at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:125)
        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.nio.channels.ClosedChannelException
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:97)
        at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:74)
        at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:73)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:114)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
        ... 12 more
q2017-03-18 08:48:17,255 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:132)] Failed to publish events
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
        at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
        at kafka.producer.Producer.send(Producer.scala:76)
        at kafka.javaapi.producer.Producer.send(Producer.scala:42)
        at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:125)
        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:745)
2017-03-18 08:48:17,255 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: Failed to publish events
        at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:142)
        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:745)
Caused by: kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
        at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
        at kafka.producer.Producer.send(Producer.scala:76)
        at kafka.javaapi.producer.Producer.send(Producer.scala:42)
        at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:125)
        ... 3 more
 
 
 

wangxiaolei

赞同来自:

有个错误提示[ArrayBuffer(id:0,host:192.168.62.156,port:9092)] failed 看kafka服务是否启动成功的,再查看下topic的信息。

fish - Hadooper

赞同来自:

看起来好像Kafka中创建的topic名字跟Flume消费的Topic名字对不上?

要回复问题请先登录注册