flume-ng 写入hdfs上后出现一个文件被分割了很多个小文件在hdfs上


agent.sources = s1
agent.channels = c1
agent.sinks = k1

# For each one of the sources, the type is defined
agent.sources.s1.type = spooldir

# The channel can be defined as follows.
# agent.sources.s1.channels = c1

agent.sources.s1.spoolDir=/home/holden/flume_logDir/logfile
agent.sources.s1.channels=c1
agent.sources.s1.fileHeader = false


agent.channels.c1.type=file
agent.channels.c1.checkpointDir=/home/holden/flume/data/logbak.backup
agent.channels.c1.dataDirs=/home/holden/flume/data/logbak


agent.sinks.k1.type=hdfs
agent.sinks.k1.hdfs.path=hdfs://master:8020/flume/data
agent.sinks.k1.hdfs.fileType=DataStream
agent.sinks.k1.hdfs.writeFormat=TEXT
agent.sinks.K1.hdfs.filePrefix=%Y-%m-%d
agent.sinks.k1.hdfs.rollInterval=1
agent.sinks.k1.channel=c1
flume conf如上,附近是hdfs数据,这里我只上传了一个xx.txt文件,大小:8308字节
我想知道为何被分成了这么多小文件并且这些小文件是flume配置文件设置问题还是,能不能告诉我控制文件大小设置的参数
 
 
QQ截图20151124152709.png

fish - Hadooper

赞同来自: holdenLi

不是,block size只影响hdfs文件存储的分块,用户从文件名看不出block。   hdfs sink中跟产生文件数量相关的配置hdfs.rollSize、hdfs.rollInterval。 你正好配置了hdfs.rollInterval等于1,每秒滚动一个新文件,文件数当然多啦。

奔跑的大象

赞同来自: fish

10M大小滚动一个文件,具体10240地方自己改
 tier1.sinks.sink1.type = hdfs
 tier1.sinks.sink1.hdfs.path = /data/flume/%{topic}/%Y-%m-%d
 tier1.sinks.sink1.hdfs.filePrefix = upusers
 tier1.sinks.sink1.hdfs.rollInterval = 0
 tier1.sinks.sink1.hdfs.rollSize = 10240
 tier1.sinks.sink1.hdfs.rollCount = 0
 tier1.sinks.sink1.hdfs.idleTimeout=0
 tier1.sinks.sink1.hdfs.fileType = DataStream
 tier1.sinks.sink1.channel = channel1

holdenLi

赞同来自:

我这个问题是不是由于hdfs的block size大小的原因导致的

holdenLi

赞同来自:

谢谢,老师,那么在正常的生产环境中应该怎么配置  

fish - Hadooper

赞同来自:

根据需求。 一般会配rollSize,根据实际数据产生的数量、磁盘空间的大小、处理文件的逻辑需求决定。

holdenLi

赞同来自:

例如:rollSize=1024 这个是不是说采集的文件在没有达到1024就不会上传到hdfs上呀,并且一般我们的怎么设置滚动文件时间,rolllnteravl这个一般设置多少比较合适

fish - Hadooper

赞同来自:

不是,rollSize=1024表示文件大小达到1024之后,滚动产生一个新文件。   一般可以把滚动策略设置得简单些,比如文件到达多大,滚动产生一个新的文件,或者,间隔时间多长,产生一个新的文件。策略不一定要同时打开。   建议看看flume中BucketWriter.java的源代码。
// if time-based rolling is enabled, schedule the roll
    if (rollInterval > 0) {
      Callable<Void> action = new Callable<Void>() {
        public Void call() throws Exception {
          LOG.debug("Rolling file ({}): Roll scheduled after {} sec elapsed.",
              bucketPath, rollInterval);
          try {
            // Roll the file and remove reference from sfWriters map.
            close(true);
          } catch(Throwable t) {
            LOG.error("Unexpected error", t);
          }
          return null;
        }
      };
      timedRollFuture = timedRollerPool.schedule(action, rollInterval,
          TimeUnit.SECONDS);
    }
 
    if ((rollCount > 0) && (rollCount <= eventCounter)) {
      LOG.debug("rolling: rollCount: {}, events: {}", rollCount, eventCounter);
      doRotate = true;
    }

    if ((rollSize > 0) && (rollSize <= processSize)) {
      LOG.debug("rolling: rollSize: {}, bytes: {}", rollSize, processSize);
      doRotate = true;
    }

要回复问题请先登录注册