flume自带kafkaSink收不到消息

flume配置文件
agent1.sources=source1
agent1.sinks=sink1
agent1.channels=channel1


#配置source1
agent1.sources.source1.type=spooldir
agent1.sources.source1.spoolDir=/usr/appsoft/data/flumetest
agent1.sources.source1.channels=channel1
agent1.sources.source1.inputCharset = GBK
agent.sources.s.basenameHeader = true


#配置sink1
agent1.sinks.sink1.channel = channel1
agent1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.sink1.metadata.broker.list=127.0.0.1:9092
agent1.sinks.sink1.custom.partition.key=kafkaPartition
agent1.sinks.sink1.custom.topic.name=kafkaTest
agent1.sinks.sink1.requiredAcks=1
agent1.sinks.sink1.batchSize = 20

#配置channel1
agent1.channels.channel1.type=file
agent1.channels.channel1.checkpointDir=/usr/appsoft/data/flumetest_tmp123
agent1.channels.channel1.dataDirs=/usr/appsoft/data/flumetest_tmp

kafka中消费topic是kafkaTest的消息获取不到,kafka的日志没有报错,flume的控制台上也没有报错,flume的日志没有找到在哪里。我下过一个flume kafka的plugin,用这个插件的sink可以正常收到消息。flume用的1.6,kafka用的2.9.1。这个问题是什么原因呢,和版本有关系吗?

hahawei

赞同来自: fish

问题解决了,是这个配置的问题。这个配置是用的kafka插件的kafkasink的配置,自带的配置不是这样写的。感谢茂源老师的耐心指导。 agent1.sinks.sink1.metadata.broker.list=127.0.0.1:9092 agent1.sinks.sink1.custom.topic.name=kafkaTest 附flume自带的kafkasink配置 agent1.sinks.sink1.brokerList = 127.0.0.1:9092 agent1.sinks.sink1.topic=kafkaTest

fish - Hadooper

赞同来自: tmac_liu

jar -tf flume-ng-kafka-sink-1.6.0.jar | fgrep KafkaSink,你就能确定这里面有没有KafkaSink了。

fish - Hadooper

赞同来自:

如果是apache版本的flume,log在软件包的logs目录下面。 如果是cdh的,在/var/log/flume-ng中。   channel所配的/usr/appsoft/data/flumetest_tmp中是否有数据?   另外,kafka的2.9.1是什么个版本?

fish - Hadooper

赞同来自:

在flume的conf目录中是否有log4j.properties文件? 如果有,做类似如下的配置
flume.root.logger=INFO,LOGFILE
flume.log.dir=/var/log/flume-ng
flume.log.file=flume.log

# Define the root logger to the system property "flume.root.logger".
log4j.rootLogger=${flume.root.logger}


# Stock log4j rolling file appender
# Default log rotation configuration
log4j.appender.LOGFILE=org.apache.log4j.RollingFileAppender
log4j.appender.LOGFILE.MaxFileSize=100MB
log4j.appender.LOGFILE.MaxBackupIndex=10
log4j.appender.LOGFILE.File=${flume.log.dir}/${flume.log.file}
log4j.appender.LOGFILE.layout=org.apache.log4j.PatternLayout
log4j.appender.LOGFILE.layout.ConversionPattern=%d{dd MMM yyyy HH:mm:ss,SSS} %-5p [%t] (%C.%M:%L) %x - %m%n
根据需要修改flume.log.dir   同时,你执行一下ps ax | fgrep <flume-agent-pid>看看任务执行的完整路径是什么?  

fish - Hadooper

赞同来自:

ps出来的任务命令看,flume启动时定义了 -Dflume.root.logger=INFO,console,所以log会打印到控制台而不是文件。   不过从你控制台的log看不出什么问题。如果将flume sink换成logger,能在控制台看到接收的数据么?

tmac_liu

赞同来自:

请问一下各位,flume1.6 y用自带的,你们是否有碰到这个问题 org.apache.flume.FlumeException: Unable to load sink type: org.apache.flume.sink.kafka.KafkaSink, class: org.apache.flume.sink.kafka.KafkaSink   为什么会提示找不到自带的 org.apache.flume.sink.kafka.KafkaSink 呢

fish - Hadooper

赞同来自:

flume的lib目录下有类似flume-ng-kafka-sink-xxxx.jar这样的文件么? ll | fgrep kafka 看看。

tmac_liu

赞同来自:

flume-ng-kafka-sink-1.6.0.jar   有这个文件,我之前以为是版本下载的问题,我后来自己去编译flume 1.6  还是只要的问题。

tmac_liu

赞同来自:

这个问题困扰我很久了,我用其他编译插件倒是可以使用,但是是1.5 版本 我想用1.6版本自带的 这真的奇怪了,

tmac_liu

赞同来自:

非常感谢你 fish Hadooper jar -tf flume-ng-kafka-sink-1.6.0.jar | fgrep KafkaSink   我用了你的方法后,我就知道我的问题所在了,非常感谢。

Paris凌

赞同来自:

请问一下,我现在也遇到同样的问题,你是怎么解决的。

要回复问题请先登录注册