Flume1.8 启动报空指针报错

配置文件如下:
a1.sources=r1
a1.channels=c1 c2

# configure source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /opt/module/flume1.8/test/log_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /tmp/logs/app.+
a1.sources.r1.fileHeader = true
a1.sources.r1.channels = c1 c2

#interceptor
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = com.ruki.flume.interceptor.LogETLInterceptor$Builder
a1.sources.r1.interceptors.i2.type = com.ruki.flume.interceptor.LogTypeInterceptor$Builder
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = topic
a1.sources.r1.selector.mapping.topic_start = c1
a1.sources.r1.selector.mapping.topic_event = c2

# configure channel
a1.channels.c1.type =org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.channels.c1.kafka.topic = topic_start
a1.channels.c1.parseAsFlumeEvent = false
a1.channels.c1.kafka.consumer.group.id = flume-consumer
a1.channels.c2.type =org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c2.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.channels.c2.kafka.topic = topic_event
a1.channels.c2.parseAsFlumeEvent = false
a1.channels.c2.kafka.consumer.group.id = flume-consumer
 
报错信息如下
 
20/03/14 20:17:19 INFO utils.AppInfoParser: Kafka version : 0.9.0.1
20/03/14 20:17:19 INFO utils.AppInfoParser: Kafka commitId : 23c69d62a0cabf06
20/03/14 20:17:19 INFO utils.AppInfoParser: Kafka version : 0.9.0.1
20/03/14 20:17:19 INFO utils.AppInfoParser: Kafka commitId : 23c69d62a0cabf06
20/03/14 20:17:19 INFO kafka.KafkaChannel: Topic = topic_event
20/03/14 20:17:19 INFO kafka.KafkaChannel: Topic = topic_start
20/03/14 20:17:19 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c2 started
20/03/14 20:17:19 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started
20/03/14 20:17:19 INFO node.Application: Starting Source r1
20/03/14 20:17:19 INFO taildir.TaildirSource: r1 TaildirSource source starting with directory: {f1=/tmp/logs/.*log.*}
        ssl.truststore.location = null
        ssl.keystore.password = null
        ssl.keymanager.algorithm = SunX509
        metrics.sample.window.ms = 30000
        partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
        send.buffer.bytes = 131072
        linger.ms = 0

20/03/14 20:17:19 INFO utils.AppInfoParser: Kafka version : 0.9.0.1
20/03/14 20:17:19 INFO utils.AppInfoParser: Kafka commitId : 23c69d62a0cabf06
20/03/14 20:17:19 INFO utils.AppInfoParser: Kafka version : 0.9.0.1
20/03/14 20:17:19 INFO utils.AppInfoParser: Kafka commitId : 23c69d62a0cabf06
20/03/14 20:17:19 INFO kafka.KafkaChannel: Topic = topic_event
20/03/14 20:17:19 INFO kafka.KafkaChannel: Topic = topic_start
20/03/14 20:17:19 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c2: Successfully registered new MBean.
20/03/14 20:17:19 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c2 started
20/03/14 20:17:19 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
20/03/14 20:17:19 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started
20/03/14 20:17:19 INFO node.Application: Starting Source r1
20/03/14 20:17:19 INFO taildir.TaildirSource: r1 TaildirSource source starting with directory: {f1=/tmp/logs/.*log.*}
20/03/14 20:17:19 INFO taildir.ReliableTaildirEventReader: taildirCache: [{filegroup='f1', filePattern='/tmp/logs/.*log.*', cached=true}]
20/03/14 20:17:19 INFO taildir.ReliableTaildirEventReader: headerTable: {}
20/03/14 20:17:19 INFO taildir.ReliableTaildirEventReader: Opening file: /tmp/logs/app-2020-03-12.log, inode: 1569837, pos: 0
20/03/14 20:17:19 INFO taildir.ReliableTaildirEventReader: Updating position from position file: /opt/module/flume1.8/test/log_position.json
20/03/14 20:17:19 INFO taildir.TailFile: Updated position, file: /tmp/logs/app-2020-03-12.log, inode: 1569837, pos: 0
20/03/14 20:17:19 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
20/03/14 20:17:19 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
20/03/14 20:17:19 ERROR taildir.TaildirSource: Unable to tail files
java.lang.NullPointerException
        at org.apache.flume.channel.MultiplexingChannelSelector.getRequiredChannels(MultiplexingChannelSelector.java:56)
        at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:157)
        at org.apache.flume.source.taildir.TaildirSource.tailFileProcess(TaildirSource.java:263)
        at org.apache.flume.source.taildir.TaildirSource.process(TaildirSource.java:226)
        at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:133)
        at java.lang.Thread.run(Thread.java:748)
20/03/14 20:17:21 INFO taildir.ReliableTaildirEventReader: Last read was never committed - resetting position
 
 

文泽路小男孩OuO

赞同来自:

已解决,问题在于配置的拦截器有问题,导致所有event在拦截器处被拦截,flume在为event分配channel的时候,event为null,故抛出NPE

要回复问题请先登录注册