Flume NG flume-hdfs-sink 源代码分析

SINK的分享[b]C1: HDFSEventSink[/b]
0. HDFSEventSink.configure() also needs to implement a Configurable interface for processing its own configuration settings.
0.1 从context中读取配置参数configure;
0.2 设置编码,
      codeC = getCodec(codecName);
      // TODO : set proper compression type
      compType = CompressionType.BLOCK;
0.2.1 getCodec()
(1) 通过 CompressionCodecFactory.getCodecClasses(conf); 获取所能兼容的编码类型codecs
(2) 通过codecMatches(cls, codecName)判断是否相等,以获取编码名codecName所对应的编码类;
(3) 获取codec = cls.newInstance(),
0.3 set writeFormat
if writeFormat = null,
then set format according to file type, if fileType= DataStreamType or CompStreamType, set 
1. HDFSEventSink.start()  method should initialize the sink and bring it to a state where it can forward the events to its next destination.
2. HDFSEventSink.process() method from sink interface is should do the core processing of extracting the event from channel and forwarding it.
3. HDFSEventSink.stop() method should do the necessary cleanup.

HDFSEventSink will call (2) HDFSFormatterFactory  
(2.1) HDFSWriterableFormatter
(2.2) HDFSTextFormatter
(3) HDFSWriterFactory
(3.1) HDFSSequenceFile
(3.2) HDFSDataStream
(3.3) HDFSCompressDataStream
(4) BucketWriter
(5) HDFSWriter

0 个评论