flume-ng 写文件到kafka,然后dump出文件的问题


dumpfile.png

 
 老师,按照您在作业讲解中,将kafka中的数据写入到hdfs中的逻辑有一个问题,就只有当第二个文件进来的时候,才能将第一个文件标记为已完成,如果仅有一个文件进来,则这个文件永远不会标记为已完成,请问有没有什么更好的解决方案呢?

IT_Angel

赞同来自: 王景隆 fish

老师,我已经成功解决了此问题!我修改了flume-ng的源码,主要是改SpoolDirectorySource与 ReliableSpoolingFileEventReader,当flume读取文件时,将fileSize添加到header中,然后在kafka consumer中累加每次获取的message的length, 判断message的总size是否等于header中fileSize,如果相等说明文件已经接收完成!   注意以下几个细节: 1、如果要在kafka的message header中添加额外信息,必须自定义kafka的message key,格式为{key=test.txt,size=200,...} 参数键值对。 2、如果需要采集二进制文件,则要利用flume-ng的  org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder 3、如果用默认的字符采集方式,则flume采集的文件size要比kafka接收到的Message大一个字节;  如果用二进制文件采集方式,则不会出现这种情况!      

fish - Hadooper

赞同来自:

标记文件完成需要自己制定一个机制,这里的机制是新文件来的时候,标老文件为完成。   确实有你所说的问题,所以需要根据实际需求做些修改,比如仿照Log4j的方式,每隔一段时间(比如一小时、一天),标志一个文件完成。

IT_Angel

赞同来自:

我觉得最好的方式就是,当flume采集文件后,原始文件大小传输到kafka队列的header中,然后kafka收集数据时依据收到的字节长度判断文件是否已经完成,这样接收的文件会在第一时间标记为已完成。但是就是目前还不知道flume如何能将文件大小传输到kafka中?

要回复问题请先登录注册