运行Kafka hadoop_consumer后,没有任何消息输出

我运行bootcamp中的kafka hadoop consumer示例时,控制台没有任何输出
kafka consonle-producer控制台内容为:

producer.png

 
kafka hadoop-consumer控制台内容为:

consumer.png

 
请老师帮忙看看是什么原因?

fish - Hadooper

赞同来自: IT_Angel

确定dos.flush被调用了么? 试试hsync是否可以?

fish - Hadooper

赞同来自:

在consumer中多打些log。

IT_Angel

赞同来自:

经过调试,发现运行到 Path path = new Path(hdfsPath); 这行代码就不继续运行了,也没有报错啊,hdfs日志我也看了,也没有什么错误啊!

fish - Hadooper

赞同来自:

代码贴一下。

IT_Angel

赞同来自:

我的hdfs文件目录如下:
hdfs-dir.png

IT_Angel

赞同来自:

package cn.chinahadoop.kafka.hadoop_consumer;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;

public class TestHadoopConsumer {
    private final ConsumerConnector consumer;
    private final String topic;
    private  ExecutorService executor;
    private FileSystem hdfs;
    private String hdfsPath;
    
    public TestHadoopConsumer(String hdfsPath, String a_zookeeper, String a_groupId, String a_topic) {
        consumer = Consumer.createJavaConsumerConnector(
                createConsumerConfig(a_zookeeper, a_groupId));
        this.topic = a_topic;
        this.hdfsPath = hdfsPath;
        try {
			hdfs = FileSystem.get(new HdfsConfiguration());
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		
    }

    public void shutdown() {
        if (consumer != null) consumer.shutdown();
        if (executor != null) executor.shutdown();
    }

    public void run(int a_numThreads) {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(a_numThreads));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

        // 启动所有线程
        executor = Executors.newFixedThreadPool(a_numThreads);

        // 开始消费消息
        int threadNumber = 0;
        for (final KafkaStream stream : streams) {
            executor.submit(new SubTaskConsumer(hdfsPath,stream,threadNumber,hdfs));
            threadNumber++;
        }
    }

    private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
        Properties props = new Properties();
        props.put("zookeeper.connect", a_zookeeper);
        props.put("group.id", a_groupId);
        props.put("zookeeper.session.timeout.ms", "60000");
        props.put("zookeeper.sync.time.ms", "2000");
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.offset.reset","smallest");
        return new ConsumerConfig(props);
    }

    public static void main(String[] args){
    	
    	if(args.length == 0){
    		ArgumentsLoader.printOptionDescription();
    		return;
    	}
    	
    	ArgumentsLoader argsLoader = new ArgumentsLoader(args);
    	if(!argsLoader.valid()){
    		System.out.println(argsLoader.getValidMessage());
    		return;
    	}
		
    	Map<String,String> paramMap = argsLoader.getParamMaps();
    	
    	//zookeeper 地址,多个可用“,”相隔,如 192.168.0.20,192.168.1.21
    	String hdfsPath = paramMap.get("--path");
        String zooKeeper = paramMap.get("--zookeeper");
        String topic = paramMap.get("--topic");
        String groupId = paramMap.get("--group");
        
        System.out.println("hdfsPath="+paramMap.get("--path")+",zookeeper="+paramMap.get("--zookeeper")
        		+",topic="+paramMap.get("--topic")+",groupId="+paramMap.get("--group"));
        
        int threads = Integer.parseInt("1");
        TestHadoopConsumer example = new TestHadoopConsumer(hdfsPath,zooKeeper, groupId, topic);
        example.run(threads);
    }
}

IT_Angel

赞同来自:

package cn.chinahadoop.kafka.hadoop_consumer;

import java.io.IOException;
import java.util.Timer;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;

public class SubTaskConsumer implements Runnable {
    private KafkaStream m_stream;
    private int m_threadNumber;
    private FileSystem hdfs;
    private String hdfsPath;
    private Timer timer = new Timer(true);
    
    public SubTaskConsumer(){
    	
    }
    
    public SubTaskConsumer(String hdfsPath, KafkaStream a_stream, int a_threadNumber,FileSystem fs) {
    	hdfsPath = hdfsPath;
        m_threadNumber = a_threadNumber;
        m_stream = a_stream;
        hdfs = fs;
        System.out.println("Consumer task is running..");
    }

    public void run() {
    	Path path = new Path(hdfsPath);
		// writing
		try {
			FSDataOutputStream dos = hdfs.create(path);
			//ConsumerTimerTask task = new ConsumerTimerTask(dos);
			//timer.schedule(task,0,5000);
			ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
	        while (it.hasNext()){
	        	byte[] by = it.next().message();
	        	dos.write(by);
	            System.out.println("Thread " + m_threadNumber + ": " + new String(by) +"-id:"+Thread.currentThread().getId());
	        }
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}   
    }
}

IT_Angel

赞同来自:

package cn.chinahadoop.kafka.hadoop_consumer;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

/**
 * 项目名称: test_kafka
 * 类名称: ConfigArgumentsLoader
 * 类描述: 
 * 创建人: 徐福明
 * 创建时间: 2016年10月26日
 * 邮箱:fallangelxfm@aliyun.com
 */
public class ArgumentsLoader {
	
	private Map<String,String> map = new HashMap<String,String>();
	private Set<String> paramKeys = new HashSet<String>();
	private String message = null;
	
	public ArgumentsLoader(String[] args){
		
		initParamKeys();
		
		int index = 0;
		while(index < args.length){
			int keyIndex = index;
			int valueIndex = index + 1;
			if(valueIndex < args.length){
				map.put(args[keyIndex], args[valueIndex]);
			}else{
				map.put(args[keyIndex], null);
			}
			index+=2;
		}
	}
	
	public Map<String,String> getParamMaps(){
		return map;
	}
	
	public boolean valid(){
		for(String key:paramKeys){
			if(!map.containsKey(key)){
				message = "Missing required argument \"["+key+"]\"";
				return false;
			}
		}
		for(String key:paramKeys){
			if(map.get(key) == null){
				message = "Option ['"+key+"'] requires an argument";
				return false;
			}
		}
		return true;
	}
	
	public static void printOptionDescription(){
		System.out.println("Option \t\t\t\t Descirption");
		System.out.println("------ \t\t\t\t -----------");
		System.out.println("--group <group name> \t\t the name of kafka consumer group");
		System.out.println("--path <path> \t\t\t the path of file in the hdfs");
		System.out.println("--topic <topic name> \t\t the topic name of message in kakfa");
		System.out.println("--zookeeper <urls> \t\t the connection string for the zookeeper,format<host:ip>");
	}
	
	public String getValidMessage(){
		return message;
	}
	
	private void initParamKeys(){
		paramKeys.add("--zookeeper");
		paramKeys.add("--path");
		paramKeys.add("--group");
		paramKeys.add("--topic");
	}
}

fish - Hadooper

赞同来自:

SubTaskConsumer中,dos.write之后,如果没有调用flush或者close,内容不会立刻刷写到hdfs中。 每写一条记录就立刻flush,又会影响性能。   请想个办法(比如另起一个线程做flush),把buffer中的内容刷到hdfs中。

IT_Angel

赞同来自:

我知道的,问题是程序还没有走到while循环中啊,一条消息也没从broker中拿到啊!

fish - Hadooper

赞同来自:

jstack看看应用挺在哪里了?或者在new Path调用前后打log,确认你的应用是否真是停在new Path调用了?

fish - Hadooper

赞同来自:

那就看看jstack返回什么?

IT_Angel

赞同来自:

如何查看啊? 运行之后,没发现有多余的进程啊! [root@mycluster-1 hadoop_consumer]# jps 27097 NameNode 27232 DataNode 15568 TestHadoopConsumer 27439 QuorumPeerMain 15757 Jps  

fish - Hadooper

赞同来自:

jstack 15568 

IT_Angel

赞同来自:

jstack-1.png
 

IT_Angel

赞同来自:

2016-10-27 17:56:09 Full thread dump Java HotSpot(TM) 64-Bit Server VM (24.45-b08 mixed mode): "Attach Listener" daemon prio=10 tid=0x00007f355c036800 nid=0x3df7 runnable [0x0000000000000000]    java.lang.Thread.State: RUNNABLE "ConsumerFetcherThread-test_mycluster-1-1477562140534-9a424857-0-0" prio=10 tid=0x00007f354c03a000 nid=0x3dd2 runnable [0x00007f3574dfb000]    java.lang.Thread.State: RUNNABLE     at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)     at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)     at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)     at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87)     - locked <0x00000000ddfe0150> (a sun.nio.ch.Util$2)     - locked <0x00000000ddfe0160> (a java.util.Collections$UnmodifiableSet)     - locked <0x00000000ddfe0108> (a sun.nio.ch.EPollSelectorImpl)     at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98)     at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:221)     - locked <0x00000000ddfe0170> (a java.lang.Object)     at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)     - locked <0x00000000ddfe0180> (a sun.nio.ch.SocketAdaptor$SocketInputStream)     at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)     - locked <0x00000000ddfe0378> (a java.lang.Object)     at kafka.utils.Utils$.read(Utils.scala:380)     at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)     at kafka.network.Receive$class.readCompletely(Transmission.scala:56)     at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)     at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)     at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:71)     at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)     - locked <0x00000000ddfe04f8> (a java.lang.Object)     at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)     at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)     at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)     at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)     at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)     at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)     at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)     at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)     at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)     at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:94)     at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)     at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) "DestroyJavaVM" prio=10 tid=0x00007f3578008800 nid=0x3dba waiting on condition [0x0000000000000000]    java.lang.Thread.State: RUNNABLE "pool-5-thread-1" prio=10 tid=0x00007f3578662000 nid=0x3dd1 waiting on condition [0x00007f3574efd000]    java.lang.Thread.State: WAITING (parking)     at sun.misc.Unsafe.park(Native Method)     - parking to wait for  <0x00000000ddfe06b0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)     at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)     at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)     at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068)     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)     at java.lang.Thread.run(Thread.java:744) "test_mycluster-1-1477562140534-9a424857-leader-finder-thread" prio=10 tid=0x00007f357865e800 nid=0x3dcf waiting on condition [0x00007f357c122000]    java.lang.Thread.State: WAITING (parking)     at sun.misc.Unsafe.park(Native Method)     - parking to wait for  <0x00000000e79f6cb0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)     at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)     at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:61)     at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) "test_mycluster-1-1477562140534-9a424857_watcher_executor" prio=10 tid=0x00007f357860c800 nid=0x3dce waiting on condition [0x00007f357c424000]    java.lang.Thread.State: TIMED_WAITING (parking)     at sun.misc.Unsafe.park(Native Method)     - parking to wait for  <0x00000000e7f8c018> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)     at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)     at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2176)     at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:544) "process reaper" daemon prio=10 tid=0x00007f35783a4800 nid=0x3dcc waiting on condition [0x00007f357c45d000]    java.lang.Thread.State: TIMED_WAITING (parking)     at sun.misc.Unsafe.park(Native Method)     - parking to wait for  <0x00000000e7bc23a8> (a java.util.concurrent.SynchronousQueue$TransferStack)     at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)     at java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:460)     at java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:359)     at java.util.concurrent.SynchronousQueue.poll(SynchronousQueue.java:942)     at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068)     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)     at java.lang.Thread.run(Thread.java:744) "kafka-consumer-scheduler-0" daemon prio=10 tid=0x00007f35782ba000 nid=0x3dca waiting on condition [0x00007f357c55e000]    java.lang.Thread.State: TIMED_WAITING (parking)     at sun.misc.Unsafe.park(Native Method)     - parking to wait for  <0x00000000e79936d0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)     at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)     at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)     at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1090)     at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:807)     at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068)     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)     at java.lang.Thread.run(Thread.java:744) "main-EventThread" daemon prio=10 tid=0x00007f3578299800 nid=0x3dc9 waiting on condition [0x00007f357c65f000]    java.lang.Thread.State: WAITING (parking)     at sun.misc.Unsafe.park(Native Method)     - parking to wait for  <0x00000000e7993858> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)     at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)     at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)     at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:494) "main-SendThread(mycluster-1:2181)" daemon prio=10 tid=0x00007f3578297800 nid=0x3dc8 runnable [0x00007f357c760000]    java.lang.Thread.State: RUNNABLE     at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)     at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)     at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)     at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87)     - locked <0x00000000e79939f8> (a sun.nio.ch.Util$2)     - locked <0x00000000e7993a08> (a java.util.Collections$UnmodifiableSet)     - locked <0x00000000e79939b0> (a sun.nio.ch.EPollSelectorImpl)     at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98)     at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:349)     at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081) "ZkClient-EventThread-12-mycluster-1:2181" daemon prio=10 tid=0x00007f357828c000 nid=0x3dc7 waiting on condition [0x00007f357c861000]    java.lang.Thread.State: WAITING (parking)     at sun.misc.Unsafe.park(Native Method)     - parking to wait for  <0x00000000e7993b88> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)     at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)     at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)     at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:67) "metrics-meter-tick-thread-2" daemon prio=10 tid=0x00007f3578278000 nid=0x3dc6 waiting on condition [0x00007f357c997000]    java.lang.Thread.State: WAITING (parking)     at sun.misc.Unsafe.park(Native Method)     - parking to wait for  <0x00000000e77dba18> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)     at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)     at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1085)     at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:807)     at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068)     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)     at java.lang.Thread.run(Thread.java:744) "metrics-meter-tick-thread-1" daemon prio=10 tid=0x00007f3578273800 nid=0x3dc5 waiting on condition [0x00007f357ca98000]    java.lang.Thread.State: TIMED_WAITING (parking)     at sun.misc.Unsafe.park(Native Method)     - parking to wait for  <0x00000000e77dba18> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)     at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)     at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)     at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1090)     at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:807)     at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068)     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)     at java.lang.Thread.run(Thread.java:744) "Service Thread" daemon prio=10 tid=0x00007f357808b000 nid=0x3dc1 runnable [0x0000000000000000]    java.lang.Thread.State: RUNNABLE "C2 CompilerThread1" daemon prio=10 tid=0x00007f3578088000 nid=0x3dc0 waiting on condition [0x0000000000000000]    java.lang.Thread.State: RUNNABLE "C2 CompilerThread0" daemon prio=10 tid=0x00007f3578086000 nid=0x3dbf waiting on condition [0x0000000000000000]    java.lang.Thread.State: RUNNABLE "Signal Dispatcher" daemon prio=10 tid=0x00007f357807b800 nid=0x3dbe runnable [0x0000000000000000]    java.lang.Thread.State: RUNNABLE "Finalizer" daemon prio=10 tid=0x00007f3578064800 nid=0x3dbd in Object.wait() [0x00007f357dced000]    java.lang.Thread.State: WAITING (on object monitor)     at java.lang.Object.wait(Native Method)     - waiting on <0x00000000e7623920> (a java.lang.ref.ReferenceQueue$Lock)     at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:135)     - locked <0x00000000e7623920> (a java.lang.ref.ReferenceQueue$Lock)     at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:151)     at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:189) "Reference Handler" daemon prio=10 tid=0x00007f3578060800 nid=0x3dbc in Object.wait() [0x00007f357ddee000]    java.lang.Thread.State: WAITING (on object monitor)     at java.lang.Object.wait(Native Method)     - waiting on <0x00000000e76239b8> (a java.lang.ref.Reference$Lock)     at java.lang.Object.wait(Object.java:503)     at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:133)     - locked <0x00000000e76239b8> (a java.lang.ref.Reference$Lock) "VM Thread" prio=10 tid=0x00007f357805e000 nid=0x3dbb runnable  "VM Periodic Task Thread" prio=10 tid=0x00007f3578096000 nid=0x3dc2 waiting on condition  JNI global references: 127

IT_Angel

赞同来自:

老师,之前的那个问题已经解决了,是我hdfs路径接收的有问题!但是现在又有一个新问题:我在消费kafka的topic时,写了一个Timer定时器,每五秒调用dos.flush()方法提交一次,但是进程在运行过程中,消息并未写入到hdfs文件系统中,当我结束consumer进程时,才写人hdfs文件,请问这是什么原因呢?

IT_Angel

赞同来自:

恩 确定调用了

要回复问题请先登录注册