flume 实时采集数据疑惑 taildir?

flume1.7.0中新增了taildir source,可以实时监控目录下文件每行数据的变化(行级),而spooldir source只能监控目录下新增文件的变化(文件级),那么flume做实时采集,直接用taildir source,不是更好?
董老师课程中讲flume实时采集数据举的例  还是exec soure .
老师,能给我们举个例子吗?比如flume做数据采集时,同时要做离线和实时的分析:
同时需要把一份数据拉到hdfs sink (离线分析)和 kafka sink(实时分析)
像下面这样设计有问题吗?(同一个source,2个channel,2个sink k1,往
hdfs sink中放,一个sink k2,往kafka sink中放,沒有用selector,是不是必須要用selector?)
# define agent
a1.sources = r1
a1.channels = c1 c2
a1.sinks = k1 k2
# define the source
a1.sources.r1.type = TAILDIR
a1.sources.r1.channels = c1
a1.sources.r1.positionFile = /app/my.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /app/openresty/nginx/logs/access.log
a1.sources.r1.headers.f1.headerKey1 = my
a1.sources.r1.fileHeader = true
#define the file channel c1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /app/flume/checkpoint/my/
a1.channels.c1.dataDirs = /app/flume/data/my/
a1.channels.c1.capacity = 200000000
a1.channels.c1.transactionCapacity = 6000
a1.channels.c1.checkpointInterval = 60000
#define the memory channel c2
a1.channels.c2.type = memory 
a1.channels.c2.keep-alive=300
a1.channels.c2.byteCapacity=0
a1.channels.c2.capacity = 200000
a1.channels.c2.transactionCapacity = 1000
# define the hdfs sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hdfscluster/apps/flume/gslb/%Y-%m-%d/%k/
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.encoding = UTF-8
a1.sinks.k1.hdfs.minBlockReplicas = 1
a1.sinks.k1.hdfs.filePrefix=a1_%Y%m%d_%H
a1.sinks.k1.hdfs.inUsePrefix = .
a1.sinks.k1.hdfs.inUseSuffix = .temp
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.rollInterval = 3600
a1.sinks.k1.hdfs.rollSize = 0
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.callTimeout = 60000
a1.sinks.k1.hdfs.idleTimeout = 600
## define the kafka sink
a1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k2.metadata.broker.list=10.0.15.203:9092,10.0.15.204:9092,10.0.15.206:9092,10.0.15.207:9092,10.0.15.208:9092
a1.sinks.k2.serializer.class=kafka.serializer.StringEncoder
a1.sinks.k2.key.serializer.class=kafka.serializer.StringEncoder
a1.sinks.k2.request.required.acks=0
a1.sinks.k2.max.message.size=8000000
a1.sinks.k2.producer.type=sync
a1.sinks.k2.custom.encoding=UTF-8
a1.sinks.k2.custom.topic.name=mytopic
# source link channel,and sink link channel
a1.sources.s1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1

fish - Hadooper

赞同来自: jhg22

如果需要感知文件的修改,且希望比tail命令更稳定的方案,可以用taildir。   配置中的channel1设置为file,这是基于什么样的考虑?是对稳定性要求更高?如何设计基于场景而定,这里只有一级flume agent,实际场景中,在日志产生的机器和存储的机器之间,可以放一层屏蔽的agent解耦发送和消费端。   如果一个source的数据往两个不同channel以拷贝方式发送,需要设置selector为replicating。

jhg22

赞同来自:

谢谢你,让我受益匪浅。配置file channel,是基于不丢数据。

要回复问题请先登录注册