kafka集群中consumer报错

环境:2台主机:client和master
client上安装了kafka,master也安装了kafka
在master上配置了zookeeper-server
client和master的kafka的配置文件server.properties相同(已配置zookeeper.connect=Host0:2181),除了broker.id不同
client中broker.id=1,master中broker.id=0
client和master中都启动kafka:bin/kafka-server-start.sh config/server.properties
查看终端输出,client中日志正常,而master中问题,问题如下:
[2016-09-03 10:02:41,078] INFO Reconnect due to socket error: java.nio.channels.ClosedChannelException (kafka.consumer.SimpleConsumer)
[2016-09-03 10:02:42,078] WARN [ReplicaFetcherThread-0-1], Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 2100; ClientId: ReplicaFetcherThread-0-1; ReplicaId: 0; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: [1,0] -> PartitionFetchInfo(0,1048576),[1,2] -> PartitionFetchInfo(0,1048576),[replicated1,1] -> PartitionFetchInfo(0,1048576). Possible cause: java.nio.channels.ClosedChannelException (kafka.server.ReplicaFetcherThread)
master的终端中重复报上述错误,请问老师,这是什么原因?应该怎么修改
 
创建topic 1:bin/kafka-topics.sh --create --zookeeper Host0:2181 --replication-factor 2 --partitions 3 --topic 1
查看topic 1:bin/kafka-topics.sh --describe --zookeepr Host0:2181 --topic 1
Topic:1    PartitionCount:3    ReplicationFactor:2    Configs:
    Topic: 1    Partition: 0    Leader: 1    Replicas: 1,0    Isr: 1
    Topic: 1    Partition: 1    Leader: 0    Replicas: 0,1    Isr: 0,1
    Topic: 1    Partition: 2    Leader: 1    Replicas: 1,0    Isr: 1
 
我在client中,模拟生产者和消费者,消息传输正常
模拟生产者:[root@client kafka_2.10-0.8.2.0]# bin/kafka-console-producer.sh --broker-list client:9092 --topic 1
模拟消费者:bin/kafka-console-consumer.sh --zookeeper Host0:2181 --from-beginning --topic 1
消息传输正常。
 
但是,client作为生产者,master作为消费者时,master接收不到消息,并且master报错。错误信息如下:
client生产者:bin/kafka-console-producer.sh --broker-list client:9092 --topic 1
master作为消费者:bin/kafka-console-consumer.sh --zookeeper Host0:2181 --from-beginning --topic 1
 
[2016-09-03 10:03:28,723] WARN Fetching topic metadata with correlation id 76 for topics [Set(1)] from broker [id:1,host:client,port:9092] failed (kafka.client.ClientUtils$)
java.nio.channels.ClosedChannelException
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
    at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
    at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
    at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
    at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
[2016-09-03 10:03:30,731] WARN [console-consumer-17327_Host0-1472867580340-91c6332d-leader-finder-thread], Failed to add leader for partitions [1,0],[1,1],[1,2]; will retry (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)
java.nio.channels.ClosedChannelException
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
    at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
    at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
    at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:127)
    at kafka.consumer.SimpleConsumer.earliestOrLatestOffset(SimpleConsumer.scala:166)
    at kafka.consumer.ConsumerFetcherThread.handleOffsetOutOfRange(ConsumerFetcherThread.scala:60)
    at kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:177)
    at kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:172)
    at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
    at scala.collection.immutable.Map$Map2.foreach(Map.scala:130)
    at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
    at kafka.server.AbstractFetcherThread.addPartitions(AbstractFetcherThread.scala:172)
    at kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:87)
    at kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:77)
    at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
    at scala.collection.immutable.Map$Map2.foreach(Map.scala:130)
    at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
    at kafka.server.AbstractFetcherManager.addFetcherForPartitions(AbstractFetcherManager.scala:77)
    at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:95)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
[2016-09-03 10:03:31,943] WARN Fetching topic metadata with correlation id 77 for topics [Set(1)] from broker [id:1,host:client,port:9092] failed (kafka.client.ClientUtils$)
java.nio.channels.ClosedChannelException
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
    at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
    at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
    at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
    at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
[2016-09-03 10:03:33,986] WARN [console-consumer-17327_Host0-1472867580340-91c6332d-leader-finder-thread], Failed to add leader for partitions [1,0],[1,1],[1,2]; will retry (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)
java.nio.channels.ClosedChannelException
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
    at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
    at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
    at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:127)
    at kafka.consumer.SimpleConsumer.earliestOrLatestOffset(SimpleConsumer.scala:166)
    at kafka.consumer.ConsumerFetcherThread.handleOffsetOutOfRange(ConsumerFetcherThread.scala:60)
    at kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:177)
    at kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:172)
    at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
    at scala.collection.immutable.Map$Map2.foreach(Map.scala:130)
    at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
    at kafka.server.AbstractFetcherThread.addPartitions(AbstractFetcherThread.scala:172)
    at kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:87)
    at kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:77)
    at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
    at scala.collection.immutable.Map$Map2.foreach(Map.scala:130)
    at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
    at kafka.server.AbstractFetcherManager.addFetcherForPartitions(AbstractFetcherManager.scala:77)
    at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:95)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
请问老师上述怎么解决?谢谢

wangxiaolei

赞同来自: 张伟

我测试了正常啊,Host0作为消费者
1473066077779.png
我登录上后,发现机器删没有Kafka的服务启动。 使用命令bin/kafka-server-start.sh config/server.properties & 切记一定要加上后台运行命令“&”, 若没加,则当前操作界面窗口关闭时该服务也会关闭的。 加上后,就算我退出了操作界面,你登录后可以看到Kafka服务正常运行。

wangxiaolei

赞同来自:

报错信息看连接是关闭的,jps -ml查看下目前kafka服务是不是正常运行。 建议启动服务的时候,请加上在后台运行的指令符号&。如: client和master中都启动kafka:bin/kafka-server-start.sh config/server.properties & 我能登录上你的环境吗  

要回复问题请先登录注册