Hadoop

Hadoop

beeline 连接spark-thriftServer,insert语句报错

回复

啊啊啊吧 发起了问题 1 人关注 0 个回复 1149 次浏览 2020-07-20 15:10 来自相关话题

Sqoop从mysql导入数据HDFS出错

回复

糖分5f1035014acd6 发起了问题 1 人关注 0 个回复 1371 次浏览 2020-07-16 19:12 来自相关话题

Sqoop从mysql导入数据HDFS出错

回复

糖分5f1035014acd6 发起了问题 1 人关注 0 个回复 1265 次浏览 2020-07-16 19:11 来自相关话题

datanode已经启动,但是文件夹data文件夹下没有current,求解答~~

权威 回复了问题 2 人关注 1 个回复 2684 次浏览 2020-01-04 11:28 来自相关话题

yarn是怎么管理节点的?

回复

zxx99 发起了问题 1 人关注 0 个回复 1691 次浏览 2020-01-02 09:40 来自相关话题

hadoop启动,50070,50090端口访问不了

啊啊啊吧 回复了问题 2 人关注 2 个回复 2676 次浏览 2019-11-12 09:32 来自相关话题

hadoop启动,50070,50090端口访问不了

回复

啊啊啊吧 发起了问题 1 人关注 0 个回复 1380 次浏览 2019-11-08 14:36 来自相关话题

我也遇到了关于hadoop运行MapReduce实例出现的问题,请问你解决了吗

回复

昨夜星辰o8u 发起了问题 1 人关注 0 个回复 960 次浏览 2019-11-04 00:46 来自相关话题

我也遇到了关于hadoop运行MapReduce实例出现的问题,请问你解决了吗

回复

昨夜星辰o8u 发起了问题 1 人关注 0 个回复 904 次浏览 2019-11-04 00:46 来自相关话题

3台机器,start-yarn.sh之后,各个服务都成功启动了,但是Master:8088里的active nodes的数量一直为1.

回复

fly321283 发起了问题 1 人关注 0 个回复 2274 次浏览 2019-11-01 16:47 来自相关话题

执行MapReduce后一直挂起

defineconst 回复了问题 3 人关注 3 个回复 2299 次浏览 2019-10-16 09:35 来自相关话题

求看,快被这个问题折磨si了,有哪位大佬知道wordcount哪里的问题?

回复

Ashleyamv 发起了问题 1 人关注 0 个回复 1203 次浏览 2019-10-13 08:44 来自相关话题

几千万个文本(1TB左右)需要根据规则计算做结构化,用什么大数据架构比较合适?

天明ss7 回复了问题 2 人关注 1 个回复 1312 次浏览 2019-09-06 20:23 来自相关话题

启动HA高可用的时候,namenode和datanode启动后马上挂掉。

回复

阿陈 发起了问题 1 人关注 0 个回复 1166 次浏览 2019-05-01 16:03 来自相关话题

sqoop使用create-hive-table命令 是否有办法直接创建分区hive表?

回复

时生君 发起了问题 1 人关注 0 个回复 1785 次浏览 2019-03-10 17:41 来自相关话题

急!请教 hdfs格式化报错 bin/hdfs namenode -format

某余rak 回复了问题 1 人关注 1 个回复 3586 次浏览 2019-01-10 12:02 来自相关话题

docker环境下搭建hadoop伪分布式集群时,Namenode如何连接宿主机mysql服务

回复

easlife 发起了问题 2 人关注 0 个回复 1657 次浏览 2018-12-28 21:54 来自相关话题

hadoop resourcemanager 启动成功 只能启动 8001 8033端口

回复

离别钩 发起了问题 1 人关注 0 个回复 2798 次浏览 2018-12-21 14:31 来自相关话题

条新动态, 点击查看
那谁知道这个“垃圾清理”机制所需要的策略是什么? 可以自己根据策略通过hadoop fs -du,hadoop fs -ls,hadoop fs -rm的组合完成你的工具。
那谁知道这个“垃圾清理”机制所需要的策略是什么? 可以自己根据策略通过hadoop fs -du,hadoop fs -ls,hadoop fs -rm的组合完成你的工具。
跳转到的那张帖子,你仔细看看,就有相应的jdwp配置以及local模式跑任务的配置。 如下截图中加重的文字是关于local模式跑mapreduce的配置。 [attach]399[/attach]  
跳转到的那张帖子,你仔细看看,就有相应的jdwp配置以及local模式跑任务的配置。 如下截图中加重的文字是关于local模式跑mapreduce的配置。 [attach]399[/attach]  
哈哈,我怎么回答呢?喊个宇宙超级无敌第一牛逼的口号好像不太合适...   这里你看到的很多问题都是训练营学生和老师之间的的问答,挺容易判断好不好的吧? 你加1818166这个微信号,问问就知道了。
哈哈,我怎么回答呢?喊个宇宙超级无敌第一牛逼的口号好像不太合适...   这里你看到的很多问题都是训练营学生和老师之间的的问答,挺容易判断好不好的吧? 你加1818166这个微信号,问问就知道了。
问题列表:   1)Hive 的多表连接查询有性能问题吗? 2)Hive 2.0 有啥新的特性? 3)Presto 支持雪花或 星型 数据模型吗?Presto 支持多表连接吗? 性能如何? 4)Apache Hadoop 如何获取补丁?打补丁的过程?例如,一个... 显示全部 »
问题列表:   1)Hive 的多表连接查询有性能问题吗? 2)Hive 2.0 有啥新的特性? 3)Presto 支持雪花或 星型 数据模型吗?Presto 支持多表连接吗? 性能如何? 4)Apache Hadoop 如何获取补丁?打补丁的过程?例如,一个集群有多个节点。每一节点停机升级?如何确定该节点没有JOB运行。    谢谢!!   Lian

hadoop机架感知脚本修改之后需要重启namenode么

link 回复了问题 1 人关注 4 个回复 4451 次浏览 2015-11-23 09:54 来自相关话题

Hadoop与Spark计算模型的比较分析

回复

zp0824 发起了问题 1 人关注 0 个回复 3364 次浏览 2015-09-20 10:37 来自相关话题

beeline 连接spark-thriftServer,insert语句报错

回复

啊啊啊吧 发起了问题 1 人关注 0 个回复 1149 次浏览 2020-07-20 15:10 来自相关话题

Sqoop从mysql导入数据HDFS出错

回复

糖分5f1035014acd6 发起了问题 1 人关注 0 个回复 1371 次浏览 2020-07-16 19:12 来自相关话题

Sqoop从mysql导入数据HDFS出错

回复

糖分5f1035014acd6 发起了问题 1 人关注 0 个回复 1265 次浏览 2020-07-16 19:11 来自相关话题

datanode已经启动,但是文件夹data文件夹下没有current,求解答~~

回复

权威 回复了问题 2 人关注 1 个回复 2684 次浏览 2020-01-04 11:28 来自相关话题

yarn是怎么管理节点的?

回复

zxx99 发起了问题 1 人关注 0 个回复 1691 次浏览 2020-01-02 09:40 来自相关话题

hadoop启动,50070,50090端口访问不了

回复

啊啊啊吧 回复了问题 2 人关注 2 个回复 2676 次浏览 2019-11-12 09:32 来自相关话题

hadoop启动,50070,50090端口访问不了

回复

啊啊啊吧 发起了问题 1 人关注 0 个回复 1380 次浏览 2019-11-08 14:36 来自相关话题

我也遇到了关于hadoop运行MapReduce实例出现的问题,请问你解决了吗

回复

昨夜星辰o8u 发起了问题 1 人关注 0 个回复 960 次浏览 2019-11-04 00:46 来自相关话题

我也遇到了关于hadoop运行MapReduce实例出现的问题,请问你解决了吗

回复

昨夜星辰o8u 发起了问题 1 人关注 0 个回复 904 次浏览 2019-11-04 00:46 来自相关话题

3台机器,start-yarn.sh之后,各个服务都成功启动了,但是Master:8088里的active nodes的数量一直为1.

回复

fly321283 发起了问题 1 人关注 0 个回复 2274 次浏览 2019-11-01 16:47 来自相关话题

执行MapReduce后一直挂起

回复

defineconst 回复了问题 3 人关注 3 个回复 2299 次浏览 2019-10-16 09:35 来自相关话题

求看,快被这个问题折磨si了,有哪位大佬知道wordcount哪里的问题?

回复

Ashleyamv 发起了问题 1 人关注 0 个回复 1203 次浏览 2019-10-13 08:44 来自相关话题

几千万个文本(1TB左右)需要根据规则计算做结构化,用什么大数据架构比较合适?

回复

天明ss7 回复了问题 2 人关注 1 个回复 1312 次浏览 2019-09-06 20:23 来自相关话题

启动HA高可用的时候,namenode和datanode启动后马上挂掉。

回复

阿陈 发起了问题 1 人关注 0 个回复 1166 次浏览 2019-05-01 16:03 来自相关话题

sqoop使用create-hive-table命令 是否有办法直接创建分区hive表?

回复

时生君 发起了问题 1 人关注 0 个回复 1785 次浏览 2019-03-10 17:41 来自相关话题

急!请教 hdfs格式化报错 bin/hdfs namenode -format

回复

某余rak 回复了问题 1 人关注 1 个回复 3586 次浏览 2019-01-10 12:02 来自相关话题

docker环境下搭建hadoop伪分布式集群时,Namenode如何连接宿主机mysql服务

回复

easlife 发起了问题 2 人关注 0 个回复 1657 次浏览 2018-12-28 21:54 来自相关话题

hadoop resourcemanager 启动成功 只能启动 8001 8033端口

回复

离别钩 发起了问题 1 人关注 0 个回复 2798 次浏览 2018-12-21 14:31 来自相关话题

Hadoop 2.0 部署单机HDFS+YARN——示例

wangxiaolei 发表了文章 0 个评论 3402 次浏览 2015-11-25 14:36 来自相关话题

准备Linux 虚拟机环境 1、用Oracle VM VirtualBox 虚拟Linux 系统 虚拟好的Linux 操作系统:ubuntu-14.04-desktop-i386 准备好:hadoop-2.2.0.tar.g ...查看全部
准备Linux 虚拟机环境
1、用Oracle VM VirtualBox 虚拟Linux 系统
虚拟好的Linux 操作系统:ubuntu-14.04-desktop-i386
准备好:hadoop-2.2.0.tar.gz jdk-6u45-linux-i586.bin

图片1.jpg


2、设置hosts 文件
对linux 虚机的hosts 进行设置sudo vi /etc/hosts
加入127.0.0.1 YARN001

图片2.jpg


3、安装jdk 和hadoop
拥有执行权限: chmod +x jdk-6u45-linux-i586.bin
解压JDK:./jdk-6u45-linux-i586.bin
解压成功后,ls 查看下

图片3.jpg


解压hadoop 的安装包:tar -zxvf hadoop-2.2.0.tar.gz
解压成功后,ls 查看下

图片4.jpg


修改hadoop 的配置文件
1、配置文件列表图例展示

图片5.jpg


2、修改hadoop-env.sh 文件
进入hadoop 目录下: cd hadoop-2.2.0/ ,输入命令vi etc/hadoop/hadoop-env.sh
也可以使用ftp 工具传输到本地,用编辑器编辑此文件。
export JAVA_HOME=/home/wangxiaolei/hadoop/jdk1.6.0_45

图片6.jpg


3、修改mapred-site.xml 文件
重命名mapred-site.xml.template 为mapred-site.xml
输入命令vi etc/hadoop/mapred-site.xml
添加下面的配置[code=Xml]

mapreduce.framework.name
yarn

[/code]4、修改core-site.xml 文件
输入命令vi etc/hadoop/core-site.xml[code=Xml]

fs.default.name
hdfs://YARN001:8020

[/code]5、修改core-site.xml 文件
输入命令vi etc/hadoop/hdfs-site.xml
注意:1、单机版副本数dfs.replication 的值默认是3 这里写为1
2、dfs.namenode.name.dir 和dfs.datanode.data.dir 的默认值,
在hadoop 安装目录下的tmp 目录下。
3、这里修改为非tmp 目录,此目录无需存在。
它是在启动hadoop 时目录是自动创建的。[code=Xml]

dfs.replication
1


dfs.namenode.name.dir
/home/wangxiaolei/hadoop/dfs/name


dfs.datanode.data.dir
/home/wangxiaolei/hadoop/dfs/data

[/code]6、修改yarn-site.xml 文件
输入命令vi etc/hadoop/yarn-site.xml[code=Xml]

yarn.nodemanager.aux-services
mapreduce_shuffle

[/code]部署HDFS+YARN
1、格式化NameNode
第一次搭建环境,需要格式化
输入命令bin/hadoop namenode -format
完成后,查看/home/wangxiaolei/hadoop 发现自动创建的目录文件

图片7.jpg


2、启动namenode
输入命令sbin/hadoop-daemon.sh start namenode

图片8.jpg


3、启动datanode
输入命令sbin/hadoop-daemon.sh start datanode

图片9.jpg


4、验证HDFS 是否启动成功
在本机上配置host
192.168.1.122 yarn001
然后在浏览器窗口输入http://yarn001:50070

图片10.jpg


bin/hadoop fs -mkdir /home
bin/hadoop fs -mkdir /home/wangxiaolei

图片11.jpg


5、启动yarn
输入命令sbin/start-yarn.sh

图片12.jpg


6、验证yarn 是否启动成功
6.1 使用jps 查看进程
这里没有配置jdk 的环境变量,所以要指定jps 的存放目录。

图片13.jpg


6.2 在web 界面查看
在本机浏览器中输入http://yarn001:8088

图片14.jpg


6.3 此时就可以跑MapReduce 程序了。
bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.2.0.jar pi 2 100

7、关闭yarn
输入命令sbin/stop-yarn.sh

图片15.jpg


8、关闭HDFS
输入命令sbin/stop-dfs.sh

图片16.jpg


最后使用jps 查看下进程

图片17.jpg


单机版部署HDFS+YARN 顺利完成!

hadoop 硬件配置 指南

唐半张 发表了文章 0 个评论 3222 次浏览 2015-10-10 10:14 来自相关话题

提高我们的客户开始使用Hadoop时的第一个问题是关于选择合适的硬件,为他们的Hadoop集群。这个帖子描述Hadoop的管理员考虑到各种因素。我们鼓励其他人也附和他们的经验生产Hadoop集群配置。虽然Hadoop是设计行业标准的硬件上运行,建议一个理想的集 ...查看全部
提高我们的客户开始使用Hadoop时的第一个问题是关于选择合适的硬件,为他们的Hadoop集群。这个帖子描述Hadoop的管理员考虑到各种因素。我们鼓励其他人也附和他们的经验生产Hadoop集群配置。虽然Hadoop是设计行业标准的硬件上运行,建议一个理想的集群配置是不一样只是提供了硬件规格列表容易。选择硬件提供了一个给定的工作负载的性能和经济的最佳平衡,需要测试和验证。例如,用户IO密集型工作负载将投资在些每核心主轴。在这个帖子 主要写的就是关于hadoop硬件配置! 
 
存储和计算的融合

    在过去十年中,IT组织有标准化的刀片服务器和SAN(存储区域网络),以满足他们的网格和处理密集型工作负载。虽然这种模式使一些标准的应用,如Web服务器,应用服务器,规模较小的结构化数据库和简单的ETL(提取,转换,装载)基础设施的要求有很大的意义已经发生变化的数据量和数量用户已经成长。 Web服务器现在前端使用缓存层,数据库使用大规模并行与本地磁盘,ETL作业正在推动更多的数据比他们可以在本地处理。硬件厂商建立创新体系,以满足这些要求包括存储刀片,SAS(串行连接SCSI)开关,外部SATA阵列和更大容量的机架单元。
Hadoop的目的是基于一种新的方法来存储和处理复杂的数据。海量存储和可靠性进行处理然后移动到刀片的集合,而不是依靠在SAN上,Hadoop的处理大数据量和可靠性,在软件层。 Hadoop的数据分布到集群上,处理平衡,并使用复制,以确保数据的可靠性和容错。因为数据的分布式计算能力的机器上,处理可以直接发送到存储数据的机器。由于每个机器在一个Hadoop集群的存储和处理数据,他们需要进行配置,以满足数据存储和处理要求。


任务压力问题

    MapReduce作业,在几乎所有情况下,将遇到一个瓶颈,从磁盘或从网络(作为IO时限的工作“),或在处理数据读取的数据(CPU绑定)。 IO绑定工作的一个例子是排序,这就需要非常小的加工(简单的比较)和大量的读取和写入磁盘。一个CPU密集型的工作的一个例子是分类,其中一些输入数据处理非常复杂的方式来确定一个本体。
这里有几个例子IO绑定的工作量:
1.索引
2.搜索
3.分组
4.解码/解压缩
5.数据导入和导出
这里有几个CPU密集型工作负载的例子:
1.机器学习
2.复杂的文本挖掘
3.自然语言处理
4.特征提取

   由于我们的客户需要了解他们的工作量,为了充分优化他们的Hadoop的硬件,在开始的时候,我们经常用一个典型的鸡和蛋的问题。最多的团队寻求建立一个Hadoop集群还不知道他们的工作量,往往是组织运行Hadoop的第一份任务的,远超过他们的想像。此外,有些工作负载可能会在无法预料的方式约束。例如,有时理论IO绑定的工作量实际上可能是因为用户的选择压缩的CPU绑定。有时可能会改变一个算法的不同实现MapReduce作业的限制。由于这些原因,是有道理的投资时,团队是不熟悉的工作,他们将运行在一个平衡的Hadoop集群,团队能够基准的MapReduce工作,一旦他们的平衡群集上运行,了解他们的必然。
   它是直接测量现场工作量,并确定将在地方上的Hadoop集群的全面监测的瓶颈。我们建议安装Ganglia的所有Hadoop的机器提供实时统计,有关CPU,磁盘和网络负载。与Ganglia的安装了Hadoop的管理员可以运行自己的MapReduce工作,并检查Ganglia的仪表盘怎么看每一台机器正在执行。了解整个集群节点的更多信息。
   除了建立集群适当的工作量,我们鼓励我们的客户能够与硬件厂商和了解电力和冷却的经济。由于Hadoop的运行几十,几百或上千个节点,一个运营团队,可以节省相当数量的钱,在低功耗的硬件投资。每个硬件厂商将能够提供如何监控电源和冷却的工具和建议。

如何挑选适合你的Hadoop集群的硬件
   在选择机器配置的第一步是要了解你的运营团队已经管理的硬件类型。运营团队往往有关于购买新机器的意见,,他们已经熟悉的硬件工作。 Hadoop是不是唯一的系统,从规模效益的好处。记得使用一个初始的群集到Hadoop时,如果你还不知道你的工作量均衡的硬件规划。
一个基本的Hadoop集群中的节点有四种类型。我们这里指的是为执行特定任务的机器上的一个节点。大部分的机器将作为双方的Datanode的TaskTracker。正如我们所描述的,这些节点存储数据和执行处理功能。我们建议Datanode的/在一个平衡的Hadoop集群的TaskTracker以下规格:
在JBOD(简单磁盘捆绑)配置41TB硬盘
2四核CPU,运行至少2-2.5GHz的
1624GBs的内存(如果你考虑HBase的24-32GBs)
千兆以太网

    namenode负责协调数据存储集群,jobtracker协调数据处理任务。最后的节点类型是secondarynamenode的,它可以为小群的namenode机器上同一位置,将较大的群集namenode节点相同的硬件上运行。我们建议我们的客户购买Power的服务器,为运行的namenodes和jobtrackers的冗余电源和搜查的企业级磁盘。 namenodes也需要更多的RAM,相对集群中的数据块数量。一个好的经验法则是假设的名称节点的分布式文件系统中存储的每个一百万块内存1GB。与100的Datanode在集群名称节点上的RAM32GBs提供充足的成长空间。我们也建议有一个备用机,以取代的情况下,当其中之一失败突然namenode或jobtracker。
    当您希望您的Hadoop集群增长超过20台机器,我们建议初始群集配置,因为它是跨越两个机架,每个机架有机架千兆交换机顶部,这些交换机连接10千兆以太网的互连或核心切换。有两个逻辑机架,运营团队的内部机架的网络要求更好地理解和跨机架的沟通。
    与Hadoop集群可以开始确定工作量和准备这些工作负载基准,以确定CPU和IO瓶颈。经过一段时间的基准和监测,该小组将有一个很好的了解,更多的机器应该如何进行配置。这是常见的有Hadoop集群,特别是因为它们的规模增长。一套机器不适合你的工作量时,将不会是一种浪费。
下面是各种硬件配置不同的工作负载,包括我们原来的“基点”的建议名单:
   轻型处理配置(1U/machine)的:两个四核CPU,8GB内存,4个磁盘驱动器(1TB或2TB)。注意CPU密集型的工作,如自然语言处理涉及加载到RAM的大型模型在数据处理之前,应配置2GB内存每核心,而不是1GB内存每核心。
   平衡计算配置(1U/machine)的两个四核CPU,16到24GB内存,4个磁盘驱动器直接连接使用的主板控制器(1TB或2TB)。这些往往是因为有两个主板和8个驱动器在一个单一的2U机柜的。
   重配置存储(2U/machine):两个四核CPU,16到24GB的内存,和12个磁盘驱动器(1TB或2TB)。这种类型的机器的功耗开始〜200W左右,处于闲置状态,可以去〜350W高活动时。
   计算密集配置(2U/machine):两个四核CPU,48-72GB的内存,8个磁盘驱动器(1TB或2TB)。这些都需要一个大的内存模型和沉重的参考数据缓存的组合时经常使用。

其他硬件方面的考虑
   当我们遇到的应用程序产生大量的中间数据,我们建议一个以太网卡或双通道,太网卡提供2 Gbps的每台机器上的两个端口。另外,对于那些已经转移到10千兆以太网或Infiniband的客户,这些解决方案可以用来解决网络绑定的工作量。可以肯定,你的操作系统和BIOS是兼容的,如果你正在考虑切换到10千兆以太网。
   当计算内存需求,Java使用管理虚拟机到10%的因素。我们建议配置Hadoop的使用严格的堆大小的限制,以避免内存交换到磁盘。交换大大影响MapReduce作业性能,可避免更多的RAM配置的机器。
   同样重要的是优化RAM的内存通道宽度。例如,当使用双通道内存每台机器应该对DIMM配置。随着三通道内存,每台机器应该有三胞胎的DIMM。这意味着一台机器可能18GBs(9x2GB)内存,而不是16GBs(4x4GB)结束。

结论
   Hadoop集群购买相应的硬件要求基准和精心策划,充分理解的工作量。然而,Hadoop集群是常用异构,我们建议与平衡的规格部署开始时的初始硬件。
 

CombineFileInputFormat问题

唐半张 发表了文章 0 个评论 1938 次浏览 2015-10-10 09:54 来自相关话题

遇到CombineFileInputFormat问题应该怎么办呢, 下面就是演示遇到CombineFileInputFormat问题怎么处理的东西! 大家仔细看!   在Eclipse调试第二题代码时遇到如下问题: ...查看全部
遇到CombineFileInputFormat问题应该怎么办呢, 下面就是演示遇到CombineFileInputFormat问题怎么处理的东西! 大家仔细看!
 
在Eclipse调试第二题代码时遇到如下问题:

在Eclipse中开发运行总是遇到文件不存在的问题,如下图:
奇怪的是同样的程序打个jar包,放到集群环境执行没有问题,所以很费解,麻烦知道的同学解答一下,谢谢!

排错过程:
1.复制一份WordCount类代码到本地,通过同样的输入输出路径运行一次,不报错。
2.对比代码后,初步判断是由自定义的MyInputFormat造成的
3.WordCount使用的是默认FileInputFormat,对比getSplits方法发现,将path打印出来


4.根据错误提示跟踪代码,最终发现问题在CombineFileInputFormat重写的getSplits方法,区别就是前面少了些东西,因此在开发环境调试找不到文件

5.修改CombineFileInputFormat代码,问题解决

Hadoop 实战

唐半张 发表了文章 0 个评论 1540 次浏览 2015-10-10 09:49 来自相关话题

Hadoop 的实战   Hadoop 是Google MapReduce的一个Java实现。MapReduce是一种简化的分布式编程模式,让程序自动分布到一个由普通机器组成的超大集群上并发执行。就如同java程序员可以不考虑内存泄露一样, Map ...查看全部
Hadoop 的实战 
 Hadoop 是Google MapReduce的一个Java实现。MapReduce是一种简化的分布式编程模式,让程序自动分布到一个由普通机器组成的超大集群上并发执行。就如同java程序员可以不考虑内存泄露一样, MapReduce的run-time系统会解决输入数据的分布细节,跨越机器集群的程序执行调度,处理机器的失效,并且管理机器之间的通讯请求。这样的模式允许程序员可以不需要有什么并发处理或者分布式系统的经验,就可以处理超大的分布式系统得资源。
一、概论
    作为Hadoop程序员,他要做的事情就是:
    1、定义Mapper,处理输入的Key-Value对,输出中间结果。
    2、定义Reducer,可选,对中间结果进行规约,输出最终结果。
    3、定义InputFormat 和OutputFormat,可选,InputFormat将每行输入文件的内容转换为Java类供Mapper函数使用,不定义时默认为String。
    4、定义main函数,在里面定义一个Job并运行它。
   
    然后的事情就交给系统了。
    1.基本概念:Hadoop的HDFS实现了google的GFS文件系统,NameNode作为文件系统的负责调度运行在master,DataNode运行在每个机器上。同时Hadoop实现了Google的MapReduce,JobTracker作为MapReduce的总调度运行在master,TaskTracker则运行在每个机器上执行Task。

    2.main()函数,创建JobConf,定义Mapper,Reducer,Input/OutputFormat 和输入输出文件目录,最后把Job提交給JobTracker,等待Job结束。

    3.JobTracker,创建一个InputFormat的实例,调用它的getSplits()方法,把输入目录的文件拆分成FileSplist作为Mapper task 的输入,生成Mapper task加入Queue。

    4.TaskTracker 向 JobTracker索求下一个Map/Reduce。
      
      Mapper Task先从InputFormat创建RecordReader,循环读入FileSplits的内容生成Key与Value,传给Mapper函数,处理完后中间结果写成SequenceFile.
      Reducer Task 从运行Mapper的TaskTracker的Jetty上使用http协议获取所需的中间内容(33%),Sort/Merge后(66%),执行Reducer函数,最后按照OutputFormat写入结果目录。
      TaskTracker 每10秒向JobTracker报告一次运行情况,每完成一个Task10秒后,就会向JobTracker索求下一个Task。

二、程序员编写的代码 (可以查看hadoop-examples-0.20.203.0.jar,里面也有一个类grep)
    我们做一个简单的分布式的Grep,简单对输入文件进行逐行的正则匹配,如果符合就将该行打印到输出文件。因为是简单的全部输出,所以我们只要写Mapper函数,不用写Reducer函数,也不用定义Input/Output Format。

         RegMapper类的configure()函数接受由main函数传入的查找字符串,map() 函数进行正则匹配,key是行数,value是文件行的内容,符合的文件行放入中间结果。
        main()函数定义由命令行参数传入的输入输出目录和匹配字符串,Mapper函数为RegMapper类,Reduce函数是什么都不做,直接把中间结果输出到最终结果的的IdentityReducer类,运行Job。

整个代码非常简单,丝毫没有分布式编程的任何细节。
 
三.运行Hadoop程序
       
3.1 local运行模式
       完全不进行任何分布式计算,不动用任何namenode,datanode的做法,适合一开始做调试代码。
       解压hadoop,其中conf目录是配置目录,hadoop的配置文件在hadoop-default.xml,如果要修改配置,不是直接修改该文件,而是修改hadoop-site.xml,将该属性在hadoop-site.xml里重新赋值。
       hadoop-default.xml的默认配置已经是local运行,不用任何修改,配置目录里唯一必须修改的是hadoop-env.sh 里JAVA_HOME的位置。
       将编译好的HadoopGrep与RegMapper.class 放入hadoop/build/classes/demo/hadoop/目录
        或者编译成jar包HadoopGrep.jar放入hadoop/build/classes/demo/hadoop/目录
        找一个比较大的xx.log文件放,然后运行        bin/hadoop demo.hadoop.HadoopGrep  input   /tmp/out  "[a-b]"
         (jar包运行:bin/hadoop jar HadoopGrep.jar  HadoopGrep  input   /tmp/output  "[a-b]" )
         说明:
         input  为xx.log文件所在目录 
         /tmp/output为输出目录 
         "[a-b]"   grep的字符串 
        查看输出目录的结果,查看hadoop/logs/里的运行日志。  
         在重新运行前,先删掉输出目录。
  
  3.2 集群运行模式 
      1 )执行bin/hadoop dfs 可以看到它所支持的文件操作指令。   
      2) 创建目录输入inpu:   
             $ bin/hadoop dfs -mkdir input    
      3)上传文件xx.log到指定目录 input :   
             $ bin/hadoop dfs -put xx.log input       4 )  执行 bin/hadoop demo.hadoop.HadoopGrep input  output
              (jar包运行:bin/hadoop jar HadoopGrep.jar  HadoopGrep  input   /tmp/output  "[a-b]" )
       5 ) 查看输出文件:
           将输出文件从分布式文件系统拷贝到本地文件系统查看:
            $ bin/hadoop fs -get output output
             $ cat output/*
            或者
            在分布式文件系统上查看输出文件:
            $ bin/hadoop fs -cat output/*            重新执行前,运行hadoop/bin/hadoop dfs -rm output删除output目录

       7.运行hadoop/bin/stop-all.sh 结束。
    
四、效率
    经测试,Hadoop并不是万用灵丹,很取决于文件的大小和数量,处理的复杂度以及群集机器的数量,相连的带宽,当以上四者并不大时,hadoop优势并不明显。
    比如,不用hadoop用java写的简单grep函数处理100M的log文件只要4秒,用了hadoop local的方式运行是14秒,用了hadoop单机集群的方式是30秒,用双机集群10M网口的话更慢,慢到不好意思说出来的地步。

Spark与Hadoop计算模型的比较分析

唐半张 发表了文章 0 个评论 1782 次浏览 2015-10-10 09:36 来自相关话题

Spark与Hadoop计算模型的比较分析 Spark是一个通用的并行计算框架,由UCBerkeley的AMP实验室开发。那么Spark和Hadoop有什么不同呢?1.Spark的中间数据放到内存中,对于迭代运算效率比较高。 Spark ...查看全部
Spark与Hadoop计算模型的比较分析
Spark是一个通用的并行计算框架,由UCBerkeley的AMP实验室开发。那么Spark和Hadoop有什么不同呢?1.Spark的中间数据放到内存中,对于迭代运算效率比较高。
Spark aims to extend MapReduce for iterative algorithms, and interactive low latency data mining. One major difference between MapReduce and Sparkis that MapReduce is acyclic. That is, data flows in from a stable source, isprocessed, and flows out to a stable filesystem. Spark allows iterative computation on the same data, which would form a cycle if jobs were visualized.   (旨在延长MapReduce的迭代算法,和互动低延迟数据挖掘的。 MapReduce和Sparkis的一个主要区别,MapReduce是非周期性。也就是说,数据流从一个稳定的来源,加工,流出到一个稳定的文件系统。“Spark允许相同的数据,这将形成一个周期,如果工作是可视化的迭代计算。)
Spark更适合于迭代运算比较多的ML和DM运算。因为在Spark里面,有RDD的概念。
Resilient Distributed Dataset (RDD) serves as an abstraction to rawdata, and some data is kept in memory and cached for later use. This last pointis very important; Spark allows data to be committed in RAM for an approximate20x speedup over MapReduce based on disks. RDDs are immutable and created through parallel transformations such as map, filter, groupBy and reduce.   (弹性分布式数据集(RDD)作为原始数据的抽象,和一些数据保存在内存中缓存供以后使用。最后这点很重要;星火允许在RAM致力于为近似20X基于加速了MapReduce的磁盘上的数据。RDDs是不可改变的,并通过并行转换,如地图,过滤器,GroupBy和减少创建的。)
RDD可以cache到内存中,那么每次对RDD数据集的操作之后的结果,都可以存放到内存中,下一个操作可以直接从内存中输入,省去了MapReduce大量的磁盘IO操作。这对于迭代运算比较常见的机器学习算法来说,效率提升比较大。但是由于Spark目前只是在UC Berkeley的一个研究项目,目前看到的最大规模也就200台机器,没有像Hadoop那样的部署规模,所以,在大规模使用的时候还是要慎重考虑的。
2.Spark比Hadoop更通用。
Spark提供的数据集操作类型有很多种,不像Hadoop只提供了Map和Reduce两种操作。比如map, filter, flatMap,sample, groupByKey, reduceByKey, union, join, cogroup, mapValues, sort,partionBy等多种操作类型,他们把这些操作称为Transformations。同时还提供Count, collect, reduce, lookup, save等多种actions。
这些多种多样的数据集操作类型,给上层应用者提供了方便。各个处理节点之间的通信模型不再像Hadoop那样就是唯一的Data Shuffle一种模式。用户可以命名,物化,控制中间结果的分区等。可以说编程模型比Hadoop更灵活。
不过论文中也提到,Spark不适用那种异步细粒度更新状态的应用,例如web服务的存储或者是增量的web爬虫和索引。就是对于那种增量修改的应用模型,当然不适合把大量数据拿到内存中了。增量改动完了,也就不用了,不需要迭代了。
3.容错性。
从Spark的论文《Resilient Distributed Datasets: AFault-Tolerant Abstraction for In-Memory Cluster Computing》中没看出容错性做的有多好。倒是提到了分布式数据集计算,做checkpoint的两种方式,一个是checkpoint data,一个是logging the updates。貌似Spark采用了后者。但是文中后来又提到,虽然后者看似节省存储空间。但是由于数据处理模型是类似DAG的操作过程,由于图中的某个节点出错,由于lineage chains的依赖复杂性,可能会引起全部计算节点的重新计算,这样成本也不低。他们后来说,是存数据,还是存更新日志,做checkpoint还是由用户说了算吧。相当于什么都没说,又把这个皮球踢给了用户。所以我看就是由用户根据业务类型,衡量是存储数据IO和磁盘空间的代价和重新计算的代价,选择代价较小的一种策略。
4.关于Spark和Hadoop的融合
不知道Apache基金会的人怎么想的,我看Spark还是应该融入到Hadoop生态系统中。从Hadoop 0.23把MapReduce做成了库,看出Hadoop的目标是要支持包括MapReduce在内的更多的并行计算模型,比如MPI,Spark等。毕竟现在Hadoop的单节点CPU利用率并不高,那么假如这种迭代密集型运算是和现有平台的互补。同时,这对资源调度系统就提出了更高的要求。有关资源调度方面,UC Berkeley貌似也在做一个Mesos的东西,还用了Linux container,统一调度Hadoop和其他应用模型。

Hadoop中CombineFileInputFormat详解

唐半张 发表了文章 0 个评论 1991 次浏览 2015-10-10 09:17 来自相关话题

Hadoop中CombineFileInputFormat详解 在MR实践中,会有很多小文件,单个文件产生一个mapper,资源比较浪费,后续没有reduce逻辑的话,会产生很多小文件,文件数量暴涨,对后续的hive job产生影响。 ...查看全部
Hadoop中CombineFileInputFormat详解
在MR实践中,会有很多小文件,单个文件产生一个mapper,资源比较浪费,后续没有reduce逻辑的话,会产生很多小文件,文件数量暴涨,对后续的hive job产生影响。
所以需要在mapper中将多个文件合成一个split作为输入,CombineFileInputFormat满足我们的需求。
CombineFileInputFormat 原理(网上牛人总结):
 
=12pt第一次:将同DN上的所有block生成Split,生成方式:
=12pt1.=12pt循环=12ptnodeToBlocks=12pt,获得每个=12ptDN上有哪些block
=12pt2.=12pt循环这些block列表
=12pt3.=12pt将block从=12ptblockToNodes=12pt中移除,避免同一个=12ptblock被包含在多个split中
=12pt4.=12pt将该block添加到一个有效block的列表中,这个列表主要是保留哪些block已经从=12ptblockToNodes=12pt中被移除了,方便后面恢复到=12ptblockToNodes=12pt中
=12pt5.=12pt向临时变量=12ptcurSplitSize=12pt增加=12ptblock的大小
=12pt6.=12pt判断=12ptcurSplitSize=12pt是否已经超过了设置的=12ptmaxSize
=12pta) =12pt如果超过,执行并添加=12ptsplit信息,并重置=12ptcurSplitSize=12pt和=12ptvalidBlocks
=12ptb) =12pt没有超过,继续循环block列表,跳到第2步
=12pt7.=12pt当前DN上的block列表循环完成,判断剩余的block是否允许被split(剩下的block大小之和是否大于=12pt每个DN的最小split大小=12pt)
=12pta) =12pt如果允许,=12pt执行并添加=12ptsplit信息
=12ptb) =12pt如果不被允许,将这些剩余的block归还=12ptblockToNodes
=12pt8.=12pt重置
=12pt9.=12pt跳到步骤=12pt1
   01.// process all nodes and create splits that are local

02. // to a node.

03. //创建同一个DN上的split

04. for (Iterator
05. List>> iter = nodeToBlocks.entrySet().iterator();

06. iter.hasNext();) {

07.

08. Map.Entry> one = iter.next();

09. nodes.add(one.getKey());

10. List blocksInNode = one.getValue();

11.

12. // for each block, copy it into validBlocks. Delete it from

13. // blockToNodes so that the same block does not appear in

14. // two different splits.

15. for (OneBlockInfo oneblock : blocksInNode) {

16. if (blockToNodes.containsKey(oneblock)) {

17. validBlocks.add(oneblock);

18. blockToNodes.remove(oneblock);

19. curSplitSize += oneblock.length;

20.

21. // if the accumulated split size exceeds the maximum, then

22. // create this split.

23. if (maxSize != 0 && curSplitSize >= maxSize) {

24. // create an input split and add it to the splits array

25. //创建这些block合并后的split,并将其split添加到split列表中

26. addCreatedSplit(job, splits, nodes, validBlocks);

27. //重置

28. curSplitSize = 0;

29. validBlocks.clear();

30. }

31. }

32. }

33. // if there were any blocks left over and their combined size is

34. // larger than minSplitNode, then combine them into one split.

35. // Otherwise add them back to the unprocessed pool. It is likely

36. // that they will be combined with other blocks from the same rack later on.

37. //其实这里的注释已经说的很清楚,我再按照我的理解说一下

38. /**

39. * 这里有几种情况:

40. * 1、在这个DN上还有没有被split的block,

41. * 而且这些block的大小大于了在一个DN上的split最小值(没有达到最大值),

42. * 将把这些block合并成一个split

43. * 2、剩余的block的大小还是没有达到,将剩余的这些block

44. * 归还给blockToNodes,等以后统一处理

45. */

46. if (minSizeNode != 0 && curSplitSize >= minSizeNode) {

47. // create an input split and add it to the splits array

48. addCreatedSplit(job, splits, nodes, validBlocks);

49. } else {

50. for (OneBlockInfo oneblock : validBlocks) {

51. blockToNodes.put(oneblock, oneblock.hosts);

52. }

53. }

54. validBlocks.clear();

55. nodes.clear();

56. curSplitSize = 0;

57. }

第二次:对不再同一个DN上但是在同一个Rack上的block进行合并(只是之前还剩下的block)
  01.// if blocks in a rack are below the specified minimum size, then keep them

02. // in 'overflow'. After the processing of all racks is complete, these overflow

03. // blocks will be combined into splits.

04. ArrayList overflowBlocks = new ArrayList();

05. ArrayList racks = new ArrayList();

06.

07. // Process all racks over and over again until there is no more work to do.

08. //这里处理的就不再是同一个DN上的block

09. //同一个DN上的已经被处理过了(上面的代码),这里是一些

10. //还没有被处理的block

11. while (blockToNodes.size() > 0) {

12.

13. // Create one split for this rack before moving over to the next rack.

14. // Come back to this rack after creating a single split for each of the

15. // remaining racks.

16. // Process one rack location at a time, Combine all possible blocks that

17. // reside on this rack as one split. (constrained by minimum and maximum

18. // split size).

19.

20. // iterate over all racks

21. //创建同机架的split

22. for (Iterator>> iter =

23. rackToBlocks.entrySet().iterator(); iter.hasNext();) {

24.

25. Map.Entry> one = iter.next();

26. racks.add(one.getKey());

27. List blocks = one.getValue();

28.

29. // for each block, copy it into validBlocks. Delete it from

30. // blockToNodes so that the same block does not appear in

31. // two different splits.

32. boolean createdSplit = false;

33. for (OneBlockInfo oneblock : blocks) {

34. //这里很重要,现在的blockToNodes说明的是还有哪些block没有被split

35. if (blockToNodes.containsKey(oneblock)) {

36. validBlocks.add(oneblock);

37. blockToNodes.remove(oneblock);

38. curSplitSize += oneblock.length;

39.

40. // if the accumulated split size exceeds the maximum, then

41. // create this split.

42. if (maxSize != 0 && curSplitSize >= maxSize) {

43. // create an input split and add it to the splits array

44. addCreatedSplit(job, splits, getHosts(racks), validBlocks);

45. createdSplit = true;

46. break;

47. }

48. }

49. }

50.

51. // if we created a split, then just go to the next rack

52. if (createdSplit) {

53. curSplitSize = 0;

54. validBlocks.clear();

55. racks.clear();

56. continue;

57. }

58.

59. //还有没有被split的block

60. //如果这些block的大小大于了同机架的最小split,

61. //则创建split

62. //否则,将这些block留到后面处理

63. if (!validBlocks.isEmpty()) {

64. if (minSizeRack != 0 && curSplitSize >= minSizeRack) {

65. // if there is a mimimum size specified, then create a single split

66. // otherwise, store these blocks into overflow data structure

67. addCreatedSplit(job, splits, getHosts(racks), validBlocks);

68. } else {

69. // There were a few blocks in this rack that remained to be processed.

70. // Keep them in 'overflow' block list. These will be combined later.

71. overflowBlocks.addAll(validBlocks);

72. }

73. }

74. curSplitSize = 0;

75. validBlocks.clear();

76. racks.clear();

77. }

78. }

最后,对于既不在同DN也不在同rack的block进行合并(经过前两步还剩下的block),这里源码就没有什么了,就不再贴了
 
 
 
=12pt源码总结:
 
=12pt合并,经过了3个步骤。同DN----》同rack不同DN-----》不同rack
=12pt将可以合并的block写到同一个split中下面是实践代码:
=12pt原始文件是70M每个的小文件,有些更小,sequence类型,需要自己实现RecordRead(Text就比较简单),key是byteWrite类型,现在需要减少文件个数,每个文件的大小接近block的大小。
=12pt自定义CombineSequenceFileInputFormat:
01.package com.hadoop.combineInput;

02.

03.import java.io.IOException;

04.

05.import org.apache.hadoop.mapreduce.InputSplit;

06.import org.apache.hadoop.mapreduce.RecordReader;

07.import org.apache.hadoop.mapreduce.TaskAttemptContext;

08.import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;

09.import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;

10.import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;

11.

12.public class CombineSequenceFileInputFormat extends CombineFileInputFormat {

13. @SuppressWarnings({ "unchecked", "rawtypes" })

14. @Override

15. public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException {

16. return new CombineFileRecordReader((CombineFileSplit)split, context, CombineSequenceFileRecordReader.class);

17. }

18.}

实现 CombineSequenceFileRecordReader

[java] view plaincopy在CODE上查看代码片派生到我的代码片

01.package com.hadoop.combineInput;

02.

03.

04.import java.io.IOException;

05.

06.import org.apache.hadoop.mapreduce.InputSplit;

07.import org.apache.hadoop.mapreduce.RecordReader;

08.import org.apache.hadoop.mapreduce.TaskAttemptContext;

09.import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;

10.import org.apache.hadoop.mapreduce.lib.input.FileSplit;

11.import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;

12.import org.apache.hadoop.util.ReflectionUtils;

13.

14.

15.public class CombineSequenceFileRecordReader extends RecordReader {

16. private CombineFileSplit split;

17. private TaskAttemptContext context;

18. private int index;

19. private RecordReader rr;

20.

21. @SuppressWarnings("unchecked")

22. public CombineSequenceFileRecordReader(CombineFileSplit split, TaskAttemptContext context, Integer index) throws IOException, InterruptedException {

23. this.index = index;

24. this.split = (CombineFileSplit) split;

25. this.context = context;

26.

27. this.rr = ReflectionUtils.newInstance(SequenceFileRecordReader.class, context.getConfiguration());

28. }

29.

30. @SuppressWarnings("unchecked")

31. @Override

32. public void initialize(InputSplit curSplit, TaskAttemptContext curContext) throws IOException, InterruptedException {

33. this.split = (CombineFileSplit) curSplit;

34. this.context = curContext;

35.

36. if (null == rr) {

37. rr = ReflectionUtils.newInstance(SequenceFileRecordReader.class, context.getConfiguration());

38. }

39.

40. FileSplit fileSplit = new FileSplit(this.split.getPath(index),

41. this.split.getOffset(index), this.split.getLength(index),

42. this.split.getLocations());

43.

44. this.rr.initialize(fileSplit, this.context);

45. }

46.

47. @Override

48. public float getProgress() throws IOException, InterruptedException {

49. return rr.getProgress();

50. }

51.

52. @Override

53. public void close() throws IOException {

54. if (null != rr) {

55. rr.close();

56. rr = null;

57. }

58. }

59.

60. @Override

61. public K getCurrentKey()

62. throws IOException, InterruptedException {

63. return rr.getCurrentKey();

64. }

65.

66. @Override

67. public V getCurrentValue()

68. throws IOException, InterruptedException {

69. return rr.getCurrentValue();

70. }

71.

72. @Override

73. public boolean nextKeyValue() throws IOException, InterruptedException {

74. return rr.nextKeyValue();

75. }

76.}

参考资料:http://sourceforge.net/p/openimaj/code/HEAD/tree/trunk/hadoop/core-hadoop/src/main/java/org/openimaj/hadoop/sequencefile/combine/CombineSequenceFileRecordReader.java

main函数比较简单,这里也贴出来下,方便后续自己记忆:

[java] view plaincopy在CODE上查看代码片派生到我的代码片

01.package com.hadoop.combineInput;

02.

03.import java.io.IOException;

04.

05.

06.import org.apache.hadoop.conf.Configuration;

07.import org.apache.hadoop.conf.Configured;

08.import org.apache.hadoop.fs.Path;

09.

10.import org.apache.hadoop.io.BytesWritable;

11.import org.apache.hadoop.io.Text;

12.import org.apache.hadoop.mapreduce.Job;

13.import org.apache.hadoop.mapreduce.Mapper;

14.import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

15.import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

16.import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;

17.import org.apache.hadoop.util.Tool;

18.import org.apache.hadoop.util.ToolRunner;

19.

20.public class MergeFiles extends Configured implements Tool {

21. public static class MapClass extends Mapper {

22.

23. public void map(BytesWritable key, Text value, Context context)

24. throws IOException, InterruptedException {

25. context.write(key, value);

26. }

27. } // END: MapClass

28.

29.

30. public int run(String[] args) throws Exception {

31. Configuration conf = new Configuration();

32. conf.set("mapred.max.split.size", "157286400");

33. conf.setBoolean("mapred.output.compress", true);

34. Job job = new Job(conf);

35. job.setJobName("MergeFiles");

36. job.setJarByClass(MergeFiles.class);

37.

38. job.setMapperClass(MapClass.class);

39. job.setInputFormatClass(CombineSequenceFileInputFormat.class);

40. job.setOutputFormatClass(SequenceFileOutputFormat.class);

41. job.setOutputKeyClass(BytesWritable.class);

42. job.setOutputValueClass(Text.class);

43.

44. FileInputFormat.addInputPaths(job, args[0]);

45. FileOutputFormat.setOutputPath(job, new Path(args[1]));

46.

47. job.setNumReduceTasks(0);

48.

49. return job.waitForCompletion(true) ? 0 : 1;

50. } // END: run

51.

52. public static void main(String[] args) throws Exception {

53. int ret = ToolRunner.run(new MergeFiles(), args);

54. System.exit(ret);

55. } // END: main

56.} //

 
 

Hadoop中的三种调度算法

唐半张 发表了文章 0 个评论 2007 次浏览 2015-10-10 09:11 来自相关话题

Hadoop中的三种调度算法  Hadoop 中的调度Hadoop 是一个通用系统,可以对一组分散的节点上的数据进行高性能处理。这样的定义也说明,Hadoop 是一个多任务系统,它可以同时为多个用户、多个作业处理多个数据集。   ...查看全部
Hadoop中的三种调度算法 
Hadoop 中的调度Hadoop 是一个通用系统,可以对一组分散的节点上的数据进行高性能处理。这样的定义也说明,Hadoop 是一个多任务系统,它可以同时为多个用户、多个作业处理多个数据集。
 
Hadoop 中的调度 
 
Hadoop 是一个通用系统,可以对一组分散的节点上的数据进行高性能处理。这样的定义也说明,Hadoop 是一个多任务系统,它可以同时为多个用户、多个作业处理多个数据集。这种多处理的能力也意味着 Hadoop 能以更优的方式将作业映射到资源。
在 2008 年以前,Hadoop 只支持与 JobTracker 逻辑混合在一起的单一调度器。尽管这样的实现对处理 Hadoop 传统批处理作业(如日志挖掘和 Web 索引)已经足够,但却不够灵活,而且无法定制。此外,Hadoop 在批处理模式下运行,作业提交给队列,Hadoop 基础架构只是简单地按照接收顺序对其进行处理。
幸运的是,有人为实现独立于 JobTracker 的调度器提交了一份 bug 报告 (HADOOP-3412)。更重要的是,新的调度器是可插入式的,从而能够使用新的调度算法来帮助优化一些特殊的作业。这项改变的另一巨大优势是增加了调度器的可靠性,从而进一步发掘潜力,以支持更多的调度器来处理 Hadoop 中不断增加的应用程序。
因为这项更改,Hadoop 现在成为一个多用户数据仓库,它支持大量不同类型的处理作业,它通过使用可插入式调度器框架来提供更广泛的控制。该框架可以优化各种不同工作负载(从小型作业到大型作业等一切作业)上的 Hadoop 集群使用方法。脱离 FIFO 调度(按提交时间确定重要性)可以让 Hadoop 集群支持具有不同优先级和性能限制的工作负载。
请注意: 本文假设您对 Hadoop 有一些了解。请参阅  参考资料,获取有关 Hadoop 架构简介以及有关安装、配置和编写 Hadoop 应用程序的 Hadoop 实用系列文章的链接。
 
Hadoop 核心架构
Hadoop 集群包含了相对简单的主从架构(参见  图 1)。NameNode 是一个 Hadoop 集群的总主节点,它负责文件系统命名空间和客户端的访问控制。还有 JobTracker,它的任务是将作业分配给等待的节点。这两项(NameNode 和 JobTracker)就是 Hadoop 架构的主要部分。从属部分包含 TaskTracker,它负责管理作业执行(包括启动和监控作业、获取其输出,以及通知 JobTracker 作业完成)。DataNode 是 Hadoop 集群中的存储节点,它表示分布式文件系统(多个 DataNodes 节点的情况下,是其中一部分)。TaskTracker 和 DataNode 是 Hadoop 集群中的从属部分。图 1. Hadoop 集群的元素
Hadoop中的三种调度算法" action-data="http%3A%2F%2Fwww.ibm.com%2Fdeveloperworks%2Fcn%2Fopensource%2Fos-hadoop-scheduling%2Ffigure1.gif" action-type="show-slide" class="alignCenter" style="max-width: 550px; height: auto; vertical-align: middle; border: 0px none; text-align: center; margin-right: auto; margin-left: auto; display: block;"> 
请注意,Hadoop 非常灵活,可支持单节点集群(所有内容在一个节点)或多节点集群(JobTracker 和 NameNodes 分布在数千个节点)。尽管关于现存大型生产环境的信息非常很少,但是拥有最大型 Hadoop 集群的是 Facebook,它包含 4000 个节点。这些节点分为多种型号(其中一半包含 8 核和 16 核 CPU)。Facebook 集群还支持分散在多个 DataNodes 节点上的 21PB 存储。由于存在着大量资源和大量用户的多种作业的潜力,调度是一项需要推进的重要优化。
 
Hadoop 调度器
自从可插入式调度器实现以来,已开发了多种调度器算法。接下来的章节将会介绍各种算法以及各自适用的情况。
 
FIFO 调度器
集成在 JobTracker 中的原有调度算法被称为  FIFO 。在 FIFO 调度中,JobTracker 从工作队列中拉取作业,最老的作业最先。这种调度方法不会考虑作业的优先级或大小,但很容易实现,而且效率很高。
公平调度
公平共享调度器的核心概念是,随着时间推移平均分配工作,这样每个作业都能平均地共享到资源。结果是只需较少时间执行的作业能够访问 CPU,那些需要更长时间执行的作业中结束得迟。这样的方式可以在 Hadoop 作业之间形成交互,而且可以让 Hadoop 集群对提交的多种类型作业作出更大的响应。公平调度器是由 Facebook 开发出来的。
Hadoop 的实现会创建一组池,将作业放在其中供调度器选择。每个池会分配一组共享以平衡池中作业的资源(更多的共享意味着作业执行所需的资源更多)。默认情况下,所有池的共享相等,但可以进行配置,根据作业类型提供更多或更少的共享。如果需要的话,还可以限制同时活动的作业数,以尽量减少拥堵,让工作及时完成。
为了保证公平,每个用户被分配一个池。在这样的方式下,如果一个用户提交很多作业,那么他分配的集群资源与其他用户一样多(与他提交的工作数无关)。无论分配到池的共享有多少,如果系统未加载,那么作业收到的共享不会被使用(在可用作业之间分配)。
调度器实现会追踪系统中每个作业的计算时间。调度器还会定期检查作业接收到的计算时间和在理想的调度器中应该收到的计算时间的差距。会使用该结果来确定任务的亏空。调度器作业接着会保证亏空最多的任务最先执行。
在 mapred-site.xml 文件中配置公平共享。该文件会定义对公平共享调度器行为的管理。一个 XML 文件(即 mapred.fairscheduler.allocation.file 属性)定义了每个池的共享的分配。为了优化作业大小,您可以设置mapread.fairscheduler.sizebasedweight 将共享分配给作业作为其大小的函数。还有一个类似的属性可以通过调整作业的权重让更小的作业在 5 分钟之后运行得更快 (mapred.fairscheduler.weightadjuster)。您还可以用很多其他的属性来调优节点上的工作负载(例如某个 TaskTracker 能管理的 maps 和 reduces 数目)并确定是否执行抢占。参见  参考资料  中所有可配置参数的链接。
容量调度器
容量调度器的原理与公平调度器有些相似,但也有一些区别。首先,容量调度是用于大型集群,它们有多个独立用户和目标应用程序。由于这个原因,容量调度能提供更大的控制和能力,提供用户之间最小容量保证并在用户之间共享多余的容量。容量调度是由 Yahoo! 开发出来的。
在容量调度中,创建的是队列而不是池,每个队列的 map 和 reduce 插槽数都可以配置。每个队列都会分配一个保证容量(集群的总容量是每个队列容量之和)。
队列处于监控之下;如果某个队列未使用分配的容量,那么这些多余的容量会被临时分配到其他队列中。由于队列可以表示一个人或大型组织,那么所有的可用容量都可以由其他用户重新分配使用。
与公平调度另一个区别是可以调整队列中作业的优先级。一般来说,具有高优先级的作业访问资源比低优先级作业更快。Hadoop 路线图包含了对抢占的支持(临时替换出低优先级作业,让高优先级作业先执行),但该功能尚未实现。
另一个区别是对队列进行严格的访问控制(假设队列绑定到一个人或组织)。这些访问控制是按照每个队列进行定义的。对于将作业提交到队列的能力和查看修改队列中作业的能力都有严格限制。
可在多个 Hadoop 配置文件中配置容量调度器。队列是在 hadoop-site.xml 中定义,在 capacity-scheduler.xml 中配置。可以在 mapred-queue-acls.xml 中配置 ACL。单个的队列属性包括容量百分比(集群中所有的队列容量少于或等于 100)、最大容量(队列多余容量使用的限制)以及队列是否支持优先级。更重要的是,可以在运行时调整队列优先级,从而可以在集群使用过程中改变或避免中断的情况。
 
其他方法
虽然本身不是调度器,但 Hadoop 也支持从大型物理集群内部提供虚拟集群的概念,这称之为  Hadoop On Demand   (HOD)。HOD 方法使用 Torque 资源管理器根据虚拟集群的需要进行节点分配。有了分配的节点,HOD 系统自动准备好配置文件,然后根据虚拟集群中的节点进行初始化。虚拟化之后,就能以相对对立的方式使用 HOD 虚拟集群。
HOD 还具有自适应性,在工作负载变化时会收缩。如果某个时间段内检测到没有运行的作业,HOD 会自动解除虚拟集群的节点分配。这种方式可以保证整个物理集群资产的使用效率达到最高。
HOD 对于在云基础架构中部署 Hadoop 集群来说,是个很有意思的模型。它还有个优势,通过与节点尽量少的共享,从而有了更大的安全性,而且某些情况下,由于节点中多个用户作业之间内部缺乏竞争,性能得到提升。
 
何时使用各个调度器
从以上的讨论中可以看出,这些调度算法各具针对性。如果正在运行一个大型 Hadoop 集群,它具有多个客户端和不同类型、不同优先级的作业,那么容量调度器是最好选择,它可以确保访问,并能重用未使用的容量并调整队列中作业的优先级。
尽管不太复杂,但无论是小型还是大型集群,如果由同一个组织使用,工作负载数量有限,那么公平调度器也能运转得很好。公平调度可以将容量不均匀地分配给池(作业的),但是它较为简单且可配置性较低。公平调度在存在多种作业的情况下非常有用,因为它能为小作业和大作业混合的情况提供更快的响应时间(支持更具交互性的使用模型)。
 
Hadoop 调度的未来开发
既然 Hadoop 调度器是可插入式的,那么您会看到针对某个独特集群部署而开发的新调度器。有两种正在开发的调度器(来自 Hadoop 事项列表),分别是自适应调度器和学习调度器。学习调度器 (MAPREDUCE-1349) 可用来在出现多种工作负载情况下维持利用率水平。目前,此调度器的实现重点关注 CPU 平均负载,网络和磁盘 I/O 的利用率仍处于计划之中。自适应调度器 (MAPREDUCE-1380) 重点关注根据性能和用户定义的业务目标自动调整某个作业。

win7下eclipse配置hadoop开发环境

唐半张 发表了文章 0 个评论 1929 次浏览 2015-10-09 10:28 来自相关话题

Windows下配置hadoop开发环境 1.开发环境操作系统:windows7 64位 开发工具:eclipse3.3 64位 Hadoop版本:0.20.2 2.安装hadoop开发插件将hadoop安装包h ...查看全部
Windows下配置hadoop开发环境
1.开发环境操作系统:windows7 64
开发工具:eclipse3.3 64
Hadoop版本:0.20.2
2.安装hadoop开发插件将hadoop安装包hadoopcontribeclipse-pluginhadoop-0.20.2-eclipse-plugin.jar拷贝到eclipse的插件目录plugins下。
需要注意的是插件版本(及后面开发导入的所有jar包)与运行的hadoop一致,否则可能会出现EOFException异常。(hadoop0.20.2版本只能与eclipse3.3及以下版本可以正常使用
重启eclipse,打开windows->open perspective->other->map/reduce 可以看到map/reduce开发视图。

参数说明如下:
        Location name:任意
        map/reduce master:与mapred-site.xml里面mapred.job.tracker设置一致。
DFS master:与core-site.xml里fs.default.name设置一致。
User name: 服务器上运行hadoop服务的用户名。

        然后是打开“Advanced parameters”设置面板,修改相应参数。上面的参数填写以后,也会反映到这里相应的参数:
主要关注下面几个参数:
fs.defualt.name:                与core-site.xml里fs.default.name设置一致。
mapred.job.tracker:        与mapred-site.xml里面mapred.job.tracker设置一致。
dfs.replication:                与hdfs-site.xml里面的dfs.replication一致。
hadoop.tmp.dir:                与core-site.xml里hadoop.tmp.dir设置一致。(可以不配置)
hadoop.job.ugi:                填写hadoop集群的用户与组名(即启动hadoop使用的用户及其所在用户组)。(必须配置)

点击Finish后可以看到HDFS结构

3.运行程序3.1新建MR项目新建项目选择Map/Reduce Project

MR项目需要配置Hadoop安装路径

选择hadoop解压路径

点击OK完成配置。
输入项目名称点击Finish完成项目创建。
hadoop解压包中hadoop-0.20.2srcexamplesorgapachehadoopexamples路径下的WordCount.java复制到项目中。

修改包路径

配置java运行参数
        Run-->Open Run Dialog

Arguments下增加程序的输入输出地址

上传输入文件
input文件夹中上传测试使用文件,内容如下:
The number of milliseconds before a task will be
terminated if it neither reads an input, writes an output, nor
updates its status string.
Run as-->Run On Hadoop
将得到以下类似输出

13/05/25 10:20:25 WARN conf.Configuration: DEPRECATED: hadoop-site.xml found in the classpath. Usage of hadoop-site.xml is deprecated. Instead use core-site.xml, mapred-site.xml and hdfs-site.xml to override properties of core-default.xml, mapred-default.xml and hdfs-default.xml respectively
13/05/25 10:20:26 INFO input.FileInputFormat: Total input paths to process : 1
13/05/25 10:20:27 INFO mapred.JobClient: Running job: job_201305250948_0006
13/05/25 10:20:28 INFO mapred.JobClient:  map 0% reduce 0%
13/05/25 10:20:44 INFO mapred.JobClient:  map 100% reduce 0%
13/05/25 10:20:56 INFO mapred.JobClient:  map 100% reduce 100%
13/05/25 10:20:58 INFO mapred.JobClient: Job complete: job_201305250948_0006
13/05/25 10:20:58 INFO mapred.JobClient: Counters: 17
13/05/25 10:20:58 INFO mapred.JobClient:   Job Counters 
13/05/25 10:20:58 INFO mapred.JobClient:     Launched reduce tasks=1
13/05/25 10:20:58 INFO mapred.JobClient:     Launched map tasks=1
13/05/25 10:20:58 INFO mapred.JobClient:     Data-local map tasks=1
13/05/25 10:20:58 INFO mapred.JobClient:   FileSystemCounters
13/05/25 10:20:58 INFO mapred.JobClient:     FILE_BYTES_READ=280
13/05/25 10:20:58 INFO mapred.JobClient:     HDFS_BYTES_READ=140
13/05/25 10:20:58 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=592
13/05/25 10:20:58 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=182
13/05/25 10:20:58 INFO mapred.JobClient:   Map-Reduce Framework
13/05/25 10:20:58 INFO mapred.JobClient:     Reduce input groups=23
13/05/25 10:20:58 INFO mapred.JobClient:     Combine output records=23
13/05/25 10:20:58 INFO mapred.JobClient:     Map input records=3
13/05/25 10:20:58 INFO mapred.JobClient:     Reduce shuffle bytes=280
13/05/25 10:20:58 INFO mapred.JobClient:     Reduce output records=23
13/05/25 10:20:58 INFO mapred.JobClient:     Spilled Records=46
13/05/25 10:20:58 INFO mapred.JobClient:     Map output bytes=235
13/05/25 10:20:58 INFO mapred.JobClient:     Combine input records=24
13/05/25 10:20:58 INFO mapred.JobClient:     Map output records=24

4问题4.1DFS local无法显示文件

解决方法:
解决方法是:

1.在“Advanced parameters”设置面板,设置hadoop.job.ugi参数,将hadoop用户加上去。

2.conf.set("hadoop.job.ugi", "hadoop,hadoop"); //设置hadoop server用户名和密码,可以参考这个博客:

[url=http://jimey.com/2009/03/26/windows-environment-eclipse-debugging-environment-hadoop-remote-ubuntu.html]http://jimey.com/2009/03/26/windows-environment-eclipse-debugging-environment-hadoop-remote-ubuntu.html[/url]

我的做法是在服务器端配置,具体是在conf/mapred-site.xml中添加下面的配置:

   hadoop.job.ugi
   root,abc123456
   hadoop user access password


   mapred.system.dir
   /home/hadoop/hadoop-0.20.0/tmp/mapred/system
   

4.2 运行时报java.lang.IllegalArgumentException: XXX... not found.这是因为导入的hadoop lib中缺少相应类,暂时不明白这个类是做什么的,只需要删除
Advanced parameters下Io.comperssion.codecs 值中的对应类即可。

Hadoop2.0 Namenode HA实现方案介绍及汇总

唐半张 发表了文章 0 个评论 2277 次浏览 2015-10-09 10:22 来自相关话题

Hadoop2.0 Namenode HA实现方案介绍及汇总 基于社区最新release的Hadoop2.2.0版本,调研了hadoop HA方面的内容。hadoop2.0主要的新特性(Hadoop2.0稳定版2.2.0新特性剖析): ...查看全部
Hadoop2.0 Namenode HA实现方案介绍及汇总
基于社区最新release的Hadoop2.2.0版本,调研了hadoop HA方面的内容。hadoop2.0主要的新特性(Hadoop2.0稳定版2.2.0新特性剖析):
  1. hdfs snapshots: apache官方对hdfs snapshots说明
  2. namenode federation: namenode在集群规模大了之后会成为性能瓶颈,尤其是内存使用量急剧增大,同时hdfs所有元数据信息的读取和操作都要与namenode通信。而联邦模式解决的就是namenode的可扩展性问题。更多内容可以参看hadoop 2.0 namenode HA实战和federation实践 下图是我画的HA和Federation部署图。每个namesevice映射了HDFS中部分实际路径,可以单独给Client提供服务,也可以由Client通过Client Mount Table来访问若干NS。图中每个NS里有一个active NN和一个standby NN,这部分HA会在下面介绍。每个NS对应了一个Pool,Pool对应的DN是该NS可以访问的DN id的集合。这样做到可扩展,带来的好处有很多,比如后续添加的NS不会影响之前的NS等。联邦部署适合大规模集群,一般规模不大的情况下不需要使用。下面主要介绍HA的内容。


  1. namenode单点故障解决方案。NN现在的HA解决方案主要思路是提供一个保存元数据信息的地方,保证editlog不会丢失。董的这篇HA单点故障解决方案总结中介绍了从解决MRv1的Jobtracker HA,到HDFS HA,再到还未正式发布的YARN RM HA解决方案的异同,各自采用的共享存储系统有所不同,主要原因是HA的解决方案难度取决于Master自身记录信息的多少和信息可重构性。共享存储系统主要有NFS,ZK,BookKeeper,QJM。其中已经发行版本里默认使用的QJM(Quaro Journal Manager)。QJM是Cloudera公司提出的,在QJM出现前,如果在主从切换的这段时间内出现脑裂,破坏HDFS元数据的时候,常见方式是去掉activeNN的写权限来保证最多只有一个active NN。QJM本质上是Paxos算法的实现,通过启动2N+1个JournalNode来写editlog,当其中大于N个Node写成功时候认为本次写成功,且允许容忍N以下个Node挂掉。QJM实现及源码分析可以参考基于QJM的HDFS HA原理及代码分析。QJM和BKJM(借助BookKeeper实现的JM)都是将editlog信息写在磁盘上,这点也是与NFS方案的区别,且NFS相对而言其实更重量级,本身是一个需要独立维护的东西,而QJM是已经实现的默认方案,配置方法在官方里也可以找到,很详细。BKJM正在实现中且长期看好。关于BookKeeper相关的JIRA进展可以参考BookKeeper Option For NN HA。所以总结来说推荐使用QJM和BKJM,且他们的原理比较相似。再给出HDFS JIRA上一份cloudera员工给的Quorum-Journal Design设计文档,地址为https://issues.apache.org/jira/secure/attachment/12547598/qjournal-design.pdf
  2. hdfs symbo links将在2.3.0里发布。类似linux文件系统的软链接。相关资料可以参考理解 Linux 的硬链接与软链接  硬连接和软连接的原理
  3. 其实现在的HA方案,很大程度上参考的是Facebook的AvatarNode的NN HA方案,只是他是手动的。Facebook的AvatarNode是业界较早的Namenode HA方案,它是基于HDFS 0.20实现的,如下图所示。


由于采用的是人工切换,所以实现相对简单。AvatarNode对Namenode进行了封装,处于工作状态的叫Primary Avatar,处于热备状态的叫Standby Avatar(封装了Namenode和SecondaryNameNode),两者通过NFS共享EditLog所在目录。在工作状态下,Primary Avatar中的Namenode实例接收Client的请求并进行处理,Datanode会向Primary和Standby两个同时发送blockReport和心跳,Standby Avatar不断地从共享的EditLog中持续写入的新事务,并推送给它的Namenode实例,此时Standby Avatar内部的Namenode处于安全模式状态,不对外提供服务,但是状态与Primary Avatar中的保持一致。一旦Primary发生故障,管理员进行Failover切换:首先将原来的Primary进程杀死(避免了“Split Brain”和“IO Fencing”问题),然后将原来的Standby设置为Primary,新的Primary会保证回放完成所有的EditLog事务,然后退出安全模式,对外接收服务请求。为了实现对客户端透明,AvatarNode主从采用相同的虚拟IP,切换时将新的Primary设置为该虚拟IP即可。整个流程可在秒~分钟级别完成。可以参考FaceBook 2011年的论文Apache Hadoop Goes Realtime at Facebook 里面专门有一节讲到HA AvatarNode的设计。
在董的博客里还谈到hadoop 2.0尚未解决的问题,提到namenode的热备现在只能是一个,且共享存储系统也只能有一套,本质上还是单点故障,其实是做了一层转移。YARN的HA还没解决。多资源存储可能存在潜在问题。这里关于YARN RM的HA的话,可以继续跟进JIRA上的情况,JIRA地址为https://issues.apache.org/jira/browse/YARN-149,里面有RM HA的设计思路,最新的两篇文档:YARN ResourceManager Automatic Failover 和 RM HA Phase1: Cold Standby 关注这个问题的朋友可以跟进关注一下。
 
总结
本文参考了网上一些资深研究者的博客资料和HDFS JIRA上的一些内容,整理了一下NN HA方面的几种实现方式,也提供了更多细致和详细的内容链接。

hadoop如何设置超时

唐半张 发表了文章 0 个评论 2699 次浏览 2015-10-09 09:39 来自相关话题

1、HDFS写入过程中有两个超时设置: dfs.socket.timeout和 dfs.datanode.socket.write.timeout; 有些地方以为只是需要修改后面 的dfs.datanode.socket.write. ...查看全部
1、HDFS写入过程中有两个超时设置: dfs.socket.timeout和 dfs.datanode.socket.write.timeout;

有些地方以为只是需要修改后面 的dfs.datanode.socket.write.timeout项就可以,其实看报错是READ_TIMEOUT。对应在hbase中的默认值如下:
  // Timeouts for communicating with DataNode for streaming writes/reads
  public static int READ_TIMEOUT = 60 * 1000;   //其实是超过了这个值
  public static int READ_TIMEOUT_EXTENSION = 3 * 1000;
  public static int WRITE_TIMEOUT = 8 * 60 * 1000;
  public static int WRITE_TIMEOUT_EXTENSION = 5 * 1000; //for write pipeline2. 修改配置文件
    所以找出来是超时导致的,所以在hadoop-site.xml[hdfs-site.xml]配置文件中添加如下配置:
   
     dfs.datanode.socket.write.timeout
     3000000
   


   
     dfs.socket.timeout
     3000000
   

hadoop不适合场景

唐半张 发表了文章 0 个评论 1479 次浏览 2015-10-09 09:29 来自相关话题

1: 低延迟数据访问        Hadoop设计的目的是大吞吐量,所以并没有针对低延迟数据访问做一些优化,如果要求低延迟, 可以看看Hbase。 2: 大量的小文件        由于NameNode把文件的MetaDa ...查看全部
1: 低延迟数据访问
       Hadoop设计的目的是大吞吐量,所以并没有针对低延迟数据访问做一些优化,如果要求低延迟, 可以看看Hbase。
2: 大量的小文件
       由于NameNode把文件的MetaData存储在内存中,所以大量的小文件会产生大量的MetaData。这样的话百万级别的文件数目还是可行的,再多的话就有问题了。
3: 多用户写入,任意修改
       Hadoop现在还不支持多人写入,任意修改的功能。也就是说每次写入都会添加在文件末尾。

hadoop1.0 和hadoop2.0 任务处理架构比较

唐半张 发表了文章 0 个评论 1634 次浏览 2015-10-09 09:27 来自相关话题

刚刚看到一篇文章对 hadoop1 和 hadoop  2  做了一个解释 图片不错 拿来看看 Hadoop 1.0     [size=0.76em]从上图中可以清楚的看出原 Map ...查看全部
刚刚看到一篇文章对 hadoop1 和 hadoop  2  做了一个解释 图片不错 拿来看看

Hadoop 1.0

 
 
[size=0.76em]从上图中可以清楚的看出原 MapReduce 程序的流程及设计思路:
  • 首先用户程序 (JobClient) 提交了一个 job,job 的信息会发送到 Job Tracker 中,Job Tracker 是 Map-reduce 框架的中心,他需要与集群中的机器定时通信 (heartbeat), 需要管理哪些程序应该跑在哪些机器上,需要管理所有 job 失败、重启等操作。
  • TaskTracker 是 Map-reduce 集群中每台机器都有的一个部分,他做的事情主要是监视自己所在机器的资源情况。
  • TaskTracker 同时监视当前机器的 tasks 运行状况。TaskTracker 需要把这些信息通过 heartbeat 发送给 JobTracker,JobTracker 会搜集这些信息以给新提交的 job 分配运行在哪些机器上。上图虚线箭头就是表示消息的发送 - 接收的过程。
[size=0.76em]可以看得出原来的 map-reduce 架构是简单明了的,在最初推出的几年,也得到了众多的成功案例,获得业界广泛的支持和肯定,但随着分布式系统集群的规模和其工作负荷的增长,原框架的问题逐渐浮出水面,主要的问题集中如下:
  • JobTracker 是 Map-reduce 的集中处理点,存在单点故障。
  • JobTracker 完成了太多的任务,造成了过多的资源消耗,当 map-reduce job 非常多的时候,会造成很大的内存开销,潜在来说,也增加了 JobTracker fail 的风险,这也是业界普遍总结出老 Hadoop 的 Map-Reduce 只能支持 4000 节点主机的上限。
  • 在 TaskTracker 端,以 map/reduce task 的数目作为资源的表示过于简单,没有考虑到 cpu/ 内存的占用情况,如果两个大内存消耗的 task 被调度到了一块,很容易出现 OOM。
  • 在 TaskTracker 端,把资源强制划分为 map task slot 和 reduce task slot, 如果当系统中只有 map task 或者只有 reduce task 的时候,会造成资源的浪费,也就是前面提过的集群资源利用的问题。
  • 源代码层面分析的时候,会发现代码非常的难读,常常因为一个 class 做了太多的事情,代码量达 3000 多行,,造成 class 的任务不清晰,增加 bug 修复和版本维护的难度。
  • 从操作的角度来看,现在的 Hadoop MapReduce 框架在有任何重要的或者不重要的变化 ( 例如 bug 修复,性能提升和特性化 ) 时,都会强制进行系统级别的升级更新。更糟的是,它不管用户的喜好,强制让分布式集群系统的每一个用户端同时更新。这些更新会让用户为了验证他们之前的应用程序是不是适用新的 Hadoop 版本而浪费大量时间。

 
hadoop2.0:

 
从业界使用分布式系统的变化趋势和 hadoop 框架的长远发展来看,MapReduce 的 JobTracker/TaskTracker 机制需要大规模的调整来修复它在可扩展性,内存消耗,线程模型,可靠性和性能上的缺陷。在过去的几年中,hadoop 开发团队做了一些 bug 的修复,但是最近这些修复的成本越来越高,这表明对原框架做出改变的难度越来越大。
[size=0.76em]为从根本上解决旧 MapReduce 框架的性能瓶颈,促进 Hadoop 框架的更长远发展,从 0.23.0 版本开始,Hadoop 的 MapReduce 框架完全重构,发生了根本的变化。新的 Hadoop MapReduce 框架命名为 MapReduceV2 或者叫 Yarn,
 
[size=0.76em]重构根本的思想是将 JobTracker 两个主要的功能分离成单独的组件,这两个功能是资源管理和任务调度 / 监控。新的资源管理器全局管理所有应用程序计算资源的分配,每一个应用的 ApplicationMaster 负责相应的调度和协调。一个应用程序无非是一个单独的传统的 MapReduce 任务或者是一个 DAG( 有向无环图 ) 任务。ResourceManager 和每一台机器的节点管理服务器能够管理用户在那台机器上的进程并能对计算进行组织。
[size=0.76em]事实上,每一个应用的 ApplicationMaster 是一个详细的框架库,它结合从 ResourceManager 获得的资源和 NodeManager 协同工作来运行和监控任务。
[size=0.76em]上图中 ResourceManager 支持分层级的应用队列,这些队列享有集群一定比例的资源。从某种意义上讲它就是一个纯粹的调度器,它在执行过程中不对应用进行监控和状态跟踪。同样,它也不能重启因应用失败或者硬件错误而运行失败的任务。
[size=0.76em]ResourceManager 是基于应用程序对资源的需求进行调度的 ; 每一个应用程序需要不同类型的资源因此就需要不同的容器。资源包括:内存,CPU,磁盘,网络等等。可以看出,这同现 Mapreduce 固定类型的资源使用模型有显著区别,它给集群的使用带来负面的影响。资源管理器提供一个调度策略的插件,它负责将集群资源分配给多个队列和应用程序。调度插件可以基于现有的能力调度和公平调度模型。
[size=0.76em]上图中 NodeManager 是每一台机器框架的代理,是执行应用程序的容器,监控应用程序的资源使用情况 (CPU,内存,硬盘,网络 ) 并且向调度器汇报。
[size=0.76em]每一个应用的 ApplicationMaster 的职责有:向调度器索要适当的资源容器,运行任务,跟踪应用程序的状态和监控它们的进程,处理任务的失败原因。

基于大数据分析系统Hadoop的13个开源工具

唐半张 发表了文章 0 个评论 2267 次浏览 2015-10-09 09:24 来自相关话题

Hadoop是由Apache基金会开发的一个大数据分布式系统基础架构,最早版本是2003年原Yahoo!DougCutting根据Google发布的学术论文研究而来。用户可以在不了解分布式底层细节的情况下,轻松地在Hadoop上开发和运行处理海量数据的应用程序 ...查看全部
Hadoop是由Apache基金会开发的一个大数据分布式系统基础架构,最早版本是2003年原Yahoo!DougCutting根据Google发布的学术论文研究而来。用户可以在不了解分布式底层细节的情况下,轻松地在Hadoop上开发和运行处理海量数据的应用程序。低成本、高可 靠、高扩展、高有效、高容错等特性让Hadoop成为最流行的大数据分析系统,然而其赖以生存的HDFS和MapReduce组件却让其一度陷入困境——批处理的工作方式让其只适用于离线数据处理,在要求实时性的场景下毫无用武之地。因此,各种基于Hadoop的工具应运而生,本次为大家分享Hadoop生态系统中最常用的13个开源工具,其中包括资源调度、流计算及各种业务针对应用场景。首先,我们看资源管理相关。
资源统一管理/调度系统

在公司和机构中,服务器往往会因为业务逻辑被拆分为多个集群,基于数据密集型的处理框架也是不断涌现,比如支持离线处理的MapReduce、支持在线处理的Storm及Impala、支持迭代计算的Spark及流处理框架S4,它们诞生于不同的实验室,并各有所长。为了减 少管理成本,提升资源的利用率,一个共同的想法产生——让这些框架运行在同一个集群上;因此,就有了当下众多的资源统一管理/调度系统,比如Google的Borg、Apache的YARN、Twitter的Mesos(已贡献给Apache基金会)、腾讯搜搜的Torca、FacebookCorona(开源),本次为大家重点介绍ApacheMesos及YARN:

1.ApacheMesos
    代码托管地址:ApacheSVN
    Mesos提供了高效、跨分布式应用程序和框架的资源隔离和共享,支持Hadoop、MPI、Hypertable、Spark等。
    Mesos是Apache孵化器中的一个开源项目,使用ZooKeeper实现容错复制,使用LinuxContainers来隔离任务, 支持多种资源计划分配(内存和CPU)。提供Java、Python和C++APIs来开发新的并行应用程序,提供基于Web的用户界面来提查看集群状 态。

2.HadoopYARN
    代码托管地址:ApacheSVN
    YARN又被称为MapReduce2.0,借鉴Mesos,YARN提出了资源隔离解决方案Container,但是目前尚未成熟,仅仅提供Java虚拟机内存的隔离。
    对比MapReduce1.x,YARN架构在客户端上并未做太大的改变,在调用API及接口上还保持大部分的兼容,然而在YARN中,开 发人员使用ResourceManager、ApplicationMaster与NodeManager代替了原框架中核心的JobTracker和TaskTracker。其中ResourceManager是一个中心的服务,负责调度、启动每一个Job所属的ApplicationMaster, 另外还监控ApplicationMaster的存在情况;NodeManager负责Container状态的维护,并向RM保持心跳。ApplicationMaster负责一个Job生命周期内的所有工作,类似老的框架中JobTracker。

Hadoop上的实时解决方案

前面我们有说过,在互联网公司中基于业务逻辑需求,企业往往会采用多种计算框架,比如从事搜索业务的公司:网页索引建立用MapReduce,自然语言处理用Spark等。本节为大家分享的则是Storm、Impala、Spark三个框架:

3.ClouderaImpala
    代码托管地址:GitHub
    Impala是由Cloudera开发,一个开源的MassivelyParallelProcessing(MPP)查询引擎。与Hive相同的元数据、SQL语法、ODBC驱动程序和用户接口(HueBeeswax),可以直接在HDFS或HBase上提供快速、交互式SQL查 询。Impala是在Dremel的启发下开发的,第一个版本发布于2012年末。
    Impala不再使用缓慢的Hive+MapReduce批处理,而是通过与商用并行关系数据库中类似的分布式查询引擎(由QueryPlanner、QueryCoordinator和QueryExecEngine三部分组成),可以直接从HDFS或者HBase中用SELECT、JOIN和统计函数查询数据,从而大大降低了延迟。

4.Spark
    代码托管地址:Apache
    Spark是个开源的数据分析集群计算框架,最初由加州大学伯克利分校AMPLab开发,建立于HDFS之上。Spark与Hadoop一样,用于构建大规模、低延时的数据分析应用。Spark采用Scala语言实现,使用Scala作为应用框架。
    Spark采用基于内存的分布式数据集,优化了迭代式的工作负载以及交互式查询。与Hadoop不同的是,Spark和Scala紧密集 成,Scala像管理本地collective对象那样管理分布式数据集。Spark支持分布式数据集上的迭代式任务,实际上可以在Hadoop文件系统 上与Hadoop一起运行(通过YARN、Mesos等实现)。

5.Storm
    代码托管地址:GitHub
    Storm是一个分布式的、容错的实时计算系统,由BackType开发,后被Twitter捕获。Storm属于流处理平台,多用于实时 计算并更新数据库。Storm也可被用于“连续计算”(continuouscomputation),对数据流做连续查询,在计算时就将结果以流的形式 输出给用户。它还可被用于“分布式RPC”,以并行的方式运行昂贵的运算。

Hadoop上的其它解决方案

就像前文说,基于业务对实时的需求,各个实验室发明了Storm、Impala、Spark、Samza等流实时处理工具。而本节我们将分 享的是实验室基于性能、兼容性、数据类型研究的开源解决方案,其中包括Shark、Phoenix、ApacheAccumulo、ApacheDrill、ApacheGiraph、ApacheHama、ApacheTez、ApacheAmbari。

6.Shark
    代码托管地址:GitHub
    Shark,代表了“HiveonSpark”,一个专为Spark打造的大规模数据仓库系统,兼容ApacheHive。无需修改现有的数据或者查询,就可以用100倍的速度执行HiveQL。
    Shark支持Hive查询语言、元存储、序列化格式及自定义函数,与现有Hive部署无缝集成,是一个更快、更强大的替代方案。

7.Phoenix
    代码托管地址:GitHub
    Phoenix是构建在ApacheHBase之上的一个SQL中间层,完全使用Java编写,提供了一个客户端可嵌入的JDBC驱动。Phoenix查询引擎会将SQL查询转换为一个或多个HBasescan,并编排执行以生成标准的JDBC结果集。直接使用HBaseAPI、协同处理 器与自定义过滤器,对于简单查询来说,其性能量级是毫秒,对于百万级别的行数来说,其性能量级是秒。Phoenix完全托管在GitHub之上。
    Phoenix值得关注的特性包括:1,嵌入式的JDBC驱动,实现了大部分的java.sql接口,包括元数据API;2,可以通过多个 行键或是键/值单元对列进行建模;3,DDL支持;4,版本化的模式仓库;5,DML支持;5,通过客户端的批处理实现的有限的事务支持;6,紧跟ANSISQL标准。

8.ApacheAccumulo
    代码托管地址:ApacheSVN
    ApacheAccumulo是一个可靠的、可伸缩的、高性能、排序分布式的键值存储解决方案,基于单元访问控制以及可定制的服务器端处 理。使用GoogleBigTable设计思路,基于ApacheHadoop、Zookeeper和Thrift构建。Accumulo最早由NSA开 发,后被捐献给了Apache基金会。
    对比GoogleBigTable,Accumulo主要提升在基于单元的访问及服务器端的编程机制,后一处修改让Accumulo可以在数据处理过程中任意点修改键值对。

9.ApacheDrill
    代码托管地址:GitHub
    本质上,ApacheDrill是GoogleDremel的开源实现,本质是一个分布式的mpp查询层,支持SQL及一些用于NoSQL和Hadoop数据存储系统上的语言,将有助于Hadoop用户实现更快查询海量数据集的目的。当下Drill还只能算上一个框架,只包含了Drill愿 景中的初始功能。
    Drill的目的在于支持更广泛的数据源、数据格式及查询语言,可以通过对PB字节数据的快速扫描(大约几秒内)完成相关分析,将是一个专为互动分析大型数据集的分布式系统。

10.ApacheGiraph
    代码托管地址:GitHub
    ApacheGiraph是一个可伸缩的分布式迭代图处理系统,灵感来自BSP(bulksynchronousparallel)和Google的Pregel,与它们区别于则是是开源、基于Hadoop的架构等。
    Giraph处理平台适用于运行大规模的逻辑计算,比如页面排行、共享链接、基于个性化排行等。Giraph专注于社交图计算,被Facebook作为其OpenGraph工具的核心,几分钟内处理数万亿次用户及其行为之间的连接。

11.ApacheHama
    代码托管地址:GitHub
    ApacheHama是一个建立在Hadoop上基于BSP(BulkSynchronousParallel)的计算框架,模仿了Google的Pregel。用来处理大规模的科学计算,特别是矩阵和图计算。集群环境中的系统架构由BSPMaster/GroomServer(ComputationEngine)、Zookeeper(DistributedLocking)、HDFS/HBase(StorageSystems)这3大块组成。

12.ApacheTez
    代码托管地址:GitHub
    ApacheTez是基于HadoopYarn之上的DAG(有向无环图,DirectedAcyclicGraph)计算框架。它把Map/Reduce过程拆分成若干个子过程,同时可以把多个Map/Reduce任务组合成一个较大的DAG任务,减少了Map/Reduce之间的文 件存储。同时合理组合其子过程,减少任务的运行时间。由Hortonworks开发并提供主要支持。

13.ApacheAmbari
    代码托管地址:ApacheSVN
    ApacheAmbari是一个供应、管理和监视ApacheHadoop集群的开源框架,它提供一个直观的操作工具和一个健壮的HadoopAPI,可以隐藏复杂的Hadoop操作,使集群操作大大简化,首个版本发布于2012年6月。
    ApacheAmbari现在是一个Apache的顶级项目,早在2011年8月,Hortonworks引进Ambari作为ApacheIncubator项目,制定了Hadoop集群极致简单管理的愿景。在两年多的开发社区显著成长,从一个小团队,成长为Hortonworks各种组织的贡献者。Ambari用户群一直在稳步增长,许多机构依靠Ambari在其大型数据中心大规模部署和管理Hadoop集 群。
    目前ApacheAmbari支持的Hadoop组件包括:HDFS、MapReduce、Hive、HCatalog、HBase、ZooKeeper、Oozie、Pig及Sqoop

解析Hadoop新一代MapReduce框架Yarn

唐半张 发表了文章 0 个评论 1930 次浏览 2015-10-08 10:43 来自相关话题

背景 Yarn是一个分布式的资源管理系统,用以提高分布式的集群环境下的资源利用率,这些资源包括内存、IO、网络、磁盘等。其产生的原因是为了解决原MapReduce框架的不足。最初MapReduce的committer们还可以周期性的在已有的代码上进 ...查看全部
背景
Yarn是一个分布式的资源管理系统,用以提高分布式的集群环境下的资源利用率,这些资源包括内存、IO、网络、磁盘等。其产生的原因是为了解决原MapReduce框架的不足。最初MapReduce的committer们还可以周期性的在已有的代码上进行修改,可是随着代码的增加以及原MapReduce框架设计的不足,在原MapReduce框架上进行修改变得越来越困难,所以MapReduce的committer们决定从架构上重新设计MapReduce,使下一代的MapReduce(MRv2/Yarn)框架具有更好的扩展性、可用性、可靠性、向后兼容性和更高的资源利用率以及能支持除了MapReduce计算框架外的更多的计算框架。
原MapReduce框架的不足

JobTracker是集群事务的集中处理点,存在单点故障JobTracker需要完成的任务太多,既要维护job的状态又要维护job的task的状态,造成过多的资源消耗 在taskTracker端,用map/reduce task作为资源的表示过于简单,没有考虑到CPU、内存等资源情况,当把两个需要消耗大内存的task调度到一起,很容易出现OOM 把资源强制划分为map/reduce slot,当只有map task时,reduce slot不能用;当只有reduce task时,map slot不能用,容易造成资源利用不足。

Yarn架构
Yarn/MRv2最基本的想法是将原JobTracker主要的资源管理和job调度/监视功能分开作为两个单独的守护进程。有一个全局的ResourceManager(RM)和每个Application有一个ApplicationMaster(AM),Application相当于map-reduce job或者DAG jobs。ResourceManager和NodeManager(NM)组成了基本的数据计算框架。ResourceManager协调集群的资源利用,任何client或者运行着的applicatitonMaster想要运行job或者task都得向RM申请一定的资源。ApplicatonMaster是一个框架特殊的库,对于MapReduce框架而言有它自己的AM实现,用户也可以实现自己的AM,在运行的时候,AM会与NM一起来启动和监视tasks。 
ResourceManager
ResourceManager作为资源的协调者有两个主要的组件:Scheduler和ApplicationsManager(AsM)。
Scheduler负责分配最少但满足application运行所需的资源量给Application。Scheduler只是基于资源的使用情况进行调度,并不负责监视/跟踪application的状态,当然也不会处理失败的task。RM使用resource container概念来管理集群的资源,resource container是资源的抽象,每个container包括一定的内存、IO、网络等资源,不过目前的实现只包括内存一种资源。
ApplicationsManager负责处理client提交的job以及协商第一个container以供applicationMaster运行,并且在applicationMaster失败的时候会重新启动applicationMaster。下面阐述RM具体完成的一些功能。
 
  • 资源调度:Scheduler从所有运行着的application收到资源请求后构建一个全局的资源分配计划,然后根据application特殊的限制以及全局的一些限制条件分配资源。
  • 资源监视:Scheduler会周期性的接收来自NM的资源使用率的监控信息,另外applicationMaster可以从Scheduler得到属于它的已完成的container的状态信息。
  • Application提交:
    • client向AsM获得一个applicationIDclient将application定义以及需要的jar包
    • client将application定义以及需要的jar包文件等上传到hdfs的指定目录,由yarn-site.xml的yarn.app.mapreduce.am.staging-dir指定
    • client构造资源请求的对象以及application的提交上下文发送给AsM
    • AsM接收application的提交上下文
    • AsM根据application的信息向Scheduler协商一个Container供applicationMaster运行,然后启动applicationMaster
    • 向该container所属的NM发送launchContainer信息启动该container,也即启动applicationMaster、AsM向client提供运行着的AM的状态信息。
  • AM的生命周期:AsM负责系统中所有AM的生命周期的管理。AsM负责AM的启动,当AM启动后,AM会周期性的向AsM发送heartbeat,默认是1s,AsM据此了解AM的存活情况,并且在AM失败时负责重启AM,若是一定时间过后(默认10分钟)没有收到AM的heartbeat,AsM就认为该AM失败了。
 关于ResourceManager的可用性目前还没有很好的实现,不过Cloudera公司的CDH4.4以后的版本实现了一个简单的高可用性,使用了Hadoop-common项目中HA部分的代码,采用了类似hdfs namenode高可用性的设计,给RM引入了active和standby状态,不过没有与journalnode相对应的角色,只是由zookeeper来负责维护RM的状态,这样的设计只是一个最简单的方案,避免了手动重启RM,离真正的生产可用还有一段距离。NodeManagerNM主要负责启动RM分配给AM的container以及代表AM的container,并且会监视container的运行情况。在启动container的时候,NM会设置一些必要的环境变量以及将container运行所需的jar包、文件等从hdfs下载到本地,也就是所谓的资源本地化;当所有准备工作做好后,才会启动代表该container的脚本将程序启动起来。启动起来后,NM会周期性的监视该container运行占用的资源情况,若是超过了该container所声明的资源量,则会kill掉该container所代表的进程。另外,NM还提供了一个简单的服务以管理它所在机器的本地目录。Applications可以继续访问本地目录即使那台机器上已经没有了属于它的container在运行。例如,Map-Reduce应用程序使用这个服务存储map output并且shuffle它们给相应的reduce task。在NM上还可以扩展自己的服务,yarn提供了一个yarn.nodemanager.aux-services的配置项,通过该配置,用户可以自定义一些服务,例如Map-Reduce的shuffle功能就是采用这种方式实现的。NM在本地为每个运行着的application生成如下的目录结构:Container目录下的目录结构如下: 在启动一个container的时候,NM就执行该container的default_container_executor.sh,该脚本内部会执行launch_container.sh。launch_container.sh会先设置一些环境变量,最后启动执行程序的命令。对于MapReduce而言,启动AM就执行org.apache.hadoop.mapreduce.v2.app.MRAppMaster;启动map/reduce task就执行org.apache.hadoop.mapred.YarnChild。 ApplicationMasterApplicationMaster是一个框架特殊的库,对于Map-Reduce计算模型而言有它自己的ApplicationMaster实现,对于其他的想要运行在yarn上的计算模型而言,必须得实现针对该计算模型的ApplicationMaster用以向RM申请资源运行task,比如运行在yarn上的spark框架也有对应的ApplicationMaster实现,归根结底,yarn是一个资源管理的框架,并不是一个计算框架,要想在yarn上运行应用程序,还得有特定的计算框架的实现。由于yarn是伴随着MRv2一起出现的,所以下面简要概述MRv2在yarn上的运行流程。MRv2运行流程: 
  • MR JobClient向resourceManager(AsM)提交一个job
  • AsM向Scheduler请求一个供MR AM运行的container,然后启动它
  • MR AM启动起来后向AsM注册
  • MR JobClient向AsM获取到MR AM相关的信息,然后直接与MR AM进行通信
  • MR AM计算splits并为所有的map构造资源请求
  • MR AM做一些必要的MR OutputCommitter的准备工作
  • MR AM向RM(Scheduler)发起资源请求,得到一组供map/reduce task运行的container,然后与NM一起对每一个container执行一些必要的任务,包括资源本地化等
  • MR AM 监视运行着的task 直到完成,当task失败时,申请新的container运行失败的task
  • 当每个map/reduce task完成后,MR AM运行MR OutputCommitter的cleanup 代码,也就是进行一些收尾工作
  • 当所有的map/reduce完成后,MR AM运行OutputCommitter的必要的job commit或者abort APIs
  • MR AM退出。


在Yarn上写应用程序
在yarn上写应用程序并不同于我们熟知的MapReduce应用程序,必须牢记yarn只是一个资源管理的框架,并不是一个计算框架,计算框架可以运行在yarn上。我们所能做的就是向RM申请container,然后配合NM一起来启动container。就像MRv2一样,jobclient请求用于MR AM运行的container,设置环境变量和启动命令,然后交由NM去启动MR AM,随后map/reduce task就由MR AM全权负责,当然task的启动也是由MR AM向RM申请container,然后配合NM一起来启动的。所以要想在yarn上运行非特定计算框架的程序,我们就得实现自己的client和applicationMaster。另外我们自定义的AM需要放在各个NM的classpath下,因为AM可能运行在任何NM所在的机器上。

hadoop 三节点集群安装配置详细实例

唐半张 发表了文章 0 个评论 1948 次浏览 2015-10-08 10:43 来自相关话题

hadoop 三节点集群安装配置详细实例 topo节点: 192.168.10.46 Hadoop46 192.168.10.47 Hadoop47 192.168.10.48 Hadoop48 Ha ...查看全部
hadoop 三节点集群安装配置详细实例
topo节点:
192.168.10.46 Hadoop46
192.168.10.47 Hadoop47
192.168.10.48 Hadoop48
Hadoop的守护进程deamons:NameNode/DataNode 和 JobTracker/TaskTracker。其中NameNode/DataNode工作在HDFS层,JobTracker/TaskTracker工作在MapReduce层。
设备列表中Hadoop48是master,担任namenode和jobtracker,46,47为slave,担任datanode和tasktracker。secondary namenode在hadoop 1.03中被废弃,用checkpoint node或backupnode来代替。暂没有配checkpoint node或backupnode。

在各机器建立用户zhouhh,可选自己喜欢的名称,用于管理hadoop。
网络准备
先对每个节点完成单节点设置,见我此前文章:10分钟从无到有搭建hadoop环境并测试mapreduce http://abloz.com/2012/05/22/10-minutes-from-scratch-to-build-hadoop-environment-and-test-mapreduce.html。
从http://labs.renren.com/apache-mirror/hadoop/common/下载最新版本hadoop
wget http://labs.renren.com/apache-mirror/hadoop/common/hadoop-1.0.3/hadoop-1.0.3.tar.gz
然后分发到各机器,并在各机器解压,配置,测试单台设备ok。
[zhouhh@Hadoop48 ~]$ cat /etc/redhat-release
CentOS release 5.5 (Final)
[zhouhh@Hadoop48 ~]$ cat /etc/hosts
# Do not remove the following line, or various programs
# that require network functionality will fail.
127.0.0.1 localhost.localdomain localhost
::1 localhost6.localdomain6 localhost6
192.168.10.46 Hadoop46
192.168.10.47 Hadoop47
192.168.10.48 Hadoop48
[zhouhh@Hadoop48 ~]$ ping Hadoop46
PING Hadoop46 (192.168.10.46) 56(84) bytes of data.
64 bytes from Hadoop46 (192.168.10.46): icmp_seq=1 ttl=64 time=5.25 ms
64 bytes from Hadoop46 (192.168.10.46): icmp_seq=2 ttl=64 time=0.428 ms
— Hadoop46 ping statistics —
2 packets transmitted, 2 received, 0% packet loss, time 1009ms
rtt min/avg/max/mdev = 0.428/2.843/5.259/2.416 ms
[zhouhh@Hadoop48 ~]$ ping Hadoop47
PING Hadoop47 (192.168.10.47) 56(84) bytes of data.
64 bytes from Hadoop47 (192.168.10.47): icmp_seq=1 ttl=64 time=7.08 ms
64 bytes from Hadoop47 (192.168.10.47): icmp_seq=2 ttl=64 time=4.27 ms
— Hadoop47 ping statistics —
2 packets transmitted, 2 received, 0% packet loss, time 1007ms
rtt min/avg/max/mdev = 4.277/5.678/7.080/1.403 ms
[zhouhh@Hadoop48 ~]$ ssh-keygen -t rsa -P “”
[zhouhh@Hadoop48 ~]$ cd .ssh
[zhouhh@Hadoop48 .ssh]$ cat id_rsa.pub >> authorized_keys
由于安全原因,如果各节点的ssh连接不是标准端口,可以配置一个config文件,以方便ssh Hadoop46这样的命令自动连接。
如果是标准端口标准key文件名的话通过hosts的解析就可以用ssh Hadoop46这样的命令自动登录了。
config文件格式:
 [zhouhh@Hadoop48 .ssh]$ vi config
Host Hadoop46
Port 22
HostName 192.168.10.46
IdentityFile ~/.ssh/id_rsa
Host Hadoop47
Port 22
HostName 192.168.10.47
IdentityFile ~/.ssh/id_rsa
Host Hadoop48
Port 22
HostName 192.168.10.48
IdentityFile ~/.ssh/id_rsa
[zhouhh@Hadoop48 ~]$ ssh-copy-id -i .ssh/id_rsa zhouhh@Hadoop46
[zhouhh@Hadoop48 ~]$ ssh-copy-id -i .ssh/id_rsa zhouhh@Hadoop47
测试用key实现无密码登录,都应该成功:
[zhouhh@Hadoop48 ~]$ ssh Hadoop46
[zhouhh@Hadoop48 ~]$ ssh Hadoop47
[zhouhh@Hadoop48 ~]$ ssh Hadoop48
拷贝私钥:
[zhouhh@Hadoop47 .ssh]$ scp zhouhh@Hadoop48:~/.ssh/id_rsa .
[zhouhh@Hadoop47 .ssh]$ scp zhouhh@Hadoop48:~/.ssh/config .
[zhouhh@Hadoop46 .ssh]$ scp zhouhh@Hadoop48:~/.ssh/id_rsa .
[zhouhh@Hadoop46 .ssh]$ scp zhouhh@Hadoop48:~/.ssh/config .
至此,完成了互联互通。
================= [b]下面完成配置[/b] =================
环境变量:
[zhouhh@Hadoop48 ~]$ vi .bashrc
export HADOOP_HOME=/home/zhouhh/hadoop-1.0.3
export HADOOP_HOME_WARN_SUPPRESS=1
unalias fs &> /dev/null
alias fs="hadoop fs"
unalias hls &> /dev/null
alias hls="fs -ls"
export PATH=$PATH:$HADOOP_HOME/bin
[zhouhh@Hadoop48 ~]$ source .bashrc
[zhouhh@Hadoop48 ~]$ cd hadoop-1.0.3
[zhouhh@Hadoop48 hadoop-1.0.3]$ cd conf
[zhouhh@Hadoop48 conf]$ ls
capacity-scheduler.xml fair-scheduler.xml hdfs-default.xml mapred-queue-acls.xml ssl-client.xml.example
configuration.xsl hadoop-env.sh hdfs-site.xml mapred-site.xml ssl-server.xml.example
core-default.xml hadoop-metrics2.properties log4j.properties masters taskcontroller.cfg
core-site.xml hadoop-policy.xml mapred-default.xml slaves
其中几个*default.xml文件是我从相应的src中拷贝过来的,用于配置参考。
配置文件包括环境和配置参数两部分。环境是bin目录下脚本需要的,在hadoop-env.sh 中配置。配置参数在*-site.xml中配置。

masters文件和slaves文件,仅方便用同时管理多台设备的启动和停止,也可以用手动方式来启动:
bin/hadoop-daemon.sh start [namenode | secondarynamenode | datanode | jobtracker | tasktracker]
运行bin/start-dfs.sh,表示是该设备是 NameNode,运行bin/start-mapred.sh表示该设备是 JobTracker。NameNode和JobTracker可以是同一台机器,也可以分开。
bin/start-all.sh, stop-all.sh这两个脚本在1.03中被废弃,被bin/start-dfs.sh ,bin/start-mapred.sh和bin/stop-dfs.sh,bin/stop-mapred.sh所替代。
[zhouhh@Hadoop48 conf]$ vi masters
Hadoop48
[zhouhh@Hadoop48 conf]$ vi slaves
Hadoop46
Hadoop47

只读配置文件:src/core/core-default.xml, src/hdfs/hdfs-default.xml, src/mapred/mapred-default.xml
可以用于配置参考。
这三个文件用于实际配置:conf/core-site.xml, conf/hdfs-site.xml,conf/mapred-site.xml
另外,可以通过配置conf/hadoop-env.sh来控制bin目录下执行脚本的变量
配置core-site.xml
可以参考手册和src/core/core-default.xml
[zhouhh@Hadoop48 conf]$ vi core-site.xml


hadoop.mydata.dir
/home/zhouhh/myhadoop
A base for other directories.${user.name}



hadoop.tmp.dir
/tmp/hadoop-${user.name}
A base for other temporary directories.



fs.default.name
hdfs://Hadoop48:54310
The name of the default file system. A URI whose
scheme and authority determine the FileSystem implementation. The
uri's scheme determines the config property (fs.SCHEME.impl) naming
the FileSystem implementation class. The uri's authority is used to
determine the host, port, etc. for a filesystem.




其中hadoop.mydata.dir 是我自定义的变量,用于作为数据根目录,以后hdfs的dfs.name.dir和dfs.data.dir全配在该分区下面。
这里,config配置文件有几个变量可以用:
${hadoop.home.dir} 和$HADOOP_HOME 一致。${user.name}和用户名一致。
[zhouhh@Hadoop48 conf]$ vi hdfs-site.xml


hadoop.mydata.dir
/home/zhouhh/myhadoop
A base for other directories.${user.name}



hadoop.tmp.dir
/tmp/hadoop-${user.name}
A base for other temporary directories.



fs.default.name
hdfs://Hadoop48:54310
The name of the default file system. A URI whose
scheme and authority determine the FileSystem implementation. The
uri's scheme determines the config property (fs.SCHEME.impl) naming
the FileSystem implementation class. The uri's authority is used to
determine the host, port, etc. for a filesystem.




 

[zhouhh@Hadoop48 conf]$ vi mapred-site.xml


mapred.job.tracker
Hadoop48:54311
The host and port that the MapReduce job tracker runs
at. If "local", then jobs are run in-process as a single map
and reduce task.




mapred.local.dir
${hadoop.tmp.dir}/mapred/local
The local directory where MapReduce stores intermediate
data files. May be a comma-separated list of
directories on different devices in order to spread disk i/o.
Directories that do not exist are ignored.




mapred.system.dir
${hadoop.mydata.dir}/mapred/system
The directory where MapReduce stores control files.



mapred.tasktracker.map.tasks.maximum
2
The maximum number of map tasks that will be run
simultaneously by a task tracker.vary it depending on your hardware




mapred.tasktracker.reduce.tasks.maximum
2
The maximum number of reduce tasks that will be run
simultaneously by a task tracker.vary it depending on your hardware



 

 

配置可能会随实际情况增减。尤其是有时端口冲突,导致datanode或tasktracker起不来,需求增加相应的配置。参考对应的default配置文件和手册完成。
将配置拷贝到47,46两台机器。


[zhouhh@Hadoop48 hadoop-1.0.3]$ ./bin/hadoop namenode -format
12/05/23 17:04:42 INFO namenode.NameNode: STARTUP_MSG:
/************************************************************
STARTUP_MSG: Starting NameNode
STARTUP_MSG: host = Hadoop48/192.168.10.48
STARTUP_MSG: args = [-format]
STARTUP_MSG: version = 1.0.3
STARTUP_MSG: build = https://svn.apache.org/repos/asf/hadoop/common/branches/branch-1.0 -r 1335192; compiled by ‘hortonfo’ on Tue May 8 20:31:25 UTC 2012
************************************************************/
12/05/23 17:04:42 INFO util.GSet: VM type = 64-bit
12/05/23 17:04:42 INFO util.GSet: 2% max memory = 17.77875 MB
12/05/23 17:04:42 INFO util.GSet: capacity = 2^21 = 2097152 entries
12/05/23 17:04:42 INFO util.GSet: recommended=2097152, actual=2097152
12/05/23 17:04:42 INFO namenode.FSNamesystem: fsOwner=zhouhh
12/05/23 17:04:42 INFO namenode.FSNamesystem: supergroup=supergroup
12/05/23 17:04:42 INFO namenode.FSNamesystem: isPermissionEnabled=true
12/05/23 17:04:42 INFO namenode.FSNamesystem: dfs.block.invalidate.limit=100
12/05/23 17:04:42 INFO namenode.FSNamesystem: isAccessTokenEnabled=false accessKeyUpdateInterval=0 min(s), accessTokenLifetime=0 min(s)
12/05/23 17:04:42 INFO namenode.NameNode: Caching file names occuring more than 10 times
12/05/23 17:04:42 INFO common.Storage: Image file of size 112 saved in 0 seconds.
12/05/23 17:04:42 INFO common.Storage: Storage directory /home/zhouhh/myhadoop/dfs/name has been successfully formatted.
12/05/23 17:04:42 INFO namenode.NameNode: SHUTDOWN_MSG:
/************************************************************
SHUTDOWN_MSG: Shutting down NameNode at Hadoop48/192.168.10.48
************************************************************/


因为我前面在.bashrc中加了路径和环境变量,因此,也可以直接用
[zhouhh@Hadoop48 hadoop-1.0.3]$ hadoop namenode -format
该命令格式化hdfs-site.xml里面定义的dfs.name.dir路径,用于保存跟踪和协同DataNode的信息。
[zhouhh@Hadoop48 ~]$ find myhadoop/
myhadoop/
myhadoop/dfs
myhadoop/dfs/name
myhadoop/dfs/name/previous.checkpoint
myhadoop/dfs/name/previous.checkpoint/fstime
myhadoop/dfs/name/previous.checkpoint/edits
myhadoop/dfs/name/previous.checkpoint/fsimage
myhadoop/dfs/name/previous.checkpoint/VERSION
myhadoop/dfs/name/image
myhadoop/dfs/name/image/fsimage
myhadoop/dfs/name/current
myhadoop/dfs/name/current/fstime
myhadoop/dfs/name/current/edits
myhadoop/dfs/name/current/fsimage
myhadoop/dfs/name/current/VERSION
[zhouhh@Hadoop48 hadoop-1.0.3]$ start-dfs.sh
starting namenode, logging to /home/zhouhh/hadoop-1.0.3/libexec/../logs/hadoop-zhouhh-namenode-Hadoop48.out
Hadoop46: Bad owner or permissions on /home/zhouhh/.ssh/config
Hadoop47: Bad owner or permissions on /home/zhouhh/.ssh/config
Hadoop48: Bad owner or permissions on /home/zhouhh/.ssh/config
[zhouhh@Hadoop48 .ssh]$ ls -l
total 20
-rw——- 1 zhouhh zhouhh 794 Apr 13 10:21 authorized_keys
-rw-rw-r– 1 zhouhh zhouhh 288 May 23 10:37 config
原来config文件权限不对
[zhouhh@Hadoop48 .ssh]$ chmod 600 config
[zhouhh@Hadoop48 .ssh]$ ls -l
total 20
-rw——- 1 zhouhh zhouhh 794 Apr 13 10:21 authorized_keys
-rw——- 1 zhouhh zhouhh 288 May 23 10:37 config
[zhouhh@Hadoop48 ~]$ start-dfs.sh
starting namenode, logging to /home/zhouhh/hadoop-1.0.3/libexec/../logs/hadoop-zhouhh-namenode-Hadoop48.out
Hadoop47: bash: line 0: cd: /home/zhouhh/hadoop-1.0.3/libexec/..: No such file or directory
Hadoop47: bash: /home/zhouhh/hadoop-1.0.3/bin/hadoop-daemon.sh: No such file or directory
Hadoop46: starting datanode, logging to /home/zhouhh/hadoop-1.0.3/libexec/../logs/hadoop-zhouhh-datanode-Hadoop46.out
Hadoop48: starting secondarynamenode, logging to /home/zhouhh/hadoop-1.0.3/libexec/../logs/hadoop-zhouhh-secondarynamenode-Hadoop48.out
start-dfs.sh会启动本机NameNode 和 conf/slaves 添加的DataNode
[zhouhh@Hadoop48 ~]$ ssh Hadoop47
Last login: Tue May 22 17:57:01 2012 from hadoop48
[zhouhh@Hadoop47 ~]$
[zhouhh@Hadoop47 hadoop-1.0.3]$ vi conf/hadoop-env.sh
配置$JAVA_HOME为正确的路径。
Hadoop46做同样处理。
[zhouhh@Hadoop48 ~]$ start-dfs.sh
starting namenode, logging to /home/zhouhh/hadoop-1.0.3/libexec/../logs/hadoop-zhouhh-namenode-Hadoop48.out
Hadoop47: starting datanode, logging to /home/zhouhh/hadoop-1.0.3/libexec/../logs/hadoop-zhouhh-datanode-Hadoop47.out
Hadoop46: starting datanode, logging to /home/zhouhh/hadoop-1.0.3/libexec/../logs/hadoop-zhouhh-datanode-Hadoop46.out
Hadoop48: secondarynamenode running as process 23491. Stop it first.
HDFS已经运行成功
排错
[zhouhh@Hadoop47 logs]$ vi hadoop-zhouhh-datanode-Hadoop47.log
2012-05-23 17:17:14,230 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: STARTUP_MSG:
/************************************************************
STARTUP_MSG: Starting DataNode
STARTUP_MSG: host = Hadoop47/192.168.10.47
STARTUP_MSG: args = []
STARTUP_MSG: version = 1.0.3
STARTUP_MSG: build = https://svn.apache.org/repos/asf/hadoop/common/branches/branch-1.0 -r 1335192; compiled by ‘hortonfo’ on Tue May 8 20:31:25 UTC 2012
************************************************************/
2012-05-23 17:17:14,762 INFO org.apache.hadoop.metrics2.impl.MetricsConfig: loaded properties from hadoop-metrics2.properties
2012-05-23 17:17:14,772 INFO org.apache.hadoop.metrics2.impl.MetricsSourceAdapter: MBean for source MetricsSystem,sub=Stats registered.
2012-05-23 17:17:14,772 INFO org.apache.hadoop.metrics2.impl.MetricsSystemImpl: Scheduled snapshot period at 10 second(s).
2012-05-23 17:17:14,772 INFO org.apache.hadoop.metrics2.impl.MetricsSystemImpl: DataNode metrics system started
2012-05-23 17:17:14,907 INFO org.apache.hadoop.metrics2.impl.MetricsSourceAdapter: MBean for source ugi registered.
2012-05-23 17:17:15,064 INFO org.apache.hadoop.util.NativeCodeLoader: Loaded the native-hadoop library
2012-05-23 17:17:15,187 ERROR org.apache.hadoop.hdfs.server.datanode.DataNode: java.lang.IllegalArgumentException: Does not contain a valid host:port authority: file:///
at org.apache.hadoop.net.NetUtils.createSocketAddr(NetUtils.java:162)
at org.apache.hadoop.hdfs.server.namenode.NameNode.getAddress(NameNode.java:198)
at org.apache.hadoop.hdfs.server.namenode.NameNode.getAddress(NameNode.java:228)
at org.apache.hadoop.hdfs.server.namenode.NameNode.getServiceAddress(NameNode.java:222)
at org.apache.hadoop.hdfs.server.datanode.DataNode.startDataNode(DataNode.java:337)
at org.apache.hadoop.hdfs.server.datanode.DataNode.(DataNode.java:299)
at org.apache.hadoop.hdfs.server.datanode.DataNode.makeInstance(DataNode.java:1582)
at org.apache.hadoop.hdfs.server.datanode.DataNode.instantiateDataNode(DataNode.java:1521)
at org.apache.hadoop.hdfs.server.datanode.DataNode.createDataNode(DataNode.java:1539)
at org.apache.hadoop.hdfs.server.datanode.DataNode.secureMain(DataNode.java:1665)
at org.apache.hadoop.hdfs.server.datanode.DataNode.main(DataNode.java:1682)
2012-05-23 17:17:15,187 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: SHUTDOWN_MSG:
/************************************************************
SHUTDOWN_MSG: Shutting down DataNode at Hadoop47/192.168.10.47
************************************************************/
同样,需要配置相关的端口
[zhouhh@Hadoop48 bin]$ start-mapred.sh
[zhouhh@Hadoop48 ~]$ ssh Hadoop46
Last login: Wed May 23 17:33:05 2012 from hadoop47
[zhouhh@Hadoop46 ~]$ cd hadoop-1.0.3/logs
[zhouhh@Hadoop46 logs]$ vi hadoop-zhouhh-datanode-Hadoop46.log
2012-05-23 17:38:46,062 INFO org.apache.hadoop.ipc.Client: Retrying connect to server: Hadoop48/192.168.10.48:54310. Already tried 0 time(s).
2012-05-23 17:38:47,065 INFO org.apache.hadoop.ipc.Client: Retrying connect to server: Hadoop48/192.168.10.48:54310. Already tried 1 time(s).
[zhouhh@Hadoop46 logs]$ vi hadoop-zhouhh-tasktracker-Hadoop46.log
2012-05-23 17:58:13,356 INFO org.apache.hadoop.ipc.Server: IPC Server handler 3 on 54550: starting
2012-05-23 17:58:14,428 INFO org.apache.hadoop.ipc.Client: Retrying connect to server: Hadoop48/192.168.10.48:54311. Already tried 0 time(s).
2012-05-23 17:58:15,430 INFO org.apache.hadoop.ipc.Client: Retrying connect to server: Hadoop48/192.168.10.48:54311. Already tried 1 time(s).
[zhouhh@Hadoop48 conf]$ netstat -antp | grep 54310
(Not all processes could be identified, non-owned process info
will not be shown, you would have to be root to see it all.)
tcp 0 0 192.168.10.48:54310 192.168.20.188:30300 ESTABLISHED 20469/python
[zhouhh@Hadoop48 conf]$ netstat -antp | grep 54311
(Not all processes could be identified, non-owned process info
will not be shown, you would have to be root to see it all.)
tcp 0 0 192.168.10.48:54311 192.168.20.188:30300 TIME_WAIT -
原来端口被占用了,将相关占用端口python程序停掉。
[zhouhh@Hadoop48 hadoop-1.0.3]$ stop-mapred.sh
[zhouhh@Hadoop48 hadoop-1.0.3]$ stop-dfs.sh
[zhouhh@Hadoop48 hadoop-1.0.3]$ start-dfs.sh
starting namenode, logging to /home/zhouhh/hadoop-1.0.3/libexec/../logs/hadoop-zhouhh-namenode-Hadoop48.out
Hadoop47: starting datanode, logging to /home/zhouhh/hadoop-1.0.3/libexec/../logs/hadoop-zhouhh-datanode-Hadoop47.out
Hadoop46: starting datanode, logging to /home/zhouhh/hadoop-1.0.3/libexec/../logs/hadoop-zhouhh-datanode-Hadoop46.out
Hadoop48: starting secondarynamenode, logging to /home/zhouhh/hadoop-1.0.3/libexec/../logs/hadoop-zhouhh-secondarynamenode-Hadoop48.out
[zhouhh@Hadoop48 hadoop-1.0.3]$ netstat -antp | grep 54310
(Not all processes could be identified, non-owned process info
will not be shown, you would have to be root to see it all.)
tcp 0 0 192.168.10.48:54310 0.0.0.0:* LISTEN 24716/java
tcp 0 0 192.168.10.48:51040 192.168.10.48:54310 TIME_WAIT -
tcp 0 0 192.168.10.48:51038 192.168.10.48:54310 TIME_WAIT -
tcp 0 0 192.168.10.48:54310 192.168.10.46:38202 ESTABLISHED 24716/java
[zhouhh@Hadoop48 hadoop-1.0.3]$ start-mapred.sh
starting jobtracker, logging to /home/zhouhh/hadoop-1.0.3/libexec/../logs/hadoop-zhouhh-jobtracker-Hadoop48.out
Hadoop46: starting tasktracker, logging to /home/zhouhh/hadoop-1.0.3/libexec/../logs/hadoop-zhouhh-tasktracker-Hadoop46.out
Hadoop47: starting tasktracker, logging to /home/zhouhh/hadoop-1.0.3/libexec/../logs/hadoop-zhouhh-tasktracker-Hadoop47.out
[zhouhh@Hadoop48 hadoop-1.0.3]$ netstat -antp | grep 54311
(Not all processes could be identified, non-owned process info
will not be shown, you would have to be root to see it all.)
tcp 0 0 192.168.10.48:54311 0.0.0.0:* LISTEN 25238/java
tcp 0 0 192.168.10.48:54311 192.168.10.46:33561 ESTABLISHED 25238/java
tcp 0 0 192.168.10.48:54311 192.168.10.47:55277 ESTABLISHED 25238/java
查看DataNode的log,已经正常。
[zhouhh@Hadoop48 hadoop-1.0.3]$ jps
24716 NameNode
25625 Jps
25238 JobTracker
24909 SecondaryNameNode
[zhouhh@Hadoop46 ~]$ jps
10649 TaskTracker
10352 DataNode
10912 Jps
MapReduce 测试
==========================
[zhouhh@Hadoop48 ~]$ vi test.txt
a b c d
a b c d
aa bb cc dd
ee ff gg hh
由前面.bashrc设置,fs为hadoop dfs的别称
hls为 hadoop -ls的别称
[zhouhh@Hadoop48 hadoop-1.0.3]$ fs -put test.txt test.txt
[zhouhh@Hadoop48 hadoop-1.0.3]$ hls
Found 1 items
-rw-r–r– 3 zhouhh supergroup 40 2012-05-23 19:39 /user/zhouhh/test.txt
执行mapreduce测试wordcount例子:


[zhouhh@Hadoop48 hadoop-1.0.3]$ ./bin/hadoop jar hadoop-examples-1.0.3.jar wordcount /user/zhouhh/test.txt output
12/05/23 19:40:52 INFO input.FileInputFormat: Total input paths to process : 1
12/05/23 19:40:52 INFO util.NativeCodeLoader: Loaded the native-hadoop library
12/05/23 19:40:52 WARN snappy.LoadSnappy: Snappy native library not loaded
12/05/23 19:40:52 INFO mapred.JobClient: Running job: job_201205231824_0001
12/05/23 19:40:53 INFO mapred.JobClient: map 0% reduce 0%
12/05/23 19:41:07 INFO mapred.JobClient: map 100% reduce 0%
12/05/23 19:41:19 INFO mapred.JobClient: map 100% reduce 100%
12/05/23 19:41:24 INFO mapred.JobClient: Job complete: job_201205231824_0001
12/05/23 19:41:24 INFO mapred.JobClient: Counters: 29
12/05/23 19:41:24 INFO mapred.JobClient: Job Counters
12/05/23 19:41:24 INFO mapred.JobClient: Launched reduce tasks=1
12/05/23 19:41:24 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=11561
12/05/23 19:41:24 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0
12/05/23 19:41:24 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0
12/05/23 19:41:24 INFO mapred.JobClient: Launched map tasks=1
12/05/23 19:41:24 INFO mapred.JobClient: Data-local map tasks=1
12/05/23 19:41:24 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=9934
12/05/23 19:41:24 INFO mapred.JobClient: File Output Format Counters
12/05/23 19:41:24 INFO mapred.JobClient: Bytes Written=56
12/05/23 19:41:24 INFO mapred.JobClient: FileSystemCounters
12/05/23 19:41:24 INFO mapred.JobClient: FILE_BYTES_READ=110
12/05/23 19:41:24 INFO mapred.JobClient: HDFS_BYTES_READ=147
12/05/23 19:41:24 INFO mapred.JobClient: FILE_BYTES_WRITTEN=43581
12/05/23 19:41:24 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=56
12/05/23 19:41:24 INFO mapred.JobClient: File Input Format Counters
12/05/23 19:41:24 INFO mapred.JobClient: Bytes Read=40
12/05/23 19:41:24 INFO mapred.JobClient: Map-Reduce Framework
12/05/23 19:41:24 INFO mapred.JobClient: Map output materialized bytes=110
12/05/23 19:41:24 INFO mapred.JobClient: Map input records=4
12/05/23 19:41:24 INFO mapred.JobClient: Reduce shuffle bytes=110
12/05/23 19:41:24 INFO mapred.JobClient: Spilled Records=24
12/05/23 19:41:24 INFO mapred.JobClient: Map output bytes=104
12/05/23 19:41:24 INFO mapred.JobClient: CPU time spent (ms)=1490
12/05/23 19:41:24 INFO mapred.JobClient: Total committed heap usage (bytes)=194969600
12/05/23 19:41:24 INFO mapred.JobClient: Combine input records=16
12/05/23 19:41:24 INFO mapred.JobClient: SPLIT_RAW_BYTES=107
12/05/23 19:41:24 INFO mapred.JobClient: Reduce input records=12
12/05/23 19:41:24 INFO mapred.JobClient: Reduce input groups=12
12/05/23 19:41:24 INFO mapred.JobClient: Combine output records=12
12/05/23 19:41:24 INFO mapred.JobClient: Physical memory (bytes) snapshot=271958016
12/05/23 19:41:24 INFO mapred.JobClient: Reduce output records=12
12/05/23 19:41:24 INFO mapred.JobClient: Virtual memory (bytes) snapshot=1126625280
12/05/23 19:41:24 INFO mapred.JobClient: Map output records=16


可见,效率不高,但成功了。
[zhouhh@Hadoop48 ~]$ hls
Found 2 items
drwxr-xr-x – zhouhh supergroup 0 2012-05-23 19:41 /user/zhouhh/output
-rw-r–r– 3 zhouhh supergroup 40 2012-05-23 19:39 /user/zhouhh/test.txt
hls所列,实际存在于分布式系统中。
[zhouhh@Hadoop48 ~]$ hadoop dfs -get /user/zhouhh/output .
[zhouhh@Hadoop48 ~]$ cat output/*
cat: output/_logs: Is a directory
a 2
aa 1
b 2
bb 1
c 2
cc 1
d 2
dd 1
ee 1
ff 1
gg 1
hh 1
或直接远程查看:
[zhouhh@Hadoop48 ~]$ hadoop dfs -cat output/*
cat: File does not exist: /user/zhouhh/output/_logs
a 2
aa 1

可见,分布式hadoop配置成功。

YARN应用场景、原理与基本架构

唐半张 发表了文章 0 个评论 2250 次浏览 2015-10-08 10:39 来自相关话题

YARN应用场景、原理与基本架构 (1) YARN概念解释 ResourceManager: 整个集群的资源管理器,负责集群资源的统一管理与调度,包括处理客户端请求,启动和监控ApplicationMaster,监控NodeManage ...查看全部
YARN应用场景、原理与基本架构
(1) YARN概念解释
ResourceManager: 整个集群的资源管理器,负责集群资源的统一管理与调度,包括处理客户端请求,启动和监控ApplicationMaster,监控NodeManager,进行资源的统一调度与分配等。
NodeManager: 为集群中节点所拥有的资源管理器,负责所在节点的资源管理与使用,包括负责所在节点上的资源管理和任务调度,处理来自ResourceManager的命令,处理来自ApplicationMaster的命令等。
资源调度器: 对多种类型资源进行调度的工具。目前支持CPU和内存两种关键资源。可使用FIFO、Fair Scheduler、Capacity Scheduler等资源调度算法。
Container: 对任务运行环境的抽象,描述了一系列信息,包括任务运行资源(节点、内存、CPU)、任务启动命令、任务运行环境等。

(2) YARN带来的好处:
1) 解决Hadoop 1.0中应用JobTracker扩展性受限的问题。
2) 解决Hadoop 1.0中JobTracker单点故障问题。
3) 使Hadoop支持MapReduce之外的框架,充分利用和共享集群的软硬件资源。

(3) MapReduce、Tez、Storm、Spark四个框架的异同:
虽然都运行在Hadoop上面(确切地说,是YARN上面),四个框架分别专注于离线计算、DAG计算、流式计算、内存计算的问题,相应的应用领域有所不同;从原理上分析,它们在YARN之上的实现机制不同。

(4) 技术选型
1) 视频流实时分析:Storm——流式计算框架的实时流处理特性正好适合视频流实时分析。
2) 搜索引擎日志分析(TB级):MapReduce——专长于海量日志数据的离线处理,为此而生。
3) 迭代式的数据挖掘算法,比如K-Means:Tez——DAG计算框架,适合于多个作业之间存在数据依赖关系、形成类似有向无环图结构的计算,比如K-means算法的计算。
4) 使用类SQL语言分析数据:HBase——HBase拥有类似SQL的数据查询语言HQL。

Hadoop YARN中内存和CPU两种资源的调度和隔离

唐半张 发表了文章 0 个评论 1726 次浏览 2015-10-08 10:39 来自相关话题

Hadoop YARN中内存和CPU两种资源的调度和隔离 Hadoop  YARN同时支持内存和CPU两种资源的调度(默认只支持内存,如果想进一步调度CPU,需要自己进行一些配置),本文将介绍YARN是如何对这些资源进行调度和隔离的。 ...查看全部
Hadoop YARN中内存和CPU两种资源的调度和隔离
Hadoop  YARN同时支持内存和CPU两种资源的调度(默认只支持内存,如果想进一步调度CPU,需要自己进行一些配置),本文将介绍YARN是如何对这些资源进行调度和隔离的。
在YARN中,资源管理由ResourceManager和NodeManager共同完成,其中,ResourceManager中的调度器负责 资源的分配,而NodeManager则负责资源的供给和隔离。ResourceManager将某个NodeManager上资源分配给任务(这就是所 谓的“资源调度”)后,NodeManager需按照要求为任务提供相应的资源,甚至保证这些资源应具有独占性,为任务运行提供基础的保证,这就是所谓的 资源隔离。
关于Hadoop YARN资源调度器的详细介绍,可参考我的这篇文章:YARN/MRv2 Resource Manager深入剖析—资源调度器
在正式介绍具体的资源调度和隔离之前,先品味一下内存和CPU这两种资源的特点,这是两种性质不同的资源。内存资源的多少会会决定任务的生死,如果内存不够,任务可能会运行失败;相比之下,CPU资源则不同,它只会决定任务运行的快慢,不会对生死产生影响。
【YARN中内存资源的调度和隔离】
基于以上考虑,YARN允许用户配置每个节点上可用的物理内存资源,注意,这里是“可用的”,因为一个节点上的内存会被若干个服务共享,比如一部分给YARN,一部分给HDFS,一部分给HBase等,YARN配置的只是自己可以使用的,配置参数如下:
(1)yarn.nodemanager.resource.memory-mb
表示该节点上YARN可使用的物理内存总量,默认是8192(MB),注意,如果你的节点内存资源不够8GB,则需要调减小这个值,而YARN不会智能的探测节点的物理内存总量。
(2)yarn.nodemanager.vmem-pmem-ratio
任务每使用1MB物理内存,最多可使用虚拟内存量,默认是2.1。
(3) yarn.nodemanager.pmem-check-enabled
是否启动一个线程检查每个任务正使用的物理内存量,如果任务超出分配值,则直接将其杀掉,默认是true。
(4) yarn.nodemanager.vmem-check-enabled
是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认是true。
(5)yarn.scheduler.minimum-allocation-mb
单个任务可申请的最少物理内存量,默认是1024(MB),如果一个任务申请的物理内存量少于该值,则该对应的值改为这个数。
(6)yarn.scheduler.maximum-allocation-mb
单个任务可申请的最多物理内存量,默认是8192(MB)。
默认情况下,YARN采用了线程监控的方法判断任务是否超量使用内存,一旦发现超量,则直接将其杀死。由于Cgroups对内存的控制缺乏灵活性 (即任务任何时刻不能超过内存上限,如果超过,则直接将其杀死或者报OOM),而Java进程在创建瞬间内存将翻倍,之后骤降到正常值,这种情况下,采用 线程监控的方式更加灵活(当发现进程树内存瞬间翻倍超过设定值时,可认为是正常现象,不会将任务杀死),因此YARN未提供Cgroups内存隔离机制。
【YARN中CPU资源的调度和隔离】
在YARN中,CPU资源的组织方式仍在探索中,目前(2.2.0版本)只是一个初步的,非常粗粒度的实现方式,更细粒度的CPU划分方式已经提出来了,正在完善和实现中。
目前的CPU被划分成虚拟CPU(CPU virtual Core),这里的虚拟CPU是YARN自己引入的概念,初衷是,考虑到不同节点的CPU性能可能不同,每个CPU具有的计算能力也是不一样的,比如某个 物理CPU的计算能力可能是另外一个物理CPU的2倍,这时候,你可以通过为第一个物理CPU多配置几个虚拟CPU弥补这种差异。用户提交作业时,可以指 定每个任务需要的虚拟CPU个数。在YARN中,CPU相关配置参数如下:
(1)yarn.nodemanager.resource.cpu-vcores
表示该节点上YARN可使用的虚拟CPU个数,默认是8,注意,目前推荐将该值设值为与物理CPU核数数目相同。如果你的节点CPU核数不够8个,则需要调减小这个值,而YARN不会智能的探测节点的物理CPU总数。
(2) yarn.scheduler.minimum-allocation-vcores
单个任务可申请的最小虚拟CPU个数,默认是1,如果一个任务申请的CPU个数少于该数,则该对应的值改为这个数。
(3)yarn.scheduler.maximum-allocation-vcores
单个任务可申请的最多虚拟CPU个数,默认是32。
默认情况下,YARN是不会对CPU资源进行调度的,你需要配置相应的资源调度器让你支持,具体可参考我的这两篇文章:
(1)Hadoop YARN配置参数剖析(4)—Fair Scheduler相关参数
(2)Hadoop YARN配置参数剖析(5)—Capacity Scheduler相关参数
默认情况下,NodeManager不会对CPU资源进行任何隔离,你可以通过启用Cgroups让你支持CPU隔离。
由于CPU资源的独特性,目前这种CPU分配方式仍然是粗粒度的。举个例子,很多任务可能是IO密集型的,消耗的CPU资源非常少,如果此时你为它 分配一个CPU,则是一种严重浪费,你完全可以让他与其他几个任务公用一个CPU,也就是说,我们需要支持更粒度的CPU表达方式。
借鉴亚马逊EC2中CPU资源的划分方式,即提出了CPU最小单位为EC2 Compute Unit(ECU),一个ECU代表相当于1.0-1.2 GHz 2007 Opteron or 2007 Xeon处理器的处理能力。YARN提出了CPU最小单位YARN Compute Unit(YCU),目前这个数是一个整数,默认是720,由参数yarn.nodemanager.resource.cpu-ycus-per- core设置,表示一个CPU core具备的计算能力(该feature在2.2.0版本中并不存在,可能增加到2.3.0版本中),这样,用户提交作业时,直接指定需要的YCU即 可,比如指定值为360,表示用1/2个CPU core,实际表现为,只使用一个CPU core的1/2计算时间。注意,在操作系统层,CPU资源是按照时间片分配的,你可以说,一个进程使用1/3的CPU时间片,或者1/5的时间片。
https://issues.apache.org/jira/browse/YARN-1089
https://issues.apache.org/jira/browse/YARN-1024
Hadoop 新特性、改进、优化和Bug分析系列5:YARN-3
【总结】
目前,YARN 内存资源调度借鉴了Hadoop 1.0中的方式,比较合理,但CPU资源的调度方式仍在不断改进中,目前只是一个初步的粗糙实现,相信在不久的将来,YARN 中CPU资源的调度将更加完善。

Spark与Hadoop计算模型的比较分析

唐半张 发表了文章 0 个评论 1855 次浏览 2015-10-08 10:37 来自相关话题

Spark与Hadoop计算模型的比较分析 Spark是一个通用的并行计算框架,由UCBerkeley的AMP实验室开发。 1.Spark的中间数据放到内存中,对于迭代运算效率比较高。 Spark更适合于迭代运算比较多的ML ...查看全部
Spark与Hadoop计算模型的比较分析
Spark是一个通用的并行计算框架,由UCBerkeley的AMP实验室开发。
1.Spark的中间数据放到内存中,对于迭代运算效率比较高。
Spark更适合于迭代运算比较多的ML和DM运算。因为在Spark里面,有RDD的概念。
2.Spark比Hadoop更通用。
      Spark提供的数据集操作类型有很多种,不像Hadoop只提供了Map和Reduce两种操作。比如map, filter, flatMap,sample, groupByKey, reduceByKey, union, join, cogroup, mapValues, sort,partionBy等多种操作类型,他们把这些操作称为Transformations。同时还提供Count, collect, reduce, looku p, save等多种actions。
     这些多种多样的数据集操作类型,给上层应用者提供了方便。各个处理节点之间的通信模型不再像Hadoop那样就是唯一的Data Shuffle一种模式。用户可以命名,物化,控制中间结果的分区等。可以说编程模型比Hadoop更灵活。
    不过论文中也提到,Spark不适用那种异步细粒度更新状态的应用,例如web服务的存储或者是增量的web爬虫和索引。就是对于那种增量修改的应用模型,当然不适合把大量数据拿到内存中了。增量改动完了,也就不用了,不需要迭代了。
3.容错性。
从Spark的论文《Resilient Distributed Datasets: AFault-Tolerant Abstraction for In-Memory Cluster Computing》中没看出容错性做的有多好。倒是提到了分布式数据集计算,做checkpoint的两种方式,一个是checkpoint data,一个是logging the updates。貌似Spark采用了后者。但是文中后来又提到,虽然后者看似节省存储空间。但是由于数据处理模型是类似DAG的操作过程,由于图中的某个节点出错,由于lineage chains的依赖复杂性,可能会引起全部计算节点的重新计算,这样成本也不低。他们后来说,是存数据,还是存更新日志,做checkpoint还是由用户说了算吧。相当于什么都没说,又把这个皮球踢给了用户。所以我看就是由用户根据业务类型,衡量是存储数据IO和磁盘空间的代价和重新计算的代价,选择代价较小的一种策略。
4.关于Spark和Hadoop的融合
不知道Apache基金会的人怎么想的,我看Spark还是应该融入到Hadoop生态系统中。从Hadoop 0.23把MapReduce做成了库,看出Hadoop的目标是要支持包括MapReduce在内的更多的并行计算模型,比如MPI,Spark等。毕竟现在Hadoop的单节点CPU利用率并不高,那么假如这种迭代密集型运算是和现有平台的互补。同时,这对资源调度系统就提出了更高的要求。有关资源调度方面,UC Berkeley貌似也在做一个Mesos的东西,还用了Linux container,统一调度Hadoop和其他应用模型。
 
 

Hadoop2.2.0中HDFS的高可用性实现原理

唐半张 发表了文章 0 个评论 1606 次浏览 2015-10-08 10:35 来自相关话题

在Hadoop2.0.0之前,NameNode(NN)在HDFS集群中存在单点故障(single point of failure),每一个集群中存在一个NameNode,如果NN所在的机器出现了故障,那么将导致整个集群无法利用,直到NN重启或者在另一台主机上 ...查看全部
Hadoop2.0.0之前,NameNode(NN)在HDFS集群中存在单点故障(single point of failure),每一个集群中存在一个NameNode,如果NN所在的机器出现了故障,那么将导致整个集群无法利用,直到NN重启或者在另一台主机上启动NN守护线程。
主要在两方面影响了HDFS的可用性:
(1)、在不可预测的情况下,如果NN所在的机器崩溃了,整个集群将无法利用,直到NN被重新启动;
(2)、在可预知的情况下,比如NN所在的机器硬件或者软件需要升级,将导致集群宕机。
HDFS的高可用性将通过在同一个集群中运行两个NN(active NN & standby NN)来解决上面两个问题,这种方案允许在机器破溃或者机器维护快速地启用一个新的NN来恢复故障。
在典型的HA集群中,通常有两台不同的机器充当NN。在任何时间,只有一台机器处于Active状态;另一台机器是处于Standby状态。Active NN负责集群中所有客户端的操作;而Standby NN主要用于备用,它主要维持足够的状态,如果必要,可以提供快速的故障恢复。
为了让Standby NN的状态和Active NN保持同步,即元数据保持一致,它们都将会和JournalNodes守护进程通信。当Active NN执行任何有关命名空间的修改,它需要持久化到一半以上的JournalNodes上(通过edits log持久化存储),而Standby NN负责观察edits log的变化,它能够读取从JNs中读取edits信息,并更新其内部的命名空间。一旦Active NN出现故障,Standby NN将会保证从JNs中读出了全部的Edits,然后切换成Active状态。Standby NN读取全部的edits可确保发生故障转移之前,是和Active NN拥有完全同步的命名空间状态。
为了提供快速的故障恢复,Standby NN也需要保存集群中各个文件块的存储位置。为了实现这个,集群中所有的Database将配置好Active NN和Standby NN的位置,并向它们发送块文件所在的位置及心跳,如下图所示:

Hadoop中HDFS高可用性实现
   在任何时候,集群中只有一个NN处于Active 状态是极其重要的。否则,在两个Active NN的状态下NameSpace状态将会出现分歧,这将会导致数据的丢失及其它不正确的结果。为了保证这种情况不会发生,在任何时间,JNs只允许一个NN充当writer。在故障恢复期间,将要变成Active 状态的NN将取得writer的角色,并阻止另外一个NN继续处于Active状态。
为了部署HA集群,你需要准备以下事项:
(1)、NameNode machines:运行Active NN和Standby NN的机器需要相同的硬件配置;
(2)、JournalNode machines:也就是运行JN的机器。JN守护进程相对来说比较轻量,所以这些守护进程可以可其他守护线程(比如NN,YARN ResourceManager)运行在同一台机器上。在一个集群中,最少要运行3个JN守护进程,这将使得系统有一定的容错能力。当然,你也可以运行3个以上的JN,但是为了增加系统的容错能力,你应该运行奇数个JN(3、5、7等),当运行N个JN,系统将最多容忍(N-1)/2个JN崩溃。
在HA集群中,Standby NN也执行namespace状态的checkpoints,所以不必要运行Secondary NN、CheckpointNode和BackupNode;事实上,运行这些守护进程是错误的。
 

Hadoop shell命令

唐半张 发表了文章 0 个评论 1809 次浏览 2015-10-08 10:23 来自相关话题

FS Shell 调用文件系统(FS)Shell命令应使用 bin/hadoop fs 的形式。 所有的的FS shell命令使用URI路径作为参数。URI格式是scheme://authority/path。对HDFS文件系统,scheme是hd ...查看全部
FS Shell 调用文件系统(FS)Shell命令应使用 bin/hadoop fs 的形式。 所有的的FS shell命令使用URI路径作为参数。URI格式是scheme://authority/path。对HDFS文件系统,scheme是hdfs,对本地文件系统,scheme是file。其中scheme和authority参数都是可选的,如果未加指定,就会使用配置中指定的默认scheme。一个HDFS文件或目录比如/parent/child可以表示成hdfs://namenode:namenodeport/parent/child,或者更简单的/parent/child(假设你配置文件中的默认值是namenode:namenodeport)。大多数FS Shell命令的行为和对应的Unix Shell命令类似,不同之处会在下面介绍各命令使用详情时指出。出错信息会输出到stderr,其他信息输出到stdout。 
cat 使用方法:hadoop fs -cat URI [URI …] 
将路径指定文件的内容输出到stdout
示例:
  • hadoop fs -cat hdfs://host1:port1/file1 hdfs://host2:port2/file2
  • hadoop fs -cat file:///file3 /user/hadoop/file4 
返回值:成功返回0,失败返回-1。 chgrp 使用方法:hadoop fs -chgrp [-R] GROUP URI [URI …] Change group association of files. With -R, make the change recursively through the directory structure. The user must be the owner of files, or else a super-user. Additional information is in the Permissions User Guide. --> 改变文件所属的组。使用-R将使改变在目录结构下递归进行。命令的使用者必须是文件的所有者或者超级用户。更多的信息请参见HDFS权限用户指南。 chmod 使用方法:hadoop fs -chmod [-R] URI [URI …] 改变文件的权限。使用-R将使改变在目录结构下递归进行。命令的使用者必须是文件的所有者或者超级用户。更多的信息请参见HDFS权限用户指南。 chown 使用方法:hadoop fs -chown [-R] [OWNER][:[GROUP]] URI [URI ] 改变文件的拥有者。使用-R将使改变在目录结构下递归进行。命令的使用者必须是超级用户。更多的信息请参见HDFS权限用户指南。 copyFromLocal使用方法:hadoop fs -copyFromLocal URI 除了限定源路径是一个本地文件外,和put命令相似。copyToLocal使用方法:hadoop fs -copyToLocal [-ignorecrc] [-crc] URI  除了限定目标路径是一个本地文件外,和get命令类似。cp 使用方法:hadoop fs -cp URI [URI …]  将文件从源路径复制到目标路径。这个命令允许有多个源路径,此时目标路径必须是一个目录。 示例:
  • hadoop fs -cp /user/hadoop/file1 /user/hadoop/file2
  • hadoop fs -cp /user/hadoop/file1 /user/hadoop/file2 /user/hadoop/dir 
返回值:成功返回0,失败返回-1。 du使用方法:hadoop fs -du URI [URI …] 显示目录中所有文件的大小,或者当只指定一个文件时,显示此文件的大小。示例:hadoop fs -du /user/hadoop/dir1 /user/hadoop/file1 hdfs://host:port/user/hadoop/dir1 返回值:成功返回0,失败返回-1。 dus 使用方法:hadoop fs -dus  显示文件的大小。 expunge 使用方法:hadoop fs -expunge 清空回收站。请参考HDFS设计文档以获取更多关于回收站特性的信息。 get 使用方法:hadoop fs -get [-ignorecrc] [-crc]  复制文件到本地文件系统。可用-ignorecrc选项复制CRC校验失败的文件。使用-crc选项复制文件以及CRC信息。 示例:
  • hadoop fs -get /user/hadoop/file localfile
  • hadoop fs -get hdfs://host:port/user/hadoop/file localfile
返回值:成功返回0,失败返回-1。 getmerge 使用方法:hadoop fs -getmerge [addnl] 接受一个源目录和一个目标文件作为输入,并且将源目录中所有的文件连接成本地目标文件。addnl是可选的,用于指定在每个文件结尾添加一个换行符。 ls 使用方法:hadoop fs -ls  如果是文件,则按照如下格式返回文件信息:文件名 <副本数> 文件大小 修改日期 修改时间 权限 用户ID 组ID 如果是目录,则返回它直接子文件的一个列表,就像在Unix中一样。目录返回列表的信息如下:目录名 修改日期 修改时间 权限 用户ID 组ID 示例:hadoop fs -ls /user/hadoop/file1 /user/hadoop/file2 hdfs://host:port/user/hadoop/dir1 /nonexistentfile 返回值:成功返回0,失败返回-1。 lsr使用方法:hadoop fs -lsr  ls命令的递归版本。类似于Unix中的ls -R。 mkdir 使用方法:hadoop fs -mkdir  接受路径指定的uri作为参数,创建这些目录。其行为类似于Unix的mkdir -p,它会创建路径中的各级父目录。示例:
  • hadoop fs -mkdir /user/hadoop/dir1 /user/hadoop/dir2
  • hadoop fs -mkdir hdfs://host1:port1/user/hadoop/dir hdfs://host2:port2/user/hadoop/dir 
返回值:成功返回0,失败返回-1。 movefromLocal 使用方法:dfs -moveFromLocal  输出一个”not implemented“信息。 mv 使用方法:hadoop fs -mv URI [URI …]  将文件从源路径移动到目标路径。这个命令允许有多个源路径,此时目标路径必须是一个目录。不允许在不同的文件系统间移动文件。 示例: 
  • hadoop fs -mv /user/hadoop/file1 /user/hadoop/file2
  • hadoop fs -mv hdfs://host:port/file1 hdfs://host:port/file2 hdfs://host:port/file3 hdfs://host:port/dir1
返回值:成功返回0,失败返回-1。 put 使用方法:hadoop fs -put ...  从本地文件系统中复制单个或多个源路径到目标文件系统。也支持从标准输入中读取输入写入目标文件系统。
  • hadoop fs -put localfile /user/hadoop/hadoopfile
  • hadoop fs -put localfile1 localfile2 /user/hadoop/hadoopdir
  • hadoop fs -put localfile hdfs://host:port/hadoop/hadoopfile
  • hadoop fs -put - hdfs://host:port/hadoop/hadoopfile 从标准输入中读取输入。
返回值:成功返回0,失败返回-1。 rm 使用方法:hadoop fs -rm URI [URI …] 删除指定的文件。只删除非空目录和文件。请参考rmr命令了解递归删除。示例: 
  • hadoop fs -rm hdfs://host:port/file /user/hadoop/emptydir 
返回值:成功返回0,失败返回-1。 rmr 使用方法:hadoop fs -rmr URI [URI …] delete的递归版本。示例: 
  • hadoop fs -rmr /user/hadoop/dir
  • hadoop fs -rmr hdfs://host:port/user/hadoop/dir 
返回值:成功返回0,失败返回-1。 setrep 使用方法:hadoop fs -setrep [-R]  改变一个文件的副本系数。-R选项用于递归改变目录下所有文件的副本系数。 示例:
  • hadoop fs -setrep -w 3 -R /user/hadoop/dir1 
返回值:成功返回0,失败返回-1。 stat 使用方法:hadoop fs -stat URI [URI …] 返回指定路径的统计信息。 示例:
  • hadoop fs -stat path 
返回值:成功返回0,失败返回-1。 tail 使用方法:hadoop fs -tail [-f] URI 将文件尾部1K字节的内容输出到stdout。支持-f选项,行为和Unix中一致。 示例:
  • hadoop fs -tail pathname 
返回值:成功返回0,失败返回-1。 test 使用方法:hadoop fs -test -[ezd] URI 选项:-e 检查文件是否存在。如果存在则返回0。-z 检查文件是否是0字节。如果是则返回0。 -d 如果路径是个目录,则返回1,否则返回0。示例:
  • hadoop fs -test -e filename 
text 使用方法:hadoop fs -text  将源文件输出为文本格式。允许的格式是zip和TextRecordInputStream。 touchz 使用方法:hadoop fs -touchz URI [URI …] 创建一个0字节的空文件。 示例:
  • hadoop -touchz pathname 

返回值:
成功返回0,失败返回-1。