MapReduce

MapReduce

执行MapReduce后一直挂起

defineconst 回复了问题 3 人关注 3 个回复 2420 次浏览 2019-10-16 09:35 来自相关话题

docker环境下搭建hadoop伪分布式集群时,Namenode如何连接宿主机mysql服务

回复

easlife 发起了问题 2 人关注 0 个回复 1804 次浏览 2018-12-28 21:54 来自相关话题

MapReduce

macg 回复了问题 3 人关注 2 个回复 1424 次浏览 2018-09-08 22:44 来自相关话题

hadoopHA hadoop 2.9.1部署 mapreduce可以用wordcount,有计算结果,但是web界面无记录

回复

zhoujian16 发起了问题 1 人关注 0 个回复 1447 次浏览 2018-08-04 08:40 来自相关话题

关于mapreducer 对象传输

fish 回复了问题 2 人关注 3 个回复 1217 次浏览 2018-02-11 19:51 来自相关话题

Mapreduce作业报错Stack trace: ExitCodeException exitCode=255:

fish 回复了问题 2 人关注 1 个回复 4603 次浏览 2018-01-22 16:36 来自相关话题

hadoop 运行mapreduce,发现任务状态一直是Accepted

小熊BN 回复了问题 4 人关注 12 个回复 8640 次浏览 2018-01-13 10:29 来自相关话题

hadoop运行mapreduce任务失败

fish 回复了问题 2 人关注 1 个回复 4568 次浏览 2017-05-02 18:19 来自相关话题

如何区分一条SQL用几个MR?

Dong 回复了问题 2 人关注 1 个回复 1934 次浏览 2017-04-19 19:37 来自相关话题

mapreduce出现Too Many fetch failures.Failing the attempt错误之后,任务回滚了,造成这个错误的原因主要有哪些?

fish 回复了问题 2 人关注 2 个回复 5735 次浏览 2017-03-20 18:09 来自相关话题

mapreduce输出文件大小

fish 回复了问题 2 人关注 3 个回复 3806 次浏览 2017-03-17 17:06 来自相关话题

mapreduce执行进度,从80%退回到0又开始执行,为什么??

fish 回复了问题 2 人关注 2 个回复 4659 次浏览 2017-03-17 10:20 来自相关话题

在reducer阶段怎么获得这次批处理处理的总的数据量?

wangxiaolei 回复了问题 2 人关注 1 个回复 1516 次浏览 2017-03-09 11:37 来自相关话题

mapreduce 设置的reduce个数过多job.setNumReduceTasks(numReduceTasks);会出现空文件,为什么?

fish 回复了问题 4 人关注 3 个回复 3821 次浏览 2017-03-02 18:47 来自相关话题

提交MR作业时,报:WordCount$TokenizerMapper not found

回复

Mathings 回复了问题 1 人关注 1 个回复 3331 次浏览 2017-01-24 19:11 来自相关话题

关于mapreduce程序的jar包调用

wangxiaolei 回复了问题 2 人关注 8 个回复 2206 次浏览 2016-08-25 22:43 来自相关话题

Hadoop 运行MapReduce 自带的例子wordcount报错

IT_Angel 回复了问题 3 人关注 14 个回复 4055 次浏览 2016-08-25 20:01 来自相关话题

hadoop运行MapReduce WordCount$TokenizerMapper classNotFound

wangxiaolei 回复了问题 2 人关注 1 个回复 3253 次浏览 2016-08-18 20:59 来自相关话题

io.sort.mb*io.sort.record.percent

fish 回复了问题 3 人关注 4 个回复 2373 次浏览 2016-08-12 17:25 来自相关话题

执行MapReduce后一直挂起

回复

defineconst 回复了问题 3 人关注 3 个回复 2420 次浏览 2019-10-16 09:35 来自相关话题

docker环境下搭建hadoop伪分布式集群时,Namenode如何连接宿主机mysql服务

回复

easlife 发起了问题 2 人关注 0 个回复 1804 次浏览 2018-12-28 21:54 来自相关话题

MapReduce

回复

macg 回复了问题 3 人关注 2 个回复 1424 次浏览 2018-09-08 22:44 来自相关话题

hadoopHA hadoop 2.9.1部署 mapreduce可以用wordcount,有计算结果,但是web界面无记录

回复

zhoujian16 发起了问题 1 人关注 0 个回复 1447 次浏览 2018-08-04 08:40 来自相关话题

关于mapreducer 对象传输

回复

fish 回复了问题 2 人关注 3 个回复 1217 次浏览 2018-02-11 19:51 来自相关话题

Mapreduce作业报错Stack trace: ExitCodeException exitCode=255:

回复

fish 回复了问题 2 人关注 1 个回复 4603 次浏览 2018-01-22 16:36 来自相关话题

hadoop 运行mapreduce,发现任务状态一直是Accepted

回复

小熊BN 回复了问题 4 人关注 12 个回复 8640 次浏览 2018-01-13 10:29 来自相关话题

hadoop运行mapreduce任务失败

回复

fish 回复了问题 2 人关注 1 个回复 4568 次浏览 2017-05-02 18:19 来自相关话题

如何区分一条SQL用几个MR?

回复

Dong 回复了问题 2 人关注 1 个回复 1934 次浏览 2017-04-19 19:37 来自相关话题

mapreduce输出文件大小

回复

fish 回复了问题 2 人关注 3 个回复 3806 次浏览 2017-03-17 17:06 来自相关话题

mapreduce执行进度,从80%退回到0又开始执行,为什么??

回复

fish 回复了问题 2 人关注 2 个回复 4659 次浏览 2017-03-17 10:20 来自相关话题

在reducer阶段怎么获得这次批处理处理的总的数据量?

回复

wangxiaolei 回复了问题 2 人关注 1 个回复 1516 次浏览 2017-03-09 11:37 来自相关话题

mapreduce 设置的reduce个数过多job.setNumReduceTasks(numReduceTasks);会出现空文件,为什么?

回复

fish 回复了问题 4 人关注 3 个回复 3821 次浏览 2017-03-02 18:47 来自相关话题

提交MR作业时,报:WordCount$TokenizerMapper not found

回复

Mathings 回复了问题 1 人关注 1 个回复 3331 次浏览 2017-01-24 19:11 来自相关话题

关于mapreduce程序的jar包调用

回复

wangxiaolei 回复了问题 2 人关注 8 个回复 2206 次浏览 2016-08-25 22:43 来自相关话题

Hadoop 运行MapReduce 自带的例子wordcount报错

回复

IT_Angel 回复了问题 3 人关注 14 个回复 4055 次浏览 2016-08-25 20:01 来自相关话题

hadoop运行MapReduce WordCount$TokenizerMapper classNotFound

回复

wangxiaolei 回复了问题 2 人关注 1 个回复 3253 次浏览 2016-08-18 20:59 来自相关话题

io.sort.mb*io.sort.record.percent

回复

fish 回复了问题 3 人关注 4 个回复 2373 次浏览 2016-08-12 17:25 来自相关话题

MapReduce作业中Map\Reduce Tasks 数目的指定

唐半张 发表了文章 0 个评论 2027 次浏览 2015-10-09 09:33 来自相关话题

MapReduce作业中Map\Reduce Tasks 数目的指定 1 、MapReduce作业中Map Task 数目的指定 :1 )  MapReduce从HDFS中读取Input文件,通过Inputform ...查看全部
MapReduce作业中Map\Reduce Tasks 数目的指定
1 、MapReduce作业中Map Task 数目的指定 :1 )  MapReduce从HDFS中读取Input文件,通过Inputformat对文件进行Split分割,得到Splits数目。 Split是MapReduce中最小的计算单元,一个Split文件对应一个Map Task。
2)源码解读:
file:///C:/Users/Ganymede/AppData/Local/Temp/Wiz/2c35b4e7-1282-47b2-a041-351f4f09d78e_4_files/1ab53ab8-c2a3-4341-b9d6-67ea5cb3a342.jpg
由源码可知:
maxSize = mapred.max.split.size.  //默认 long max
minSize  = mapred.min.split.size.  //默认 1

splitSize 计算公式:
splitSize = max(minSize , min(maxSize,blockSize)) (默认= 64M)


3) 当执行wordcount时:
(1)一个输入文件如果小于64M,默认情况则保存在hdfs上的一个block中,对应一个Split文件,所以将产和一个Map Task。
(2)如果输入一个文件为150M,默认情况则保存在hdfs上的三个block,对应三个Split文件,所以将产和三个Map Task。
(3)如果有输入三个文件都小于64M,默认情况下会保存在三个不同的block中,所以对应三个Split文件,也将产和三个Map Task。


4)用户可自行指定block与split的关系,HDFS中的一个block,一个Split也可以对应多个block。Split与block的关系是一对多的关系。


5)总结MapReduce作业中Map Task 数目是由:
(1)输入文件的个数与大小
(2)hadoop设置mapred.max.split.size大小决定的


6)修改mapred.max.split.size
设置文件 mapred-site.xml 
< ?xml version="1.0"?>
< ?xml-stylesheet type="text/xsl" href="https://wenda.chinahadoop.cn/configuration.xsl"?>
< configuration>
        
                   mapred.job.tracker
                    hadoop.main:9001
         
         
                   mapred.max.split.size
                    300
         
< /configuration>


运行示例:
有三个输入文件:
file:///C:/Users/Ganymede/AppData/Local/Temp/Wiz/2c35b4e7-1282-47b2-a041-351f4f09d78e_4_files/97b2c8aa-b64b-43d2-b3ca-fe6edaa5de0e.png


根据公式,splitsize = splitSize = max(1, min(300b,64*1024*1024))  = 300
a.txt 的 splits数   1473 /300 = 5
b.txt 的 splits数   25 / 300 = 1
c.txt 的 splits数   40 / 300 = 1




2、MapReduce作业中Reduce Task 数目的指定 :
1)JobClient类中submitJobInternal方法中指定   int reduces = jobCopy.getNumReduceTasks();

2) 而JobConf类中,public int getNumReduceTasks() { return getInt("mapred.reduce.tasks", 1); }
因此,Reduce Task 数目是由mapred.reduce.tasks,如果不指定默认为1。


file:///C:/Users/Ganymede/AppData/Local/Temp/Wiz/2c35b4e7-1282-47b2-a041-351f4f09d78e_4_files/59b8a73e-3cc0-4ec5-bad3-4d417d5d9d7a.png


3) reduce task的数目,决定了输出文件的个数。
统计wordcount可以有两个以上的reduce,如果是排序的mapreduce,两个或者以上的reduce的结果都是不合理的。

Yarn(MR2)上的应用汇总

唐半张 发表了文章 0 个评论 1745 次浏览 2015-10-08 10:40 来自相关话题

Yarn做为hadoop下一代集群资源管理和调度平台, 其上能支持多种计算框架, 本文就简要介绍一下这些计算框架. 1 ...查看全部
Yarn做为hadoop下一代集群资源管理和调度平台, 其上能支持多种计算框架, 本文就简要介绍一下这些计算框架.
1.       MapReduce

首先是大家熟悉的mapreduce, 在MR2之前, hadoop包括HDFS和mapreduce, 做为hadoop上唯一的分布式计算框架, 其优点是用户可以很方便的编写分布式计算程序, 并支持许多的应用, 如hive, mahout, pig等. 但是其缺点是无法充分利用集群资源, 不支持DAG, 迭代式计算等. 为了解决这些问题, yahoo提出了Yarn (next generation mapreduce), 一个分布式集群集群资源管理和调度平台. 这样除了mapreduce外, 还可以支持各种计算框架.

2.       Spark

Spark是一种与mapreduce相似的开源计算框架, 不同之处在于Spark在某些工作负载方面表现更优, 因为它使用了内存分布式数据集, 另外除了提供交互式查询外, 它还可以优化迭代工作负载.

3.       Apache HAMA

Apache Hama 是一个运行在HDFS上的BSP(Bulk Synchronous Parallel大容量同步并行) 计算框架, 主要针对大规模科学计算,如矩阵, 图像, 网络算法等.当前它有一下功能:

作业提交和管理接口
单节点上运行多个任务
输入/输出格式化
备份恢复
支持通过Apache Whirr运行在云端
支持与Yarn一起运行
4.       Apache Giraph

图像处理平台上运行这大型算法(如page rank, shared connections, personalization-based popularity 等)已经很流行, Giraph采用BSP模型(bulk-synchronous parallel model),可用于等迭代类算法。

5.       Open MPI

这是一个高性能计算函数库,通常在HPC(High Performance Computing)中采用,与MapReduce相比,其性能更高,用户可控性更强,但编程复杂,容错性差,可以说,各有所长,在实际应用中,针对不同 该应用会采用MPI或者MapReduce。

6.       Apache HBase

HBase是一个hadoop数据库, 其特点是分布式,可扩展的,存储大数据。当有需要随机,实时读写的大数据时, 使用HBase很适合.

MapReduce\Tez\Storm\Spark四个框架的异同

唐半张 发表了文章 0 个评论 1914 次浏览 2015-10-08 10:38 来自相关话题

1) MapReduce:是一种离线计算框架,将一个算法抽象成Map和Reduce两个阶段进行 处理,非常适合数据密集型计算。 2) Spark:MapReduce计算框架不适合迭代计算和交互式计算,MapR ...查看全部
1) MapReduce:是一种离线计算框架,将一个算法抽象成Map和Reduce两个阶段进行

处理,非常适合数据密集型计算。

2) Spark:MapReduce计算框架不适合迭代计算和交互式计算,MapReduce是一种磁盘

计算框架,而Spark则是一种内存计算框架,它将数据尽可能放到内存中以提高迭代

应用和交互式应用的计算效率。

3) Storm:MapReduce也不适合进行流式计算、实时分析,比如广告点击计算等,而

Storm则更擅长这种计算、它在实时性要远远好于MapReduce计算框架。

4)Tez: 运行在YARN之上支持DAG作业的计算框架,对MapReduce数据处理的归纳。它

把Map/Reduce过程拆分成若干个子过程,同时可以把多个Map/Reduce任务组合成一个

较大的DAG任务,减少了Map/Reduce之间的文件存储。同时合理组合其子过程,也可

以减少任务的运行时间。

MapReduce – 用户编程接口

唐半张 发表了文章 0 个评论 1601 次浏览 2015-09-30 10:08 来自相关话题

MapReduce – 用户编程接口 下面将着重谈下MapReduce框架中用户经常使用的一些接口或类的详细内容。了解 ...查看全部
MapReduce – 用户编程接口
下面将着重谈下MapReduce框架中用户经常使用的一些接口或类的详细内容。了解这些会极大帮助你实现、配置和优化MR任务。当然javadoc中对每个class或接口都进行了更全面的陈述,这里只是一个指引教程。

首先来看下Mapper和Reducer接口,通常MR应用都要实现这两个接口来提供map和reduce方法,这些是MRJob的核心部分。
 
Mapper 将输入的kv对映射成中间数据kv对集合。Maps 将输入记录转变为中间记录,其中被转化后的记录不必和输入记录类型相同。一个给定的输入对可以映射为0或者多个输出对。

在MRJob执行过程中,MapReduce框架根据提前指定的InputFormat(输入格式对象)产生InputSplit(输入分片),而每个InputSplit将会由一个map任务处理。

总起来讲,Mapper实现类通过JobConfigurable.configure(JobConf)方法传入JobConf对象来初始化,然后在每个map任务中调用map(WritableComparable,Writable,OutputCollector,Reporter)方法处理InputSplit的每个kv对。MR应用可以覆盖Closeable.close方法去处理一些必须的清理工作。

输出对不一定和输入对类型相同。一个给定的输入对可能映射成0或者很多的输出对。输出对是框架通过调用OutputCollector.colect(WritableComparable,Writable)得到。

MR应用可以使用Reporter汇报进度,设置应用层级的状态信息,更新计数器或者只是显示应用处于运行状态等。

所有和给定的输出key关联的中间数据都会随后被框架分组处理,并传给Reducer处理以产生最终的输出。用户可以通过JobConf.setOutputKeyComparatorClass(Class)指定一个Comparator控制分组处理过程。

Mapper输出都被排序后根据Reducer数量进行分区,分区数量等于reduce任务数量。用户可以通过实现自定义的Partitioner来控制哪些keys(记录)到哪个Reducer中去。

此外,用户还可以指定一个Combiner,调用JobConf.setCombinerClass(Class)来实现。这个可以来对map输出做本地的聚合,有助于减少从mapper到reducer的数据量。

经过排序的中间输出数据通常以一种简单的格式(key-len,key,value-len,value)存储。应用可以决定是否或者怎样被压缩以及压缩格式,可以通过JobConf来指定.

Map数

通常map数由输入数据总大小决定,也就是所有输入文件的blocks数目决定。

每个节点并行的运行的map数正常在10到100个。由于Map任务初始化本身需要一段时间所以map运行时间至少在1分钟为好。

如此,如果有10T的数据文件,每个block大小128M,最大使用为82000map数,除非使用setNumMapTasks(int)(这个方法仅仅对MR框架提供一个建议值)将map数值设置到更高。

Reducer

Reducer 根据key将中间数据集合处理合并为更小的数据结果集。
用户可以通过JobConf.setNumReduceTasks(int)设置作业的reducer数目。

整体而言,Reducer实现类通过JobConfigurable.configure(JobConf)方法将JobConf对象传入,并为Job设置和初始化Reducer。MR框架调用 reduce(WritableComparable, Iterator, OutputCollector, Reporter) 来处理以key被分组的输入数据。应用可以覆盖Closeable.close()处理必要的清理操作。

Reducer由三个主要阶段组成:shuffle,sort,reduce。

shuffle

输入到Reducer的输入数据是Mapper已经排过序的数据.在shuffle阶段,框架根据partition算法获取相关的mapper地址,并通过Http协议将数据由reducer拉取到reducer机器上处理。

sort

框架在这个阶段会根据key对reducer的输入进行分组(因为不同的mapper输出的数据中可能含有相同的key)。
shuffle和sort是同时进行的,同时reducer仍然在拉取map的输出。

Secondary Sort

如果对中间数据key进行分组的规则和在处理化简阶段前对key分组规则不一致时,可以通过 JobConf.setOutputValueGroupingComparator(Class)设置一个Comparator。因为中间数据的分组策略是通过 JobConf.setOutputKeyComparatorClass(Class) 设置的,可以控制中间数据根据哪些key进行分组。而JobConf.setOutputValueGroupingComparator(Class)则可用于在数据连接情况下对值进行二次排序。

Reduce(化简)

这个阶段框架循环调用 reduce(WritableComparable, Iterator, OutputCollector, Reporter) 方法处理被分组的每个kv对。
reduce 任务一般通过 OutputCollector.collect(WritableComparable, Writable)将输出数据写入文件系统FileSystem。
应用可以使用Reporter汇报作业执行进度、设置应用层级的状态信息并更新计数器(Counter),或者只是提示作业在运行。
注意,Reducer的输出不会进行排序。

Reducer数目

合适的reducer数目可以这样估算:
(节点数目mapred.tasktracker.reduce.tasks.maximum)乘以0.95 或 乘以1.75。
因子为0.95时,当所有map任务完成时所有reducer可以立即启动,并开始从map机器上拉取数据。因子为1.75时,最快的一些节点将完成第一轮reduce处理,此时框架开始启动第二轮reduce任务,这样可以达到比较好的作业负载均衡。

提高reduce数目会增加框架的运行负担,但有利于提升作业的负载均衡并降低失败的成本。

上述的因子使用最好在作业执行时框架仍然有reduce槽为前提,毕竟框架还需要对作业进行可能的推测执行和失败任务的处理。

不使用Reducer

如果不需要进行化简处理,可以将reduce数目设为0。

这种情况下,map的输出会直接写入到文件系统。输出路径通过setOutputPath(Path)指定。框架在写入数据到文件系统之前不再对map结果进行排序。

Partitioner

Partitioner对数据按照key进行分区,从而控制map的输出传输到哪个reducer上。默认的Partitioner算法是hash(哈希。分区数目由作业的reducer数目决定。
HashPartitioner 是默认的Partitioner。

Reporter

Reporter为MR应用提供了进度报告、应用状态信息设置,和计数器(Counter)更新等功能.

Mapper和Reducer实现可以使用Reporter汇报进度或者提示作业在正常运行。在一些场景下,应用在处理一些特殊的kv对时耗费了过 多时间,这个可能会因为框架假定任务超时而强制停止了这些作业。为避免该情况,可以设置mapred.task.timeout 为一个比较高的值或者将其设置为0以避免超时发生。

应用也可以使用Reporter来更新计数(Counter)。

OutputCollector

OutputCollector是MR框架提供的通用工具来收集Mapper或者Reducer输出数据(中间数据或者最终结果数据)。

Hadoop MapReduce提供了一些经常使用的mapper、reducer和partioner的实现来。这些工具可以点击这里进行学习。

combiner函数作用和用法

唐半张 发表了文章 0 个评论 1885 次浏览 2015-09-30 10:05 来自相关话题

很多时候MapReduce程序受限于集群上可用的带宽,所以它会尽力最小化需要在map和reduce任务之间传输的中间数据。Hadoop允许用户声明一个combiner function来处理map的输出 ...查看全部
很多时候MapReduce程序受限于集群上可用的带宽,所以它会尽力最小化需要在map和reduce任务之间传输的中间数据。Hadoop允许用户声明一个combiner function来处理map的输出,同时把自己对map的处理结果作为reduce的输入。因为combiner function本身只是一种优化,hadoop并不保证对于某个map输出,这个方法会被调用多少次。换句话说,不管combiner function被调用多少次,对应的reduce输出结果都应该是一样的。
 
 
下面例子来加以说明,假设1950年的天气数据读取是由两个map完成的,其中第一个map的输出如下: 
(1950, 0)
(1950, 20)
(1950, 10)


第二个map的输出为:
(1950, 25)
(1950, 15)


而reduce得到的输入为:(1950, [0, 20, 10, 25, 15]), 输出为:(1950, 25)


由于25是集合中的最大值,我们可以使用一个类似于reduce function的combiner function来找出每个map输出中的最大值,这样的话,reduce的输入就变成了:
(1950, [20, 25])


各个funciton 对温度值的处理过程可以表示如下:max(0, 20, 10, 25, 15) = max(max(0, 20, 10), max(25, 15)) = max(20, 25) = 25


注意:并不是所有的函数都拥有这个属性的(有这个属性的函数我们称之为commutative和associative),例如,如果我们要计算平均温度,就不能这样使用combiner function,因为mean(0, 20, 10, 25, 15) = 14,而mean(mean(0, 20, 10), mean(25, 15)) = mean(10, 20) = 15


combiner function并不能取代reduce function(因为仍然需要reduce function处理来自不同map的带有相同key的记录)。但是他可以帮助减少需要在map和reduce之间传输的数据,就为这一点combiner function就值得考虑使用。

Shell写Wordcount程序

唐半张 发表了文章 0 个评论 1816 次浏览 2015-09-30 10:02 来自相关话题

Mapper: #! /bin/sh ...查看全部
Mapper:
#! /bin/sh
while read LINE;do
   for word in $LINE
   do
        echo "$word 1"
   done
done


Reducer:

#! /bin/sh
count=0
started=0
word="" 
while read LINE;do
  newword=`echo $LINE | cut -d ' ' -f 1`
  if [ "x" == x"$newword" ];then
        continue
  fi    
  if [ "$word" != "$newword" ];then
        [ $started -ne 0 ] && echo -e "$word\t$count"
        word=$newword 
        count=1
        started=1
  else 
        count=$(( $count + 1 ))
  fi
done


测试:cat test | sh mapper.sh | sort | sh reducer.sh

但是用haoop jar 执行,报一下错误:
13/12/08 00:24:07 ERROR streaming.StreamJob: Job not successful. Error: # of failed Map Tasks exceeded allowed limit. FailedCount: 1. LastFailedTask: task_201312072336_0008_m_000000
13/12/08 00:24:07 INFO streaming.StreamJob: killJob...
Streaming Command Failed!

MapReduce,组合式,迭代式,链式

唐半张 发表了文章 0 个评论 1476 次浏览 2015-09-30 09:58 来自相关话题

1.迭代式mapreduce     一些复杂的任务难以用一次mapreduce处理完成,需要多次mapreduce才能完成任务,例如Pagrank,Kmeans算法都需要多次的迭代,关于mapreduce迭代在m ...查看全部
1.迭代式mapreduce
    一些复杂的任务难以用一次mapreduce处理完成,需要多次mapreduce才能完成任务,例如Pagrank,Kmeans算法都需要多次的迭代,关于mapreduce迭代在mahout中运用较多。有兴趣的可以参考一下mahout的源码。
在map/reduce迭代过程中,思想还是比较简单,就像类似for循环一样,前一个mapreduce的输出结果,作为下一个mapreduce的输入,任务完成后中间结果都可以删除。如代码所以:Configuration conf1 = new Configuration();
Job job1 = new Job(conf1,"job1");
.....
FileInputFormat.addInputPath(job1,InputPaht1);
FileOutputFromat.setOoutputPath(job1,Outpath1);
job1.waitForCompletion(true);
//sub Mapreduce
Configuration conf2 = new Configuration();
Job job2 = new Job(conf1,"job1");
.....
FileInputFormat.addInputPath(job2,Outpath1);
FileOutputFromat.setOoutputPath(job2,Outpath2);
job2.waitForCompletion(true);
//sub Mapreduce
Configuration conf3 = new Configuration();
Job job3 = new Job(conf1,"job1");
.....
FileInputFormat.addInputPath(job3,Outpath2);
FileOutputFromat.setOoutputPath(job3,Outpath3);
job3.waitForCompletion(true);
.....
下面列举一个mahout怎样运用mapreduce迭代的,下面的代码快就是mahout中kmeans的算法的代码,在main函数中用一个while循环来做mapreduce的迭代,其中:runIteration()是一次mapreduce的过程。
但个人感觉现在的mapreduce迭代设计不太满意的地方。
1. 每次迭代,如果所有Job(task)重复创建,代价将非常高。
2.每次迭代,数据都写入本地和读取本地,I/O和网络传输的代价比较大。
好像Twister和Haloop的模型能过比较好的解决这些问题,但他们抽象度不够高,支持的计算有限。
期待着下个版本hadoop更好的支持迭代算法。//main function
while (!converged && iteration <= maxIterations) {
      log.info("K-Means Iteration {}", iteration);
      // point the output to a new directory per iteration
      Path clustersOut = new Path(output, AbstractCluster.CLUSTERS_DIR + iteration);
      converged = runIteration(conf, input, clustersIn, clustersOut, measure.getClass().getName(), delta);
      // now point the input to the old output directory
      clustersIn = clustersOut;
      iteration++;
}


  private static boolean runIteration(Configuration conf,
                                      Path input,
                                      Path clustersIn,
                                      Path clustersOut,
                                      String measureClass,
                                      String convergenceDelta)
    throws IOException, InterruptedException, ClassNotFoundException {


    conf.set(KMeansConfigKeys.CLUSTER_PATH_KEY, clustersIn.toString());
    conf.set(KMeansConfigKeys.DISTANCE_MEASURE_KEY, measureClass);
    conf.set(KMeansConfigKeys.CLUSTER_CONVERGENCE_KEY, convergenceDelta);


    Job job = new Job(conf, "KMeans Driver running runIteration over clustersIn: " + clustersIn);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(ClusterObservations.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Cluster.class);


    job.setInputFormatClass(SequenceFileInputFormat.class);
    job.setOutputFormatClass(SequenceFileOutputFormat.class);
    job.setMapperClass(KMeansMapper.class);
    job.setCombinerClass(KMeansCombiner.class);
    job.setReducerClass(KMeansReducer.class);


    FileInputFormat.addInputPath(job, input);
    FileOutputFormat.setOutputPath(job, clustersOut);


    job.setJarByClass(KMeansDriver.class);
    HadoopUtil.delete(conf, clustersOut);
    if (!job.waitForCompletion(true)) {
      throw new InterruptedException("K-Means Iteration failed processing " + clustersIn);
    }
    FileSystem fs = FileSystem.get(clustersOut.toUri(), conf);


    return isConverged(clustersOut, conf, fs);
  }

2.依赖关系组合式MapReduce
我们可以设想一下MapReduce有3个子任务job1,job2,job3构成,其中job1和job2相互独立,job3要在job1和job2完成之后才执行。这种关系就叫复杂数据依赖关系的组合时mapreduce。hadoop为这种组合关系提供了一种执行和控制机制,hadoop通过job和jobControl类提供具体的编程方法。Job除了维护子任务的配置信息,还维护子任务的依赖关系,而jobControl控制整个作业流程,把所有的子任务作业加入到JobControl中,执行JobControl的run()方法即可运行程序。
下面给出伪代码:Configuration job1conf = new Configuration();
Job job1 = new Job(job1conf,"Job1");
.........//job1 其他设置
Configuration job2conf = new Configuration();
Job job2 = new Job(job2conf,"Job2");
.........//job2 其他设置
Configuration job3conf = new Configuration();
Job job3 = new Job(job3conf,"Job3");
.........//job3 其他设置
job3.addDepending(job1);//设置job3和job1的依赖关系
job3.addDepending(job2);
JobControl JC = new JobControl("123");
JC.addJob(job1);//把三个job加入到jobcontorl中
JC.addJob(job2);
JC.addJob(job3);
JC.run();

3.链式MapReduce
首先看一下例子,来说明为什么要有链式MapReduce,假设在统计单词是,会出现这样的词,make,made,making等,他们都属于一个词,在单词累加的时候,都归于一个词。解决的方法为用一个单独的Mapreduce任务可以实现,单增加了多个Mapreduce作业,将增加整个作业处理的周期,还增加了I/O操作,因而处理效率不高。
一个较好的办法就是在核心的MapReduce之外,增加一个辅助的Map过程,然后将这个辅助的Map过程和核心的Mapreudce过程合并为一个链式的Mapreduce,从而完成整个作业。hadoop提供了专门的链式ChainMapper和ChainReducer来处理链式任务,ChainMapper允许一个Map任务中添加多个Map的子任务,ChainReducer可以在Reducer执行之后,在加入多个Map的子任务。其调用形式如下:ChainMapper.addMapper(...);
    ChainReducer.addMapper(...);
    //addMapper()调用的方法形式如下:
    public static void addMapper(JOb job,
            Class mclass,
            Class inputKeyClass,
            Class inputValueClass,
            Class outputKeyClass,
            Class outputValueClass,
            Configuration conf
    ){
    }
其中,ChainReducer专门提供了一个setRreducer()方法来设置整个作业唯一的Reducer。
note:这些Mapper和Reducer之间传递的键和值都必须保持一致。
下面举个例子:用ChainMapper把Map1加如并执行,然后用ChainReducer把Reduce和Map2加入到Reduce过程中。代码如下:Map1.class 要实现map方法public void function throws IOException {
        Configuration conf = new Configuration();
        Job job = new Job(conf);
        job.setJobName("ChianJOb");
        // 在ChainMapper里面添加Map1
        Configuration map1conf = new Configuration(false);
        ChainMapper.addMapper(job, Map1.class, LongWritable.class, Text.class,
                Text.class, Text.class, true, map1conf);
        // 在ChainReduce中加入Reducer,Map2;
        Configuration reduceConf = new Configuration(false);
        ChainReducer.setReducer(job, Reduce.class, LongWritable.class,
                Text.class, Text.class, Text.class, true, map1conf);
        Configuration map2Conf = new Configuration();
        ChainReducer.addMapper(job, Map2.class, LongWritable.class, Text.class,
                Text.class, Text.class, true, map1conf);
        job.waitForCompletion(true);
    }
 

Window开发MapReduce环境

唐半张 发表了文章 0 个评论 1500 次浏览 2015-09-30 09:45 来自相关话题

同样适用于linux 1.首先创建Java项目   2.简单起见,导入所有jar包 在项目上 右键->属性   选择ja ...查看全部
同样适用于linux
1.首先创建Java项目
 
2.简单起见,导入所有jar包
在项目上 右键->属性
 
选择jar包位置如下图:
 
3.创建配置文件夹
 
4.将core-site.xml,hdfs-site.xml,mapred-site.xml放入到此文件夹中。
5.右键->属性 进行如下操作,添加刚刚创建的文件夹
 
基本上配置完成,需要主要的是,写代码的过程中要写入的namenode和jobtracker地址(修改为你自己的)
在你的代码中,加入如下代码就可以了
  1. Configuration conf = new Configuration();
  2. conf.set("fs.default.name","192.1.1.2:9000");
  3. conf.set("mapred.job.tracker","192.1.1.2:9001");

mapreduce 多种输入

唐半张 发表了文章 0 个评论 1419 次浏览 2015-09-29 11:18 来自相关话题

1.多路径输入 1)FileInputFormat.addInputPath 多次调用加载不同路径 FileInputFormat.addInputPath(job, new Path("hdfs://master:9000/cs/pa ...查看全部
1.多路径输入
1)FileInputFormat.addInputPath 多次调用加载不同路径
FileInputFormat.addInputPath(job, new Path("hdfs://master:9000/cs/path1"));
FileInputFormat.addInputPath(job, new Path("hdfs://master:9000/cs/path2"));
 
2)FileInputFormat.addInputPaths一次调用加载 多路径字符串用逗号隔开
FileInputFormat.addInputPaths(job, "hdfs://master:9000/cs/path1,hdfs:/master:9000/cs/path2");
2.多种输入
MultipleInputs可以加载不同路径的输入文件,并且每个路径可用不同的maper
MultipleInputs.addInputPath(job, new Path("hdfs://master:9000/cs/path1"), TextInputFormat.class,MultiTypeFileInput1Mapper.class);
MultipleInputs.addInputPath(job, new Path("hdfs://master:9000/cs/path3"), TextInputFormat.class,MultiTypeFileInput3Mapper.class);

在Eclipse下开发并运行Hadoop Map/Reduce程序

唐半张 发表了文章 0 个评论 1849 次浏览 2015-09-29 10:53 来自相关话题

在Eclipse下开发并运行Hadoop  Map/Reduce程序 在eclipse3.7中配置hadoop1.0.1插件 ...查看全部
在Eclipse下开发并运行Hadoop  Map/Reduce程序
在eclipse3.7中配置hadoop1.0.1插件
1.安装插件
准备程序:
A.        eclipse-3.7 :下载:eclipse-java-indigo-SR2-linux-gtk.tar.gz并解压缩
B.hadoop-eclipse-plugin-1.0.1 (http://download.csdn.net/detail/xiaoping8411/4216102)
将hadoop-eclipse-plugin-1.0.1复制到eclipse/plugins目录下,重启eclipse。
2.打开MapReduce视图
Window -> Open Perspective -> Other 选择Map/Reduce,图标是个蓝色的象。 
3.添加一个MapReduce环境
在eclipse下端,控制台旁边会多一个Tab,叫“Map/Reduce Locations”,在下面空白的地方点右键,选择“New Hadoop location...”
在弹出的对话框中填写如下内容:
Location name(取个名字):如HadoopLocation
Map/Reduce Master(Job Tracker的IP和端口,根据mapred-site.xml中配置的mapred.job.tracker来填写) 
DFS Master(Name Node的IP和端口,根据core-site.xml中配置的fs.default.name来填写)
User name: 用户名(默认操作系统用户名,这个没什么用),比如root

4.使用eclipse对HDFS内容进行修改
经过上一步骤,左侧“Project Explorer”中应该会出现配置好的HDFS,点击右键,可以进行新建文件夹、删除文件夹、上传文件、下载文件、删除文件等操作。
注意:每一次操作完在eclipse中不能马上显示变化,必须得刷新一下。

5.创建MapReduce工程
A.配置Hadoop路径
Window -> Preferences 选择 “Hadoop Map/Reduce”,点击“Browse...”选择Hadoop文件夹的路径。

这个步骤与运行环境无关,只是在新建工程的时候能将hadoop根目录和lib目录下的所有jar包自动导入。
B.创建工程
File -> New -> Project 选择“Map/Reduce Project”,然后输入项目名称,创建项目。插件会自动把hadoop根目录和lib目录下的所有jar包导入。
C.创建Mapper或者Reducer
File -> New -> Mapper 创建Mapper,自动继承mapred包里面的MapReduceBase并实现Mapper接口。 
注意:这个插件自动继承的是mapred包里旧版的类和接口,新版的Mapper得自己写。
Reducer同理。
6.在eclipse中运行WordCount程序
A.导入WordCount
从网上下载hadoop自带的hadoop-examples-1.0.1.jar中获取WordCount程序的源代码,
创建一个项目把该代码放进去
B.配置运行参数
Run As -> Open Run Dialog... 选择WordCount程序,在Arguments中配置运行参数:/mapreduce/wordcount/input    /mapreduce/wordcount/output/
分别表示HDFS下的输入目录和输出目录,其中输入目录中有几个文本文件,输出目录必须不存在。必须在第3步建立的HadoopLocation下建立mapreduce/wordcount/input目录,并将file1.txt, file2.txt拷到该目录下。

C.运行
Run As -> Run on Hadoop 选择之前配置好的MapReduce运行环境,点击“Finish”运行。

控制台会输出相关的运行信息。

D.查看运行结果
在输出目录/mapreduce/wordcount/output/1中,可以看见WordCount程序的输出文件。除此之外,还可以看见一个logs文件夹,里面会有运行的日志。

新旧 Hadoop MapReduce 框架比对(配置)

唐半张 发表了文章 0 个评论 2227 次浏览 2015-09-29 10:26 来自相关话题

[size=0.76em]让我们来对新旧 MapReduce 框架做详细的分析和对比,可以看到有以下几点显著变化: [size=0.76em]首先客户端不变,其调用 API 及接口大部分保持兼容,这也是为了对开发使用者透明化,使其不必对原有代码做大 ...查看全部
[size=0.76em]让我们来对新旧 MapReduce 框架做详细的分析和对比,可以看到有以下几点显著变化:
[size=0.76em]首先客户端不变,其调用 API 及接口大部分保持兼容,这也是为了对开发使用者透明化,使其不必对原有代码做大的改变 ( 详见 2.3 Demo 代码开发及详解),但是原框架中核心的 JobTracker 和 TaskTracker 不见了,取而代之的是 ResourceManager, ApplicationMaster 与 NodeManager 三个部分。
[size=0.76em]我们来详细解释这三个部分,首先 ResourceManager 是一个中心的服务,它做的事情是调度、启动每一个 Job 所属的 ApplicationMaster、另外监控 ApplicationMaster 的存在情况。细心的读者会发现:Job 里面所在的 task 的监控、重启等等内容不见了。这就是 AppMst 存在的原因。ResourceManager 负责作业与资源的调度。接收 JobSubmitter 提交的作业,按照作业的上下文 (Context) 信息,以及从 NodeManager 收集来的状态信息,启动调度过程,分配一个 Container 作为 App Mstr
[size=0.76em]NodeManager 功能比较专一,就是负责 Container 状态的维护,并向 RM 保持心跳。
[size=0.76em]ApplicationMaster 负责一个 Job 生命周期内的所有工作,类似老的框架中 JobTracker。但注意每一个 Job(不是每一种)都有一个 ApplicationMaster,它可以运行在 ResourceManager 以外的机器上。
[size=0.76em]Yarn 框架相对于老的 MapReduce 框架什么优势呢?我们可以看到:
  • 这个设计大大减小了 JobTracker(也就是现在的 ResourceManager)的资源消耗,并且让监测每一个 Job 子任务 (tasks) 状态的程序分布式化了,更安全、更优美。
  • 在新的 Yarn 中,ApplicationMaster 是一个可变更的部分,用户可以对不同的编程模型写自己的 AppMst,让更多类型的编程模型能够跑在 Hadoop 集群中,可以参考 hadoop Yarn 官方配置模板中的 mapred-site.xml 配置。
  • 对于资源的表示以内存为单位 ( 在目前版本的 Yarn 中,没有考虑 cpu 的占用 ),比之前以剩余 slot 数目更合理。
  • 老的框架中,JobTracker 一个很大的负担就是监控 job 下的 tasks 的运行状况,现在,这个部分就扔给 ApplicationMaster 做了,而 ResourceManager 中有一个模块叫做 ApplicationsMasters( 注意不是 ApplicationMaster),它是监测 ApplicationMaster 的运行状况,如果出问题,会将其在其他机器上重启。
  • Container 是 Yarn 为了将来作资源隔离而提出的一个框架。这一点应该借鉴了 Mesos 的工作,目前是一个框架,仅仅提供 java 虚拟机内存的隔离 ,hadoop 团队的设计思路应该后续能支持更多的资源调度和控制 , 既然资源表示成内存量,那就没有了之前的 map slot/reduce slot 分开造成集群资源闲置的尴尬情况。

[size=0.76em]新的 Yarn 框架相对旧 MapRduce 框架而言,其配置文件 , 启停脚本及全局变量等也发生了一些变化,主要的改变如下:
[size=0.76em]
 改变项原框架中新框架中(Yarn)备注配置文件位置${hadoop_home_dir}/conf${hadoop_home_dir}/etc/hadoop/Yarn 框架也兼容老的 ${hadoop_home_dir}/conf 位置配置,启动时会检测是否存在老的 conf 目录,如果存在将加载 conf 目录下的配置,否则加载 etc 下配置启停脚本${hadoop_home_dir}/bin/start(stop)-all.sh${hadoop_home_dir}/sbin/start(stop)-dfs.sh
${hadoop_home_dir}/bin/start(stop)-all.sh新的 Yarn 框架中启动分布式文件系统和启动 Yarn 分离,启动 / 停止分布式文件系统的命令位于 ${hadoop_home_dir}/sbin 目录下,启动 / 停止 Yarn 框架位于 ${hadoop_home_dir}/bin/ 目录下JAVA_HOME 全局变量${hadoop_home_dir}/bin/start-all.sh 中${hadoop_home_dir}/etc/hadoop/hadoop-env.sh
${hadoop_home_dir}/etc/hadoop/Yarn-env.shYarn 框架中由于启动 hdfs 分布式文件系统和启动 MapReduce 框架分离,JAVA_HOME 需要在 hadoop-env.sh 和 Yarn-env.sh 中分别配置HADOOP_LOG_DIR 全局变量不需要配置${hadoop_home_dir}/etc/hadoop/hadoop-env.sh老框架在 LOG,conf,tmp 目录等均默认为脚本启动的当前目录下的 log,conf,tmp 子目录 
Yarn 新框架中 Log 默认创建在 Hadoop 用户的 home 目录下的 log 子目录,因此最好在 ${hadoop_home_dir}/etc/hadoop/hadoop-env.sh 配置 HADOOP_LOG_DIR,否则有可能会因为你启动 hadoop 的用户的 .bashrc 或者 .bash_profile 中指定了其他的 PATH 变量而造成日志位置混乱,而该位置没有访问权限的话启动过程中会报错 
[size=0.76em]由于新的 Yarn 框架与原 Hadoop MapReduce 框架相比变化较大,核心的配置文件中很多项在新框架中已经废弃,而新框架中新增了很多其他配置项,看下表所示会更加清晰:
[size=0.76em]
[table]
[tr][td]配置文件[/td][td]配置项[/td][td]Hadoop 0.20.X 配置[/td][td]Hadoop 0.23.X 配置[/td][td]说明[/td][/tr]
[tr][td]core-site.xml[/td][td]系统默认分布式文件 URI[/td][td]fs.default.name[/td][td]fs.defaultFS[/td][td][/td][/tr]
[tr][td]hdfs-site.xml[/td][td]DFS name node 存放 name table 的目录[/td][td]dfs.name.dir[/td][td]dfs.namenode.name.dir[/td][td]新框架中 name node 分成 dfs.namenode.name.dir( 存放 naname table 和 [/td][/tr]
[tr][td][/td][td]DFS data node 存放数据 block 的目录[/td][td]dfs.data.dir[/td][td]dfs.datanode.data.dir[/td][td]新框架中 DataNode 增加更多细节配置,位于 dfs.datanode. 配置项下,如[/td][/tr]
[tr][td][/td][td]分布式文件系统数据块复制数[/td][td]dfs.replication[/td][td]dfs.replication[/td][td]新框架与老框架一致,值建议配置为与分布式 cluster 中实际的 DataNode 主机数一致[/td][/tr]
[tr][td]mapred-site.xml[/td][td]Job 监控地址及端口[/td][td]mapred.job.tracker[/td][td]无[/td][td]新框架中已改为 Yarn-site.xml 中的 resouceManager 及 nodeManager 具体配置项,新框架中历史 job 的查询已从 Job tracker 剥离,归入单独的[/td][/tr]
[tr][td][/td][td]第三方 MapReduce 框架[/td][td]无[/td][td]mapreduce.framework.name[/td][td]新框架支持第三方 MapReduce 开发框架以支持如 SmartTalk/DGSG 等非 Yarn 架构,注意通常情况下这个配置的值都设置为 Yarn,如果没有配置这项,那么提交的 Yarn job 只会运行在 locale 模式,而不是分布式模式。[/td][/tr]
[tr][td][/td][td][/td][td][/td][td][/td][td][/td][/tr]
[tr][td]Yarn-site.xml[/td][td]The address of the applications manager interface in the RM[/td][td]无[/td][td]Yarn.resourcemanager.address[/td][td]新框架中 NodeManager 与 RM 通信的接口地址[/td][/tr]
[tr][td][/td][td]The address of the scheduler interface[/td][td]无[/td][td]Yarn.resourcemanager.scheduler.address[/td][td]同上,NodeManger 需要知道 RM 主机的 scheduler 调度服务接口地址[/td][/tr]
[tr][td][/td][td]The address of the RM web application[/td][td]无[/td][td]Yarn.resourcemanager.webapp.address[/td][td]新框架中各个 task 的资源调度及运行状况通过通过该 web 界面访问[/td][/tr]
[tr][td][/td][td]The address of the resource tracker interface[/td][td]无[/td][td]Yarn.resourcemanager.resource-tracker.address

MapReduce——WordCount问题总结

唐半张 发表了文章 0 个评论 2497 次浏览 2015-09-25 11:02 来自相关话题

我是单节点模拟并发模式 下面把整了一下午WordCount的问题总结一下,我是自己实现了一个。 ...查看全部
我是单节点模拟并发模式

下面把整了一下午WordCount的问题总结一下,我是自己实现了一个。

将源码打成jar包

问题1:
命令:xxx@xxx-ubuntu:~/Hadoop/hadoop-0.20.2$ bin/hadoop jar wordcount.jar WordCount input output
报错:
11/12/21 15:07:06 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
11/12/21 15:07:06 WARN mapred.JobClient: No job jar file set.  User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
Exception in thread "main" org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory input already exists
解决方案:这个主要是WordCount类没有找到,要加上包名,eg:bin/hadoop jar wordcount.jar org.mypackage.WordCount input output

问题2:(同样的命令,加上包名)
INFO mapred.JobClient: Task Id : attempt_201112211459_0003_m_000000_0, Status : FAILED
java.lang.RuntimeException: java.lang.ClassNotFoundException: cn.edu.fudan.util.WordCount$Map
解决方案:需要在源码中加入一句话——job.setJarByClass(WordCount.class);

问题3:输出的文件夹不能存在,否则报错
Exception in thread "main" org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory output already exists
解决方案:命令——bin/hadoop fs -rmr output

上源码,多半是看别人的,呵呵
package org.mypackage;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class WordCount {
public static class Map extends Mapper {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value, Context context) 
throws IOException, InterruptedException{
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
context.write(word, one);
}
}
}
public static class Reduce extends Reducer {

@Override
public void reduce(Text key, Iterable values, Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
int sum = 0;
for (IntWritable val : values)
sum += val.get();
context.write(key, new IntWritable(sum));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();

Job job = new Job(conf, "wordcount");
job.setJarByClass(WordCount.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);

job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);

FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.waitForCompletion(true);

}
}

分布式计算框架有哪些

唐半张 发表了文章 0 个评论 2263 次浏览 2015-09-25 10:59 来自相关话题

在SIP项目设计的过程中,对于它庞大的日志在开始时就考虑使用任务分解的多线程处理模式来分析统计,在我从前写的文章《Tiger Concurrent Practice --日志分析并行分解设计与实现》中有 ...查看全部
在SIP项目设计的过程中,对于它庞大的日志在开始时就考虑使用任务分解的多线程处理模式来分析统计,在我从前写的文章《Tiger Concurrent Practice --日志分析并行分解设计与实现》中有所提到。但是由于统计的内容暂时还是十分简单,所以就采用Memcache作为计数器,结合MySQL就完成了访问控制以及统计的工作。然而未来,对于海量日志分析的工作,还是需要有所准备。现在最火的技术词汇莫过于“云计算”,在Open API日益盛行的今天,互联网应用的数据将会越来越有价值,如何去分析这些数据,挖掘其内在价值,就需要分布式计算来支撑海量数据的分析工作。
回过头来看,早先那种多线程,多任务分解的日志分析设计,其实是分布式计算的一个单机版缩略,如何将这种单机的工作进行分拆,变成协同工作的集群,其实就是分布式计算框架设计所涉及的。在去年参加BEA大会的时候,BEA和VMWare合作采用虚拟机来构建集群,无非就是希望使得计算机硬件能够类似于应用程序中资源池的资源,使用者无需关心资源的分配情况,从而最大化了硬件资源的使用价值。分布式计算也是如此,具体的计算任务交由哪一台机器执行,执行后由谁来汇总,这都由分布式框架的Master来抉择,而使用者只需简单地将待分析内容提供给分布式计算系统作为输入,就可以得到分布式计算后的结果。
Hadoop是Apache开源组织的一个分布式计算开源框架,在很多大型网站上都已经得到了应用,如亚马逊、Facebook和Yahoo等等。对于我来说,最近的一个使用点就是服务集成平台的日志分析。服务集成平台的日志量将会很大,而这也正好符合了分布式计算的适用场景(日志分析和索引建立就是两大应用场景)。
当前没有正式确定使用,所以也是自己业余摸索,后续所写的相关内容,都是一个新手的学习过程,难免会有一些错误,只是希望记录下来可以分享给更多志同道合的朋友。什么是Hadoop?
搞什么东西之前,第一步是要知道What(是什么),然后是Why(为什么),最后才是How(怎么做)。但很多开发的朋友在做了多年项目以后,都习惯是先How,然后What,最后才是Why,这样只会让自己变得浮躁,同时往往会将技术误用于不适合的场景。
 
Hadoop框架中最核心的设计就是:MapReduce和HDFS。MapReduce的思想是由Google的一篇论文所提及而被广为流传的,简单的一句话解释MapReduce就是“任务的分解与结果的汇总”。HDFS是Hadoop分布式文件系统(Hadoop Distributed File System)的缩写,为分布式计算存储提供了底层支持。
MapReduce从它名字上来看就大致可以看出个缘由,两个动词Map和Reduce,“Map(展开)”就是将一个任务分解成为多个任务,“Reduce”就是将分解后多任务处理的结果汇总起来,得出最后的分析结果。这不是什么新思想,其实在前面提到的多线程,多任务的设计就可以找到这种思想的影子。不论是现实社会,还是在程序设计中,一项工作往往可以被拆分成为多个任务,任务之间的关系可以分为两种:一种是不相关的任务,可以并行执行;另一种是任务之间有相互的依赖,先后顺序不能够颠倒,这类任务是无法并行处理的。回到大学时期,教授上课时让大家去分析关键路径,无非就是找最省时的任务分解执行方式。在分布式系统中,机器集群就可以看作硬件资源池,将并行的任务拆分,然后交由每一个空闲机器资源去处理,能够极大地提高计算效率,同时这种资源无关性,对于计算集群的扩展无疑提供了最好的设计保证。(其实我一直认为Hadoop的卡通图标不应该是一个小象,应该是蚂蚁,分布式计算就好比蚂蚁吃大象,廉价的机器群可以匹敌任何高性能的计算机,纵向扩展的曲线始终敌不过横向扩展的斜线)。任务分解处理以后,那就需要将处理以后的结果再汇总起来,这就是Reduce要做的工作。
图1:MapReduce结构示意图
上图就是MapReduce大致的结构图,在Map前还可能会对输入的数据有Split(分割)的过程,保证任务并行效率,在Map之后还会有Shuffle(混合)的过程,对于提高Reduce的效率以及减小数据传输的压力有很大的帮助。后面会具体提及这些部分的细节。
HDFS是分布式计算的存储基石,Hadoop的分布式文件系统和其他分布式文件系统有很多类似的特质。分布式文件系统基本的几个特点:
  1. 对于整个集群有单一的命名空间。
  2. 数据一致性。适合一次写入多次读取的模型,客户端在文件没有被成功创建之前无法看到文件存在。
  3. 文件会被分割成多个文件块,每个文件块被分配存储到数据节点上,而且根据配置会由复制文件块来保证数据的安全性。


图2:HDFS结构示意图
上图中展现了整个HDFS三个重要角色:NameNode、DataNode和Client。NameNode可以看作是分布式文件系统中的管理者,主要负责管理文件系统的命名空间、集群配置信息和存储块的复制等。NameNode会将文件系统的Meta-data存储在内存中,这些信息主要包括了文件信息、每一个文件对应的文件块的信息和每一个文件块在DataNode的信息等。DataNode是文件存储的基本单元,它将Block存储在本地文件系统中,保存了Block的Meta-data,同时周期性地将所有存在的Block信息发送给NameNode。Client就是需要获取分布式文件系统文件的应用程序。这里通过三个操作来说明他们之间的交互关系。
文件写入:
  1. Client向NameNode发起文件写入的请求。
  2. NameNode根据文件大小和文件块配置情况,返回给Client它所管理部分DataNode的信息。
  3. Client将文件划分为多个Block,根据DataNode的地址信息,按顺序写入到每一个DataNode块中。

文件读取:
  1. Client向NameNode发起文件读取的请求。
  2. NameNode返回文件存储的DataNode的信息。
  3. Client读取文件信息。

文件Block复制:
  1. NameNode发现部分文件的Block不符合最小复制数或者部分DataNode失效。
  2. 通知DataNode相互复制Block。
  3. DataNode开始直接相互复制。

最后再说一下HDFS的几个设计特点(对于框架设计值得借鉴):
  1. Block的放置:默认不配置。一个Block会有三份备份,一份放在NameNode指定的DataNode,另一份放在与指定DataNode非同一Rack上的DataNode,最后一份放在与指定DataNode同一Rack上的DataNode上。备份无非就是为了数据安全,考虑同一Rack的失败情况以及不同Rack之间数据拷贝性能问题就采用这种配置方式。
  2. 心跳检测DataNode的健康状况,如果发现问题就采取数据备份的方式来保证数据的安全性。
  3. 数据复制(场景为DataNode失败、需要平衡DataNode的存储利用率和需要平衡DataNode数据交互压力等情况):这里先说一下,使用HDFS的balancer命令,可以配置一个Threshold来平衡每一个DataNode磁盘利用率。例如设置了Threshold为10%,那么执行balancer命令的时候,首先统计所有DataNode的磁盘利用率的均值,然后判断如果某一个DataNode的磁盘利用率超过这个均值Threshold以上,那么将会把这个DataNode的block转移到磁盘利用率低的DataNode,这对于新节点的加入来说十分有用。
  4. 数据交验:采用CRC32作数据交验。在文件Block写入的时候除了写入数据还会写入交验信息,在读取的时候需要交验后再读入。
  5. NameNode是单点:如果失败的话,任务处理信息将会纪录在本地文件系统和远端的文件系统中。
  6. 数据管道性的写入:当客户端要写入文件到DataNode上,首先客户端读取一个Block然后写到第一个DataNode上,然后由第一个DataNode传递到备份的DataNode上,一直到所有需要写入这个Block的NataNode都成功写入,客户端才会继续开始写下一个Block。
  7. 安全模式:在分布式文件系统启动的时候,开始的时候会有安全模式,当分布式文件系统处于安全模式的情况下,文件系统中的内容不允许修改也不允许删除,直到安全模式结束。安全模式主要是为了系统启动的时候检查各个DataNode上数据块的有效性,同时根据策略必要的复制或者删除部分数据块。运行期通过命令也可以进入安全模式。在实践过程中,系统启动的时候去修改和删除文件也会有安全模式不允许修改的出错提示,只需要等待一会儿即可。

下面综合MapReduce和HDFS来看Hadoop的结构:

图3:Hadoop结构示意图
在Hadoop的系统中,会有一台Master,主要负责NameNode的工作以及JobTracker的工作。JobTracker的主要职责就是启动、跟踪和调度各个Slave的任务执行。还会有多台Slave,每一台Slave通常具有DataNode的功能并负责TaskTracker的工作。TaskTracker根据应用要求来结合本地数据执行Map任务以及Reduce任务。
说到这里,就要提到分布式计算最重要的一个设计点:Moving Computation is Cheaper than Moving Data。就是在分布式处理中,移动数据的代价总是高于转移计算的代价。简单来说就是分而治之的工作,需要将数据也分而存储,本地任务处理本地数据然后归总,这样才会保证分布式计算的高效性。为什么要选择Hadoop?
说完了What,简单地说一下Why。官方网站已经给了很多的说明,这里就大致说一下其优点及使用的场景(没有不好的工具,只用不适用的工具,因此选择好场景才能够真正发挥分布式计算的作用):
  1. 可扩展:不论是存储的可扩展还是计算的可扩展都是Hadoop的设计根本。
  2. 经济:框架可以运行在任何普通的PC上。
  3. 可靠:分布式文件系统的备份恢复机制以及MapReduce的任务监控保证了分布式处理的可靠性。
  4. 高效:分布式文件系统的高效数据交互实现以及MapReduce结合Local Data处理的模式,为高效处理海量的信息作了基础准备。

使用场景:个人觉得最适合的就是海量数据的分析,其实Google最早提出MapReduce也就是为了海量数据分析。同时HDFS最早是为了搜索引擎实现而开发的,后来才被用于分布式计算框架中。海量数据被分割于多个节点,然后由每一个节点并行计算,将得出的结果归并到输出。同时第一阶段的输出又可以作为下一阶段计算的输入,因此可以想象到一个树状结构的分布式计算图,在不同阶段都有不同产出,同时并行和串行结合的计算也可以很好地在分布式集群的资源下得以高效的处理。

hadoop2.5 mapreduce编程实例

唐半张 发表了文章 1 个评论 2179 次浏览 2015-09-25 10:48 来自相关话题

2.5 hadoop2.5 mapreduce编程实例 由于本书以“作业生命周期”为线索对Hadoop MapReduce架构设计和实现原理进行解析,因而在深入剖析各个MapReduce实现细节之前整体了解一个作业 ...查看全部
2.5 hadoop2.5 mapreduce编程实例
由于本书以“作业生命周期”为线索对Hadoop MapReduce架构设计和实现原理进行解析,因而在深入剖析各个MapReduce实现细节之前整体了解一个作业的生命周期显得非常重要。为此,本节主要讲解Hadoop MapReduce作业的生命周期,即作业从提交到运行结束经历的整个过程。本节只是概要性地介绍MapReduce作业的生命周期,可看作后续几章的内容导读。作业生命周期中具体各个阶段的深入剖析将在后续的章节中进行。

MapReduce中的二次排序

夕阳丶一抹红颜 发表了文章 1 个评论 2329 次浏览 2015-09-22 11:44 来自相关话题

在MapReduce操作时,我们知道传递的会按照key的大小进行排序,最后 ...查看全部
MapReduce操作时,我们知道传递的会按照key的大小进行排序,最后输出的结果是按照key排过序的。有的时候我们在key排序的基础上,对value也进行排序。这种需求就是二次排序。
(1)Mapper任务会接收输入分片,然后不断的调用map函数,对记录进行处理。处理完毕后,转换为新的输出。
(2)对map函数输出的调用分区函数,对数据进行分区。不同分区的数据会被送到不同的Reducer任务中。
(3)对于不同分区的数据,会按照key进行排序,这里的key必须实现WritableComparable接口。该接口实现了Comparable接口,因此可以进行比较排序。
(4)对于排序后的,会按照key进行分组。如果key相同,那么相同key的就被分到一个组中。最终,每个分组会调用一次reduce函数。
(5)排序、分组后的数据会被送到Reducer节点。
在MapReduce的体系结构中,我们没有看到对value的排序操作。怎么实现对value的排序哪?这就需要我们变通的去实现这个需求。
变通手段:我们可以把key和value联合起来作为新的key,记作newkey。这时,newkey含有两个字段,假设分别是k,v。这里的k和v是原来的key和value。原来的value还是不变。这样,value就同时在newkey和value的位置。我们再实现newkey的比较规则,先按照key排序,在key相同的基础上再按照value排序。在分组时,再按照原来的key进行分组,就不会影响原有的分组逻辑了。最后在输出的时候,只把原有的key、value输出,就可以变通的实现了二次排序的需求。
下面看个例子,结合着理解。
假设有以下输入数据,这是两列整数,要求先按照第一列整数大小排序,如果第一列相同,按照第二列整数大小排序。
20 21 50 51 50 52 50 53 50 54 60 51 60 53 60 52 60 56 60 57 70 58 60 61 70 54 70 55 70 56 70 57 70 58
分析一下, 这是一个典型的二次排序问题。
我们先对现在第一列和第二列整数创建一个新的类,作为newkey,代码如下
/** * 把第一列整数和第二列作为类的属性,并且实现WritableComparable接口 */ public static class IntPair implements WritableComparable { private int first = 0; private int second = 0; public void set(int left, int right) { first = left; second = right; } public int getFirst() { return first; } public int getSecond() { return second; } @Override public void readFields(DataInput in) throws IOException { first = in.readInt(); second = in.readInt(); } @Override public void write(DataOutput out) throws IOException { out.writeInt(first); out.writeInt(second); } @Override public int hashCode() { return first+"".hashCode() + second+"".hashCode(); } @Override public boolean equals(Object right) { if (right instanceof IntPair) { IntPair r = (IntPair) right; return r.first == first && r.second == second; } else { return false; } } //这里的代码是关键,因为对key排序时,调用的就是这个compareTo方法 @Override public int compareTo(IntPair o) { if (first != o.first) { return first - o.first; } else if (second != o.second) { return second - o.second; } else { return 0; } } }
一定要注意上面的compareTo方法,先按照first比较,再按照second比较。在以后调用的时候,key就是first,value就是second。
下面看一下分组比较函数,代码如下
/** * 在分组比较的时候,只比较原来的key,而不是组合key。 */ public static class GroupingComparator implements RawComparator { @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { return WritableComparator.compareBytes(b1, s1, Integer.SIZE/8, b2, s2, Integer.SIZE/8); } @Override public int compare(IntPair o1, IntPair o2) { int first1 = o1.getFirst(); int first2 = o2.getFirst(); return first1 - first2; } }
一定要注意上面代码中,虽然泛型是IntPair,但是比较的始终是第一个字段,而不是所有的字段。因为要按照原有的key进行分组啊。
如果以上的代码明白,再看一下自定义的Mapper类和Reducer类吧
public static class MapClass extends Mapper { private final IntPair key = new IntPair(); private final IntWritable value = new IntWritable(); @Override public void map(LongWritable inKey, Text inValue, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(inValue.toString()); int left = 0; int right = 0; if (itr.hasMoreTokens()) { left = Integer.parseInt(itr.nextToken()); if (itr.hasMoreTokens()) { right = Integer.parseInt(itr.nextToken()); } key.set(left, right); value.set(right); context.write(key, value); } } }public static class Reduce extends Reducer { private static final Text SEPARATOR = new Text("------------------------------------------------"); private final Text first = new Text(); @Override public void reduce(IntPair key, Iterable values, Context context) throws IOException, InterruptedException { context.write(SEPARATOR, null); first.set(Integer.toString(key.getFirst())); for(IntWritable value: values) { context.write(first, value); } } }
在map函数中,要注意k2是由哪几个字段组成的;在reduce函数中,要注意输出的k3是IntPair中的第一个字段,而不是所有字段。
好了,看一下驱动代码吧,如下
public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); final FileSystem fileSystem = FileSystem.get(new URI("hdfs://hadoop2:9000"), conf); fileSystem.delete(new Path(OUTPUT_PATH), true); Job job = new Job(conf, "secondary sort"); job.setJarByClass(SecondarySortApp.class); job.setMapperClass(MapClass.class); job.setReducerClass(Reduce.class); job.setGroupingComparatorClass(GroupingComparator.class); job.setMapOutputKeyClass(IntPair.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(INPUT_PATH)); FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH)); System.exit(job.waitForCompletion(true) ? 0 : 1); }
以上驱动代码中,重大变化是设置了分组比较函数。好了,看看执行结果吧
------------------------------------------------ 20 21 ------------------------------------------------ 50 51 50 52 50 53 50 54 ------------------------------------------------ 60 51 60 52 60 53 60 56 60 57 60 61 ------------------------------------------------ 70 54 70 55 70 56 70 57 70 58 70 58
看看,是不是我们想要的结果啊!!

Hadoop MapReduce

夕阳丶一抹红颜 发表了文章 0 个评论 1876 次浏览 2015-09-22 11:31 来自相关话题

Hadoop MapReduce 从一大堆数中找出最大的数,类似SQL的SELECT MAX(NUMBER) FROM TABLE .这里写了个简单的MapReduce,实现了该功能.我这里会生成测试数据,同时在生成的时候会计 ...查看全部
Hadoop MapReduce 从一大堆数中找出最大的数,类似SQL的SELECT MAX(NUMBER) FROM TABLE .这里写了个简单的MapReduce,实现了该功能.我这里会生成测试数据,同时在生成的时候会计算出最大值.待MapReduce跑玩后,你可以去输出路径查看并进行对比.具体请查看代码:
 
Java代码
  1. package com.guoyun.hadoop.mapreduce.study;  
  2.   
  3. import java.io.IOException;  
  4. import java.util.StringTokenizer;  
  5.   
  6. import org.apache.hadoop.conf.Configuration;  
  7. import org.apache.hadoop.fs.FileSystem;  
  8. import org.apache.hadoop.fs.Path;  
  9. import org.apache.hadoop.io.LongWritable;  
  10. import org.apache.hadoop.io.Text;  
  11. import org.apache.hadoop.mapreduce.Job;  
  12. import org.apache.hadoop.mapreduce.Mapper;  
  13. import org.apache.hadoop.mapreduce.Reducer;  
  14. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  15. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  16. import org.slf4j.Logger;  
  17. import org.slf4j.LoggerFactory;  
  18.   
  19. /** 
  20.  * 获得最大的数,类似SQL:SELECT MAX(NUMBER) FROM TABLE 
  21.  * 注意这里只有一列 
  22.  * 相比 @GetMaxValueMapReduceImproveTest 这里速度会更慢 
  23.  *  
  24.  */  
  25. public class GetMaxValueMapReduceTest extends MyMapReduceSIngleColumnTest{  
  26.   public static final Logger log=LoggerFactory.getLogger(GetMaxValueMapReduceTest.class);  
  27.     
  28.   
  29.   public GetMaxValueMapReduceTest(int dataLength) throws Exception {  
  30.     super(dataLength);  
  31.     // TODO Auto-generated constructor stub  
  32.   }  
  33.   
  34.   public GetMaxValueMapReduceTest(long dataLength, String inputPath,  
  35.       String outputPath) throws Exception {  
  36.     super(dataLength, inputPath, outputPath);  
  37.     // TODO Auto-generated constructor stub  
  38.   }  
  39.   
  40.   public GetMaxValueMapReduceTest(String outputPath) {  
  41.     super(outputPath);  
  42.     // TODO Auto-generated constructor stub  
  43.   }  
  44.   
  45.   /** 
  46.    * Map,to get the source datas 
  47.    */  
  48.   public static class MyMapper extends Mapper{  
  49.     private final Text writeKey=new Text("K");  
  50.     private LongWritable writeValue=new LongWritable(0);  
  51.       
  52.     @Override  
  53.     protected void map(LongWritable key, Text value, Context context)  
  54.         throws IOException, InterruptedException {  
  55.       log.debug("begin to map");  
  56.       StringTokenizer tokenizer=null;  
  57.       String lineValue=null;  
  58.         
  59.         
  60.       tokenizer=new StringTokenizer(value.toString().trim());  
  61.       while(tokenizer.hasMoreTokens()){  
  62.         lineValue=tokenizer.nextToken().trim();  
  63.         if(lineValue.equals("")){  
  64.           continue;  
  65.         }  
  66.         try {  
  67.           writeValue.set(Long.parseLong(lineValue));  
  68.           context.write(writeKey, writeValue);  
  69.         } catch (NumberFormatException e) {  
  70.           continue;  
  71.         }  
  72.           
  73.       }  
  74.     }  
  75.   }  
  76.     
  77.   /** 
  78.    * Reduce,to get the max value 
  79.    */  
  80.   public static class MyReducer   
  81.     extends Reducer{  
  82.     private final Text maxValueKey=new Text("maxValue");  
  83.           
  84.     @Override  
  85.     public void reduce(Text key, Iterable values,Context context)  
  86.         throws IOException, InterruptedException {  
  87.       log.debug("begin to reduce");  
  88.       long maxValue=Long.MIN_VALUE;  
  89.       for(LongWritable value:values){  
  90.         if(value.get()>maxValue){  
  91.           maxValue=value.get();  
  92.         }  
  93.       }  
  94.       context.write(maxValueKey, new LongWritable(maxValue));  
  95.     }  
  96.       
  97.       
  98.   }  
  99.     
  100.   /** 
  101.    * @param args 
  102.    */  
  103.   public static void main(String[] args) {  
  104.     MyMapReduceTest mapReduceTest=null;  
  105.     Configuration conf=null;  
  106.     Job job=null;  
  107.     FileSystem fs=null;  
  108.     Path inputPath=null;  
  109.     Path outputPath=null;  
  110.     long begin=0;  
  111.     String output="testDatas/mapreduce/MROutput_SingleColumn_getMax";  
  112.       
  113.       
  114.     try {  
  115.       mapReduceTest=new GetMaxValueMapReduceTest(10000000);  
  116.         
  117.       inputPath=new Path(mapReduceTest.getInputPath());  
  118.       outputPath=new Path(mapReduceTest.getOutputPath());  
  119.         
  120.       conf=new Configuration();  
  121.       job=new Job(conf,"getMaxValue");  
  122.         
  123.       fs=FileSystem.getLocal(conf);  
  124.       if(fs.exists(outputPath)){  
  125.         if(!fs.delete(outputPath,true)){  
  126.           System.err.println("Delete output file:"+mapReduceTest.getOutputPath()+" failed!");  
  127.           return;  
  128.         }  
  129.       }  
  130.         
  131.         
  132.       job.setJarByClass(GetMaxValueMapReduceTest.class);  
  133.       job.setMapOutputKeyClass(Text.class);  
  134.       job.setMapOutputValueClass(LongWritable.class);  
  135.       job.setOutputKeyClass(Text.class);  
  136.       job.setOutputValueClass(LongWritable.class);  
  137.       job.setMapperClass(MyMapper.class);  
  138.       job.setReducerClass(MyReducer.class);  
  139.         
  140.       job.setNumReduceTasks(2);  
  141.         
  142.       FileInputFormat.addInputPath(job, inputPath);  
  143.       FileOutputFormat.setOutputPath(job, outputPath);  
  144.         
  145.        
  146.       begin=System.currentTimeMillis();  
  147.       job.waitForCompletion(true);  
  148.         
  149.       System.out.println("===================================================");  
  150.       if(mapReduceTest.isGenerateDatas()){  
  151.         System.out.println("The maxValue is:"+mapReduceTest.getMaxValue());  
  152.         System.out.println("The minValue is:"+mapReduceTest.getMinValue());  
  153.       }  
  154.       System.out.println("Spend time:"+(System.currentTimeMillis()-begin));  
  155.       // Spend time:18908  
  156.         
  157.     } catch (Exception e) {  
  158.       // TODO Auto-generated catch block  
  159.       e.printStackTrace();  
  160.     }  
  161.   }  
  162.   
  163. }  

hadoop mapreduce 参数

夕阳丶一抹红颜 发表了文章 0 个评论 1750 次浏览 2015-09-22 09:42 来自相关话题

下面介绍hadoop mapreduce 参数主要的六个类,只有了解了这六个类的作用,才能在编写程序中知道哪个类是要自己实现,哪些类可以调用默认的类,才能真正的做到游刃有余。 1. InputFormat类。该类的 ...查看全部
下面介绍hadoop mapreduce 参数主要的六个类,只有了解了这六个类的作用,才能在编写程序中知道哪个类是要自己实现,哪些类可以调用默认的类,才能真正的做到游刃有余。
1. InputFormat类。该类的作用是将输入的文件和数据分割成许多小的split文件,并将split的每个行通过LineRecorderReader解析成,通过job.setInputFromatClass()函数来设置,默认的情况为类TextInputFormat,其中Key默认为字符偏移量,value是该行的值。
2.Map类。根据输入的对生成中间结果,默认的情况下使用Mapper类,该类将输入的对原封不动的作为中间按结果输出,通过job.setMapperClass()实现。实现Map函数。
3.Combine类。实现combine函数,该类的主要功能是合并相同的key键,通过job.setCombinerClass()方法设置,默认为null,不合并中间结果。实现map函数
4.Partitioner类。 该该主要在Shuffle过程中按照Key值将中间结果分成R份,其中每份都有一个Reduce去负责,可以通过job.setPartitionerClass()方法进行设置,默认的使用hashPartitioner类。实现getPartition函数
5.Reducer类。 将中间结果合并,得到中间结果。通过job.setReduceCalss()方法进行设置,默认使用Reducer类,实现reduce方法。
6.OutPutFormat类,该类负责输出结果的格式。可以通过job.setOutputFormatClass()方法进行设置。默认使用TextOUtputFormat类,得到对。
note:hadoop主要是上面的六个类进行mapreduce操作,使用默认的类,处理的数据和文本的能力很有限,具体的项目中,用户通过改写这六个类(重载六个类),完成项目的需求。说实话,我刚开始学的时候,我怀疑过Mapreudce处理数据功能,随着学习深入,真的很钦佩mapreduce的设计,基本就二个函数,通过重载,可以完成所有你想完成的工作。

hadoop mapreduce 参数

夕阳丶一抹红颜 发表了文章 0 个评论 1877 次浏览 2015-09-22 09:35 来自相关话题

下面介绍MapReduce的主要的六个类,只有了解了这六个类的作用,才能在编写程序中知道哪个类是要自己实现,哪些类可以调用默认的类,才能真正的做到游刃有余,那么关于 ...查看全部
下面介绍MapReduce的主要的六个类,只有了解了这六个类的作用,才能在编写程序中知道哪个类是要自己实现,哪些类可以调用默认的类,才能真正的做到游刃有余,那么关于hadoop mapreduce 参数就在下文了。
1. InputFormat类。该类的作用是将输入的文件和数据分割成许多小的split文件,并将split的每个行通过LineRecorderReader解析成,通过job.setInputFromatClass()函数来设置,默认的情况为类TextInputFormat,其中Key默认为字符偏移量,value是该行的值。
 2.Map类。根据输入的对生成中间结果,默认的情况下使用Mapper类,该类将输入的对原封不动的作为中间按结果输出,通过job.setMapperClass()实现。实现Map函数。
 3.Combine类。实现combine函数,该类的主要功能是合并相同的key键,通过job.setCombinerClass()方法设置,默认为null,不合并中间结果。实现map函数
 4.Partitioner类。 该该主要在Shuffle过程中按照Key值将中间结果分成R份,其中每份都有一个Reduce去负责,可以通过job.setPartitionerClass()方法进行设置,默认的使用hashPartitioner类。实现getPartition函数
 5.Reducer类。 将中间结果合并,得到中间结果。通过job.setReduceCalss()方法进行设置,默认使用Reducer类,实现reduce方法。
 6.OutPutFormat类,该类负责输出结果的格式。可以通过job.setOutputFormatClass()方法进行设置。默认使用TextOUtputFormat类,得到对。
note:hadoop主要是上面的六个类进行mapreduce操作,使用默认的类,处理的数据和文本的能力很有限,具体的项目中,用户通过改写这六个类(重载六个类),完成项目的需求。说实话,我刚开始学的时候,我怀疑过Mapreudce处理数据功能,随着学习深入,真的很钦佩mapreduce的设计,基本就二个函数,通过重载,可以完成所有你想完成的工作。

map-reduce 优化

cenyuhai 发表了文章 0 个评论 1530 次浏览 2015-09-11 14:40 来自相关话题

map阶段优化 参数:io.sort.mb(default 100) 当map task开始运算,并产生中间数据时,其产生的中间结果并非直接就简单的写入磁盘。 ...查看全部
map阶段优化
参数:io.sort.mb(default 100)
当map task开始运算,并产生中间数据时,其产生的中间结果并非直接就简单的写入磁盘。
而是会利用到了内存buffer来进行已经产生的部分结果的缓存,
并在内存buffer中进行一些预排序来优化整个map的性能。
每一个map都会对应存在一个内存buffer,map会将已经产生的部分结果先写入到该buffer中,
这个buffer默认是100MB大小,
但是这个大小是可以根据job提交时的参数设定来调整的,
当map的产生数据非常大时,并且把io.sort.mb调大,
那么map在整个计算过程中spill的次数就势必会降低,
map task对磁盘的操作就会变少,
如果map tasks的瓶颈在磁盘上,这样调整就会大大提高map的计算性能。
 
参数:io.sort.spill.percent(default 0.80,也就是80%)
map在运行过程中,不停的向该buffer中写入已有的计算结果,
但是该buffer并不一定能将全部的map输出缓存下来,
当map输出超出一定阈值(比如100M),那么map就必须将该buffer中的数据写入到磁盘中去,
这个过程在mapreduce中叫做spill。
map并不是要等到将该buffer全部写满时才进行spill,
因为如果全部写满了再去写spill,势必会造成map的计算部分等待buffer释放空间的情况。
所以,map其实是当buffer被写满到一定程度(比如80%)时,就开始进行spill。
这个阈值也是由一个job的配置参数来控制,
这个参数同样也是影响spill频繁程度,进而影响map task运行周期对磁盘的读写频率的。
但非特殊情况下,通常不需要人为的调整。调整io.sort.mb对用户来说更加方便。
 
参数:io.sort.factor
当map task的计算部分全部完成后,如果map有输出,就会生成一个或者多个spill文件,这些文件就是map的输出结果。
map在正常退出之前,需要将这些spill合并(merge)成一个,所以map在结束之前还有一个merge的过程。
merge的过程中,有一个参数可以调整这个过程的行为,该参数为:io.sort.factor。
该参数默认为10。它表示当merge spill文件时,最多能有多少并行的stream向merge文件中写入。
比如如果map产生的数据非常的大,产生的spill文件大于10,而io.sort.factor使用的是默认的10,
那么当map计算完成做merge时,就没有办法一次将所有的spill文件merge成一个,而是会分多次,每次最多10个stream。
这也就是说,当map的中间结果非常大,调大io.sort.factor,
有利于减少merge次数,进而减少map对磁盘的读写频率,有可能达到优化作业的目的。
 
参数:min.num.spill.for.combine(default 3)
当job指定了combiner的时候,我们都知道map介绍后会在map端根据combiner定义的函数将map结果进行合并。
运行combiner函数的时机有可能会是merge完成之前,或者之后,这个时机可以由一个参数控制,
即min.num.spill.for.combine(default 3),当job中设定了combiner,并且spill数最少有3个的时候,
那么combiner函数就会在merge产生结果文件之前运行。
通过这样的方式,就可以在spill非常多需要merge,并且很多数据需要做conbine的时候,
减少写入到磁盘文件的数据数量,同样是为了减少对磁盘的读写频率,有可能达到优化作业的目的。
 
参数:mapred.compress.map.output(default false)
减少中间结果读写进出磁盘的方法不止这些,还有就是压缩。
也就是说map的中间,无论是spill的时候,还是最后merge产生的结果文件,都是可以压缩的。
压缩的好处在于,通过压缩减少写入读出磁盘的数据量。
对中间结果非常大,磁盘速度成为map执行瓶颈的job,尤其有用。
控制map中间结果是否使用压缩的参数为:mapred.compress.map.output(true/false)。
将这个参数设置为true时,那么map在写中间结果时,就会将数据压缩后再写入磁盘,读结果时也会采用先解压后读取数据。
这样做的后果就是:写入磁盘的中间结果数据量会变少,但是cpu会消耗一些用来压缩和解压。
所以这种方式通常适合job中间结果非常大,瓶颈不在cpu,而是在磁盘的读写的情况。
说的直白一些就是用cpu换IO。
根据观察,通常大部分的作业cpu都不是瓶颈,除非运算逻辑异常复杂。所以对中间结果采用压缩通常来说是有收益的。
 
参数:mapred.map.output.compression.codec( default org.apache.hadoop.io.compress.DefaultCodec)
当采用map中间结果压缩的情况下,用户还可以选择压缩时采用哪种压缩格式进行压缩,
现在hadoop支持的压缩格式有:GzipCodec,LzoCodec,BZip2Codec,LzmaCodec等压缩格式。
通常来说,想要达到比较平衡的cpu和磁盘压缩比,LzoCodec比较适合。但也要取决于job的具体情况。
用户若想要自行选择中间结果的压缩算法,
可以设置配置参数:mapred.map.output.compression.codec=org.apache.hadoop.io.compress.DefaultCodec或者其他用户自行选择的压缩方式
 
reduce阶段优化
reduce的运行是分成三个阶段的。分别为copy->sort->reduce。
由于job的每一个map都会根据reduce(n)数将数据分成map 输出结果分成n个partition,
所以map的中间结果中是有可能包含每一个reduce需要处理的部分数据的。
所以,为了优化reduce的执行时间,hadoop中是等job的第一个map结束后,
所有的reduce就开始尝试从完成的map中下载该reduce对应的partition部分数据。
这个过程就是通常所说的shuffle,也就是copy过程。
 
参数:mapred.reduce.parallel.copies(default 5)
说明:每个reduce并行下载map结果的最大线程数
Reduce task在做shuffle时,实际上就是从不同的已经完成的map上去下载属于自己这个reduce的部分数据,
由于map通常有许多个,所以对一个reduce来说,下载也可以是并行的从多个map下载,这个并行度是可以调整的,
调整参数为:mapred.reduce.parallel.copies(default 5)。
默认情况下,每个只会有5个并行的下载线程在从map下数据,如果一个时间段内job完成的map有100个或者更多,
那么reduce也最多只能同时下载5个map的数据,
所以这个参数比较适合map很多并且完成的比较快的job的情况下调大,有利于reduce更快的获取属于自己部分的数据。
 
参数:mapred.reduce.copy.backoff(default 300秒)
说明:reduce下载线程最大等待时间(秒)
reduce的每一个下载线程在下载某个map数据的时候,有可能因为那个map中间结果所在机器发生错误,
或者中间结果的文件丢失,或者网络瞬断等等情况,这样reduce的下载就有可能失败,
所以reduce的下载线程并不会无休止的等待下去,当一定时间后下载仍然失败,那么下载线程就会放弃这次下载,
并在随后尝试从另外的地方下载(因为这段时间map可能重跑)。
所以reduce下载线程的这个最大的下载时间段是可以调整的,
调整参数为:mapred.reduce.copy.backoff(default 300秒)。
如果集群环境的网络本身是瓶颈,那么用户可以通过调大这个参数来避免reduce下载线程被误判为失败的情况。
不过在网络环境比较好的情况下,没有必要调整。通常来说专业的集群网络不应该有太大问题,所以这个参数需要调整的情况不多。
 
参数:io.sort.factor
Reduce将map结果下载到本地时,同样也是需要进行merge的,所以io.sort.factor的配置选项同样会影响reduce进行merge时的行为,
该参数的详细介绍上文已经提到,当发现reduce在shuffle阶段iowait非常的高的时候,就有可能通过调大这个参数来加大一次merge时的并发吞吐,优化reduce效率。
参数:mapred.job.shuffle.input.buffer.percent(default 0.7)
说明:用来缓存shuffle数据的reduce task heap百分比
Reduce在shuffle阶段对下载来的map数据,并不是立刻就写入磁盘的,而是会先缓存在内存中,然后当使用内存达到一定量的时候才刷入磁盘。
这个内存大小的控制就不像map一样可以通过io.sort.mb来设定了,而是通过另外一个参数来设置:mapred.job.shuffle.input.buffer.percent(default 0.7),
这个参数其实是一个百分比,意思是说,shuffile在reduce内存中的数据最多使用内存量为:0.7 × maxHeap of reduce task。
也就是说,如果该reduce task的最大heap使用量(通常通过mapred.child.java.opts来设置,比如设置为-Xmx1024m)的一定比例用来缓存数据。
默认情况下,reduce会使用其heapsize的70%来在内存中缓存数据。
如果reduce的heap由于业务原因调整的比较大,相应的缓存大小也会变大,这也是为什么reduce用来做缓存的参数是一个百分比,而不是一个固定的值了。
 
参数:mapred.job.shuffle.merge.percent(default 0.66)
说明:缓存的内存中多少百分比后开始做merge操作
假设mapred.job.shuffle.input.buffer.percent为0.7,reduce task的max heapsize为1G,
那么用来做下载数据缓存的内存就为大概700MB左右,这700M的内存,跟map端一样,
也不是要等到全部写满才会往磁盘刷的,而是当这700M中被使用到了一定的限度(通常是一个百分比),就会开始往磁盘刷。
这个限度阈值也是可以通过job参数来设定的,设定参数为:mapred.job.shuffle.merge.percent(default 0.66)。
如果下载速度很快,很容易就把内存缓存撑大,那么调整一下这个参数有可能会对reduce的性能有所帮助。
 
参数:mapred.job.reduce.input.buffer.percent(default 0.0)
说明:sort完成后reduce计算阶段用来缓解数据的百分比
当reduce将所有的map上对应自己partition的数据下载完成后,就会开始真正的reduce计算阶段
(中间有个sort阶段通常时间非常短,几秒钟就完成了,因为整个下载阶段就已经是边下载边sort,然后边merge的)。
当reduce task真正进入reduce函数的计算阶段的时候,有一个参数也是可以调整reduce的计算行为。
也就是:mapred.job.reduce.input.buffer.percent(default 0.0)。
由于reduce计算时肯定也是需要消耗内存的,而在读取reduce需要的数据时,同样是需要内存作为buffer,
这个参数是控制,需要多少的内存百分比来作为reduce读已经sort好的数据的buffer百分比。
默认情况下为0,也就是说,默认情况下,reduce是全部从磁盘开始读处理数据。
如果这个参数大于0,那么就会有一定量的数据被缓存在内存并输送给reduce,
当reduce计算逻辑消耗内存很小时,可以分一部分内存用来缓存数据,反正reduce的内存闲着也是闲着。
 
 
转载自http://blog.sina.com.cn/s/blog_7673d4a50101bn5p.html
 

Hbase 学习(八) 使用MapReduce

cenyuhai 发表了文章 0 个评论 1551 次浏览 2015-09-11 14:34 来自相关话题

在hbase的demo里面有个IndexBuilder的例子,它就是使用了MapReduce来操作hbase的,例子也比较简单,但是只包括了Mapper。 另外网上还有另外一个例子,也是说明这个的,这个例子更为全面一点,包括了Mapper和Redu ...查看全部
在hbase的demo里面有个IndexBuilder的例子,它就是使用了MapReduce来操作hbase的,例子也比较简单,但是只包括了Mapper。
另外网上还有另外一个例子,也是说明这个的,这个例子更为全面一点,包括了Mapper和Reducer。
http://www.cnblogs.com/chenli0513/archive/2012/01/06/2314886.html
这里就不说什么了,很简单,一看就懂。