ip统计的输入异常问题

看异常应该是Text传了个IntWritable,debug发现异常是在
context.write(one,ip)抛出来的 看了需要传入Text的地方为ip,传的没有问题
源码见附件
报错如下:java.lang.Exception: java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.Text, received org.apache.hadoop.io.IntWritable
        at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
        at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522)
Caused by: java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.Text, received org.apache.hadoop.io.IntWritable
        at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1072)
        at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:715)
        at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
        at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112)
        at IpCount$TokenizerMapper.map(IpCount.java:30)
        at IpCount$TokenizerMapper.map(IpCount.java:1)
        at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
        at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
        at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
15/08/31 23:53:28 INFO mapreduce.Job: Job job_local607312391_0001 failed with state FAILED due to: NA
15/08/31 23:53:28 INFO mapreduce.Job: Counters: 0
QQ图片20150901010025.png

Dong - Hulu

赞同来自: 天热不下雨 fish

Add following line in main function:     job.setMapOutputKeyClass(IntWritable.class);     job.setMapOutputValueClass(Text.class);   另外,你的reduce有个bug,最好不要把values强制转化为hashset、,然后求size,必须一个一个遍历,具体原理,随着你对mapreduce深入理解就知道了。  

fish - Hadooper

赞同来自: 天热不下雨

从你刚才回复董老师的话题中我看到了异常栈。 后面这个问题原因是: 千万记住,combiner就像个管道,它不改变mapreduce之间的<key, value>类型,也就是说,combiner的输入<key, value>类型跟它的输出<key,value>类型一定是相同的,否则,这个类就不要拿来当combiner。

天热不下雨

赞同来自:

import java.io.IOException; import java.util.HashSet; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class IpCount {       public static class TokenizerMapper             extends Mapper<Object, Text, IntWritable, Text>{                  private final static IntWritable one = new IntWritable(1);         private Text ip = new Text();                    public void map(Object key, Text value, Context context                         ) throws IOException, InterruptedException {           StringTokenizer itr = new StringTokenizer(value.toString());                      ip.set(itr.nextToken());           System.out.print(ip.toString());           context.write(one,ip);         }       }              public static class IntSumReducer             extends Reducer<IntWritable,Text,Text,IntWritable> {         private IntWritable result = new IntWritable();         private Text ipcount = new Text("ipcount");         private HashSet hs = new HashSet();         public void reduce(IntWritable key, Iterable<Text> values,                             Context context                            ) throws IOException, InterruptedException {           hs = (HashSet) values;           int sum = hs.size();           result.set(sum);           context.write(ipcount, result);         }       }       public static void main(String[] args) throws Exception {         Configuration conf = new Configuration();         String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();         if (otherArgs.length < 2) {           System.err.println("Usage: ipcount <in> [<in>...] <out>");           System.exit(2);         }         Job job = Job.getInstance(conf, "ip count");         job.setJarByClass(IpCount.class);         job.setMapperClass(TokenizerMapper.class);         job.setCombinerClass(IntSumReducer.class);         job.setReducerClass(IntSumReducer.class);         job.setOutputKeyClass(Text.class);         job.setOutputValueClass(IntWritable.class);         for (int i = 0; i < otherArgs.length - 1; ++i) {           FileInputFormat.addInputPath(job, new Path(otherArgs[i]));         }         FileOutputFormat.setOutputPath(job,           new Path(otherArgs[otherArgs.length - 1]));         System.exit(job.waitForCompletion(true) ? 0 : 1);       } }

要回复问题请先登录注册