kafka多partition消费问题

flume配置文件如下:
 a1.sources  = source1
a1.channels = channel1
a1.sinks = sink1

a1.sources.source1.type = spooldir
a1.sources.source1.spoolDir = /data/practise/sogouquery/data/generated
a1.sources.source1.fileHeader = true
a1.sources.source1.basenameHeader = true
a1.sources.source1.basenameHeaderKey = key
a1.sources.source1.inputCharset = GBK
a1.sources.source1.trackerDir = /data/practise/sogouquery/data
a1.sources.source1.ignorePattern = ^(.)*\\.tmp$
a1.sources.source1.channels = channel1

a1.channels.channel1.type = memory
a1.channels.channel1.capacity = 10000
a1.channels.channel1.transactionCapacity = 1000

a1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.sink1.topic = sogotest
a1.sinks.sink1.groupId = sogo_group
a1.sinks.sink1.brokerList = data-1:9092,data-2:9092,data-3:9092,data-4:9092
a1.sinks.sink1.channel = channel1
a1.sinks.sink1.batchSize = 100

# a1.sinks.sink1.type = logger

a1.sources.source1.channels = channel1
a1.sinks.sink1.channel = channel1
/data/practise/sogouquery/data目录下有两个文件,每个文件只有500行(SogouQ.reduced中提取出来的),
[hadoop@data-1 generated]$ wc -l caa cab
  500 caa
  500 cab
 1000 total

topic sogotest是2个partition
kafka写入hdfs代码如下:样例简单修改了下
    public void run() {
Path path = new Path("/kafka/test.txt");
String msg = "";
String dateStr = "";
try {
FSDataOutputStream dos = hdfs.create(path);
ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
int i = 0;
while (it.hasNext()){
byte by[] = (byte[])it.next().message();
msg = new String(by, "UTF-8");
dateStr = CommonUtil.formatDateToString(new Date());
String a[] = msg.split("\t");
msg = (new StringBuilder(String.valueOf(dateStr))).append("\t").append(a[1]).append("\t").append(a[2]).append("\t").append(a[3]).append("\t").append(a[4]).append("\t").append(a[5]).append("\n").toString();
dos.write(msg.getBytes("UTF-8"));
System.out.println("i="+i+", "+msg);
dos.hflush();
i ++;
}
System.out.println("Shutting down Thread: " + m_threadNumber);

dos.close();
hdfs.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
现在问题是,只能消费其中一个partition中的数据,/kafka/test.txt文件中只有500行,对比后发现只是/data/practise/sogouquery/data/generated目录中的其中一个文件内容,剩下的不能消费,不知道原因
使用miniway的kafka-hadoop-consumer方式也是只能消费其中一个文件,不知道哪里出问题了
 

jirimutu

赞同来自:

1、首先看看两个文件是不是都加载到对应的topic里面。只要你在 a1.sources.source1.spoolDir = /data/practise/sogouquery/data/generated目录下面看到两个文件的后缀名都是COMPLETED,那就说明两个文件已经加载好了(注意:文件名不能重复,就是如果你之前已经收集过caa文件,那么不能再收集同名的文件,除非你删除caa.COMPLETED文件)。 2、其次看看topic日志文件,是不是两个partition中都有数据

fish - Hadooper

赞同来自:

只看到500条数据,确定是因为只收到了一个partition的数据么?如何确定原因的? 是否可以先将数据量降低些,方便debug。

fish - Hadooper

赞同来自:

这里提问可以找人回答的,选择问题上的“邀请”可以直接出发邀请邮件让别人上来看。

shoushantou

赞同来自:

老铁,完成代码有吗? 我也在写这个代码,直接消费topic,写了之后文件创建了,文件为空

要回复问题请先登录注册