Flume 如何一天的数据追加在一个XXX.txt文件中
**flume配置为:**
a1.sinks = k1
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = master
a1.sources.r1.port = 4141
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path =/home/hadoop/flume/%y%m%d
a1.sinks.k1.hdfs.filePrefix = test_
a1.sinks.k1.hdfs.fileSuffix=.log
#a1.sinks.k1.hdfs.round = true
#a1.sinks.k1.hdfs.roundValue = 10
#a1.sinks.k1.hdfs.roundUnit = second
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.writeFormat=Text
a1.sinks.k1.hdfs.rollInterval=0
a1.sinks.k1.hdfs.rollSize=10240000
a1.sinks.k1.hdfs.rollCount=0
a1.sinks.k1.hdfs.idleTimeout=60
**java程序的测试代码为:**
public class FlumeDemo {
private String hostname;
private int port;
private RpcClient client;
public static void main(String[] args) {
FlumeDemo rpcClient = new FlumeDemo("master", 4141);
String data = "this is my test data!这是我的测试日志数据!";
for(int i=0;i<10;i++){
rpcClient.sendMessage(data);
}
rpcClient.cleanUp();
}
public FlumeDemo(String hostname,int port) {
this.hostname = hostname;
this.port = port;
this.client = RpcClientFactory.getDefaultInstance(hostname, port);
}
public void sendMessage(String data){
Event event = EventBuilder.withBody(data,Charset.forName("UTF-8"));
try {
client.append(event);
}
catch (EventDeliveryException e) {
e.printStackTrace();
}
}
public void cleanUp(){
client.close();
}
}
我希望一天生成一个日志文件,一个日志文件把一天的数据都放进去,就是每次java端产生日志数据,都是以追加的形式放入HDFS路径下的,比如今天是20170710文件夹,里面只有一个文件,例如是test_123456789.txt文件,一整天产生的数据都只放入这个文件中,请问该如何实现?
现在的测试代码发现运行一次java程序,linux端的flume就产生一个日志文件报错数据,这样不符合我的需求,刚接触Flume,好多理解不深入,请各位大神给个建议,非常感谢!
a1.sinks = k1
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = master
a1.sources.r1.port = 4141
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path =/home/hadoop/flume/%y%m%d
a1.sinks.k1.hdfs.filePrefix = test_
a1.sinks.k1.hdfs.fileSuffix=.log
#a1.sinks.k1.hdfs.round = true
#a1.sinks.k1.hdfs.roundValue = 10
#a1.sinks.k1.hdfs.roundUnit = second
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.writeFormat=Text
a1.sinks.k1.hdfs.rollInterval=0
a1.sinks.k1.hdfs.rollSize=10240000
a1.sinks.k1.hdfs.rollCount=0
a1.sinks.k1.hdfs.idleTimeout=60
**java程序的测试代码为:**
public class FlumeDemo {
private String hostname;
private int port;
private RpcClient client;
public static void main(String[] args) {
FlumeDemo rpcClient = new FlumeDemo("master", 4141);
String data = "this is my test data!这是我的测试日志数据!";
for(int i=0;i<10;i++){
rpcClient.sendMessage(data);
}
rpcClient.cleanUp();
}
public FlumeDemo(String hostname,int port) {
this.hostname = hostname;
this.port = port;
this.client = RpcClientFactory.getDefaultInstance(hostname, port);
}
public void sendMessage(String data){
Event event = EventBuilder.withBody(data,Charset.forName("UTF-8"));
try {
client.append(event);
}
catch (EventDeliveryException e) {
e.printStackTrace();
}
}
public void cleanUp(){
client.close();
}
}
我希望一天生成一个日志文件,一个日志文件把一天的数据都放进去,就是每次java端产生日志数据,都是以追加的形式放入HDFS路径下的,比如今天是20170710文件夹,里面只有一个文件,例如是test_123456789.txt文件,一整天产生的数据都只放入这个文件中,请问该如何实现?
现在的测试代码发现运行一次java程序,linux端的flume就产生一个日志文件报错数据,这样不符合我的需求,刚接触Flume,好多理解不深入,请各位大神给个建议,非常感谢!
没有找到相关结果
已邀请:
4 个回复
fish - Hadooper
jane3von
a1.sinks = k1
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = master
a1.sources.r1.port = 4141
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path =/home/hadoop/flume/%y%m%d
a1.sinks.k1.hdfs.filePrefix = test_
a1.sinks.k1.hdfs.fileSuffix=.log
#a1.sinks.k1.hdfs.round = true
#a1.sinks.k1.hdfs.roundValue = 10
#a1.sinks.k1.hdfs.roundUnit = second
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.writeFormat=Text
a1.sinks.k1.hdfs.rollInterval=0
a1.sinks.k1.hdfs.rollSize=10240000
a1.sinks.k1.hdfs.rollCount=0
a1.sinks.k1.hdfs.idleTimeout=0
a1.sinks.k1.hdfs.batchSize=100
**a1.sinks.k1.hdfs.minBlockReplicas=1**
fish - Hadooper
fish - Hadooper