Flume 如何一天的数据追加在一个XXX.txt文件中

flume配置为:
a1.sinks = k1
a1.sources = r1
a1.channels = c1

a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = master
a1.sources.r1.port = 4141

a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000

a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path =/home/hadoop/flume/%y%m%d
a1.sinks.k1.hdfs.filePrefix = test_
a1.sinks.k1.hdfs.fileSuffix=.log
#a1.sinks.k1.hdfs.round = true
#a1.sinks.k1.hdfs.roundValue = 10
#a1.sinks.k1.hdfs.roundUnit = second
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.writeFormat=Text
a1.sinks.k1.hdfs.rollInterval=0
a1.sinks.k1.hdfs.rollSize=10240000
a1.sinks.k1.hdfs.rollCount=0
a1.sinks.k1.hdfs.idleTimeout=60
 
java程序的测试代码为:
public class FlumeDemo {
     private String hostname;
     private int port;
     private RpcClient client;
     public static void main(String[] args) {
         FlumeDemo rpcClient = new FlumeDemo("master", 4141);
         String data = "this is my test data!这是我的测试日志数据!";
         for(int i=0;i<10;i++){
             rpcClient.sendMessage(data);
         }
         rpcClient.cleanUp();
     }
     public FlumeDemo(String hostname,int port) {
         this.hostname = hostname;
         this.port = port;
         this.client = RpcClientFactory.getDefaultInstance(hostname, port);
     }
     public void sendMessage(String data){
         Event event = EventBuilder.withBody(data,Charset.forName("UTF-8"));
         try {
            client.append(event);
         } 
         catch (EventDeliveryException e) {
            e.printStackTrace();
         }
     }
     public void cleanUp(){
        client.close();
     }    
}
 
我希望一天生成一个日志文件,一个日志文件把一天的数据都放进去,就是每次java端产生日志数据,都是以追加的形式放入HDFS路径下的,比如今天是20170710文件夹,里面只有一个文件,例如是test_123456789.txt文件,一整天产生的数据都只放入这个文件中,请问该如何实现? 
现在的测试代码发现运行一次java程序,linux端的flume就产生一个日志文件报错数据,这样不符合我的需求,刚接触Flume,好多理解不深入,请各位大神给个建议,非常感谢!

fish - Hadooper

赞同来自:

可以阅读org.apache.flume.sink.hdfs.HDFSEventSink来确认如何满足你所需的要求。

jane3von

赞同来自:

配置项少了一个配置导致每天产生一个文件不成功! a1.sinks = k1 a1.sources = r1 a1.channels = c1 a1.sources.r1.type = avro a1.sources.r1.channels = c1 a1.sources.r1.bind = master a1.sources.r1.port = 4141 a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 10000 a1.channels.c1.byteCapacityBufferPercentage = 20 a1.channels.c1.byteCapacity = 800000 a1.sinks.k1.type = hdfs a1.sinks.k1.channel = c1 a1.sinks.k1.hdfs.path =/home/hadoop/flume/%y%m%d a1.sinks.k1.hdfs.filePrefix = test_ a1.sinks.k1.hdfs.fileSuffix=.log #a1.sinks.k1.hdfs.round = true #a1.sinks.k1.hdfs.roundValue = 10 #a1.sinks.k1.hdfs.roundUnit = second a1.sinks.k1.hdfs.useLocalTimeStamp = true a1.sinks.k1.hdfs.fileType=DataStream a1.sinks.k1.hdfs.writeFormat=Text a1.sinks.k1.hdfs.rollInterval=0 a1.sinks.k1.hdfs.rollSize=10240000 a1.sinks.k1.hdfs.rollCount=0 a1.sinks.k1.hdfs.idleTimeout=0 a1.sinks.k1.hdfs.batchSize=100 a1.sinks.k1.hdfs.minBlockReplicas=1

fish - Hadooper

赞同来自:

a1.sinks.k1.hdfs.minBlockReplicas只是配置HDFS副本数量,跟存储的文件数量没有关系。

fish - Hadooper

赞同来自:

HDFSEventSink不能完全实现你的需求,尝试修改代码。

要回复问题请先登录注册