用GenericWritable适配解决reduce接受多个类型的value值问题时,多个mapper类的运行出错;

hadoop可以写多个map类吗
代码:
public static class MyGenericMapper1 extends Mapper<LongWritable, Text, Text, MultiValueWritable> {
protected void map(LongWritable key, Text value, Context context)
throws IOException ,InterruptedException {
String[] splits = value.toString().split("\t");
for(String str : splits) {
context.write(new Text(str), new MultiValueWritable(new LongWritable(1)));
}
}
}

public static class MyGenericMapper2 extends Mapper<LongWritable, Text, Text, MultiValueWritable> {
protected void map(LongWritable key2, Text value2, Context context)
throws IOException ,InterruptedException {
String[] splits = value2.toString().split(",");
for(String str : splits) {
context.write(new Text(str), new MultiValueWritable(new Text("1")));
}
}
}
在试验那个map输出相同的key,可以对应不同类型的value;可以总是失败;不知道哪里有问题;

fish - Hadooper

赞同来自:

在一轮Job中,setMapperClass只能带一个mapper class参数,所以只能设置一个mapper。 一个mapper的逻辑在mapreduce中会处理所有输入,可以再mapper的逻辑中,判断不同的数据来源,做不同的逻辑处理。

龚飞飞 - 90后的IT男

赞同来自:

我没有设mapperclass类;以下是源代码: package com.gff.hadoop; import java.io.IOException; import java.net.URI; import org.apache.hadoop.io.GenericWritable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; public class MultiValueTest {     private static String INPUT_PATH = "hdfs://hadoop2:9000/input";     private static String OUTPUT_PATH = "hdfs://hadoop2:9000/output/testMultiValue";     public static void main(String[] args) {         try {             Configuration conf = new Configuration();             FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);             if (fileSystem.exists(new Path(OUTPUT_PATH))) {                 fileSystem.delete(new Path(OUTPUT_PATH), true);             }             Job job = new Job(conf,"MultiValueTest");             //设置输入目录和设置格式化的类(这是使用GenericWritable的关键)               MultipleInputs.addInputPath(job, new Path(INPUT_PATH+"/file1.txt"), TextInputFormat.class);             MultipleInputs.addInputPath(job, new Path(INPUT_PATH+"/file2.txt"), TextInputFormat.class);                          job.setJarByClass(MultiValueTest.class);                          //不需要设置mapper类(有两个mapper)             job.setMapOutputKeyClass(Text.class);             job.setMapOutputValueClass(MultiValueWritable.class);                          job.setPartitionerClass(HashPartitioner.class);             job.setNumReduceTasks(1);                          job.setReducerClass(MyGenericReducer.class);             job.setOutputKeyClass(Text.class);             job.setOutputValueClass(IntWritable.class);                          job.setOutputFormatClass(TextOutputFormat.class);             FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));                          System.exit(job.waitForCompletion(true) ? 0 : 1);                      } catch (Exception e) {             e.printStackTrace();         }     }          public static class MyGenericMapper1 extends Mapper<LongWritable, Text, Text, MultiValueWritable> {         protected void map(LongWritable key, Text value, Context context)                  throws IOException ,InterruptedException {             String[] splits = value.toString().split("\t");             for(String str : splits) {                 context.write(new Text(str), new MultiValueWritable(new LongWritable(1)));             }         }     }          public static class MyGenericMapper2 extends Mapper<LongWritable, Text, Text, MultiValueWritable> {         protected void map(LongWritable key2, Text value2, Context context)                  throws IOException ,InterruptedException {             String[] splits = value2.toString().split(",");             for(String str : splits) {                 context.write(new Text(str), new MultiValueWritable(new Text("1")));             }         }     }          public static class MyGenericReducer extends Reducer<Text, MultiValueWritable, Text, IntWritable> {         protected void reduce(Text key3, Iterable<MultiValueWritable> value3, Context context)                  throws IOException ,InterruptedException {             //统计两个文件 单词个数  一个用数字表示   一个字符串表示  比如 gongfeifei  1    gongfeifei  "1"             int count = 0;             for (MultiValueWritable mvw : value3) {                 Writable writable = mvw.get();                 if (writable instanceof LongWritable) {                     count += ((LongWritable) writable).get();                 }                 if (writable instanceof Text) {                     count += Long.parseLong(((Text) writable).toString());                 }             }             context.write(key3, new IntWritable(count));         }     }          public static class MultiValueWritable extends GenericWritable {         private static Class[] CLASSES = new Class[] {IntWritable.class, Text.class};         public MultiValueWritable(Writable value) {             set(value);         }         @Override         protected Class<? extends Writable>[] getTypes() {             return CLASSES;         }     } } 打包在hadoop中执行过程: -rwx--x--x 1 hadoop hadoop 14058 Jun 15 03:43 MultiValueTest.jar [hadoop@hadoop3 jar]$ hadoop jar Mu* 16/06/15 03:47:54 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 16/06/15 03:48:19 INFO client.ConfiguredRMFailoverProxyProvider: Failing over to rm2 16/06/15 03:48:24 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this. 16/06/15 03:48:25 INFO input.FileInputFormat: Total input paths to process : 2 16/06/15 03:48:27 INFO mapreduce.JobSubmitter: number of splits:2 16/06/15 03:48:30 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1465951792308_0005 16/06/15 03:48:43 INFO impl.YarnClientImpl: Submitted application application_1465951792308_0005 16/06/15 03:48:43 INFO mapreduce.Job: The url to track the job: http://hadoop3:8088/proxy/appl ... 0005/ 16/06/15 03:48:43 INFO mapreduce.Job: Running job: job_1465951792308_0005 16/06/15 03:49:30 INFO mapreduce.Job: Job job_1465951792308_0005 running in uber mode : true 16/06/15 03:49:30 INFO mapreduce.Job:  map 0% reduce 0% 16/06/15 03:49:34 INFO mapreduce.Job:  map 100% reduce 100% 16/06/15 03:49:51 INFO mapreduce.Job: Job job_1465951792308_0005 failed with state FAILED due to: Task failed task_1465951792308_0005_m_000000 Job failed as tasks failed. failedMaps:1 failedReduces:0 16/06/15 03:49:52 INFO mapreduce.Job: Counters: 15     Job Counters          Failed map tasks=2         Launched map tasks=2         Other local map tasks=2         Total time spent by all maps in occupied slots (ms)=4053         Total time spent by all reduces in occupied slots (ms)=0         TOTAL_LAUNCHED_UBERTASKS=3         NUM_UBER_SUBMAPS=2         NUM_UBER_SUBREDUCES=1         NUM_FAILED_UBERTASKS=3         Total time spent by all map tasks (ms)=4053         Total vcore-seconds taken by all map tasks=4053         Total megabyte-seconds taken by all map tasks=4150272     Map-Reduce Framework         CPU time spent (ms)=0         Physical memory (bytes) snapshot=0         Virtual memory (bytes) snapshot=0    

龚飞飞 - 90后的IT男

赞同来自:

我是用两个文件: 一个文件: gongfeifei    hello hello    me 一个文件 gongfeifei,hello hello,me 然后分别用两个mapper类进行分别处理得到key和不同的value类型,一个是单词key:text,value:IntWritable 一个是key:text,value:text 然后用GenericWritable适配一下,是今天听老师讲这个内容进行的实践操作;  

fish - Hadooper

赞同来自:

我跑了下你的代码,任务失败时抛出如下异常,是否有什么启发?
16/06/15 20:31:25 INFO mapreduce.Job: Task Id : attempt_1463456939802_0017_m_000000_2, Status : FAILED
Error: java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.Text, received org.apache.hadoop.io.LongWritable
        at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1072)
        at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:715)
        at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
        at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112)
        at org.apache.hadoop.mapreduce.Mapper.map(Mapper.java:124)
        at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
        at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)

fish - Hadooper

赞同来自:

提示一下,可以关注你的MultipleInputs.addInputPath方法,这个方法除了只有三个参数的版本:
 public static void addInputPath(Job job, Path path,
      Class<? extends InputFormat> inputFormatClass)
还有一个同名的四参数版本:
public static void addInputPath(Job job, Path path,
      Class<? extends InputFormat> inputFormatClass,
      Class<? extends Mapper> mapperClass) 

fish - Hadooper

赞同来自:

这回的错误已经变成了:
16/06/15 23:38:16 INFO mapreduce.Job: Task Id : attempt_1463456939802_0018_m_000001_2, Status : FAILED
Error: java.lang.RuntimeException: The type of instance is: class org.apache.hadoop.io.LongWritable, which is NOT registered.
        at org.apache.hadoop.io.GenericWritable.set(GenericWritable.java:106)
        at cn.chinahadoop.test.MultiValueTest$MultiValueWritable.<init>(MultiValueTest.java:102)
        at cn.chinahadoop.test.MultiValueTest$MyGenericMapper1.map(MultiValueTest.java:66)
        at cn.chinahadoop.test.MultiValueTest$MyGenericMapper1.map(MultiValueTest.java:61)
        at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
        at org.apache.hadoop.mapreduce.lib.input.DelegatingMapper.run(DelegatingMapper.java:55)
        at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
注意这里所说的,未识别的类型是LongWritable。你自己定义的MultiValueWritable 中,CLASSES 由IntWritable以及Text组成。这里的IntWritable是不是不太对?它得是LongWritable吧?   尝试再改改?有问题自己尝试追踪下,通过讨论你追踪问题时的发现,可能收获会更大。

要回复问题请先登录注册