Hbase利用mapreduce计算avg max min

{{{public class QueryAvgMaxMin {
public static class QueryMapper extends TableMapper{
public void map(ImmutableBytesWritable row , Result value , Context context) throws IOException, InterruptedException{
String val = new String(value.getValue(Bytes.toBytes("rating_info"), Bytes.toBytes("rating")));
ImmutableBytesWritable rating = new ImmutableBytesWritable(Bytes.toBytes(Integer.parseInt(val)));
context.write(row, rating);
}
}
public static class QueryReducer extends TableReducer{
public void reduce(ImmutableBytesWritable key, Iterable value,
Context context) throws IOException, InterruptedException{
int sum = 0;
int cnt =0;
int maxValue = Integer.MIN_VALUE;
int minValue = Integer.MAX_VALUE;
for(ImmutableBytesWritable val :value){
sum += Bytes.toInt(val.get());
cnt++;
maxValue=Math.max(maxValue,Bytes.toInt(val.get()));
minValue=Math.min(minValue,Bytes.toInt(val.get()));
}
int avg = (int)sum/cnt;
Put put = new Put(Bytes.toBytes(key.get()));
put.add(Bytes.toBytes("count"), Bytes.toBytes("avg"), Bytes.toBytes(String.valueOf(avg)));
put.add(Bytes.toBytes("count"), Bytes.toBytes("max"), Bytes.toBytes(String.valueOf(maxValue)));
put.add(Bytes.toBytes("count"), Bytes.toBytes("min"), Bytes.toBytes(String.valueOf(minValue)));
context.write(null ,put);
}
}
public static void main(String args) throws IOException, ClassNotFoundException, InterruptedException{
Configuration conf = new Configuration();
conf.set("mapreduce.task.timeout", "1800000");
conf.set("dfs.client.socket-timeout", "1800000");
conf.set("dfs.datanode.socket.write.timeout", "1800000");
Job job = new Job(conf , "statistic");
job.setJarByClass(QueryAvgMaxMin.class);
Scan scan = new Scan();
scan.setCaching(500);
scan.setCacheBlocks(false);
FilterList list = new FilterList(FilterList.Operator.MUST_PASS_ALL);
Filter sexFilter = new SingleColumnValueFilter(Bytes.toBytes("user_info"),Bytes.toBytes("userSex"),
CompareOp.EQUAL,Bytes.toBytes("M"));
list.addFilter(sexFilter);
Filter ageFilter1 =new SingleColumnValueFilter(Bytes.toBytes("user_info"),Bytes.toBytes("userAge"),
CompareOp.GREATER_OR_EQUAL,Bytes.toBytes("20"));
Filter ageFilter2 =new SingleColumnValueFilter(Bytes.toBytes("user_info"),Bytes.toBytes("userAge"),
CompareOp.LESS_OR_EQUAL,Bytes.toBytes("40"));
list.addFilter(ageFilter1);
list.addFilter(ageFilter2);
scan.setFilter(list);
TableMapReduceUtil.initTableMapperJob("movielens", scan, QueryMapper.class,
ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
TableMapReduceUtil.initTableReducerJob("movies_count", QueryReducer.class, job);
job.waitForCompletion(true);
}
}}}}代码如上
但是运行之后结果显示出一大堆,对每个rowkey 做了一次这个操作,而不是对总体进行计算,是我的代码有问题么?


123123.png


 
已邀请:

wangwensheng - 大数据工程师@腾讯

赞同来自: mopishv0

结果贴出来看下啊

mopishv0 - 高级开发工程师@美团

你这个是把扫描出来的movielens表的**每一条**记录交给map函数处理。
map函数对于**每一条**记录写入到context中。
reduce对于每一个map输出的key(也就是rowkey)进行了一次处理,处理结果通过Put写入到movies_count中,所以每个rowkey都做了一次处理,是符合你的代码逻辑的。
 

Eric_Jiang - 我是小象的搬运工!!!

结果已贴,我是需要吧reduce 改成一般的reduce操作么
楼主你好,我也遇到类似问题,只不过我的输出是在HDFS文件系统中,想问一下你的这个问题解决了吗?求助
谢谢,应该怎么改呢?是把value作为key吗?@Eric_Jiang:
楼主你好,可能是刚接触mapreduce,还是不太明白map该怎么变,这是map,该怎么修改key呢,求指导,谢谢


IMG_20160512_093641.jpg


 

mopishv0 - 高级开发工程师@美团

当时楼主是把每一个map输出的key做了处理,其实如果是全部数据,reduce本身不应该区分输出key的,也就是说,要么map输出相同key,要么reduce用static处理所有输入的数据,在结束的时候输出结果

要回复问题请先登录注册