Spark Streaming, Kafka receiver, "Failed to get records for ... after polling for 512"

环境:spark2.1.0,kafka0.10,spark使用yarn-cluster模式运行,引用
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
    <version>2.1.0</version>
    <scope>provided</scope>
</dependency>
作为spark与kafka的依赖包,代码中使用DirectStream建立kafka流,程序尝试过每间隔(1s,10s)从kafka中取消息处理,处理完毕(每一batch)将offsets信息存在zk中(这块自己实现)。 运行程序的时候连续出现如下异常信息:  
[ WARN ]  Lost task 0.0 in stage 8528.0 (TID 775294, hdyst02sh): java.lang.AssertionError: assertion failed: Failed to get records for spark-executor-LC10_L_CAPPAP_LogFile-txt_FLM_sz LC10_L_CAPPAP_LogFile-txt_FLM_sz 1 1691427 after polling for 512
        at scala.Predef$.assert(Predef.scala:170)
        at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74)
        at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
        at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
官方说这不是一个bug,https://issues.apache.org/jira/browse/SPARK-19275​
WX20170405-112820.png
  看了kafka的gc信息
WechatIMG37.jpeg
其中一台broker其余类似。   尝试使用参数--conf spark.streaming.kafka.consumer.poll.ms=4096来解决,参数生效,但出现"Failed to get records for ... after polling for 4096"   问题: 造成这个问题产生的根本原因是?  当发生这个问题时数据是丢失了吗?还是会尝试在拉一次? 有什么解决办法(生产上现在一直出这个问题,急T T)

Alfred - 挖数据的

赞同来自:

题主给的链接错了,网页链接多个,号. 附上正确的Issue地址  https://issues.apache.org/jira/browse/SPARK-19275   从网络方面考虑的话: Issue里提到了三个变量 
  1. default Kafka heartbeat session timeout (30 seconds)
  2. session.timeout.ms
  3. request.timeout.ms
个人理解, 是由于batch的timeout超出Kafka的心跳监听时长. 猜测可以考虑将这三个设置成相同间隔或接近值.   从Offset考虑: Instability issues with Spark 2.0.1 and Kafka 0.10 如果有多个consumer的话,提交offset的时候不知道有没有考虑异步更新offset导致offset不一致的问题.  上文链接里提到需要用consumer.commitSync(newOffsets.asJava)进行提交.   个人理解,希望对题主有帮助.      

liying_sn

赞同来自:

new SparkConf().setAppName("XX").setMaster("local")
  .set("spark.streaming.kafka.consumer.poll.ms", "60000");亲试有效

要回复问题请先登录注册