Hbase利用mapreduce计算avg max min
{{{public class QueryAvgMaxMin {
public static class QueryMapper extends TableMapper{
public void map(ImmutableBytesWritable row , Result value , Context context) throws IOException, InterruptedException{
String val = new String(value.getValue(Bytes.toBytes("rating_info"), Bytes.toBytes("rating")));
ImmutableBytesWritable rating = new ImmutableBytesWritable(Bytes.toBytes(Integer.parseInt(val)));
context.write(row, rating);
}
}
public static class QueryReducer extends TableReducer{
public void reduce(ImmutableBytesWritable key, Iterable value,
Context context) throws IOException, InterruptedException{
int sum = 0;
int cnt =0;
int maxValue = Integer.MIN_VALUE;
int minValue = Integer.MAX_VALUE;
for(ImmutableBytesWritable val :value){
sum += Bytes.toInt(val.get());
cnt++;
maxValue=Math.max(maxValue,Bytes.toInt(val.get()));
minValue=Math.min(minValue,Bytes.toInt(val.get()));
}
int avg = (int)sum/cnt;
Put put = new Put(Bytes.toBytes(key.get()));
put.add(Bytes.toBytes("count"), Bytes.toBytes("avg"), Bytes.toBytes(String.valueOf(avg)));
put.add(Bytes.toBytes("count"), Bytes.toBytes("max"), Bytes.toBytes(String.valueOf(maxValue)));
put.add(Bytes.toBytes("count"), Bytes.toBytes("min"), Bytes.toBytes(String.valueOf(minValue)));
context.write(null ,put);
}
}
public static void main(String args) throws IOException, ClassNotFoundException, InterruptedException{
Configuration conf = new Configuration();
conf.set("mapreduce.task.timeout", "1800000");
conf.set("dfs.client.socket-timeout", "1800000");
conf.set("dfs.datanode.socket.write.timeout", "1800000");
Job job = new Job(conf , "statistic");
job.setJarByClass(QueryAvgMaxMin.class);
Scan scan = new Scan();
scan.setCaching(500);
scan.setCacheBlocks(false);
FilterList list = new FilterList(FilterList.Operator.MUST_PASS_ALL);
Filter sexFilter = new SingleColumnValueFilter(Bytes.toBytes("user_info"),Bytes.toBytes("userSex"),
CompareOp.EQUAL,Bytes.toBytes("M"));
list.addFilter(sexFilter);
Filter ageFilter1 =new SingleColumnValueFilter(Bytes.toBytes("user_info"),Bytes.toBytes("userAge"),
CompareOp.GREATER_OR_EQUAL,Bytes.toBytes("20"));
Filter ageFilter2 =new SingleColumnValueFilter(Bytes.toBytes("user_info"),Bytes.toBytes("userAge"),
CompareOp.LESS_OR_EQUAL,Bytes.toBytes("40"));
list.addFilter(ageFilter1);
list.addFilter(ageFilter2);
scan.setFilter(list);
TableMapReduceUtil.initTableMapperJob("movielens", scan, QueryMapper.class,
ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
TableMapReduceUtil.initTableReducerJob("movies_count", QueryReducer.class, job);
job.waitForCompletion(true);
}
}}}}代码如上
但是运行之后结果显示出一大堆,对每个rowkey 做了一次这个操作,而不是对总体进行计算,是我的代码有问题么?
public static class QueryMapper extends TableMapper
public void map(ImmutableBytesWritable row , Result value , Context context) throws IOException, InterruptedException{
String val = new String(value.getValue(Bytes.toBytes("rating_info"), Bytes.toBytes("rating")));
ImmutableBytesWritable rating = new ImmutableBytesWritable(Bytes.toBytes(Integer.parseInt(val)));
context.write(row, rating);
}
}
public static class QueryReducer extends TableReducer
public void reduce(ImmutableBytesWritable key, Iterable
Context context) throws IOException, InterruptedException{
int sum = 0;
int cnt =0;
int maxValue = Integer.MIN_VALUE;
int minValue = Integer.MAX_VALUE;
for(ImmutableBytesWritable val :value){
sum += Bytes.toInt(val.get());
cnt++;
maxValue=Math.max(maxValue,Bytes.toInt(val.get()));
minValue=Math.min(minValue,Bytes.toInt(val.get()));
}
int avg = (int)sum/cnt;
Put put = new Put(Bytes.toBytes(key.get()));
put.add(Bytes.toBytes("count"), Bytes.toBytes("avg"), Bytes.toBytes(String.valueOf(avg)));
put.add(Bytes.toBytes("count"), Bytes.toBytes("max"), Bytes.toBytes(String.valueOf(maxValue)));
put.add(Bytes.toBytes("count"), Bytes.toBytes("min"), Bytes.toBytes(String.valueOf(minValue)));
context.write(null ,put);
}
}
public static void main(String args) throws IOException, ClassNotFoundException, InterruptedException{
Configuration conf = new Configuration();
conf.set("mapreduce.task.timeout", "1800000");
conf.set("dfs.client.socket-timeout", "1800000");
conf.set("dfs.datanode.socket.write.timeout", "1800000");
Job job = new Job(conf , "statistic");
job.setJarByClass(QueryAvgMaxMin.class);
Scan scan = new Scan();
scan.setCaching(500);
scan.setCacheBlocks(false);
FilterList list = new FilterList(FilterList.Operator.MUST_PASS_ALL);
Filter sexFilter = new SingleColumnValueFilter(Bytes.toBytes("user_info"),Bytes.toBytes("userSex"),
CompareOp.EQUAL,Bytes.toBytes("M"));
list.addFilter(sexFilter);
Filter ageFilter1 =new SingleColumnValueFilter(Bytes.toBytes("user_info"),Bytes.toBytes("userAge"),
CompareOp.GREATER_OR_EQUAL,Bytes.toBytes("20"));
Filter ageFilter2 =new SingleColumnValueFilter(Bytes.toBytes("user_info"),Bytes.toBytes("userAge"),
CompareOp.LESS_OR_EQUAL,Bytes.toBytes("40"));
list.addFilter(ageFilter1);
list.addFilter(ageFilter2);
scan.setFilter(list);
TableMapReduceUtil.initTableMapperJob("movielens", scan, QueryMapper.class,
ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
TableMapReduceUtil.initTableReducerJob("movies_count", QueryReducer.class, job);
job.waitForCompletion(true);
}
}}}}代码如上
但是运行之后结果显示出一大堆,对每个rowkey 做了一次这个操作,而不是对总体进行计算,是我的代码有问题么?
没有找到相关结果
已邀请:
7 个回复
wangwensheng - 大数据工程师@腾讯
赞同来自: mopishv0
mopishv0 - 高级开发工程师@美团
map函数对于**每一条**记录写入到context中。
reduce对于每一个map输出的key(也就是rowkey)进行了一次处理,处理结果通过Put写入到movies_count中,所以每个rowkey都做了一次处理,是符合你的代码逻辑的。
Eric_Jiang - 我是小象的搬运工!!!
yc13515144207
yc13515144207
yc13515144207
mopishv0 - 高级开发工程师@美团