Mapreduce导入导出Hbase0.98代码示例

在[button href=http://www.bcmeng.com/hbasemr/ color=red]Hadoop2.6.0|Hbase0.98.13的Mapreduce开发环境搭建[/button]一文中,我们已经成功搭建了Hadoop2.6.0|Hbase0.98.13的Mapreduce开发环境。下面小梦给大家演示一下具体代码。

1:Mapreduce从HDFS导入Hbase0.98代码示例:

源文件格式:

175.44.30.93 - - [29/Sep/2013:00:10:15 +0800] "GET /structure/heap HTTP/1.1" 301 406 "-" "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1;)"
java代码:

package hbase;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class HbaseMr {

public static class MapperClass extends Mapper<LongWritable,Text,ImmutableBytesWritable,Put>{

public void map(LongWritable key,Text value,Context context)throws IOException,InterruptedException{

String[] strs = value.toString().split(" ");
String rowkey = strs[0]+"-"+strs[3].substring(1);

byte [] row = Bytes.toBytes(rowkey);
byte [] family = Bytes.toBytes("info");
byte [] qualifier = Bytes.toBytes("url");
byte [] values = Bytes.toBytes(strs[6]);
Put put=new Put(row);
put.add(family,qualifier,values);

context.write(new ImmutableBytesWritable(row),put);
}
}

public static void main(String[] args) throws Exception {

Configuration conf = HBaseConfiguration.create();
conf.set(TableOutputFormat.OUTPUT_TABLE,"access-log");
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
Job job = Job.getInstance(conf, "Hbase_Mr");
job.setNumReduceTasks(0);
job.setJarByClass(HbaseMr.class);
job.setMapperClass(MapperClass.class);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Put.class);
job.setOutputFormatClass(TableOutputFormat.class);

FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}

}
导入Hbase效果如下:

Mapreduce导入导出Hbase0.98代码示例

2:Mapreduce从Hbase导入Hbase代码示例:

统计上张图的同一IP目录总数,代码如下:

package hbase;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;

import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;

import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;

public class HbaseMR {

final static String inTable = "access-log";
final static String inFamily = "info";
final static String inColumn = "url";

final static String outTable="total-access";
final static String outFamily="url";
final static String outColumn="count";
public static class Mapper extends TableMapper<Text,Text> {
public Mapper() {}
@Override
public void map(ImmutableBytesWritable row, Result values,Context context) throws IOException, InterruptedException {

byte [] b =values.getValue(inFamily.getBytes(), inColumn.getBytes());
if(b!=null){
String v = new String(b);

String r= new String(values.getRow());
String[] strs = r.split("-");
String ip=strs[0];

context.write(new Text(ip), new Text(v));
}
}
}

public static class Reducer extends TableReducer<Text, Text, Text> {
@Override
public void reduce(Text key,Iterable<Text> values,
Context context) throws IOException, InterruptedException {
int count=0;
String sum=null;
for (Text val : values) {
count++;
}
sum=String.valueOf(count);

Put put = new Put(Bytes.toBytes(key.toString()));

put.add(outFamily.getBytes(),outColumn.getBytes(),sum.getBytes());
context.write(key, put);
}
}

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = HBaseConfiguration.create();

Job job = Job.getInstance(conf, "HbaseMR");
job.setJarByClass(HbaseMR.class);
Scan scan = new Scan();
TableMapReduceUtil.initTableMapperJob("access-log",scan,Mapper.class,
Text.class, Text.class, job);
TableMapReduceUtil.initTableReducerJob("total-access",Reducer.class, job);
System.exit(job.waitForCompletion(true) ? 0 : 1);

}

}
新表效果如下:

Mapreduce导入导出Hbase0.98代码示例

3:Mapreduce导入Hbase时去重:

在上面的导入hbase过程中我们并没有考虑同一IP访问的子目录重复问题,所以我们应该去重。

正常的Mapreduce去重思路:

利用经过shuffle之后输入到reduce中key的唯一性直接输出key即可。

具体就是map的输出的key为map输入的value,value为null。自然reduce的输入就是map的输出。当reduce接收到一个<key,value-list>时,其中value-list为null,就直接将key复制到输出的key中,并将value设置成空值
map端的输出:

context.write(value, NullWriteable.get());
reduce端的输出:

context.write(key, NullWriteable.get());
在Mapreduce导入Hbase时我们也可以先去重,然后再导入,不过那样会需要俩次任务。

我们可以直接在上面的reduce任务中去重即可。利用HashSet来去重。改动代码如下:

public static class Reducer extends TableReducer<Text, Text, Text> {
@Override
public void reduce(Text key,Iterable<Text> values,
Context context) throws IOException, InterruptedException {

String sum=null;
HashSet<Text> set = new HashSet<Text>();

for (Text val : values) {
set.add(val);
}

sum=String.valueOf(set.size());

Put put = new Put(Bytes.toBytes(key.toString()));

put.add(outFamily.getBytes(),outColumn.getBytes(),sum.getBytes());
context.write(key, put);
}
}
同一数据源,效果如下:

hbase mapreduce去重

大家可以发现,统计的值明显减少。

0 个评论

要回复文章请先登录注册