关于mapreducer 对象传输

static class UserAgeMaxCoreMapper extends Mapper<LongWritable, Text, Text, UserBean>{
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            //将一行数据按照分隔符拆分,字段格式name age gender score
            String fields = line.split("\t");
            String gender = fields[2];
            String name = fields[0];
            String age = fields[1];
            String score = fields[3];
            String nameAgeScore = fields[0] + "\t" + fields[1] + "\t" + fields[3];
            context.write(new Text(gender), new UserBean(gender, age, score, name));
        }
    }
    static  class UserAgeMaxCoreReducer extends Reducer<Text, UserBean, NullWritable, Text>{
        @Override
        protected void reduce(Text key, Iterable<UserBean> values, Context context) throws IOException, InterruptedException {
            long maxScore = 0;
            UserBean retUserBean = null;
            for (UserBean userBean: values){
//                if (userBean.getScore() == null){
//                    continue;
//                }
                int score = Integer.parseInt(userBean.getScore());
                if (score > maxScore){
                    retUserBean = new UserBean(userBean.getGender(), userBean.getAge(), userBean.getScore(),userBean.getName());
                    maxScore = score;
                }
            }
            context.write(NullWritable.get(), new Text(retUserBean.getName() + "\t" + retUserBean.getAge() + "\t" + retUserBean.getGender() + "\t" + retUserBean.getScore()));
        }
    }
  上述代码中  int score = Integer.parseInt(userBean.getScore());  userBean.getScore()值总是null,在Mapper中已经传输值了,为什么到了reducer中就是空了呢? 错误信息: Error: java.lang.NumberFormatException: null         at java.lang.Integer.parseInt(Integer.java:542)         at java.lang.Integer.parseInt(Integer.java:615)         at bigdata.mr.MrUserAgeMaxCoreApp$UserAgeMaxCoreReducer.reduce(MrUserAgeMaxCoreApp.java:80)         at bigdata.mr.MrUserAgeMaxCoreApp$UserAgeMaxCoreReducer.reduce(MrUserAgeMaxCoreApp.java:33)         at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171)         at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:627)         at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:389)         at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)         at java.security.AccessController.doPrivileged(Native Method)         at javax.security.auth.Subject.doAs(Subject.java:422)         at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)         at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)

fish - Hadooper

赞同来自:

代码中的UserBean是个什么类? 在Hadoop中,必须是Writable的实现类对象才可以在Map和Reduce之间传递。

向日葵ral

赞同来自:

UserBean是Writable的实现类对象

package bigdata.mr.bean;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class UserBean implements Writable{
    private String name;
    private String age;
    private String gender;
    private String score;
    public UserBean(){};
    public UserBean(String name, String age, String score){
        this.name = name;
        this.age = age;
        this.score = score;
    }
    public UserBean(String gender, String age, String score, String  name){
        this.gender = gender;
        this.age = age;
        this.score = score;
        this.name = name;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getGender() {
        return gender;
    }

    public void setGender(String gender) {
        this.gender = gender;
    }

    public String getAge() {
        return age;
    }

    public void setAge(String age) {
        this.age = age;
    }

    public String getScore() {
        return score;
    }

    public void setScore(String score) {
        this.score = score;
    }

    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeChars(name);
        dataOutput.writeChars(gender);
        dataOutput.writeChars(score);
        dataOutput.writeChars(age);
    }

    public void readFields(DataInput dataInput) throws IOException {
        name = dataInput.readLine();
        gender = dataInput.readLine();
        score = dataInput.readLine();
        age = dataInput.readLine();
    }
}

fish - Hadooper

赞同来自:

readFields中,为何要用readLine进行读取? 对于Int等基本类型,请参考https://hadoop.apache.org/docs/current/api/org/apache/hadoop/io/Writable.html 中的样例,对于字符串类型,请用org.apache.hadoop.io.Text 代替 String   下面也是个可以参考的小片段:
 public static class TwoFieldKey implements WritableComparable<TwoFieldKey> {

    private IntWritable index;
    private IntWritable content;

    public TwoFieldKey() {
      index = new IntWritable();
      content = new IntWritable();
    }

    public TwoFieldKey(int index, int content) {
      this.index = new IntWritable(index);
      this.content = new IntWritable(content);
    }

    public int getIndex() {
      return index.get();
    }

    public int getContent() {
      return content.get();
    }

    @Override
    public void readFields(DataInput in) throws IOException {
      index.readFields(in);
      content.readFields(in);
    }

    @Override
    public void write(DataOutput out) throws IOException {
      index.write(out);
      content.write(out);
    }
    
    ...

要回复问题请先登录注册