Hadoop MapReduce

Hadoop MapReduce 从一大堆数中找出最大的数,类似SQL的SELECT MAX(NUMBER) FROM TABLE .这里写了个简单的MapReduce,实现了该功能.我这里会生成测试数据,同时在生成的时候会计算出最大值.待MapReduce跑玩后,你可以去输出路径查看并进行对比.具体请查看代码:
 
Java代码
  1. package com.guoyun.hadoop.mapreduce.study;  
  2.   
  3. import java.io.IOException;  
  4. import java.util.StringTokenizer;  
  5.   
  6. import org.apache.hadoop.conf.Configuration;  
  7. import org.apache.hadoop.fs.FileSystem;  
  8. import org.apache.hadoop.fs.Path;  
  9. import org.apache.hadoop.io.LongWritable;  
  10. import org.apache.hadoop.io.Text;  
  11. import org.apache.hadoop.mapreduce.Job;  
  12. import org.apache.hadoop.mapreduce.Mapper;  
  13. import org.apache.hadoop.mapreduce.Reducer;  
  14. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  15. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  16. import org.slf4j.Logger;  
  17. import org.slf4j.LoggerFactory;  
  18.   
  19. /** 
  20.  * 获得最大的数,类似SQL:SELECT MAX(NUMBER) FROM TABLE 
  21.  * 注意这里只有一列 
  22.  * 相比 @GetMaxValueMapReduceImproveTest 这里速度会更慢 
  23.  *  
  24.  */  
  25. public class GetMaxValueMapReduceTest extends MyMapReduceSIngleColumnTest{  
  26.   public static final Logger log=LoggerFactory.getLogger(GetMaxValueMapReduceTest.class);  
  27.     
  28.   
  29.   public GetMaxValueMapReduceTest(int dataLength) throws Exception {  
  30.     super(dataLength);  
  31.     // TODO Auto-generated constructor stub  
  32.   }  
  33.   
  34.   public GetMaxValueMapReduceTest(long dataLength, String inputPath,  
  35.       String outputPath) throws Exception {  
  36.     super(dataLength, inputPath, outputPath);  
  37.     // TODO Auto-generated constructor stub  
  38.   }  
  39.   
  40.   public GetMaxValueMapReduceTest(String outputPath) {  
  41.     super(outputPath);  
  42.     // TODO Auto-generated constructor stub  
  43.   }  
  44.   
  45.   /** 
  46.    * Map,to get the source datas 
  47.    */  
  48.   public static class MyMapper extends Mapper<LongWritable,Text,Text,LongWritable>{  
  49.     private final Text writeKey=new Text("K");  
  50.     private LongWritable writeValue=new LongWritable(0);  
  51.       
  52.     @Override  
  53.     protected void map(LongWritable key, Text value, Context context)  
  54.         throws IOException, InterruptedException {  
  55.       log.debug("begin to map");  
  56.       StringTokenizer tokenizer=null;  
  57.       String lineValue=null;  
  58.         
  59.         
  60.       tokenizer=new StringTokenizer(value.toString().trim());  
  61.       while(tokenizer.hasMoreTokens()){  
  62.         lineValue=tokenizer.nextToken().trim();  
  63.         if(lineValue.equals("")){  
  64.           continue;  
  65.         }  
  66.         try {  
  67.           writeValue.set(Long.parseLong(lineValue));  
  68.           context.write(writeKey, writeValue);  
  69.         } catch (NumberFormatException e) {  
  70.           continue;  
  71.         }  
  72.           
  73.       }  
  74.     }  
  75.   }  
  76.     
  77.   /** 
  78.    * Reduce,to get the max value 
  79.    */  
  80.   public static class MyReducer   
  81.     extends Reducer<Text,LongWritable,Text,LongWritable>{  
  82.     private final Text maxValueKey=new Text("maxValue");  
  83.           
  84.     @Override  
  85.     public void reduce(Text key, Iterable<LongWritable> values,Context context)  
  86.         throws IOException, InterruptedException {  
  87.       log.debug("begin to reduce");  
  88.       long maxValue=Long.MIN_VALUE;  
  89.       for(LongWritable value:values){  
  90.         if(value.get()>maxValue){  
  91.           maxValue=value.get();  
  92.         }  
  93.       }  
  94.       context.write(maxValueKey, new LongWritable(maxValue));  
  95.     }  
  96.       
  97.       
  98.   }  
  99.     
  100.   /** 
  101.    * @param args 
  102.    */  
  103.   public static void main(String[] args) {  
  104.     MyMapReduceTest mapReduceTest=null;  
  105.     Configuration conf=null;  
  106.     Job job=null;  
  107.     FileSystem fs=null;  
  108.     Path inputPath=null;  
  109.     Path outputPath=null;  
  110.     long begin=0;  
  111.     String output="testDatas/mapreduce/MROutput_SingleColumn_getMax";  
  112.       
  113.       
  114.     try {  
  115.       mapReduceTest=new GetMaxValueMapReduceTest(10000000);  
  116.         
  117.       inputPath=new Path(mapReduceTest.getInputPath());  
  118.       outputPath=new Path(mapReduceTest.getOutputPath());  
  119.         
  120.       conf=new Configuration();  
  121.       job=new Job(conf,"getMaxValue");  
  122.         
  123.       fs=FileSystem.getLocal(conf);  
  124.       if(fs.exists(outputPath)){  
  125.         if(!fs.delete(outputPath,true)){  
  126.           System.err.println("Delete output file:"+mapReduceTest.getOutputPath()+" failed!");  
  127.           return;  
  128.         }  
  129.       }  
  130.         
  131.         
  132.       job.setJarByClass(GetMaxValueMapReduceTest.class);  
  133.       job.setMapOutputKeyClass(Text.class);  
  134.       job.setMapOutputValueClass(LongWritable.class);  
  135.       job.setOutputKeyClass(Text.class);  
  136.       job.setOutputValueClass(LongWritable.class);  
  137.       job.setMapperClass(MyMapper.class);  
  138.       job.setReducerClass(MyReducer.class);  
  139.         
  140.       job.setNumReduceTasks(2);  
  141.         
  142.       FileInputFormat.addInputPath(job, inputPath);  
  143.       FileOutputFormat.setOutputPath(job, outputPath);  
  144.         
  145.        
  146.       begin=System.currentTimeMillis();  
  147.       job.waitForCompletion(true);  
  148.         
  149.       System.out.println("===================================================");  
  150.       if(mapReduceTest.isGenerateDatas()){  
  151.         System.out.println("The maxValue is:"+mapReduceTest.getMaxValue());  
  152.         System.out.println("The minValue is:"+mapReduceTest.getMinValue());  
  153.       }  
  154.       System.out.println("Spend time:"+(System.currentTimeMillis()-begin));  
  155.       // Spend time:18908  
  156.         
  157.     } catch (Exception e) {  
  158.       // TODO Auto-generated catch block  
  159.       e.printStackTrace();  
  160.     }  
  161.   }  
  162.   
  163. }  

0 个评论

要回复文章请先登录注册