flume agent 收集SogouQ.reduced数据的时候报错

配置如下:

1.在SYH-2 conf/flume_sogoulog.conf

a1.sources = r1
a1.channels = c1
a1.sinks = k1

#配置source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir=/root/data/sogoulog
a1.sources.r1.channels = c1

#配置channels
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#配置sinks
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = SYH-1
a1.sinks.k1.port = 5554
a1.sinks.k1.channel = c1

2.在SYH-1创建flume_sogouhdfs配置文件,conf/flume_sogouhdfs.conf

a1.sources = r1
a1.channels = c1
a1.sinks = k1


# 配置sources
a1.sources.r1.type = avro
a1.sources.r1.bind = SYH-1
a1.sources.r1.port = 5554
a1.sources.r1.channels = c1

#配置channels
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#配置sinks
a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path = hdfs://SYH-1:8020/user/root/logs/%y-%m-%d/%H%M%S
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.writeFormat=TEXT
a1.sinks.k1.hdfs.round=true
a1.sinks.k1.hdfs.roundValue=5
a1.sinks.k1.hdfs.roundUnit=second
a1.sinks.k1.hdfs.useLocalTimeStamp=true
a1.sinks.k1.hdfs.filePrefix=events-
a1.sinks.k1.channel = c1

SYH-1启动flume_sogouhdfs

cd /usr/lib/flume-ng/

bin/flume-ng agent -c conf -f conf/flume_sogouhdfs.conf -n a1 -Dflume.root.logger=INFO,console

SYH-2启动flume_sogoulog

cd /usr/lib/flume-ng/

bin/flume-ng agent -c conf -f conf/flume_sogoulog.conf -n a1 -Dflume.root.logger=INFO,console

在SYH-2

cd /root/data/sogoulog

echo "111" > 111 没有任何问题 并且正常传入hdfs上

 

但把 SogouQ.reduced 文件放在 /root/data/sogoulog就会报错,是不是和字符编码有关,如何解决

2015-07-27 21:08:23,783 (pool-4-thread-1) [ERROR - org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:256)] FATAL: Spool Directory source r1: { spoolDir: /root/data/sogoulog }: Uncaught exception in SpoolDirectorySource thread. Restart or reconfigure Flume to continue processing.
java.nio.charset.MalformedInputException: Input length = 1
        at java.nio.charset.CoderResult.throwException(CoderResult.java:277)
        at org.apache.flume.serialization.ResettableFileInputStream.readChar(ResettableFileInputStream.java:195)
        at org.apache.flume.serialization.LineDeserializer.readLine(LineDeserializer.java:133)
        at org.apache.flume.serialization.LineDeserializer.readEvent(LineDeserializer.java:71)
        at org.apache.flume.serialization.LineDeserializer.readEvents(LineDeserializer.java:90)
        at org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:252)
        at org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:228)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:744)

 

zp0824 - 好好学习,天天向上

赞同来自:

这是两级的flume agent,将流程直接减少成只有一个agent,简化架构看是否问题依旧?

zp0824 - 好好学习,天天向上

赞同来自:

和字符编码有关,就需要自己实现source对特殊编码的文件进行解析。

zp0824 - 好好学习,天天向上

赞同来自:

或者,阅读一下flume的代码,找到可以解析GBK编码输入的配置并作相应配置。(此时读入的数据为GBK编码方式)

夕阳丶一抹红颜 - 一句话介绍

赞同来自:

这个问题我刚刚碰到了。 是这个配置的问题 a1.sources.r1.type = spooldir a1.sources.r1.spoolDir=/root/data/sogoulog a1.sources.r1.channels = c1 加一项: a1.sources.r1.inputCharset=GB18030 官网文档如下:http://flume.apache.org/FlumeU ... ource inputCharset default value is UTF-8

夕阳丶一抹红颜 - 一句话介绍

赞同来自:

我10分钟前刚试过,好用

要回复问题请先登录注册