camus读取kafka信息报错

我的kafka版本是0.82,发现:camus的pom.xml文件里边使用的是0.81,0.82版本的kafka编译不过去。

15/07/30 16:02:37 INFO kafka.CamusJob: Physical memory (bytes) snapshot:        3106086912
15/07/30 16:02:37 INFO kafka.CamusJob: Virtual memory (bytes) snapshot: 16543186944
15/07/30 16:02:37 INFO kafka.CamusJob: Total committed heap usage (bytes):      683745280
15/07/30 16:02:37 INFO compress.CodecPool: Got brand-new decompressor [.deflate]
15/07/30 16:02:37 ERROR kafka.CamusJob: Errors encountered during job run:
15/07/30 16:02:37 ERROR kafka.CamusJob: Errors from file [hdfs://node-1:8020/user/kafka/exec/2015-07-30-16-00-12/errors-m-00000]
15/07/30 16:02:37 ERROR kafka.CamusJob: Error for EtlKey [topic=mytopic2 partition=0leaderId= server= service= beginOffset=3447528 offset=3447529 msgSize=17 server= checksum=4272954815 time=1438272080148 message.size=17]: java.io.IOException: java.lang.ClassCastException: com.linkedin.camus.etl.kafka.common.KafkaMessage cannot be cast to [B
        at com.linkedin.camus.etl.kafka.mapred.EtlRecordReader.getWrappedRecord(EtlRecordReader.java:152)
        at com.linkedin.camus.etl.kafka.mapred.EtlRecordReader.nextKeyValue(EtlRecordReader.java:292)
        at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.nextKeyValue(MapTask.java:556)
        at org.apache.hadoop.mapreduce.task.MapContextImpl.nextKeyValue(MapContextImpl.java:80)
        at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.nextKeyValue(WrappedMapper.java:91)
        at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
        at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
Caused by: java.lang.ClassCastException: com.linkedin.camus.etl.kafka.common.KafkaMessage cannot be cast to [B
        at com.linkedin.camus.etl.kafka.coders.StringMessageDecoder.decode(StringMessageDecoder.java:16)
        at com.linkedin.camus.etl.kafka.mapred.EtlRecordReader.getWrappedRecord(EtlRecordReader.java:142)
        ... 12 more

15/07/30 16:02:37 ERROR kafka.CamusJob: Error for EtlKey [topic=mytopic2 partition=0leaderId= server= service= beginOffset=3447529 offset=3447530 msgSize=17 server= checksum=4272954815 time=1438272080602 message.size=17]: java.io.IOException: java.lang.ClassCastException: com.linkedin.camus.etl.kafka.common.KafkaMessage cannot be cast to [B
        at com.linkedin.camus.etl.kafka.mapred.EtlRecordReader.getWrappedRecord(EtlRecordReader.java:152)
        at com.linkedin.camus.etl.kafka.mapred.EtlRecordReader.nextKeyValue(EtlRecordReader.java:292)
        at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.nextKeyValue(MapTask.java:556)
        at org.apache.hadoop.mapreduce.task.MapContextImpl.nextKeyValue(MapContextImpl.java:80)
        at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.nextKeyValue(WrappedMapper.java:91)
        at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
        at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
Caused by: java.lang.ClassCastException: com.linkedin.camus.etl.kafka.common.KafkaMessage cannot be cast to [B
        at com.linkedin.camus.etl.kafka.coders.StringMessageDecoder.decode(StringMessageDecoder.java:16)
        at com.linkedin.camus.etl.kafka.mapred.EtlRecordReader.getWrappedRecord(EtlRecordReader.java:142)
        ... 12 more

15/07/30 16:02:37 ERROR kafka.CamusJob: Error for EtlKey [topic=mytopic2 partition=0leaderId= server= service= beginOffset=3447530 offset=3447531 msgSize=17 server= checksum=2523389571 time=1438272080612 message.size=17]: java.io.IOException: java.lang.ClassCastException: com.linkedin.camus.etl.kafka.common.KafkaMessage cannot be cast to [B
        at com.linkedin.camus.etl.kafka.mapred.EtlRecordReader.getWrappedRecord(EtlRecordReader.java:152)
        at com.linkedin.camus.etl.kafka.mapred.EtlRecordReader.nextKeyValue(EtlRecordReader.java:292)
        at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.nextKeyValue(MapTask.java:556)
        at org.apache.hadoop.mapreduce.task.MapContextImpl.nextKeyValue(MapContextImpl.java:80)
        at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.nextKeyValue(WrappedMapper.java:91)
        at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
        at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
Caused by: java.lang.ClassCastException: com.linkedin.camus.etl.kafka.common.KafkaMessage cannot be cast to [B
        at com.linkedin.camus.etl.kafka.coders.StringMessageDecoder.decode(StringMessageDecoder.java:16)
        at com.linkedin.camus.etl.kafka.mapred.EtlRecordReader.getWrappedRecord(EtlRecordReader.java:142)
        ... 12 more

15/07/30 16:02:37 ERROR kafka.CamusJob: Error for EtlKey [topic=mytopic2 partition=0leaderId= server= service= beginOffset=3447531 offset=3447532 msgSize=17 server= checksum=2950894309 time=1438272080638 message.size=17]: java.io.IOException: java.lang.ClassCastException: com.linkedin.camus.etl.kafka.common.KafkaMessage cannot be cast to [B
        at com.linkedin.camus.etl.kafka.mapred.EtlRecordReader.getWrappedRecord(EtlRecordReader.java:152)
        at com.linkedin.camus.etl.kafka.mapred.EtlRecordReader.nextKeyValue(EtlRecordReader.java:292)
        at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.nextKeyValue(MapTask.java:556)
        at org.apache.hadoop.mapreduce.task.MapContextImpl.nextKeyValue(MapContextImpl.java:80)
        at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.nextKeyValue(WrappedMapper.java:91)
        at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
        at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
Caused by: java.lang.ClassCastException: com.linkedin.camus.etl.kafka.common.KafkaMessage cannot be cast to [B
        at com.linkedin.camus.etl.kafka.coders.StringMessageDecoder.decode(StringMessageDecoder.java:16)
        at com.linkedin.camus.etl.kafka.mapred.EtlRecordReader.getWrappedRecord(EtlRecordReader.java:142)
        ... 12 more

15/07/30 16:02:37 ERROR kafka.CamusJob: Error for EtlKey [topic=mytopic2 partition=0leaderId= server= service= beginOffset=3447532 offset=3447533 msgSize=17 server= checksum=4272954815 time=1438272080650 message.size=17]: java.io.IOException: java.lang.ClassCastException: com.linkedin.camus.etl.kafka.common.KafkaMessage cannot be cast to [B
        at com.linkedin.camus.etl.kafka.mapred.EtlRecordReader.getWrappedRecord(EtlRecordReader.java:152)
        at com.linkedin.camus.etl.kafka.mapred.EtlRecordReader.nextKeyValue(EtlRecordReader.java:292)
        at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.nextKeyValue(MapTask.java:556)
        at org.apache.hadoop.mapreduce.task.MapContextImpl.nextKeyValue(MapContextImpl.java:80)
        at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.nextKeyValue(WrappedMapper.java:91)
        at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
        at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
Caused by: java.lang.ClassCastException: com.linkedin.camus.etl.kafka.common.KafkaMessage cannot be cast to [B
        at com.linkedin.camus.etl.kafka.coders.StringMessageDecoder.decode(StringMessageDecoder.java:16)
        at com.linkedin.camus.etl.kafka.mapred.EtlRecordReader.getWrappedRecord(EtlRecordReader.java:142)
        ... 12 more

15/07/30 16:02:37 ERROR kafka.CamusJob: job failed: 100.0% messages skipped due to other, maximum allowed is 0.1%
Exception in thread "main" java.lang.RuntimeException: job failed: 100.0% messages skipped due to other, maximum allowed is 0.1%
        at com.linkedin.camus.etl.kafka.CamusJob.checkIfTooManySkippedMsg(CamusJob.java:467)
        at com.linkedin.camus.etl.kafka.CamusJob.checkIfTooManySkippedMsg(CamusJob.java:453)
        at com.linkedin.camus.etl.kafka.CamusJob.run(CamusJob.java:372)
        at com.linkedin.camus.etl.kafka.CamusJob.run(CamusJob.java:235)
        at com.linkedin.camus.etl.kafka.CamusJob.run(CamusJob.java:691)
        at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
        at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
        at com.linkedin.camus.etl.kafka.CamusJob.main(CamusJob.java:646)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
        at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
[root@node-1 target]# 

封尘 - 如:80后IT男..

赞同来自:

camus配置文件如下: # Almost all properties have decent default properties. When in doubt, comment out the property. #   # The job name. camus.job.name=Camus Job   # final top-level data output directory, sub-directory will be dynamically created for each topic pulled etl.destination.path=/user/kafka/topics # HDFS location where you want to keep execution files, i.e. offsets, error logs, and count files etl.execution.base.path=/user/kafka/exec # where completed Camus job output directories are kept, usually a sub-dir in the base.path etl.execution.history.path=/user/kafka/camus/exec/history   fs.default.name=hdfs://node-1:8020 # Concrete implementation of the Encoder class to use (used by Kafka Audit, and thus optional for now) #camus.message.encoder.class=com.linkedin.camus.etl.kafka.coders.DummyKafkaMessageEncoder   # Concrete implementation of the Decoder class to use. # Out of the box options are: #  com.linkedin.camus.etl.kafka.coders.JsonStringMessageDecoder - Reads JSON events, and tries to extract timestamp. #  com.linkedin.camus.etl.kafka.coders.KafkaAvroMessageDecoder - Reads Avro events using a schema from a configured schema repository. #  com.linkedin.camus.etl.kafka.coders.LatestSchemaKafkaAvroMessageDecoder - Same, but converts event to latest schema for current topic. ## camus.message.decoder.class=com.linkedin.camus.etl.kafka.coders.LatestSchemaKafkaAvroMessageDecoder camus.message.decoder.class=com.linkedin.camus.etl.kafka.coders.StringMessageDecoder   etl.record.writer.provider.class=com.linkedin.camus.etl.kafka.common.StringRecordWriterProvider   # Decoder class can also be set on a per topic basis. #camus.message.decoder.class.<topic-name>=com.your.custom.MessageDecoder   # Used by avro-based Decoders (KafkaAvroMessageDecoder and LatestSchemaKafkaAvroMessageDecoder) to use as their schema registry. # Out of the box options are: # com.linkedin.camus.schemaregistry.FileSchemaRegistry # com.linkedin.camus.schemaregistry.MemorySchemaRegistry # com.linkedin.camus.schemaregistry.AvroRestSchemaRegistry # com.linkedin.camus.example.schemaregistry.DummySchemaRegistry kafka.message.coder.schema.registry.class=com.linkedin.camus.example.DummySchemaRegistry   # Used by JsonStringMessageDecoder when extracting the timestamp # Choose the field that holds the time stamp (default "timestamp") #camus.message.timestamp.field=time # What format is the timestamp in? Out of the box options are: # "unix" or "unix_seconds": The value will be read as a long containing the seconds since epoc # "unix_milliseconds": The value will be read as a long containing the milliseconds since epoc # "ISO-8601": Timestamps will be fed directly into org.joda.time.DateTime constructor, which reads ISO-8601 # All other values will be fed into the java.text.SimpleDateFormat constructor, which will be used to parse the timestamps # Default is "[dd/MMM/yyyy:HH:mm:ss Z]" #camus.message.timestamp.format=yyyy-MM-dd_HH:mm:ss #camus.message.timestamp.format=ISO-8601   # Used by the committer to arrange .avro files into a partitioned scheme. This will be the default partitioner for all # topic that do not have a partitioner specified. # Out of the box options are (for all options see the source for configuration options): # com.linkedin.camus.etl.kafka.partitioner.HourlyPartitioner, groups files in hourly directories # com.linkedin.camus.etl.kafka.partitioner.DailyPartitioner, groups files in daily directories # com.linkedin.camus.etl.kafka.partitioner.TimeBasedPartitioner, groups files in very configurable directories # com.linkedin.camus.etl.kafka.partitioner.DefaultPartitioner, like HourlyPartitioner but less configurable # com.linkedin.camus.etl.kafka.partitioner.TopicGroupingPartitioner #etl.partitioner.class=com.linkedin.camus.etl.kafka.partitioner.HourlyPartitioner   # Partitioners can also be set on a per-topic basis. (Note though that configuration is currently not per-topic.) #etl.partitioner.class.<topic-name>=com.your.custom.CustomPartitioner   # all files in this dir will be added to the distributed cache and placed on the classpath for hadoop tasks # hdfs.default.classpath.dir=   # max hadoop tasks to use, each task can pull multiple topic partitions mapred.map.tasks=30 # max historical time that will be pulled from each partition based on event timestamp kafka.max.pull.hrs=1 # events with a timestamp older than this will be discarded. kafka.max.historical.days=3 # Max minutes for each mapper to pull messages (-1 means no limit) kafka.max.pull.minutes.per.task=-1   # if whitelist has values, only whitelisted topic are pulled. Nothing on the blacklist is pulled kafka.blacklist.topics= kafka.whitelist.topics=mytopic2 log4j.configuration=true   # Name of the client as seen by kafka kafka.client.name=camus # The Kafka brokers to connect to, format: kafka.brokers=host1:port,host2:port,host3:port kafka.brokers=node-2:9092,node-3:9093,node-4:9094 # Fetch request parameters: #kafka.fetch.buffer.size= #kafka.fetch.request.correlationid= #kafka.fetch.request.max.wait= #kafka.fetch.request.min.bytes= #kafka.timeout.value=   #Stops the mapper from getting inundated with Decoder exceptions for the same topic #Default value is set to 10 max.decoder.exceptions.to.print=5   #Controls the submitting of counts to Kafka #Default value set to true post.tracking.counts.to.kafka=false monitoring.event.class=class.that.generates.record.to.submit.counts.to.kafka   # everything below this point can be ignored for the time being, will provide more documentation down the road ########################## etl.run.tracking.post=false kafka.monitor.tier= etl.counts.path= kafka.monitor.time.granularity=10   #etl.hourly=hourly etl.daily=daily   # Should we ignore events that cannot be decoded (exception thrown by MessageDecoder)? # `false` will fail the job, `true` will silently drop the event. etl.ignore.schema.errors=false   # configure output compression for deflate or snappy. Defaults to deflate mapred.output.compress=false etl.output.codec=gzip etl.deflate.level=6 #etl.output.codec=snappy   etl.default.timezone=America/Los_Angeles etl.output.file.time.partition.mins=60 etl.keep.count.files=false etl.execution.history.max.of.quota=.8   # Configures a customer reporter which extends BaseReporter to send etl data #etl.reporter.class   mapred.map.max.attempts=1   kafka.client.buffer.size=20971520 kafka.client.so.timeout=60000   #zookeeper.session.timeout= #zookeeper.connection.timeout=

封尘 - 如:80后IT男..

赞同来自:

老师,我按照你给的连接实现了Decoder,又遇到了这个错误

zp0824 - 好好学习,天天向上

赞同来自:

camus中的kafka应该是0.8.0版本的,你这个错误是执行时错误?

封尘 - 如:80后IT男..

赞同来自:

从kakfa里边读不出数据来

zp0824 - 好好学习,天天向上

赞同来自:

0.8.0更0.8.2存在API差别,版本不一致时会有使用问题

封尘 - 如:80后IT男..

赞同来自:

好的,谢谢老师,我试试gobblin

要回复问题请先登录注册