Yarn

Yarn

yarn是怎么管理节点的?

回复

zxx99 发起了问题 1 人关注 0 个回复 1691 次浏览 2020-01-02 09:40 来自相关话题

3台机器,start-yarn.sh之后,各个服务都成功启动了,但是Master:8088里的active nodes的数量一直为1.

回复

fly321283 发起了问题 1 人关注 0 个回复 2274 次浏览 2019-11-01 16:47 来自相关话题

NodeManager无法连接到ResourceManager

fish 回复了问题 2 人关注 1 个回复 3124 次浏览 2017-06-09 11:13 来自相关话题

yarn和hdfs关系

fish 回复了问题 2 人关注 1 个回复 3124 次浏览 2017-05-02 18:17 来自相关话题

hadoop-2.7.3+zookeeper-3.4.8+hbase-1.2.2 HA 两个ResourceManager无法启动(急)

Dong 回复了问题 2 人关注 1 个回复 1820 次浏览 2017-04-26 18:54 来自相关话题

集群部署问题

fish 回复了问题 3 人关注 2 个回复 1250 次浏览 2017-04-07 10:26 来自相关话题

Hadoop yarn基于deadline的调度器如何获取用户输入的deadline

Dong 回复了问题 3 人关注 2 个回复 1573 次浏览 2017-03-21 07:54 来自相关话题

不用zookeeper 只使用HA+yarn主备resourceManager如何配置?

fish 回复了问题 3 人关注 2 个回复 3080 次浏览 2017-02-09 09:37 来自相关话题

yarn 基于标签调度的问题

wangxiaolei 回复了问题 2 人关注 3 个回复 2326 次浏览 2017-01-06 14:45 来自相关话题

Yarn容器调度器(Capacity Scheduler)

回复

亡命天涯 发起了问题 2 人关注 0 个回复 3434 次浏览 2016-12-13 14:13 来自相关话题

yarn中的ResourceManger Restart与NodeManager Restart

fish 回复了问题 2 人关注 1 个回复 2362 次浏览 2016-12-09 23:41 来自相关话题

Yarn Container 怎么划分CPU资源

fish 回复了问题 2 人关注 1 个回复 3955 次浏览 2016-10-14 23:50 来自相关话题

关于yarn模式的spark的安装

小谦 回复了问题 4 人关注 5 个回复 3178 次浏览 2016-09-19 15:51 来自相关话题

capacity-scheduler调度器的default队列,执行任务卡在ACCEPTED阶段

fish 回复了问题 3 人关注 11 个回复 3806 次浏览 2016-09-13 14:59 来自相关话题

如何动态增减yarn集群中的机器?

wangxiaolei 回复了问题 3 人关注 2 个回复 3118 次浏览 2016-07-26 18:51 来自相关话题

关于yarn中的applicationmaster疑问

wangxiaolei 回复了问题 2 人关注 1 个回复 2822 次浏览 2016-07-18 18:37 来自相关话题

Hadoop2.x中MRAppMaster中用启动用户任务的父进程实现

fish 回复了问题 3 人关注 2 个回复 2325 次浏览 2016-07-06 17:09 来自相关话题

zeppelin on yarn,在启动后,任务在什么时候会被杀死

fish 回复了问题 2 人关注 1 个回复 3270 次浏览 2016-05-31 22:05 来自相关话题

yarn是怎么管理节点的?

回复

zxx99 发起了问题 1 人关注 0 个回复 1691 次浏览 2020-01-02 09:40 来自相关话题

3台机器,start-yarn.sh之后,各个服务都成功启动了,但是Master:8088里的active nodes的数量一直为1.

回复

fly321283 发起了问题 1 人关注 0 个回复 2274 次浏览 2019-11-01 16:47 来自相关话题

NodeManager无法连接到ResourceManager

回复

fish 回复了问题 2 人关注 1 个回复 3124 次浏览 2017-06-09 11:13 来自相关话题

yarn和hdfs关系

回复

fish 回复了问题 2 人关注 1 个回复 3124 次浏览 2017-05-02 18:17 来自相关话题

hadoop-2.7.3+zookeeper-3.4.8+hbase-1.2.2 HA 两个ResourceManager无法启动(急)

回复

Dong 回复了问题 2 人关注 1 个回复 1820 次浏览 2017-04-26 18:54 来自相关话题

集群部署问题

回复

fish 回复了问题 3 人关注 2 个回复 1250 次浏览 2017-04-07 10:26 来自相关话题

Hadoop yarn基于deadline的调度器如何获取用户输入的deadline

回复

Dong 回复了问题 3 人关注 2 个回复 1573 次浏览 2017-03-21 07:54 来自相关话题

不用zookeeper 只使用HA+yarn主备resourceManager如何配置?

回复

fish 回复了问题 3 人关注 2 个回复 3080 次浏览 2017-02-09 09:37 来自相关话题

yarn 基于标签调度的问题

回复

wangxiaolei 回复了问题 2 人关注 3 个回复 2326 次浏览 2017-01-06 14:45 来自相关话题

Yarn容器调度器(Capacity Scheduler)

回复

亡命天涯 发起了问题 2 人关注 0 个回复 3434 次浏览 2016-12-13 14:13 来自相关话题

yarn中的ResourceManger Restart与NodeManager Restart

回复

fish 回复了问题 2 人关注 1 个回复 2362 次浏览 2016-12-09 23:41 来自相关话题

Yarn Container 怎么划分CPU资源

回复

fish 回复了问题 2 人关注 1 个回复 3955 次浏览 2016-10-14 23:50 来自相关话题

关于yarn模式的spark的安装

回复

小谦 回复了问题 4 人关注 5 个回复 3178 次浏览 2016-09-19 15:51 来自相关话题

capacity-scheduler调度器的default队列,执行任务卡在ACCEPTED阶段

回复

fish 回复了问题 3 人关注 11 个回复 3806 次浏览 2016-09-13 14:59 来自相关话题

如何动态增减yarn集群中的机器?

回复

wangxiaolei 回复了问题 3 人关注 2 个回复 3118 次浏览 2016-07-26 18:51 来自相关话题

关于yarn中的applicationmaster疑问

回复

wangxiaolei 回复了问题 2 人关注 1 个回复 2822 次浏览 2016-07-18 18:37 来自相关话题

Hadoop2.x中MRAppMaster中用启动用户任务的父进程实现

回复

fish 回复了问题 3 人关注 2 个回复 2325 次浏览 2016-07-06 17:09 来自相关话题

zeppelin on yarn,在启动后,任务在什么时候会被杀死

回复

fish 回复了问题 2 人关注 1 个回复 3270 次浏览 2016-05-31 22:05 来自相关话题

资源管理框架(mesos/YARN/coraca/Torca/Omega)选型分析

唐半张 发表了文章 0 个评论 2592 次浏览 2015-10-09 10:26 来自相关话题

1 资源调度的目标和价值 1.1 子系统高效调度 任务之间资源隔离,减少争抢。 任务分配调度时结合资源分配,各个任务分配合理的资源,充分利用系统资源,减少资源利用不充分的问题。 资源调度结合优先级,优先级高的分配更多的资源。 ...查看全部
1 资源调度的目标和价值
1.1 子系统高效调度
任务之间资源隔离,减少争抢。 任务分配调度时结合资源分配,各个任务分配合理的资源,充分利用系统资源,减少资源利用不充分的问题。 资源调度结合优先级,优先级高的分配更多的资源。
1.2 提高全系统的资源利用率
各个子系统,存在不同时期,对资源需求不一样的情况,平滑系统资源的利用。
1.3 支持动态调整切分资源,增强系统扩展性。
系统对资源的规划很难一次性准确,通过mesos支持虚拟主机的方式,动态扩展。
2 资源调度使用限制以及难点
2.1 资源调度使用限制
资源调度是为了提高资源利用率,分配本身是存在一定的开销的,对实时性要求非常高的应用不适合(毫秒,秒级别的应用)。
2.2 应用(框架)比较难规划资源
资源框架通过算法分配资源,但是每个细粒度的具体的任务对资源的需求非常难预估。规划如果偏差比较大,反而会降低系统本身的性能。
2.3 mem使用分配难题 JVM虚拟机存在内存回收的问题,这个不是程序本身是不能干涉的。内存很难分配准确,如果内存分配过少会导致任务失败。分配过多,造成资源浪费。
3 业界资源调度框架
3.1 Mesos
3.1.1 背景
Mesos诞生于UC Berkeley的一个研究项目,现已成为Apache Incubator中的项目,当前有一些公司使用Mesos管理集群资源,比如Twitter。
3.1.2 架构

总体上看,Mesos是一个master/slave结构,其中,master是非常轻量级的,仅保存了framework(各种计算框架称为framework)和mesos slave的一些状态,而这些状态很容易通过framework和slave重新注册而重构,因而很容易使用了zookeeper解决mesos master的单点故障问题。
Mesos master实际上是一个全局资源调度器,采用某种策略将某个slave上的空闲资源分配给某一个framework,各种framework通过自己的调度器向Mesos master注册,以接入到Mesos中;而Mesos slave主要功能是汇报任务的状态和启动各个framework的executor(比如Hadoop的excutor就是TaskTracker)。
整个mesos系统采用了双层调度框架:第一层,由mesos将资源分配给框架;第二层,框架自己的调度器将资源分配给自己内部的任务。
在Mesos中,各种计算框架是完全融入Mesos中的,也就是说,如果你想在Mesos中添加一个新的计算框架,首先需要在Mesos中部署一套该框架; Mesos采用linux container对内存和cpu进行隔离。
3.1.3 优点
可以同时支持短类型任务以及长类型服务,比如webservice以及SQL service。 资源分配粒度粗,比较适合我们产品多种计算框架并存的现状。
3.1.4 缺点
Mesos中的DRF调度算法过分的追求公平,没有考虑到实际的应用需求。在实际生产线上,往往需要类似于Hadoop中Capacity Scheduler的调度机制,将所有资源分成若干个queue,每个queue分配一定量的资源,每个user有一定的资源使用上限;更使用的调度策略是应该支持每个queue可单独定制自己的调度器策略,如:FIFO,Priority等。
由于Mesos采用了双层调度机制,在实际调度时,将面临设计决策问题:第一层和第二层调度器分别实现哪几个调度机制,即:将大部分调度机制放到第一层调度器,还是第一层调度器仅支持简单的资源分配(分配比例由管理员指定)?
Mesos采用了Resource Offer机制(不同于Hadoop中的基于slot的调度机制),这种调度机制面临着资源碎片问题,即:每个节点上的资源不可能全部被分配完,剩下的一点可能不足以让任何任务运行,这样,便产生了类似于操作系统中的内存碎片问题。
3.2 YARN(Coroca)
3.2.1 背景
从hadoop 1.0发展而来,解决了hadoop1.0的单管理节点两个主要问题:
1、 单管理节点性能瓶颈。一个管理节点能管理的服务器不能无上限。
2、 Hadoop 1.0按照slot来划分资源,map slot的资源不能共享给reduce slot。造成资源浪费 很多公司都切换到hadoop 2.0,如淘宝天梯已经淘汰1.0,上线2.0。
3.2.2 架构

MRv2最基本的设计思想是将JobTracker的两个主要功能,即资源管理和作业调度/监控分成两个独立的进程。在该解决方案中包含两个组件:全局的ResourceManager(RM)和与每个应用相关的ApplicationMaster(AM)。这里的“应用”指一个单独的MapReduce作业或者DAG作业。RM和与NodeManager(NM,每个节点一个)共同组成整个数据计算框架。RM是系统中将资源分配给各个应用的最终决策者。AM实际上是一个具体的框架库,它的任务是【与RM协商获取应用所需资源】和【与NM合作,以完成执行和监控task的任务】。
调度器根据容量,队列等限制条件(如每个队列分配一定的资源,最多执行一定数量的作业等),将系统中的资源分配给各个正在运行的应用。这里的调度器是一个“纯调度器”,因为它不再负责监控或者跟踪应用的执行状态等,此外,他也不负责重新启动因应用执行失败或者硬件故障而产生的失败任务。调度器仅根据各个应用的资源需求进行调度,这是通过抽象概念“资源容器”完成的,资源容器(Resource Container)将内存,CPU,磁盘,网络等资源封装在一起,从而限定每个任务使用的资源量。
调度器是可插拔的组件,主要负责将集群中得资源分配给多个队列和应用。YARN自带了多个资源调度器,如Capacity Scheduler和Fair Scheduler等。
ASM主要负责接收作业,协商获取第一个容器用于执行AM和提供重启失败AM container的服务。
NM是每个节点上的框架代理,主要负责启动应用所需的容器,监控资源(内存,CPU,磁盘,网络等)的使用情况并将之汇报给调度器。
AM主要负责同调度器协商以获取合适的容器,并跟踪这些容器的状态和监控其进度。
3.2.3 优点
YARN作为hadoop 2.0,hadoop各个组件都快速的接入YARN框架,未来发展很快,默认支持调度算法更丰富。
3.2.4 缺点
ResourceManager负责所有应用的任务调度,各个应用作为YARN的一个client library。传统数据库应用,接入之后效率不高,比较困难。
3.3 Torca
3.3.1 背景
Torca作为Typhoon云平台的关键系统也就应运而生,已经在网页搜索、广告等广泛应用。
3.3.2 架构

分central manager和execute server。central manager是集群的任务调度中心,包含以下模块
Master Daemon:负责启动/重启其他进程。
Scheduler:管理多个优先级的任务队列,根据任务描述文件生成任务ClassAd(分类广告),通过collector匹配任务与资源,下发任务至Execute Server。
Collector:集群的数据中心,负责从Execute Server收集机器及任务状态。机器的动态信息由Execute Server上报,一些静态信息及机器无法上报的信息如机位从CMDB拉取;同时,也是集群的匹配中心,对任务与资源的ClassAd进行匹配(MatchMaking)。

用户通过submitter和jdf提交job,如果作业依赖的文件在提交机本地,则submitter自动将其copy到xfs,并且用digest做标记,同时对job的各个属性进行解析,以及进行有效性检查,如果没有问题后,将生成最终的作业描述,发给CM上的scheduler。scheduler执行一定的调度策略,当决定这个job可以调度时,就将其发给collector,则collector会返回给scheduler满足条件的机器列表,scheduler就可以向这些机器发出启动task的流程了。Execute server根据job描述,准备相应的作业执行环境,分配资源,创建container等,就会启动task,并且在zk上记入该task相应的name信息。
3.3.3 优点
资源分配的有一个匹配的map的过程,而不是如mesos一样直接把所有资源先分给一个框架。分配算法更合理,可以参考。
3.3.4 缺点
模仿hadoop搞了一套私有的xfs,mapreduce,完全没有生态圈。
3.4 Omega
3.4.1 背景

Google的论文《Omega flexible, scalable schedulers for large compute clusters》中把调度分为3代,第一代是独立的集群,第二代是两层调度(mesos,YARN)第三带是共享状态调度。
3.4.2 架构
为了克服双层调度器的以上两个缺点,Google开发了下一代资源管理系统Omega,Omega是一种基于共享状态的调度器,该调度器将双层调度器中的集中式资源调度模块简化成了一些持久化的共享数据(状态)和针对这些数据的验证代码,而这里的“共享数据”实际上就是整个集群的实时资源使用信息。一旦引入共享数据后,共享数据的并发访问方式就成为该系统设计的核心,而Omega则采用了传统数据库中基于多版本的并发访问控制方式(也称为“乐观锁”, MVCC, Multi-Version Concurrency Control),这大大提升了Omega的并发性。 由于Omega不再有集中式的调度模块,因此,不能像Mesos或者YARN那样,在一个统一模块中完成以下功能:对整个集群中的所有资源分组,限制每类应用程序的资源使用量,限制每个用户的资源使用量等,这些全部由各个应用程序调度器自我管理和控制,根据论文所述,Omega只是将优先级这一限制放到了共享数据的验证代码中,即当同时由多个应用程序申请同一份资源时,优先级最高的那个应用程序将获得该资源,其他资源限制全部下放到各个子调度器。 引入多版本并发控制后,限制该机制性能的一个因素是资源访问冲突的次数,冲突次数越多,系统性能下降的越快,而google通过实际负载测试证明,这种方式的冲突次数是完全可以接受的。 Omega论文中谈到,Omega是从Google现有系统上演化而来的。既然这篇论文只介绍了Omega的调度器架构,我们可推测它的整体架构类似于Mesos,这样,如果你了解Mesos,那么可知道,我们可以通过仅修改Mesos的Master将之改造成一个Omega。
3.4.3 优点
共享资源状态,支持更大的集群和更高的并发。
3.4.4 缺点
只有论文,无具体实现,在小集群下,没有优势。
5 选型建议
1)如果整个系统是hadoop应用和传统数据库并存的系统,可以考虑选用mesos,mesos是二层资源管理框架,更多的资源分配的权利提供了framework。
2)如果整个系统是纯粹的hadoop应用,考虑到YARN框架的发展,以及对框架开发对各个hadoop应用的支持,建议考虑YARN。
不管mesos和YARN本身,框架设计都考虑了的扩展性,但是原生的框架可能并非适用完全适用实际场景的应用,所以基于原有框架扩展分配策略是非常重要的,大家可以一起探讨下框架本身的限制以及修改扩展思路?

yarn(hadoop2)框架的一些软件设计模式

唐半张 发表了文章 0 个评论 1613 次浏览 2015-10-09 09:30 来自相关话题

yarn(hadoop2)框架的一些软件设计模式 一、概述 我们都知道,yarn版本的hadoop无论是从架构上面还是软件设计的层面上面都比原始的hadoop版本有较大的改进。在架构方面,我们认为yarn模式是新一代的框架,这个在官方等 ...查看全部
yarn(hadoop2)框架的一些软件设计模式
一、概述
我们都知道,yarn版本的hadoop无论是从架构上面还是软件设计的层面上面都比原始的hadoop版本有较大的改进。在架构方面,我们认为yarn模式是新一代的框架,这个在官方等丛多的资料中说明得很详细了。在软件设计方面,我认为主要有以下的一些大的方面的改进:服务生命周期管理模式、事件驱动模式、状态驱动模式。这几个模式都写在hadoop-yarn-common中,接下来,我将详细说明这些模式。
二、服务生命周期管理模式一个对象肯定有生与死,那在我们设计中如何表示这一点呢?在业务系统中,我们一般是用spring,spring就负责管理对象的生命。在hadoop,我们没有必要引进spring这么厚重的容器。我们可以自行设计一套代码来管理我们服务的生命周期。那需要满足那些条件呢?
  • 一个服务的生命大概有4个状态:NOTINITED、INITED、STARTED、STOPPED。对应一些基本的操作,如:init start stop等。
  • 服务的状态变化会触发一些变化。可以用观察者模式。
  • 有组合服务的概念,因为我需要一个循环同时启动多个服。可以使用Composite模式。

那yarn的设计方面如下:

从中我们看出service这个设计正好满足我们的三个基本的要求。从图中,我看得很清楚,这个是一个典型的设计方案。一个接口,下面有一个抽象类,再有一个组合类。AbstractService其实实现了register()、unrgister()及状态变化后,调用Listener基本的功能。CompositeService实现了组合服务的需求,如:ResourceManager可以组合几个服务。在yarn中,Listener并没有实现异步。个人感觉主要有两个理由:第一,如图中,NodeManager既是Service又是Listener,如果异步有死锁的风险。第二,因为都是服务,其启动,停止调用次数都相对非常少,状态也不会经常发生变化,没有必要引入异步。
这一套机制其实在很多的框架中都有涉及,如jetty中的LifeCycle,其实和这个差不多的。
三、事件驱动模式
事件驱动模式最核心的部分就是一个异步dispatcher,以此来达到解耦的目的。我们看下yarn中怎么实现的,如下图:

 
这个也是一个典型的设计方案,我在以前的系统中经常这么设计事件的。其实这个也是监听者模式。在消息中间件中,我们往往引入中间的存储层——存储转发。其实这个在路由器中也是这样的。用到最后,其实都差不多,关键在于你能否看破。
需要注意的是,AsyncDispatcher也是一个service,这样ResourceManager等组合服务可以add AsyncDispatcher获得AsyncDispatcher事件转发的功能。
四、状态驱动模式
在设计模式中,有一个状态模式,其实我这里讲的理论就是有穷状态机。状态模式我们可以认为是摩尔型有限状态机 ,我们这里讲的主要是米利型有限状态机, yarn中实现的还是比较复杂的,可以看出他就是非确定型的自动机。在框架中还是比较少看见状态机的,这个可以仔细研究下,我们可以先看下RMNode状态机的状态图(这个图是根据RMNode状态机自动生成的)。

我们看到 任意两个状态之间的变化可以是任意的事件,并且可以是多个事件;同一个事件可以使一个状态迁移到多个不同的状态。我们可以认为这里的状态机是非确定性米利型有限状态机。这些知识在大学的编译原理上面讲过,我也是翻书的。我们看下yarn中的实现,如下图所示:

我认为其中最重要就是构建这个Map>> stateMachineTable对象,这里面存了状态机的元信息。后续调用完全是根据这个Map来运行的。重点讲下这个map的组成,从from到to端,第一个STATE是from端的状态。从一个状态转移可以有多个事件触发,其中的每一个事件可以有一个Transition,每个Transition就是有一个OPERAND操作。
一个Transition可以转移到多个to端的状态。可以从类图中看到Transition的两个实现类SingleInternalArc、MultipleInternalArc。MultipleInternalArc还带了一个默认的状态。
这个数据构建的时候用了builder模式,实现了FluentInterface: http://www.martinfowler.com/bliki/FluentInterface.html。客户端是直接使用StateMachine的接口调用的,当然这个StateMachine也是由StateMachineFactory构建的(make)。我们看下状态的执行流程:

 
在yarn中,我们看下事件驱动模式与状态驱动模式是怎么结合的,从中可以看出,状态机其实和事件是密不可分的。状态机的Transition也会产生一些Event再输出到AsynDispatcher中。

基于大数据分析系统Hadoop的13个开源工具

唐半张 发表了文章 0 个评论 2267 次浏览 2015-10-09 09:24 来自相关话题

Hadoop是由Apache基金会开发的一个大数据分布式系统基础架构,最早版本是2003年原Yahoo!DougCutting根据Google发布的学术论文研究而来。用户可以在不了解分布式底层细节的情况下,轻松地在Hadoop上开发和运行处理海量数据的应用程序 ...查看全部
Hadoop是由Apache基金会开发的一个大数据分布式系统基础架构,最早版本是2003年原Yahoo!DougCutting根据Google发布的学术论文研究而来。用户可以在不了解分布式底层细节的情况下,轻松地在Hadoop上开发和运行处理海量数据的应用程序。低成本、高可 靠、高扩展、高有效、高容错等特性让Hadoop成为最流行的大数据分析系统,然而其赖以生存的HDFS和MapReduce组件却让其一度陷入困境——批处理的工作方式让其只适用于离线数据处理,在要求实时性的场景下毫无用武之地。因此,各种基于Hadoop的工具应运而生,本次为大家分享Hadoop生态系统中最常用的13个开源工具,其中包括资源调度、流计算及各种业务针对应用场景。首先,我们看资源管理相关。
资源统一管理/调度系统

在公司和机构中,服务器往往会因为业务逻辑被拆分为多个集群,基于数据密集型的处理框架也是不断涌现,比如支持离线处理的MapReduce、支持在线处理的Storm及Impala、支持迭代计算的Spark及流处理框架S4,它们诞生于不同的实验室,并各有所长。为了减 少管理成本,提升资源的利用率,一个共同的想法产生——让这些框架运行在同一个集群上;因此,就有了当下众多的资源统一管理/调度系统,比如Google的Borg、Apache的YARN、Twitter的Mesos(已贡献给Apache基金会)、腾讯搜搜的Torca、FacebookCorona(开源),本次为大家重点介绍ApacheMesos及YARN:

1.ApacheMesos
    代码托管地址:ApacheSVN
    Mesos提供了高效、跨分布式应用程序和框架的资源隔离和共享,支持Hadoop、MPI、Hypertable、Spark等。
    Mesos是Apache孵化器中的一个开源项目,使用ZooKeeper实现容错复制,使用LinuxContainers来隔离任务, 支持多种资源计划分配(内存和CPU)。提供Java、Python和C++APIs来开发新的并行应用程序,提供基于Web的用户界面来提查看集群状 态。

2.HadoopYARN
    代码托管地址:ApacheSVN
    YARN又被称为MapReduce2.0,借鉴Mesos,YARN提出了资源隔离解决方案Container,但是目前尚未成熟,仅仅提供Java虚拟机内存的隔离。
    对比MapReduce1.x,YARN架构在客户端上并未做太大的改变,在调用API及接口上还保持大部分的兼容,然而在YARN中,开 发人员使用ResourceManager、ApplicationMaster与NodeManager代替了原框架中核心的JobTracker和TaskTracker。其中ResourceManager是一个中心的服务,负责调度、启动每一个Job所属的ApplicationMaster, 另外还监控ApplicationMaster的存在情况;NodeManager负责Container状态的维护,并向RM保持心跳。ApplicationMaster负责一个Job生命周期内的所有工作,类似老的框架中JobTracker。

Hadoop上的实时解决方案

前面我们有说过,在互联网公司中基于业务逻辑需求,企业往往会采用多种计算框架,比如从事搜索业务的公司:网页索引建立用MapReduce,自然语言处理用Spark等。本节为大家分享的则是Storm、Impala、Spark三个框架:

3.ClouderaImpala
    代码托管地址:GitHub
    Impala是由Cloudera开发,一个开源的MassivelyParallelProcessing(MPP)查询引擎。与Hive相同的元数据、SQL语法、ODBC驱动程序和用户接口(HueBeeswax),可以直接在HDFS或HBase上提供快速、交互式SQL查 询。Impala是在Dremel的启发下开发的,第一个版本发布于2012年末。
    Impala不再使用缓慢的Hive+MapReduce批处理,而是通过与商用并行关系数据库中类似的分布式查询引擎(由QueryPlanner、QueryCoordinator和QueryExecEngine三部分组成),可以直接从HDFS或者HBase中用SELECT、JOIN和统计函数查询数据,从而大大降低了延迟。

4.Spark
    代码托管地址:Apache
    Spark是个开源的数据分析集群计算框架,最初由加州大学伯克利分校AMPLab开发,建立于HDFS之上。Spark与Hadoop一样,用于构建大规模、低延时的数据分析应用。Spark采用Scala语言实现,使用Scala作为应用框架。
    Spark采用基于内存的分布式数据集,优化了迭代式的工作负载以及交互式查询。与Hadoop不同的是,Spark和Scala紧密集 成,Scala像管理本地collective对象那样管理分布式数据集。Spark支持分布式数据集上的迭代式任务,实际上可以在Hadoop文件系统 上与Hadoop一起运行(通过YARN、Mesos等实现)。

5.Storm
    代码托管地址:GitHub
    Storm是一个分布式的、容错的实时计算系统,由BackType开发,后被Twitter捕获。Storm属于流处理平台,多用于实时 计算并更新数据库。Storm也可被用于“连续计算”(continuouscomputation),对数据流做连续查询,在计算时就将结果以流的形式 输出给用户。它还可被用于“分布式RPC”,以并行的方式运行昂贵的运算。

Hadoop上的其它解决方案

就像前文说,基于业务对实时的需求,各个实验室发明了Storm、Impala、Spark、Samza等流实时处理工具。而本节我们将分 享的是实验室基于性能、兼容性、数据类型研究的开源解决方案,其中包括Shark、Phoenix、ApacheAccumulo、ApacheDrill、ApacheGiraph、ApacheHama、ApacheTez、ApacheAmbari。

6.Shark
    代码托管地址:GitHub
    Shark,代表了“HiveonSpark”,一个专为Spark打造的大规模数据仓库系统,兼容ApacheHive。无需修改现有的数据或者查询,就可以用100倍的速度执行HiveQL。
    Shark支持Hive查询语言、元存储、序列化格式及自定义函数,与现有Hive部署无缝集成,是一个更快、更强大的替代方案。

7.Phoenix
    代码托管地址:GitHub
    Phoenix是构建在ApacheHBase之上的一个SQL中间层,完全使用Java编写,提供了一个客户端可嵌入的JDBC驱动。Phoenix查询引擎会将SQL查询转换为一个或多个HBasescan,并编排执行以生成标准的JDBC结果集。直接使用HBaseAPI、协同处理 器与自定义过滤器,对于简单查询来说,其性能量级是毫秒,对于百万级别的行数来说,其性能量级是秒。Phoenix完全托管在GitHub之上。
    Phoenix值得关注的特性包括:1,嵌入式的JDBC驱动,实现了大部分的java.sql接口,包括元数据API;2,可以通过多个 行键或是键/值单元对列进行建模;3,DDL支持;4,版本化的模式仓库;5,DML支持;5,通过客户端的批处理实现的有限的事务支持;6,紧跟ANSISQL标准。

8.ApacheAccumulo
    代码托管地址:ApacheSVN
    ApacheAccumulo是一个可靠的、可伸缩的、高性能、排序分布式的键值存储解决方案,基于单元访问控制以及可定制的服务器端处 理。使用GoogleBigTable设计思路,基于ApacheHadoop、Zookeeper和Thrift构建。Accumulo最早由NSA开 发,后被捐献给了Apache基金会。
    对比GoogleBigTable,Accumulo主要提升在基于单元的访问及服务器端的编程机制,后一处修改让Accumulo可以在数据处理过程中任意点修改键值对。

9.ApacheDrill
    代码托管地址:GitHub
    本质上,ApacheDrill是GoogleDremel的开源实现,本质是一个分布式的mpp查询层,支持SQL及一些用于NoSQL和Hadoop数据存储系统上的语言,将有助于Hadoop用户实现更快查询海量数据集的目的。当下Drill还只能算上一个框架,只包含了Drill愿 景中的初始功能。
    Drill的目的在于支持更广泛的数据源、数据格式及查询语言,可以通过对PB字节数据的快速扫描(大约几秒内)完成相关分析,将是一个专为互动分析大型数据集的分布式系统。

10.ApacheGiraph
    代码托管地址:GitHub
    ApacheGiraph是一个可伸缩的分布式迭代图处理系统,灵感来自BSP(bulksynchronousparallel)和Google的Pregel,与它们区别于则是是开源、基于Hadoop的架构等。
    Giraph处理平台适用于运行大规模的逻辑计算,比如页面排行、共享链接、基于个性化排行等。Giraph专注于社交图计算,被Facebook作为其OpenGraph工具的核心,几分钟内处理数万亿次用户及其行为之间的连接。

11.ApacheHama
    代码托管地址:GitHub
    ApacheHama是一个建立在Hadoop上基于BSP(BulkSynchronousParallel)的计算框架,模仿了Google的Pregel。用来处理大规模的科学计算,特别是矩阵和图计算。集群环境中的系统架构由BSPMaster/GroomServer(ComputationEngine)、Zookeeper(DistributedLocking)、HDFS/HBase(StorageSystems)这3大块组成。

12.ApacheTez
    代码托管地址:GitHub
    ApacheTez是基于HadoopYarn之上的DAG(有向无环图,DirectedAcyclicGraph)计算框架。它把Map/Reduce过程拆分成若干个子过程,同时可以把多个Map/Reduce任务组合成一个较大的DAG任务,减少了Map/Reduce之间的文 件存储。同时合理组合其子过程,减少任务的运行时间。由Hortonworks开发并提供主要支持。

13.ApacheAmbari
    代码托管地址:ApacheSVN
    ApacheAmbari是一个供应、管理和监视ApacheHadoop集群的开源框架,它提供一个直观的操作工具和一个健壮的HadoopAPI,可以隐藏复杂的Hadoop操作,使集群操作大大简化,首个版本发布于2012年6月。
    ApacheAmbari现在是一个Apache的顶级项目,早在2011年8月,Hortonworks引进Ambari作为ApacheIncubator项目,制定了Hadoop集群极致简单管理的愿景。在两年多的开发社区显著成长,从一个小团队,成长为Hortonworks各种组织的贡献者。Ambari用户群一直在稳步增长,许多机构依靠Ambari在其大型数据中心大规模部署和管理Hadoop集 群。
    目前ApacheAmbari支持的Hadoop组件包括:HDFS、MapReduce、Hive、HCatalog、HBase、ZooKeeper、Oozie、Pig及Sqoop

yarn distributedshell client 代码分析

唐半张 发表了文章 0 个评论 1825 次浏览 2015-10-09 09:23 来自相关话题

在网上找了distributedshell client 代码分析,分享给大家 程序流程: 1. 通过ClientRMProtocol协议向ApplicationsManager(ASM)取得ApplicationId 2. ...查看全部
在网上找了distributedshell client 代码分析,分享给大家
程序流程:
1. 通过ClientRMProtocol协议向ApplicationsManager(ASM)取得ApplicationId
2. 初始化ApplicationSubmissionContext,包括application id,application name,application priority queue,另外还有一个ContainerLaunchContext,用于描述如何启动ApplicationMaster
3. ContainerLaunchContext包含了启动ApplicationMaster的所有资源,包括二进制文件(jar),配置文件,执行命令,环境变量等等;凡是文件类型的资源,都会先上传到hdfs,ContainerLaunchContext里面仅仅指定了这个文件资源的hdfs URL
4. 当job提交到ResourceManager后,Client通过ApplicationReport来向ResourceManager取得当前job的进展;

启动Client的命令:
./bin/hadoop jar share/hadoop/yarn/hadoop-yarn-applications-distributedshell-2.0.5-alpha.jar org.apache.hadoop.yarn.applications.distributedshell.Client -jar share/hadoop/yarn/hadoop-yarn-applications-distributedshell-2.0.5-alpha.jar -shell_command ls -shell_script ./ignore.sh -num_containers 10 -container_memory 350 -master_memory 350 -priority 10
其中-jar用来表明运行ApplicationMaster的程序
-shell_command,-shell_script用来在每一个Container上执行的真正程序,这里为ls ./ignore.sh
-num_container用来启动多少个container来执行shell命令
-container_memoery来表明每个container需要的内存资源
-master_memoery用来表明ApplicationMaster需要的内存资源
-priority用来表明优先级
需要理解的是,ApplicationMaster是由ResourceManager里面的ASM模块来调度到某一个Container并启动它的;即ApplicationMaster是Yarn框架负责调度的;但每一个Container的调度,则需要ApplicationMaster自己负责了;ApplicationMaster里面的具体细节,留到另一篇博客里面介绍

函数分析:
1. public boolean init(String[] args) throws ParseException
该函数的主要作用是分析命令行,根据分析的结果初始化Client类的成员变量;shell_args和shell_env的使用方法重点分析

2. run里面调用了start函数,该函数的主要功能是用来建立ClientRMProtocol的RPC实体rmClient;其中的rmAddress是在YarnConfiguration里面定好的

3. 获取Application Id,
GetNewApplicationRequest request = Records.newRecord(GetNewApplicationRequest.class);
rmClient.getNewApplication(request);
ApplicationId appId = newApp.getApplicationId();

4. 构建AppMaster的运行环境
ApplicationSubmissionContext appContext = Records.newRecord(ApplicationSubmissionContext.class);
appContext.setApplicationId(appId);
ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);

// 这里的FileSystem指hdfs,conf里面包含了hdfs信息    
FileSystem fs = FileSystem.get(conf);
// 本地的jar文件路径
Path src = new Path(appMasterJar);
// 本地jar文件上传到hdfs上的路径
String pathSuffix = appName + "/" + appId.getId() + "/AppMaster.jar";    
Path dst = new Path(fs.getHomeDirectory(), pathSuffix);
// 将本地jar文件上传到hdfs
fs.copyFromLocalFile(false, true, src, dst);
    
// 设置这个jar的resource内容
LocalResource amJarRsrc = Records.newRecord(LocalResource.class);
// 资源类型
amJarRsrc.setType(LocalResourceType.FILE);
// 资源权限
amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION);    
// 资源路径
amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(dst));
// 大小和时间戳用来验证资源的正确性    
amJarRsrc.setTimestamp(destStatus.getModificationTime());
amJarRsrc.setSize(destStatus.getLen());

Map localResources = new HashMap();
localResources.put("AppMaster.jar",  amJarRsrc);
// 这个资源作为master执行资源,主要是提交给AMS的;其它需要上传得file,应该会跟jar文件一起解在同一目录
amContainer.setLocalResources(localResources);

// 因为script脚本是master在执行的时候需要的资源,因此这些资源只需要上传到hdfs即可,而不用加入到localResources里面;localResouces里面只要干干净净提交给AMS的资源即可
// 脚本上传到hdfs里面后,采用环境变量来记录脚本的位置;AMS和Container联合在启动application master之前会先布置好环境变量再执行它
Map env = new HashMap();
env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION, hdfsShellScriptLocation);
env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP, Long.toString(hdfsShellScriptTimestamp));
env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN, Long.toString(hdfsShellScriptLen));

// 需要确定application master的jar包执行需要的classpath,并将其加入到env
env.put("CLASSPATH", classPathEnv.toString());

// 设置环境变量,到目前为止,启动application master的执行文件,环境变量都已经ready,剩下是确定执行命令
amContainer.setEnvironment(env);
Vector vargs = new Vector(30);
vargs.add("${JAVA_HOME}" + "/bin/java");
// Set Xmx based on am memory size
vargs.add("-Xmx" + amMemory + "m");
//设置需要执行的application master的class
vargs.add(appMasterMainClass);
// 程序执行的其它参数,这些参数都会作为application master的args
vargs.add("--container_memory " + String.valueOf(containerMemory));
vargs.add("--num_containers " + String.valueOf(numContainers));
vargs.add("--priority " + String.valueOf(shellCmdPriority));
vargs.add("--shell_command " + shellCommand + "");
vargs.add("--shell_args " + shellArgs + "");
for (Map.Entry entry : shellEnv.entrySet()) {
  vargs.add("--shell_env " + entry.getKey() + "=" + entry.getValue());
}
vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout");
vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr");

amContainer.setCommands(commands);
// 确定application master本身需要的执行资源
Resource capability = Records.newRecord(Resource.class);
capability.setMemory(amMemory);
amContainer.setResource(capability);
appContext.setAMContainerSpec(amContainer);
// 提交application master
super.submitApplication(appContext);

如何运行YARN中的DistributedShell程序

唐半张 发表了文章 0 个评论 2008 次浏览 2015-10-09 09:22 来自相关话题

本文介绍YARN自带的一个非常简单的应用程序实例—distributedshell的使用方法。它可以看做YARN编程中的“hello world”,主要功能是并行执行用户提供的shell命令或者shell脚本。 (1)运行参数介绍 ...查看全部
本文介绍YARN自带的一个非常简单的应用程序实例—distributedshell的使用方法。它可以看做YARN编程中的“hello world”,主要功能是并行执行用户提供的shell命令或者shell脚本。
(1)运行参数介绍

DistributedShell的基本运行参数如下:

No args specified for client to initialize
usage: Client
-appname              Application Name. Default value -
                             DistributedShell
-container_memory     Amount of memory in MB to be requested to run
                             the shell command
-debug                      Dump out debug information
-help                       Print usage
-jar                  Jar file containing the application master
-log_properties       log4j.properties file
-master_memory        Amount of memory in MB to be requested to run
                             the application master
-num_containers       No. of containers on which the shell command
                             needs to be executed
-priority             Application Priority. Default 0
-queue                RM Queue in which this application is to be
                             submitted
-shell_args           Command line args for the shell script
-shell_cmd_priority   Priority for the shell command containers
-shell_command        Shell command to be executed by the
                             Application Master
-shell_env            Environment for shell script. Specified as
                             env_key=env_val pairs
-shell_script         Location of the shell script to be executed
-timeout              Application timeout in milliseconds

(2)运行方法

DistributedShell的运行方法如下:

在YARN安装目录下,执行以下命令:

bin/hadoop jar\
share/hadoop/yarn/hadoop-yarn-applications-distributedshell-2.0.0-cdh4.1.1.jar\
org.apache.hadoop.yarn.applications.distributedshell.Client\
–jar share/hadoop/yarn/hadoop-yarn-applications-distributedshell-2.0.0-cdh4.1.1.jar\
–shell_command ls\
–shell_script ignore.sh\
–num_containers 10\
–container_memory 350\
–master_memory 350\
–priority 10

正确应为:

bin/hadoop jar\
share/hadoop/yarn/hadoop-yarn-applications-distributedshell-2.0.0-cdh4.1.1.jar\
–jar share/hadoop/yarn/hadoop-yarn-applications-distributedshell-2.0.0-cdh4.1.1.jar\
org.apache.hadoop.yarn.applications.distributedshell.Client\
–shell_command ls\
–shell_script ignore.sh\
–num_containers 10\
–container_memory 350\
–master_memory 350\
–priority 10

需要注意的是,在hadoop-2.0.3-alpha(不包括该版本)和CDH 4.1.2版本(包括该版本)之前,DistributedShell存在BUG,具体如下:

1)    必须使用–shell_command参数

2)    当只有shell_command参数而没有shell_script参数时,在分布式模式下(伪分布式下可以)不能执行成功,具体说明和修复方法见:https://issues.apache.org/jira/browse/YARN-253,一直临时的解决方案是同时设置shell_command和shell_script两个参数,像上面给出的这个实例一样。

在这个实例中,ignore.sh中的内容就是“ls”

3)    内存设置一定要正确,不然会出现以下提示的错误:

Container [pid=4424,containerID=container_1359629844156_0004_01_000001] is running beyond virtual memory limits. Current usage: 90.1mb of 128.0mb physical memory used; 593.0mb of 268.8mb virtual memory used. Killing container.
【附】DistributedShell运行日志:

13/02/01 13:43:11 INFO distributedshell.Client: Initializing Client
13/02/01 13:43:11 INFO distributedshell.Client: Starting Client
13/02/01 13:43:11 INFO distributedshell.Client: Connecting to ResourceManager at c2-23/10.1.1.98:8032
13/02/01 13:43:12 INFO distributedshell.Client: Got Cluster metric info from ASM, numNodeManagers=3
13/02/01 13:43:12 INFO distributedshell.Client: Got Cluster node info from ASM
13/02/01 13:43:12 INFO distributedshell.Client: Got node report from ASM for, nodeId=c2-23:36594, nodeAddressc2-23:8042, nodeRackName/default-rack, nodeNumContainers0, nodeHealthStatusis_node_healthy: true, health_report: “”, last_health_report_time: 1359697377337,
13/02/01 13:43:12 INFO distributedshell.Client: Got node report from ASM for, nodeId=c2-25:41070, nodeAddressc2-25:8042, nodeRackName/default-rack, nodeNumContainers0, nodeHealthStatusis_node_healthy: true, health_report: “”, last_health_report_time: 1359697367180,
13/02/01 13:43:12 INFO distributedshell.Client: Got node report from ASM for, nodeId=c2-24:48383, nodeAddressc2-24:8042, nodeRackName/default-rack, nodeNumContainers0, nodeHealthStatusis_node_healthy: true, health_report: “”, last_health_report_time: 1359699033102,
13/02/01 13:43:12 INFO distributedshell.Client: Queue info, queueName=default, queueCurrentCapacity=0.0, queueMaxCapacity=1.0, queueApplicationCount=0, queueChildQueueCount=0
13/02/01 13:43:12 INFO distributedshell.Client: User ACL Info for Queue, queueName=default, userAcl=SUBMIT_APPLICATIONS
13/02/01 13:43:12 INFO distributedshell.Client: User ACL Info for Queue, queueName=default, userAcl=ADMINISTER_QUEUE
13/02/01 13:43:12 INFO distributedshell.Client: Got new application id=application_1359695803957_0003
13/02/01 13:43:12 INFO distributedshell.Client: Min mem capabililty of resources in this cluster 128
13/02/01 13:43:12 INFO distributedshell.Client: Max mem capabililty of resources in this cluster 10240
13/02/01 13:43:12 INFO distributedshell.Client: Setting up application submission context for ASM
13/02/01 13:43:12 INFO distributedshell.Client: Copy App Master jar from local filesystem and add to local environment
13/02/01 13:43:13 INFO distributedshell.Client: Set the environment for the application master
13/02/01 13:43:13 INFO distributedshell.Client: Trying to generate classpath for app master from current thread’s classpath
13/02/01 13:43:13 INFO distributedshell.Client: Readable bytes from stream=9006
13/02/01 13:43:13 INFO distributedshell.Client: Setting up app master command
13/02/01 13:43:13 INFO distributedshell.Client: Completed setting up app master command ${JAVA_HOME}/bin/java -Xmx350m org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster –container_memory 350 –num_containers 10 –priority 0 –shell_command ls 1>/AppMaster.stdout 2>/AppMaster.stderr
13/02/01 13:43:13 INFO distributedshell.Client: Submitting application to ASM
13/02/01 13:43:14 INFO distributedshell.Client: Got application report from ASM for, appId=3, clientToken=null, appDiagnostics=, appMasterHost=N/A, appQueue=default, appMasterRpcPort=0, appStartTime=1359697393467, yarnAppState=ACCEPTED, distributedFinalState=UNDEFINED, appTrackingUrl=c2-23:8088/proxy/application_1359695803957_0003/, appUser=rmss
13/02/01 13:43:15 INFO distributedshell.Client: Got application report from ASM for, appId=3, clientToken=null, appDiagnostics=, appMasterHost=, appQueue=default, appMasterRpcPort=0, appStartTime=1359697393467, yarnAppState=RUNNING, distributedFinalState=UNDEFINED, appTrackingUrl=, appUser=rmss
13/02/01 13:43:16 INFO distributedshell.Client: Got application report from ASM for, appId=3, clientToken=null, appDiagnostics=, appMasterHost=, appQueue=default, appMasterRpcPort=0, appStartTime=1359697393467, yarnAppState=RUNNING, distributedFinalState=UNDEFINED, appTrackingUrl=, appUser=rmss
13/02/01 13:43:17 INFO distributedshell.Client: Got application report from ASM for, appId=3, clientToken=null, appDiagnostics=, appMasterHost=, appQueue=default, appMasterRpcPort=0, appStartTime=1359697393467, yarnAppState=RUNNING, distributedFinalState=UNDEFINED, appTrackingUrl=, appUser=rmss
13/02/01 13:43:18 INFO distributedshell.Client: Got application report from ASM for, appId=3, clientToken=null, appDiagnostics=, appMasterHost=, appQueue=default, appMasterRpcPort=0, appStartTime=1359697393467, yarnAppState=RUNNING, distributedFinalState=UNDEFINED, appTrackingUrl=, appUser=rmss
13/02/01 13:43:19 INFO distributedshell.Client: Got application report from ASM for, appId=3, clientToken=null, appDiagnostics=, appMasterHost=, appQueue=default, appMasterRpcPort=0, appStartTime=1359697393467, yarnAppState=FINISHED, distributedFinalState=SUCCEEDED, appTrackingUrl=, appUser=rmss
13/02/01 13:43:19 INFO distributedshell.Client: Application has completed successfully. Breaking monitoring loop
13/02/01 13:43:19 INFO distributedshell.Client: Application completed successfully

解析Hadoop新一代MapReduce框架Yarn

唐半张 发表了文章 0 个评论 1930 次浏览 2015-10-08 10:43 来自相关话题

背景 Yarn是一个分布式的资源管理系统,用以提高分布式的集群环境下的资源利用率,这些资源包括内存、IO、网络、磁盘等。其产生的原因是为了解决原MapReduce框架的不足。最初MapReduce的committer们还可以周期性的在已有的代码上进 ...查看全部
背景
Yarn是一个分布式的资源管理系统,用以提高分布式的集群环境下的资源利用率,这些资源包括内存、IO、网络、磁盘等。其产生的原因是为了解决原MapReduce框架的不足。最初MapReduce的committer们还可以周期性的在已有的代码上进行修改,可是随着代码的增加以及原MapReduce框架设计的不足,在原MapReduce框架上进行修改变得越来越困难,所以MapReduce的committer们决定从架构上重新设计MapReduce,使下一代的MapReduce(MRv2/Yarn)框架具有更好的扩展性、可用性、可靠性、向后兼容性和更高的资源利用率以及能支持除了MapReduce计算框架外的更多的计算框架。
原MapReduce框架的不足

JobTracker是集群事务的集中处理点,存在单点故障JobTracker需要完成的任务太多,既要维护job的状态又要维护job的task的状态,造成过多的资源消耗 在taskTracker端,用map/reduce task作为资源的表示过于简单,没有考虑到CPU、内存等资源情况,当把两个需要消耗大内存的task调度到一起,很容易出现OOM 把资源强制划分为map/reduce slot,当只有map task时,reduce slot不能用;当只有reduce task时,map slot不能用,容易造成资源利用不足。

Yarn架构
Yarn/MRv2最基本的想法是将原JobTracker主要的资源管理和job调度/监视功能分开作为两个单独的守护进程。有一个全局的ResourceManager(RM)和每个Application有一个ApplicationMaster(AM),Application相当于map-reduce job或者DAG jobs。ResourceManager和NodeManager(NM)组成了基本的数据计算框架。ResourceManager协调集群的资源利用,任何client或者运行着的applicatitonMaster想要运行job或者task都得向RM申请一定的资源。ApplicatonMaster是一个框架特殊的库,对于MapReduce框架而言有它自己的AM实现,用户也可以实现自己的AM,在运行的时候,AM会与NM一起来启动和监视tasks。 
ResourceManager
ResourceManager作为资源的协调者有两个主要的组件:Scheduler和ApplicationsManager(AsM)。
Scheduler负责分配最少但满足application运行所需的资源量给Application。Scheduler只是基于资源的使用情况进行调度,并不负责监视/跟踪application的状态,当然也不会处理失败的task。RM使用resource container概念来管理集群的资源,resource container是资源的抽象,每个container包括一定的内存、IO、网络等资源,不过目前的实现只包括内存一种资源。
ApplicationsManager负责处理client提交的job以及协商第一个container以供applicationMaster运行,并且在applicationMaster失败的时候会重新启动applicationMaster。下面阐述RM具体完成的一些功能。
 
  • 资源调度:Scheduler从所有运行着的application收到资源请求后构建一个全局的资源分配计划,然后根据application特殊的限制以及全局的一些限制条件分配资源。
  • 资源监视:Scheduler会周期性的接收来自NM的资源使用率的监控信息,另外applicationMaster可以从Scheduler得到属于它的已完成的container的状态信息。
  • Application提交:
    • client向AsM获得一个applicationIDclient将application定义以及需要的jar包
    • client将application定义以及需要的jar包文件等上传到hdfs的指定目录,由yarn-site.xml的yarn.app.mapreduce.am.staging-dir指定
    • client构造资源请求的对象以及application的提交上下文发送给AsM
    • AsM接收application的提交上下文
    • AsM根据application的信息向Scheduler协商一个Container供applicationMaster运行,然后启动applicationMaster
    • 向该container所属的NM发送launchContainer信息启动该container,也即启动applicationMaster、AsM向client提供运行着的AM的状态信息。
  • AM的生命周期:AsM负责系统中所有AM的生命周期的管理。AsM负责AM的启动,当AM启动后,AM会周期性的向AsM发送heartbeat,默认是1s,AsM据此了解AM的存活情况,并且在AM失败时负责重启AM,若是一定时间过后(默认10分钟)没有收到AM的heartbeat,AsM就认为该AM失败了。
 关于ResourceManager的可用性目前还没有很好的实现,不过Cloudera公司的CDH4.4以后的版本实现了一个简单的高可用性,使用了Hadoop-common项目中HA部分的代码,采用了类似hdfs namenode高可用性的设计,给RM引入了active和standby状态,不过没有与journalnode相对应的角色,只是由zookeeper来负责维护RM的状态,这样的设计只是一个最简单的方案,避免了手动重启RM,离真正的生产可用还有一段距离。NodeManagerNM主要负责启动RM分配给AM的container以及代表AM的container,并且会监视container的运行情况。在启动container的时候,NM会设置一些必要的环境变量以及将container运行所需的jar包、文件等从hdfs下载到本地,也就是所谓的资源本地化;当所有准备工作做好后,才会启动代表该container的脚本将程序启动起来。启动起来后,NM会周期性的监视该container运行占用的资源情况,若是超过了该container所声明的资源量,则会kill掉该container所代表的进程。另外,NM还提供了一个简单的服务以管理它所在机器的本地目录。Applications可以继续访问本地目录即使那台机器上已经没有了属于它的container在运行。例如,Map-Reduce应用程序使用这个服务存储map output并且shuffle它们给相应的reduce task。在NM上还可以扩展自己的服务,yarn提供了一个yarn.nodemanager.aux-services的配置项,通过该配置,用户可以自定义一些服务,例如Map-Reduce的shuffle功能就是采用这种方式实现的。NM在本地为每个运行着的application生成如下的目录结构:Container目录下的目录结构如下: 在启动一个container的时候,NM就执行该container的default_container_executor.sh,该脚本内部会执行launch_container.sh。launch_container.sh会先设置一些环境变量,最后启动执行程序的命令。对于MapReduce而言,启动AM就执行org.apache.hadoop.mapreduce.v2.app.MRAppMaster;启动map/reduce task就执行org.apache.hadoop.mapred.YarnChild。 ApplicationMasterApplicationMaster是一个框架特殊的库,对于Map-Reduce计算模型而言有它自己的ApplicationMaster实现,对于其他的想要运行在yarn上的计算模型而言,必须得实现针对该计算模型的ApplicationMaster用以向RM申请资源运行task,比如运行在yarn上的spark框架也有对应的ApplicationMaster实现,归根结底,yarn是一个资源管理的框架,并不是一个计算框架,要想在yarn上运行应用程序,还得有特定的计算框架的实现。由于yarn是伴随着MRv2一起出现的,所以下面简要概述MRv2在yarn上的运行流程。MRv2运行流程: 
  • MR JobClient向resourceManager(AsM)提交一个job
  • AsM向Scheduler请求一个供MR AM运行的container,然后启动它
  • MR AM启动起来后向AsM注册
  • MR JobClient向AsM获取到MR AM相关的信息,然后直接与MR AM进行通信
  • MR AM计算splits并为所有的map构造资源请求
  • MR AM做一些必要的MR OutputCommitter的准备工作
  • MR AM向RM(Scheduler)发起资源请求,得到一组供map/reduce task运行的container,然后与NM一起对每一个container执行一些必要的任务,包括资源本地化等
  • MR AM 监视运行着的task 直到完成,当task失败时,申请新的container运行失败的task
  • 当每个map/reduce task完成后,MR AM运行MR OutputCommitter的cleanup 代码,也就是进行一些收尾工作
  • 当所有的map/reduce完成后,MR AM运行OutputCommitter的必要的job commit或者abort APIs
  • MR AM退出。


在Yarn上写应用程序
在yarn上写应用程序并不同于我们熟知的MapReduce应用程序,必须牢记yarn只是一个资源管理的框架,并不是一个计算框架,计算框架可以运行在yarn上。我们所能做的就是向RM申请container,然后配合NM一起来启动container。就像MRv2一样,jobclient请求用于MR AM运行的container,设置环境变量和启动命令,然后交由NM去启动MR AM,随后map/reduce task就由MR AM全权负责,当然task的启动也是由MR AM向RM申请container,然后配合NM一起来启动的。所以要想在yarn上运行非特定计算框架的程序,我们就得实现自己的client和applicationMaster。另外我们自定义的AM需要放在各个NM的classpath下,因为AM可能运行在任何NM所在的机器上。

YARN中运行应用程序的基本流程

唐半张 发表了文章 0 个评论 2404 次浏览 2015-10-08 10:41 来自相关话题

当用户向YARN中提交一个应用程序后,YARN将分两个阶段运行该应用程序:第一个阶段是启动ApplicationMaster;第二个阶段是由ApplicationMaster创建应用程序,为它申请资源,并监控它的整个运行过程,直到运行完成。如图2-11所示,Y ...查看全部
当用户向YARN中提交一个应用程序后,YARN将分两个阶段运行该应用程序:第一个阶段是启动ApplicationMaster;第二个阶段是由ApplicationMaster创建应用程序,为它申请资源,并监控它的整个运行过程,直到运行完成。如图2-11所示,YARN的工作流程分为以下几个步骤:
 
步骤1 用户向YARN中提交应用程序,其中包括ApplicationMaster程序、启动ApplicationMaster的命令、用户程序等。

步骤2 ResourceManager为该应用程序分配第一个Container,并与对应的Node-Manager通信,要求它在这个Container中启动应用程序的ApplicationMaster。

步骤3 ApplicationMaster首先向ResourceManager注册,这样用户可以直接通过ResourceManage查看应用程序的运行状态,然后它将为各个任务申请资源,并监控它的运行状态,直到运行结束,即重复步骤4~7。

步骤4 ApplicationMaster采用轮询的方式通过RPC协议向ResourceManager申请和领取资源。

步骤5 一旦ApplicationMaster申请到资源后,便与对应的NodeManager通信,要求它启动任务Task。

步骤6 NodeManager为任务Task设置好运行环境(包括环境变量、JAR包、二进制程序等)后,将任务启动命令写到一个脚本中,并通过运行该脚本启动任务Task。

步骤7 各个任务Task通过某个RPC协议向ApplicationMaster汇报自己的状态和进度,以让ApplicationMaster随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务。

在应用程序运行过程中,用户可随时通过RPC向ApplicationMaster查询应用程序的当前运行状态。

步骤8 应用程序运行完成后,ApplicationMaster向ResourceManager注销并关闭自己。

Yarn(MR2)上的应用汇总

唐半张 发表了文章 0 个评论 1667 次浏览 2015-10-08 10:40 来自相关话题

Yarn做为hadoop下一代集群资源管理和调度平台, 其上能支持多种计算框架, 本文就简要介绍一下这些计算框架. 1.       MapReduce 首先是大家熟悉的mapreduce, 在MR2之前, hadoop包 ...查看全部
Yarn做为hadoop下一代集群资源管理和调度平台, 其上能支持多种计算框架, 本文就简要介绍一下这些计算框架.
1.       MapReduce

首先是大家熟悉的mapreduce, 在MR2之前, hadoop包括HDFS和mapreduce, 做为hadoop上唯一的分布式计算框架, 其优点是用户可以很方便的编写分布式计算程序, 并支持许多的应用, 如hive, mahout, pig等. 但是其缺点是无法充分利用集群资源, 不支持DAG, 迭代式计算等. 为了解决这些问题, yahoo提出了Yarn (next generation mapreduce), 一个分布式集群集群资源管理和调度平台. 这样除了mapreduce外, 还可以支持各种计算框架.

2.       Spark

Spark是一种与mapreduce相似的开源计算框架, 不同之处在于Spark在某些工作负载方面表现更优, 因为它使用了内存分布式数据集, 另外除了提供交互式查询外, 它还可以优化迭代工作负载.

3.       Apache HAMA

Apache Hama 是一个运行在HDFS上的BSP(Bulk Synchronous Parallel大容量同步并行) 计算框架, 主要针对大规模科学计算,如矩阵, 图像, 网络算法等.当前它有一下功能:

作业提交和管理接口
单节点上运行多个任务
输入/输出格式化
备份恢复
支持通过Apache Whirr运行在云端
支持与Yarn一起运行
4.       Apache Giraph

图像处理平台上运行这大型算法(如page rank, shared connections, personalization-based popularity 等)已经很流行, Giraph采用BSP模型(bulk-synchronous parallel model),可用于等迭代类算法。

5.       Open MPI

这是一个高性能计算函数库,通常在HPC(High Performance Computing)中采用,与MapReduce相比,其性能更高,用户可控性更强,但编程复杂,容错性差,可以说,各有所长,在实际应用中,针对不同 该应用会采用MPI或者MapReduce。

6.       Apache HBase

HBase是一个hadoop数据库, 其特点是分布式,可扩展的,存储大数据。当有需要随机,实时读写的大数据时, 使用HBase很适合.

YARN应用场景、原理与基本架构

唐半张 发表了文章 0 个评论 2250 次浏览 2015-10-08 10:39 来自相关话题

YARN应用场景、原理与基本架构 (1) YARN概念解释 ResourceManager: 整个集群的资源管理器,负责集群资源的统一管理与调度,包括处理客户端请求,启动和监控ApplicationMaster,监控NodeManage ...查看全部
YARN应用场景、原理与基本架构
(1) YARN概念解释
ResourceManager: 整个集群的资源管理器,负责集群资源的统一管理与调度,包括处理客户端请求,启动和监控ApplicationMaster,监控NodeManager,进行资源的统一调度与分配等。
NodeManager: 为集群中节点所拥有的资源管理器,负责所在节点的资源管理与使用,包括负责所在节点上的资源管理和任务调度,处理来自ResourceManager的命令,处理来自ApplicationMaster的命令等。
资源调度器: 对多种类型资源进行调度的工具。目前支持CPU和内存两种关键资源。可使用FIFO、Fair Scheduler、Capacity Scheduler等资源调度算法。
Container: 对任务运行环境的抽象,描述了一系列信息,包括任务运行资源(节点、内存、CPU)、任务启动命令、任务运行环境等。

(2) YARN带来的好处:
1) 解决Hadoop 1.0中应用JobTracker扩展性受限的问题。
2) 解决Hadoop 1.0中JobTracker单点故障问题。
3) 使Hadoop支持MapReduce之外的框架,充分利用和共享集群的软硬件资源。

(3) MapReduce、Tez、Storm、Spark四个框架的异同:
虽然都运行在Hadoop上面(确切地说,是YARN上面),四个框架分别专注于离线计算、DAG计算、流式计算、内存计算的问题,相应的应用领域有所不同;从原理上分析,它们在YARN之上的实现机制不同。

(4) 技术选型
1) 视频流实时分析:Storm——流式计算框架的实时流处理特性正好适合视频流实时分析。
2) 搜索引擎日志分析(TB级):MapReduce——专长于海量日志数据的离线处理,为此而生。
3) 迭代式的数据挖掘算法,比如K-Means:Tez——DAG计算框架,适合于多个作业之间存在数据依赖关系、形成类似有向无环图结构的计算,比如K-means算法的计算。
4) 使用类SQL语言分析数据:HBase——HBase拥有类似SQL的数据查询语言HQL。

Hadoop YARN中内存和CPU两种资源的调度和隔离

唐半张 发表了文章 0 个评论 1726 次浏览 2015-10-08 10:39 来自相关话题

Hadoop YARN中内存和CPU两种资源的调度和隔离 Hadoop  YARN同时支持内存和CPU两种资源的调度(默认只支持内存,如果想进一步调度CPU,需要自己进行一些配置),本文将介绍YARN是如何对这些资源进行调度和隔离的。 ...查看全部
Hadoop YARN中内存和CPU两种资源的调度和隔离
Hadoop  YARN同时支持内存和CPU两种资源的调度(默认只支持内存,如果想进一步调度CPU,需要自己进行一些配置),本文将介绍YARN是如何对这些资源进行调度和隔离的。
在YARN中,资源管理由ResourceManager和NodeManager共同完成,其中,ResourceManager中的调度器负责 资源的分配,而NodeManager则负责资源的供给和隔离。ResourceManager将某个NodeManager上资源分配给任务(这就是所 谓的“资源调度”)后,NodeManager需按照要求为任务提供相应的资源,甚至保证这些资源应具有独占性,为任务运行提供基础的保证,这就是所谓的 资源隔离。
关于Hadoop YARN资源调度器的详细介绍,可参考我的这篇文章:YARN/MRv2 Resource Manager深入剖析—资源调度器
在正式介绍具体的资源调度和隔离之前,先品味一下内存和CPU这两种资源的特点,这是两种性质不同的资源。内存资源的多少会会决定任务的生死,如果内存不够,任务可能会运行失败;相比之下,CPU资源则不同,它只会决定任务运行的快慢,不会对生死产生影响。
【YARN中内存资源的调度和隔离】
基于以上考虑,YARN允许用户配置每个节点上可用的物理内存资源,注意,这里是“可用的”,因为一个节点上的内存会被若干个服务共享,比如一部分给YARN,一部分给HDFS,一部分给HBase等,YARN配置的只是自己可以使用的,配置参数如下:
(1)yarn.nodemanager.resource.memory-mb
表示该节点上YARN可使用的物理内存总量,默认是8192(MB),注意,如果你的节点内存资源不够8GB,则需要调减小这个值,而YARN不会智能的探测节点的物理内存总量。
(2)yarn.nodemanager.vmem-pmem-ratio
任务每使用1MB物理内存,最多可使用虚拟内存量,默认是2.1。
(3) yarn.nodemanager.pmem-check-enabled
是否启动一个线程检查每个任务正使用的物理内存量,如果任务超出分配值,则直接将其杀掉,默认是true。
(4) yarn.nodemanager.vmem-check-enabled
是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认是true。
(5)yarn.scheduler.minimum-allocation-mb
单个任务可申请的最少物理内存量,默认是1024(MB),如果一个任务申请的物理内存量少于该值,则该对应的值改为这个数。
(6)yarn.scheduler.maximum-allocation-mb
单个任务可申请的最多物理内存量,默认是8192(MB)。
默认情况下,YARN采用了线程监控的方法判断任务是否超量使用内存,一旦发现超量,则直接将其杀死。由于Cgroups对内存的控制缺乏灵活性 (即任务任何时刻不能超过内存上限,如果超过,则直接将其杀死或者报OOM),而Java进程在创建瞬间内存将翻倍,之后骤降到正常值,这种情况下,采用 线程监控的方式更加灵活(当发现进程树内存瞬间翻倍超过设定值时,可认为是正常现象,不会将任务杀死),因此YARN未提供Cgroups内存隔离机制。
【YARN中CPU资源的调度和隔离】
在YARN中,CPU资源的组织方式仍在探索中,目前(2.2.0版本)只是一个初步的,非常粗粒度的实现方式,更细粒度的CPU划分方式已经提出来了,正在完善和实现中。
目前的CPU被划分成虚拟CPU(CPU virtual Core),这里的虚拟CPU是YARN自己引入的概念,初衷是,考虑到不同节点的CPU性能可能不同,每个CPU具有的计算能力也是不一样的,比如某个 物理CPU的计算能力可能是另外一个物理CPU的2倍,这时候,你可以通过为第一个物理CPU多配置几个虚拟CPU弥补这种差异。用户提交作业时,可以指 定每个任务需要的虚拟CPU个数。在YARN中,CPU相关配置参数如下:
(1)yarn.nodemanager.resource.cpu-vcores
表示该节点上YARN可使用的虚拟CPU个数,默认是8,注意,目前推荐将该值设值为与物理CPU核数数目相同。如果你的节点CPU核数不够8个,则需要调减小这个值,而YARN不会智能的探测节点的物理CPU总数。
(2) yarn.scheduler.minimum-allocation-vcores
单个任务可申请的最小虚拟CPU个数,默认是1,如果一个任务申请的CPU个数少于该数,则该对应的值改为这个数。
(3)yarn.scheduler.maximum-allocation-vcores
单个任务可申请的最多虚拟CPU个数,默认是32。
默认情况下,YARN是不会对CPU资源进行调度的,你需要配置相应的资源调度器让你支持,具体可参考我的这两篇文章:
(1)Hadoop YARN配置参数剖析(4)—Fair Scheduler相关参数
(2)Hadoop YARN配置参数剖析(5)—Capacity Scheduler相关参数
默认情况下,NodeManager不会对CPU资源进行任何隔离,你可以通过启用Cgroups让你支持CPU隔离。
由于CPU资源的独特性,目前这种CPU分配方式仍然是粗粒度的。举个例子,很多任务可能是IO密集型的,消耗的CPU资源非常少,如果此时你为它 分配一个CPU,则是一种严重浪费,你完全可以让他与其他几个任务公用一个CPU,也就是说,我们需要支持更粒度的CPU表达方式。
借鉴亚马逊EC2中CPU资源的划分方式,即提出了CPU最小单位为EC2 Compute Unit(ECU),一个ECU代表相当于1.0-1.2 GHz 2007 Opteron or 2007 Xeon处理器的处理能力。YARN提出了CPU最小单位YARN Compute Unit(YCU),目前这个数是一个整数,默认是720,由参数yarn.nodemanager.resource.cpu-ycus-per- core设置,表示一个CPU core具备的计算能力(该feature在2.2.0版本中并不存在,可能增加到2.3.0版本中),这样,用户提交作业时,直接指定需要的YCU即 可,比如指定值为360,表示用1/2个CPU core,实际表现为,只使用一个CPU core的1/2计算时间。注意,在操作系统层,CPU资源是按照时间片分配的,你可以说,一个进程使用1/3的CPU时间片,或者1/5的时间片。
https://issues.apache.org/jira/browse/YARN-1089
https://issues.apache.org/jira/browse/YARN-1024
Hadoop 新特性、改进、优化和Bug分析系列5:YARN-3
【总结】
目前,YARN 内存资源调度借鉴了Hadoop 1.0中的方式,比较合理,但CPU资源的调度方式仍在不断改进中,目前只是一个初步的粗糙实现,相信在不久的将来,YARN 中CPU资源的调度将更加完善。

YARN ResourceManager调度器的分析

唐半张 发表了文章 0 个评论 1512 次浏览 2015-10-08 10:22 来自相关话题

YARN是Hadoop新版中的资源控制框架。本文旨在深入剖析ResourceManager的调度器,探讨三种调度器的设计侧重,最后给出一些配置建议和参数解释。 本文分析基于CDH4.2.1。调度器这个部分目前还在快速变化之中。例如,CPU资源分配等 ...查看全部
YARN是Hadoop新版中的资源控制框架。本文旨在深入剖析ResourceManager的调度器,探讨三种调度器的设计侧重,最后给出一些配置建议和参数解释。
本文分析基于CDH4.2.1。调度器这个部分目前还在快速变化之中。例如,CPU资源分配等特性在不就的将来就会加入。
为了方便查阅源代码,原代码位置使用[类名:行号]方式表示。
名词解释:
ResourceManager:以下简称RM。YARN的中控模块,负责统一规划资源的使用。
NodeManager:以下简称NM。YARN的资源结点模块,负责启动管理container。
ApplicationMaster:以下简称AM。YARN中每个应用都会启动一个AM,负责向RM申请资源,请求NM启动container,并告诉container做什么事情。
Container:资源容器。YARN中所有的应用都是在container之上运行的。AM也是在container上运行的,不过AM的container是RM申请的。
1.RM中的调度器
ResourceManager是YARN资源控制框架的中心模块,负责集群中所有的资源的统一管理和分配。它接收来自NM的汇报,建立AM,并将资源派送给AM。整体的RM的框架可以参考:RM总体架构
 


最初的hadoop的版本只有FifoScheduler(先进先出调度器)。当Hadoop集群在大规模使用的时候,如何整合资源和分配资源就是一个迫切的需求。对此,Yahoo!和facebook先后开发了CapacityScheduler(容量调度器)和FairScheduler(公平调度器)。在新版本中,这两个调度器在保持核心算法的基础上,也被重新开发了一次。(实际上所有整个YARN的代码都是重写的…)
2.调度器的接口
首先来了解一下调度器的工作方式和对外暴露的接口:[ResourceScheduler:36]
一个完整的调度器在内存中都会维护一个队列,应用,NM,Container的关系。同时,一个调度器也是一个事件处理器,通过RM的异步的事件调用机制知晓外部的发生的事情。要跟外部交互也要发送相应的事件。调度器一共要处理6个调度事件。

yarn(hadoop2)框架的一些软件设计模式

唐半张 发表了文章 0 个评论 1471 次浏览 2015-10-08 10:00 来自相关话题

一、概述 我们都知道,yarn版本的hadoop无论是从架构上面还是软件设计的层面上面都比原始的hadoop版本有较大的改进。在架构方面,我们认为yarn模式是新一代的框架,这个在官方等丛多的资料中说明得很详细了。在软件设计方面,我认为主要有以下的 ...查看全部
一、概述
我们都知道,yarn版本的hadoop无论是从架构上面还是软件设计的层面上面都比原始的hadoop版本有较大的改进。在架构方面,我们认为yarn模式是新一代的框架,这个在官方等丛多的资料中说明得很详细了。在软件设计方面,我认为主要有以下的一些大的方面的改进:服务生命周期管理模式、事件驱动模式、状态驱动模式。这几个模式都写在hadoop-yarn-common中,接下来,我将详细说明这些模式。
二、服务生命周期管理模式一个对象肯定有生与死,那在我们设计中如何表示这一点呢?在业务系统中,我们一般是用spring,spring就负责管理对象的生命。在hadoop,我们没有必要引进spring这么厚重的容器。我们可以自行设计一套代码来管理我们服务的生命周期。那需要满足那些条件呢?
  • 一个服务的生命大概有4个状态:NOTINITED、INITED、STARTED、STOPPED。对应一些基本的操作,如:init start stop等。
  • 服务的状态变化会触发一些变化。可以用观察者模式。
  • 有组合服务的概念,因为我需要一个循环同时启动多个服。可以使用Composite模式。

那yarn的设计方面如下:

从中我们看出service这个设计正好满足我们的三个基本的要求。从图中,我看得很清楚,这个是一个典型的设计方案。一个接口,下面有一个抽象类,再有一个组合类。AbstractService其实实现了register()、unrgister()及状态变化后,调用Listener基本的功能。CompositeService实现了组合服务的需求,如:ResourceManager可以组合几个服务。在yarn中,Listener并没有实现异步。个人感觉主要有两个理由:第一,如图中,NodeManager既是Service又是Listener,如果异步有死锁的风险。第二,因为都是服务,其启动,停止调用次数都相对非常少,状态也不会经常发生变化,没有必要引入异步。
这一套机制其实在很多的框架中都有涉及,如jetty中的LifeCycle,其实和这个差不多的。
三、事件驱动模式
事件驱动模式最核心的部分就是一个异步dispatcher,以此来达到解耦的目的。我们看下yarn中怎么实现的,如下图:

这个也是一个典型的设计方案,我在以前的系统中经常这么设计事件的。其实这个也是监听者模式。在消息中间件中,我们往往引入中间的存储层——存储转发。其实这个在路由器中也是这样的。用到最后,其实都差不多,关键在于你能否看破。
需要注意的是,AsyncDispatcher也是一个service,这样ResourceManager等组合服务可以add AsyncDispatcher获得AsyncDispatcher事件转发的功能。
四、状态驱动模式
在设计模式中,有一个状态模式,其实我这里讲的理论就是有穷状态机。状态模式我们可以认为是摩尔型有限状态机 ,我们这里讲的主要是米利型有限状态机, yarn中实现的还是比较复杂的,可以看出他就是非确定型的自动机。在框架中还是比较少看见状态机的,这个可以仔细研究下,我们可以先看下RMNode状态机的状态图(这个图是根据RMNode状态机自动生成的)。
我们看到 任意两个状态之间的变化可以是任意的事件,并且可以是多个事件;同一个事件可以使一个状态迁移到多个不同的状态。我们可以认为这里的状态机是非确定性米利型有限状态机。这些知识在大学的编译原理上面讲过,我也是翻书的。我们看下yarn中的实现,如下图所示:

我认为其中最重要就是构建这个Map>> stateMachineTable对象,这里面存了状态机的元信息。后续调用完全是根据这个Map来运行的。重点讲下这个map的组成,从from到to端,第一个STATE是from端的状态。从一个状态转移可以有多个事件触发,其中的每一个事件可以有一个Transition,每个Transition就是有一个OPERAND操作。
一个Transition可以转移到多个to端的状态。可以从类图中看到Transition的两个实现类SingleInternalArc、MultipleInternalArc。MultipleInternalArc还带了一个默认的状态。
这个数据构建的时候用了builder模式,实现了FluentInterface: http://www.martinfowler.com/bliki/FluentInterface.html。客户端是直接使用StateMachine的接口调用的,当然这个StateMachine也是由StateMachineFactory构建的(make)。我们看下状态的执行流程:

在yarn中,我们看下事件驱动模式与状态驱动模式是怎么结合的,从中可以看出,状态机其实和事件是密不可分的。状态机的Transition也会产生一些Event再输出到AsynDispatcher中。

四、总结
在yarn中的应用了很多新的设计思想,以上3个只是在整个框架中比较突出的几个。我们在阅读框架时,要时刻牢记,设计软件的第一原则是软件设计的理论及架构模式。文章可能会重新编辑,如果想浏览最新内容请访问原创博客。由于作者个人知识面有限,如果描述有错误或者遗留之处敬请谅解,再欢迎指出,我们共同进步。

yarn fairscheduler原理与配置

唐半张 发表了文章 0 个评论 1372 次浏览 2015-10-08 09:58 来自相关话题

MAPREDUCE-3451, 把fairScheduler引入到2.0.2-alpha, 本文介绍一下hadoop 2.0.2-alpha的fairscheduler. 包括调度算法和配置方法.代码 在org.apache.hadoop.yarn ...查看全部
MAPREDUCE-3451, 把fairScheduler引入到2.0.2-alpha, 本文介绍一下hadoop 2.0.2-alpha的fairscheduler. 包括调度算法和配置方法.代码
在org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair包下, 主要包括如下的类:
各个类作用的简要描述:
1. AllocationConfigurationException, 如果配置文件出错会抛出此异常.
2. AppSchedulable 一个可以提交task的实体, 继承于Schedulable,
3. FairScheduler 调度器的主体部分
4. FairSchedulerConfiguration的配置常量和默认值
5. FairSchedulerEventLog 封装了LOG, 用于把调度事件写到指定的log文件中
6. FifoAppComparator 继承于Comparator, 对比两个AppSchedulable的大小, 首先是Priority, 然后是startTime, 最后对比ApplicationId.
7. FSQueue, fairscheduler中的组信息 类
8. FSQueueSchedulable继承于Schedulable, 一个可以提交task的实体
9. FSSchedulerApp继承于SchedulerApp, 从调度器的角度来看, 每个在RM中正在运行的应用程序都是此类的实例.
10. NewJobWeightBooster, 实现了WeightAdjuster接口, 用于更新AppSchedulable的权重, Weight.
11. QueueManager, 维护一个组队列, 并提供更新方法, fairscheduler调用.
12. Schedulable一个可以提交task的实体
13. SchedulingAlgorithms, 工具类, 包括fair scheduler使用到的调度算法.
14. SchedulingMode, enum类型, 包括FAIR, FIFO. 每个组内的调度模式.
15. 接口WeightAdjuster, 权重修改接口

Fairscheduler的原理
当 NM (NodeManager的简称)向RM (ResourceManager的简称)发送心跳后, 会调用调度器的nodeUpdate()方法,流程如下:
1. Processing the newly launched containers
2. Process completed containers
3. Assign new containers
a) Check for reserved applications
Reserved, 预留的意思, If we have have an application that has reserved a resource on this node already, we try to complete the reservation.
b) Schedule if there are no reservations. schedule at queue which is furthest below fair share.
    i. 这里首先获取所有组(getQueueSchedulables), 然后对他们排序, 使用SchedulingAlgorithms. FairShareComparator类排序.
    ii. 然后从第一个组开始, 把资源分配给它, 并开始组内分资源,
    iii. 如果未分配给第一组, 则分给下一组, 如果分给了第一组, 则继续到第一步. 若未分配给第一个组, 或重复分配给某一组, 或大于maxAssign, 则退出循环.

SchedulingAlgorithms.FairShareComparator排序算法
两个组, 排序的规则是:
1. 一个需要资源, 另外一个不需要资源, 则需要资源的排前面
2. 若都需要资源的话, 对比 使用的内存占minShare的比例, 比例小的排前面, (即尽量保证达到minShare)
3. 若比例相同的话, 计算出使用量与权重的比例, 小的排前面, 即权重大的优先, 使用量小的优先.
4. 若还是相同, 提交时间早的优先, app id小的排前面.

配置方法
在RM的配置目录下的yarn-site.xml文件中增加配置项
   yarn.resourcemanager.scheduler.class   org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler 
在RM的配置目录下新建fair-scheduler.xml文件, 增加如下内容:
         1000     9000     50     2.0     fair      sample_queue,yuling.sh      sample_queue,yuling.sh           1000     9000     50     2.0     fair      yuling.sh      a     5 
注意, 在yarn中, 提交作业的组验证已经放到了调度器中实现.

Hadoop Yarn 框架原理及运作机制

唐半张 发表了文章 0 个评论 1481 次浏览 2015-10-08 09:28 来自相关话题

                        从业界使用分布式系统的变化趋势和 hadoop 框架的长远发展来看,MapReduce 的 JobTracker/TaskTracker 机制需要大规模的调整来修复它在可扩展性,内存消耗,线程模型,可靠性和性能上 ...查看全部
                        从业界使用分布式系统的变化趋势和 hadoop 框架的长远发展来看,MapReduce 的 JobTracker/TaskTracker 机制需要大规模的调整来修复它在可扩展性,内存消耗,线程模型,可靠性和性能上的缺陷。在过去的几年中,hadoop 开发团队做了一些 bug 的修复,但是最近这些修复的成本越来越高,这表明对原框架做出改变的难度越来越大。
                        为从根本上解决旧 MapReduce 框架的性能瓶颈,促进 Hadoop 框架的更长远发展,从 0.23.0 版本开始,Hadoop 的 MapReduce 框架完全重构,发生了根本的变化。新的 Hadoop MapReduce 框架命名为 MapReduceV2 或者叫 Yarn,其架构图如下图所示:


                         重构根本的思想是将 JobTracker 两个主要的功能分离成单独的组件,这两个功能是资源管理和任务调度 / 监控。新的资源管理器全局管理所有应用程序计算资源的分配,每一个应用的 ApplicationMaster 负责相应的调度和协调。一个应用程序无非是一个单独的传统的 MapReduce 任务或者是一个 DAG( 有向无环图 ) 任务。ResourceManager 和每一台机器的节点管理服务器能够管理用户在那台机器上的进程并能对计算进行组织。
                        事实上,每一个应用的 ApplicationMaster 是一个详细的框架库,它结合从 ResourceManager 获得的资源和 NodeManager 协同工作来运行和监控任务。
                        上图中 ResourceManager 支持分层级的应用队列,这些队列享有集群一定比例的资源。从某种意义上讲它就是一个纯粹的调度器,它在执行过程中不对应用进行监控和状态跟踪。同样,它也不能重启因应用失败或者硬件错误而运行失败的任务。
                        ResourceManager 是基于应用程序对资源的需求进行调度的 ; 每一个应用程序需要不同类型的资源因此就需要不同的容器。资源包括:内存,CPU,磁盘,网络等等。可以看出,这同现 Mapreduce 固定类型的资源使用模型有显著区别,它给集群的使用带来负面的影响。资源管理器提供一个调度策略的插件,它负责将集群资源分配给多个队列和应用程序。调度插件可以基于现有的能力调度和公平调度模型。
                        上图中 NodeManager 是每一台机器框架的代理,是执行应用程序的容器,监控应用程序的资源使用情况 (CPU,内存,硬盘,网络 ) 并且向调度器汇报。
                        每一个应用的 ApplicationMaster 的职责有:向调度器索要适当的资源容器,运行任务,跟踪应用程序的状态和监控它们的进程,处理任务的失败原因。

YARN ResourceManager调度器分析

唐半张 发表了文章 0 个评论 1970 次浏览 2015-10-07 10:36 来自相关话题

YARN是Hadoop新版中的资源控制框架。本文旨在深入剖析ResourceManager的调度器,探讨三种调度器的设计侧重,最后给出一些配置建议和参数解释。 本文分析基于CDH4.2.1。调度器这个部分目前还在快速变化之中。例如,CPU资源分配等 ...查看全部
YARN是Hadoop新版中的资源控制框架。本文旨在深入剖析ResourceManager的调度器,探讨三种调度器的设计侧重,最后给出一些配置建议和参数解释。
本文分析基于CDH4.2.1。调度器这个部分目前还在快速变化之中。例如,CPU资源分配等特性在不就的将来就会加入。
为了方便查阅源代码,原代码位置使用[类名:行号]方式表示。
名词解释:
ResourceManager:以下简称RM。YARN的中控模块,负责统一规划资源的使用。
NodeManager:以下简称NM。YARN的资源结点模块,负责启动管理container。
ApplicationMaster:以下简称AM。YARN中每个应用都会启动一个AM,负责向RM申请资源,请求NM启动container,并告诉container做什么事情。
Container:资源容器。YARN中所有的应用都是在container之上运行的。AM也是在container上运行的,不过AM的container是RM申请的。
1.RM中的调度器
ResourceManager是YARN资源控制框架的中心模块,负责集群中所有的资源的统一管理和分配。它接收来自NM的汇报,建立AM,并将资源派送给AM。整体的RM的框架可以参考:RM总体架构

最初的hadoop的版本只有FifoScheduler(先进先出调度器)。当Hadoop集群在大规模使用的时候,如何整合资源和分配资源就是一个迫切的需求。对此,Yahoo!和facebook先后开发了CapacityScheduler(容量调度器)和FairScheduler(公平调度器)。在新版本中,这两个调度器在保持核心算法的基础上,也被重新开发了一次。(实际上所有整个YARN的代码都是重写的…)
2.调度器的接口
首先来了解一下调度器的工作方式和对外暴露的接口:[ResourceScheduler:36]
一个完整的调度器在内存中都会维护一个队列,应用,NM,Container的关系。同时,一个调度器也是一个事件处理器,通过RM的异步的事件调用机制知晓外部的发生的事情。要跟外部交互也要发送相应的事件。调度器一共要处理6个调度事件。Node_ADDED/NODE_REMOVED/NODE_UPDATE/APP_ADDED/APP_REMOVED/CONTAINER_EXPIRED
AM会告诉调度器一些资源申请的请求和已经使用完的container的列表,然后获取到已经在NODE_UPDATE分配给这个应用的container的分配。
可以看到调度器接受资源的申请和分配资源这个动作是异步的。
3.资源分配模型
无论FifoScheduler,CapacityScheduler和FairScheduler的核心资源分配模型都是一样的。
调度器维护一群队列的信息。用户可以向一个或者多个队列提交应用。每次NM心跳的时候,调度器,根据一定的规则选择一个队列,再在队列上选择一个应用,尝试在这个应用上分配资源。不过,因为一些参数限制了分配失败,就会继续选择下一个应用。在选择了一个应用之后,这个应用对应也会有很多的资源申请的请求。调度器会优先匹配本地资源是申请请求,其次是同机架的,最后的任意机器的。
 
总的来说,3种调度器就是在回答如何选择一个队列,在一个队列上如何选择一个应用的问题。
当然,实际上,比起简单的FifoScheduler。CapacityScheduler和FairScheduler还有更多新奇好玩的特性。
4.调度器比较
我们先比较下3种调度器。
调度器FifoSchedulerCapacitySchedulerFairScheduler
设计目的最简单的调度器,易于理解和上手
多用户的情况下,最大化集群的吞吐和利用率
多用户的情况下,强调用户公平地贡献资源
队列组织方式单队列树状组织队列。无论父队列还是子队列都会有资源参数限制,子队列的资源限制计算是基于父队列的。应用提交到叶子队列。树状组织队列。但是父队列和子队列没有参数继承关系。父队列的资源限制对子队列没有影响。应用提交到叶子队列。
资源限制无父子队列之间有容量关系。每个队列限制了资源使用量,全局最大资源使用量,最大活跃应用数量等。每个叶子队列有最小共享量,最大资源量和最大活跃应用数量。用户有最大活跃应用数量的全局配置。
队列ACL限制可以限制应用提交权限可以限制应用提交权限和队列开关权限,父子队列间的ACL会继承。可以限制应用提交权限,父子队列间的ACL会继承。但是由于支持客户端动态创建队列,需要限制默认队列的应用数量。目前,还看不到关闭动态创建队列的选项。
队列排序算法无按照队列的资源使用量最小的优先根据公平排序算法排序
应用选择算法先进先出先进先出先进先出或者公平排序算法
本地优先分配支持支持支持
延迟调度不支持不支持支持
资源抢占不支持不支持支持,看到代码中也有实现。但是,由于本特性还在开发阶段,本文没有真实试验。
简单总结下:
FifoScheduler:最简单的调度器,按照先进先出的方式处理应用。只有一个队列可提交应用,所有用户提交到这个队列。可以针对这个队列设置ACL。没有应用优先级可以配置。
CapacityScheduler:可以看作是FifoScheduler的多队列版本。每个队列可以限制资源使用量。但是,队列间的资源分配以使用量作排列依据,使得容量小的队列有竞争优势。集群整体吞吐较大。延迟调度机制使得应用可以放弃,夸机器或者夸机架的调度机会,争取本地调度。
FairScheduler:多队列,多用户共享资源。特有的客户端创建队列的特性,使得权限控制不太完美。根据队列设定的最小共享量或者权重等参数,按比例共享资源。延迟调度机制跟CapacityScheduler的目的类似,但是实现方式稍有不同。资源抢占特性,是指调度器能够依据公平资源共享算法,计算每个队列应得的资源,将超额资源的队列的部分容器释放掉的特性。
5.本地优化与延迟调度
我们解释一下什么是本地优化和延迟调度。
Hadoop是构建在以hdfs为基础的文件系统之上的。YARN所运行的应用的绝大部分输入都是hdfs上的文件。而hdfs上的文件的是分块多副本存储的。假设文件系统的备份因子是3。则每一个文件块都会在3个机器上有副本。在YARN运行应用的时候,AM会将输入文件进行切割,然后,AM向RM申请的container来运行task来处理这些被切割的文件段。
假如输入文件在ABC三个机器上有备份,那如果AM申请到的container在这3个机器上的其中一个,那这个task就无须从其它机器上传输要处理的文件段,节省网络传输。这就是Hadoop的本地优化。所以Hadoop的文件备份数量除了和数据安全有关,还对应用运行效率有莫大关系。
YARN的实现本地优化的方式是AM给RM提交的资源申请的时候,会同时发送本地申请,机架申请和任意申请。然后,RM的匹配这些资源申请的时候,会先匹配本地申请,再匹配机架申请,最后才匹配任意申请。关于AM资源申请机制可以参考:ContainerAlloctor分析
而延迟调度机制,就是调度器在匹配本地申请失败的时候,匹配机架申请或者任意申请成功的时候,允许略过这次的资源分配,直到达到延迟调度次数上限。CapacityScheduler和FairScheduler在延迟调度上的实现稍有不同,前者的调度次数是根据规则计算的,后者的调度次数通过配置指定的,但实际的含义是一样的。
6.参数配置
要彻底理解各个参数配置中关系,就不能忽略掉调度器的调度算法和调度细节。
[b]6.1.调度器的集群配置[/b]
以下是和调度器密切相关的集群配置
配置文件
配置项

含义 
mapred-site.xml
yarn.app.mapreduce.am.resource.mb
1024
ApplicationMaster的container占用的内存大小 
mapreduce.map.memory.mb
mapreduce.reduce.memory.mb
512/512
map/reduce阶段申请的container的内存的大小 
mapreduce.map.java.opts
mapreduce.reduce.java.opts 
用户设定的map/reduce阶段申请的container的JVM参数。最大堆设定要比申请的内存少一些,用于JVM的非堆部分使用。 
mapreduce.admin.map.child.java.opts
mapreduce.admin.reduce.child.java.opts 
-Xmx500m
管理员设定的map/reduce阶段申请的container的默认JVM启动参数。启动container的命令行会先连接管理员设定参数,然后再连接用户设定参数。 
yarn-site.xml
yarn.nodemanager.vmem-pmem-ratio
4.0
container可使用的虚拟映射地址是物理内存的多少倍。 
yarn.scheduler.minimum-allocation-mb
512
container最小可申请的内存。在调度器中,很多资源计算部分会转化为这个最小值的N倍进行计算。所以,设定可分配内存等资源的时候,最好是刚好为这个最小值的倍数。 
yarn.scheduler.maximum-allocation-mb
2048
container最多可申请的内存数量 
yarn.resourcemanager.scheduler.class
FairScheduler
/CapacityScheduler
ResourceManager加载的调度器类实例 
yarn-site.private.xml
yarn.nodemanager.resource.memory-mb
4096
每个nodemanager可分配的内存总量

[b]6.2. 容量调度器[/b]
容量调度器(CapacityScheduler),由Yahoo!最初开发,被设计出来使得hadoop应用能够被多用户使用,且最大化整个集群资源的吞吐量。[b]6.2.1. 容量调度器的配置[/b]
在hadoop集群配置中启动容量调度器之后,调度器会从classpath中加载capacity-scheduler.xml文件,完成容量调度器的初始化。总结起来有如下特性:
1) 动态更新配置:容量调度器的配置文件在运行时可以随时重新加载来调整分配参数。除非重启ResourceManager,否则队列只能添加不能删除,但是允许关闭。修改配置文件后,使用以下命令可以刷新配置。
yarn rmadmin -refreshQueues
2) 树形组织队列:容量调度器的队列是按照树形结构组织的。根队列只有一个root队列。子队列分享父队列的资源。每个队列设定一个容量值代表可使用父队列的容量值,容量值会影响队列的排序。父队列的所有子队列的容量相加一定是100,否则加载失败。还有一个最大容量值表示子队列绝对不会超过的使用上限。
3) 队列应用限制:队列可以设定最大提交的应用数量和AM占用资源的百分比。AM占用资源的百分比这个配置是用来计算队列的最大活跃应用数量。这里有个小问题。调度器中最大活跃应用数量=AM占用资源的百分比*队列最大可使用资源量/最小的container分配额度。但是我们在mapred-site.xml中会配置AM的内存额度会比最小container分配额度大,造成最大活跃应用数量虚高(可以理解,如果YARN加入不同的计算框架,AM的分配会不一致,所以这里使用最小container分配额度来计算。但是,如果是这样的话,应该直接计算AM的内存使用量来控制)。
4) 用户参数限制:用户可以提交应用到多个队列。不同队列间用户的应用运行情况,不相互影响。用户在队列中最大可占用资源受两个参数控制,一个是单用户占据队列的百分比上限,一个是单用户内存使用上限。具体参看下面的参数表。
5)资源分配选择:不同队列之间,按照队列的资源使用比排序。同一队列中的应用按照应用id排序,也就是先进先出。
6)延迟调度:当调度次数小于本地延迟调度次数的时候不接受机架调度。本地延迟调度次数,由yarn.scheduler.capacity.node-locality-delay配置,默认是-1,不开启延迟调度。官方文档中没有提及这个参数。而任意调度的延迟调度上限是应用申请的机器的数量,不能配置。
 

Hadoop 新 MapReduce 框架 Yarn 详解

唐半张 发表了文章 0 个评论 1535 次浏览 2015-10-07 10:28 来自相关话题

对于业界的大数据存储及分布式处理系统来说,Hadoop 是耳熟能详的卓越开源分布式文件存储及处理框架,对于 Hadoop 框架的介绍在此不再累述,读者可参考 Hadoop 官方简介。使用和学习过老 Hadoop 框架(0.20.0 及之前版本)的同仁应该很熟悉 ...查看全部
对于业界的大数据存储及分布式处理系统来说,Hadoop 是耳熟能详的卓越开源分布式文件存储及处理框架,对于 Hadoop 框架的介绍在此不再累述,读者可参考 Hadoop 官方简介。使用和学习过老 Hadoop 框架(0.20.0 及之前版本)的同仁应该很熟悉如下的原 MapReduce 框架图:
图 1.Hadoop 原 MapReduce 架构

从上图中可以清楚的看出原 MapReduce 程序的流程及设计思路:
  • 首先用户程序 (JobClient) 提交了一个 job,job 的信息会发送到 Job Tracker 中,Job Tracker 是 Map-reduce 框架的中心,他需要与集群中的机器定时通信 (heartbeat), 需要管理哪些程序应该跑在哪些机器上,需要管理所有 job 失败、重启等操作。
  • TaskTracker 是 Map-reduce 集群中每台机器都有的一个部分,他做的事情主要是监视自己所在机器的资源情况。
  • TaskTracker 同时监视当前机器的 tasks 运行状况。TaskTracker 需要把这些信息通过 heartbeat 发送给 JobTracker,JobTracker 会搜集这些信息以给新提交的 job 分配运行在哪些机器上。上图虚线箭头就是表示消息的发送 - 接收的过程。
[size=0.76em]可以看得出原来的 map-reduce 架构是简单明了的,在最初推出的几年,也得到了众多的成功案例,获得业界广泛的支持和肯定,但随着分布式系统集群的规模和其工作负荷的增长,原框架的问题逐渐浮出水面,主要的问题集中如下:
  • JobTracker 是 Map-reduce 的集中处理点,存在单点故障。
  • JobTracker 完成了太多的任务,造成了过多的资源消耗,当 map-reduce job 非常多的时候,会造成很大的内存开销,潜在来说,也增加了 JobTracker fail 的风险,这也是业界普遍总结出老 Hadoop 的 Map-Reduce 只能支持 4000 节点主机的上限。
  • 在 TaskTracker 端,以 map/reduce task 的数目作为资源的表示过于简单,没有考虑到 cpu/ 内存的占用情况,如果两个大内存消耗的 task 被调度到了一块,很容易出现 OOM。
  • 在 TaskTracker 端,把资源强制划分为 map task slot 和 reduce task slot, 如果当系统中只有 map task 或者只有 reduce task 的时候,会造成资源的浪费,也就是前面提过的集群资源利用的问题。
  • 源代码层面分析的时候,会发现代码非常的难读,常常因为一个 class 做了太多的事情,代码量达 3000 多行,,造成 class 的任务不清晰,增加 bug 修复和版本维护的难度。
  • 从操作的角度来看,现在的 Hadoop MapReduce 框架在有任何重要的或者不重要的变化 ( 例如 bug 修复,性能提升和特性化 ) 时,都会强制进行系统级别的升级更新。更糟的是,它不管用户的喜好,强制让分布式集群系统的每一个用户端同时更新。这些更新会让用户为了验证他们之前的应用程序是不是适用新的 Hadoop 版本而浪费大量时间。

[size=0.76em]新 Hadoop Yarn 框架原理及运作机制
[size=0.76em]从业界使用分布式系统的变化趋势和 hadoop 框架的长远发展来看,MapReduce 的 JobTracker/TaskTracker 机制需要大规模的调整来修复它在可扩展性,内存消耗,线程模型,可靠性和性能上的缺陷。在过去的几年中,hadoop 开发团队做了一些 bug 的修复,但是最近这些修复的成本越来越高,这表明对原框架做出改变的难度越来越大。
[size=0.76em]为从根本上解决旧 MapReduce 框架的性能瓶颈,促进 Hadoop 框架的更长远发展,从 0.23.0 版本开始,Hadoop 的 MapReduce 框架完全重构,发生了根本的变化。新的 Hadoop MapReduce 框架命名为 MapReduceV2 或者叫 Yarn,其架构图如下图所示:

图 2. 新的 Hadoop MapReduce 框架(Yarn)架构

[size=0.76em]重构根本的思想是将 JobTracker 两个主要的功能分离成单独的组件,这两个功能是资源管理和任务调度 / 监控。新的资源管理器全局管理所有应用程序计算资源的分配,每一个应用的 ApplicationMaster 负责相应的调度和协调。一个应用程序无非是一个单独的传统的 MapReduce 任务或者是一个 DAG( 有向无环图 ) 任务。ResourceManager 和每一台机器的节点管理服务器能够管理用户在那台机器上的进程并能对计算进行组织。
[size=0.76em]事实上,每一个应用的 ApplicationMaster 是一个详细的框架库,它结合从 ResourceManager 获得的资源和 NodeManager 协同工作来运行和监控任务。
[size=0.76em]上图中 ResourceManager 支持分层级的应用队列,这些队列享有集群一定比例的资源。从某种意义上讲它就是一个纯粹的调度器,它在执行过程中不对应用进行监控和状态跟踪。同样,它也不能重启因应用失败或者硬件错误而运行失败的任务。
[size=0.76em]ResourceManager 是基于应用程序对资源的需求进行调度的 ; 每一个应用程序需要不同类型的资源因此就需要不同的容器。资源包括:内存,CPU,磁盘,网络等等。可以看出,这同现 Mapreduce 固定类型的资源使用模型有显著区别,它给集群的使用带来负面的影响。资源管理器提供一个调度策略的插件,它负责将集群资源分配给多个队列和应用程序。调度插件可以基于现有的能力调度和公平调度模型。
[size=0.76em]上图中 NodeManager 是每一台机器框架的代理,是执行应用程序的容器,监控应用程序的资源使用情况 (CPU,内存,硬盘,网络 ) 并且向调度器汇报。
[size=0.76em]每一个应用的 ApplicationMaster 的职责有:向调度器索要适当的资源容器,运行任务,跟踪应用程序的状态和监控它们的进程,处理任务的失败原因。

hadoop 2.0 yarn应用程序的执行流程和开发

唐半张 发表了文章 0 个评论 2320 次浏览 2015-10-07 09:46 来自相关话题

在yarn的官方文档上有一篇很经典的Hadoop MapReduce Next Generation – Writing YARN Applications,讲述了如果编写基于hadoop 2.0 yarn的应用程序(中文翻译版)。本文主要讲述yarn程序的执 ...查看全部
yarn的官方文档上有一篇很经典的Hadoop MapReduce Next Generation – Writing YARN Applications,讲述了如果编写基于hadoop 2.0 yarn的应用程序(中文翻译版)。本文主要讲述yarn程序的执行流程和如何进行开发的一点想法。
 
YARN程序的执行流程
Yarn是一个资源管理系统,负责整个集群资源的管理和分配。如果需要在一个yarn集群上运行程序:
  • 首先得有个客户端client来提交job到ResourceManager(RM)申请资源。Client通过RMProtocol协议来与RM通信,将应用程序运行所需的一些信息,比如local file/jars,执行的命令,参数,环境变量等提供给RM来运行应用的第一个container也就是ApplicationMaster(AppMaster)。
  • 如果申请到了资源,RM就在第一个container上启动AppMaster。AppMaster然后通过AMRMProtocol协议与ResourceManager通讯,注册自身,然后继续申请资源。
  • 如果获得了containers,AppMaster会通过ContainerManager类与NodeManager通信,为任务启动container。AppMaster同时也会提供启动container需要的一些信息,比如命令行,环境变量等。任务完成后,AppMaster会通过AMRMProtocol::finishApplicationMaster来通知RM任务完成。同时,client可以通过查询RM来获取job的状态信息,或者如果AppMaster支持也可以直接从AppMaster查询信息。如果需要,client可以通过ClientRMProtocol::forceKillApplication来kill掉application。
整个执行流程可以参考下图(来源网络):三个角色
  • client 即客户端,负责将应用程序提交到RM。
  • AppMaster 即整个应用程序的核心,负责与RM通信,申请资源,启动containers。并监控containers的执行情况,在container执行失败后做failover的处理。
  • container 就是具体干活的,和具体业务相关的一些处理逻辑。
三个RPC协议
  • ClientRMProtocol(Client<–>ResourceManager):客户端与RM通信的协议,可以启动AppMater,查询或者kill AppMaster。
  • AMRMProtocol(ApplicationMaster<–>ResourceManager):AppMaster与RM通信,AppMaster可以向RM注册和注销自己,也可以向RM申请资源以启动container。
  • ContainerManager(ApplicationMaster<–> NodeManager):AppMaster与NM通信,可以启动或者停止一个container,也可以获取container的执行状态。

Distributed shell
编写yarn应用程序的详细步骤可以直接参考源码自带的distributed shell的例子。distributed shell是在每个节点上执行一条shell命令或者一个脚本,对于理解基本的概念还是很有帮助的。
YARN编程框架的开发
可以看到,一个YARN应用程序的编写,大量的工作是在编写客户端和AppMaster。而AppMaster要处理资源申请,启动和监控container,尤其是container的fail over,这才是真正值得关注的地方。对于大量的应用程序来说,AppMaster的工作机制可能相同,这就可以抽象出一个通用的AppMaster框架。框架的使用者只需要关心自己的具体应用逻辑即container就可以了,可以大大减少开发成本。
其实Yarn已经提供了一个可以直接使用的客户端-MRClienService和AppMaster-MRAppMater。MapReduce也只是Yarn上的一种通用的框架,所以完全可以参考MRAppMaster来实现自己的框架。如类似storm的流式计算框架,或者调度RPC Service的框架,或者支持MPI的框架。目前github上已经有类似的项目出现了,相信不久就会出现一些通用的框架。

MapReduce2.0(Yarn)

唐半张 发表了文章 0 个评论 1304 次浏览 2015-10-06 11:26 来自相关话题

MapReduce2.0是在Hadoop0.23开始采用的,叫做MapReduce2.0或者MRv2或者Yarn。 MRv2的主要思想是把jobtracker的任务分为两个基本的功能,一个是资源管理,一个是任务监控,这两个任务分别用不 ...查看全部
MapReduce2.0是在Hadoop0.23开始采用的,叫做MapReduce2.0或者MRv2或者Yarn。

MRv2的主要思想是把jobtracker的任务分为两个基本的功能,一个是资源管理,一个是任务监控,这两个任务分别用不同的进程来运行。这个想法使拥有一个全局的资源管理器(ResourceManager)和每个应用程序的应用程序管理器(ApplicationMaster)。一个应用程序要么使用传统的MapReduce任务来运行,要么以DAG形式的任务来运行。
ResourceManager和每个节点(NodeManager)组成了处理数据的框架,ResourceManager是整个系统资源的最终决策者。实际上,每个应用程序的ApplicationMaster是框架具体的Lib,它的任务是从ResourceManager出获得资源,并在NodeManager上执行和监控任务。



ResourceManager有两个主要的组件:调度器(Schedule)和应用程序管理器(ApplicationManager)。

调度器(Schedule)负责分配资源到各种各样正在运行的应用程序中。调度器不执行监控和跟踪应用程序的状态,在这个意义上说,它是纯粹的调度器。此外,它也不保证重启失败的任务。调度器是基于资源的请求来执行它的调度功能的,它是基于资源容器的抽象概念的,这种资源容器包括内存、cpu、磁盘、网络等。在第一个版本中只支持内存。调度器支持可插入的策略,

ApplicationManager负责接送提交的作业,协商第一个执行该任务的容器,并提供失败作业的重启。

NodeManager是每个节点的框架代理。它负责监控资源的使用情况。并报告给ResourceManager.

每个应用的ApplicationMaster 负责与调度器谈判资源占用的containers数量,追踪状态和监控进程。
 
过程是:

客户端提交一个Application到Yarn Resource Manager,客户端通过ClientRMProtocol和ResourceManager通讯,首先通过getNewApplication请求,获得一个ApplicationId,之后便可以通过submitApplication提交Application。在调用submitApplication时,客户端需要向ResourceManager提供充足的信息,这些信息用于加载第一次运行该程序的container,the ApplicationMaster。程序需要提供的信息包括本地文件、jars包、执行时需要的命令,及Unix环境设计等。之后Yarn ResourceManager在已经分配的container中加载ApplicationMaster。之后ApplicationMaster通过AMRMProtocol和ResourceManager通信。首先,ApplicationMaster需要注册到ResourceManager中,为了完成分配给它的任务,ApplicationMaster之后便通过AMRMProtocol请求和接收containers,一旦一个container被分配给它,ApplicationMaster便和NodeManager通信,调用ContainerManager.startContainer去加载container。在加载container时,ApplicationMaster需要指定ContainerLaunchContext,ContainerLaunchContext和ApplicationSubmissionContext比较相似,它含有加载信息。当任务完成时,ApplicationMaster通过AMRMProtocol.finishApplicationMaster通知ResourceManager。

同时,客户端ResourceManager来监控Application的状态,或者直接通过ApplicationMaster来了解程序的状态。客户端也可以通过ClientRMProtocol.forceKillApplication来终止程序运行。

yarn(hadoop2)框架的一些软件设计模式

唐半张 发表了文章 0 个评论 1955 次浏览 2015-10-06 11:10 来自相关话题

一、概述 我们都知道,yarn版本的hadoop无论是从架构上面还是软件设计的层面上面都比原始的hadoop版本有较大的改进。在架构方面,我们认为yarn模式是新一代的框架,这个在官方等丛多的资料中说明得很详细了。在软件设计方面,我认为主要有以下的 ...查看全部
一、概述
我们都知道,yarn版本的hadoop无论是从架构上面还是软件设计的层面上面都比原始的hadoop版本有较大的改进。在架构方面,我们认为yarn模式是新一代的框架,这个在官方等丛多的资料中说明得很详细了。在软件设计方面,我认为主要有以下的一些大的方面的改进:服务生命周期管理模式、事件驱动模式、状态驱动模式。这几个模式都写在hadoop-yarn-common中,接下来,我将详细说明这些模式。
  • 二、服务生命周期管理模式一个对象肯定有生与死,那在我们设计中如何表示这一点呢?在业务系统中,我们一般是用spring,spring就负责管理对象的生命。在hadoop,我们没有必要引进spring这么厚重的容器。我们可以自行设计一套代码来管理我们服务的生命周期。那需要满足那些条件呢?
    • 一个服务的生命大概有4个状态:NOTINITED、INITED、STARTED、STOPPED。对应一些基本的操作,如:init start stop等。
    • 服务的状态变化会触发一些变化。可以用观察者模式。
    • 有组合服务的概念,因为我需要一个循环同时启动多个服。可以使用Composite模式。
    那yarn的设计方面如下: 从中我们看出service这个设计正好满足我们的三个基本的要求。从图中,我看得很清楚,这个是一个典型的设计方案。一个接口,下面有一个抽象类,再有一个组合类。AbstractService其实实现了register()、unrgister()及状态变化后,调用Listener基本的功能。CompositeService实现了组合服务的需求,如:ResourceManager可以组合几个服务。在yarn中,Listener并没有实现异步。个人感觉主要有两个理由:第一,如图中,NodeManager既是Service又是Listener,如果异步有死锁的风险。第二,因为都是服务,其启动,停止调用次数都相对非常少,状态也不会经常发生变化,没有必要引入异步。这一套机制其实在很多的框架中都有涉及,如jetty中的LifeCycle,其实和这个差不多的。三、事件驱动模式事件驱动模式最核心的部分就是一个异步dispatcher,以此来达到解耦的目的。我们看下yarn中怎么实现的,如下图:这个也是一个典型的设计方案,我在以前的系统中经常这么设计事件的。其实这个也是监听者模式。在消息中间件中,我们往往引入中间的存储层——存储转发。其实这个在路由器中也是这样的。用到最后,其实都差不多,关键在于你能否看破。需要注意的是,AsyncDispatcher也是一个service,这样ResourceManager等组合服务可以add AsyncDispatcher获得AsyncDispatcher事件转发的功能。四、状态驱动模式在设计模式中,有一个状态模式,其实我这里讲的理论就是有穷状态机。状态模式我们可以认为是摩尔型有限状态机 ,我们这里讲的主要是米利型有限状态机, yarn中实现的还是比较复杂的,可以看出他就是非确定型的自动机。在框架中还是比较少看见状态机的,这个可以仔细研究下,我们可以先看下RMNode状态机的状态图(这个图是根据RMNode状态机自动生成的)。
  • 我们看到 任意两个状态之间的变化可以是任意的事件,并且可以是多个事件;同一个事件可以使一个状态迁移到多个不同的状态。我们可以认为这里的状态机是非确定性米利型有限状态机。这些知识在大学的编译原理上面讲过,我也是翻书的。我们看下yarn中的实现,如下图所示: 我认为其中最重要就是构建这个Map>> stateMachineTable对象,这里面存了状态机的元信息。后续调用完全是根据这个Map来运行的。重点讲下这个map的组成,从from到to端,第一个STATE是from端的状态。从一个状态转移可以有多个事件触发,其中的每一个事件可以有一个Transition,每个Transition就是有一个OPERAND操作。一个Transition可以转移到多个to端的状态。可以从类图中看到Transition的两个实现类SingleInternalArc、MultipleInternalArc。MultipleInternalArc还带了一个默认的状态。这个数据构建的时候用了builder模式,实现了FluentInterface: http://www.martinfowler.com/bliki/FluentInterface.html。客户端是直接使用StateMachine的接口调用的,当然这个StateMachine也是由StateMachineFactory构建的(make)。我们看下状态的执行流程:在yarn中,我们看下事件驱动模式与状态驱动模式是怎么结合的,从中可以看出,状态机其实和事件是密不可分的。状态机的Transition也会产生一些Event再输出到AsynDispatcher中。四、总结在yarn中的应用了很多新的设计思想,以上3个只是在整个框架中比较突出的几个。我们在阅读框架时,要时刻牢记,设计软件的第一原则是软件设计的理论及架构模式。

Hadoop 新 MapReduce 框架 Yarn 详解

唐半张 发表了文章 0 个评论 1711 次浏览 2015-09-30 10:27 来自相关话题

Hadoop MapReduceV2(Yarn) 框架简介 原 Hadoop MapReduce 框架的问题 对于业界的大数据存储及分布式处理系统来说,Hadoop 是耳熟能详的卓越开源分布式文件存储及处理框架,对于 Hadoop 框架 ...查看全部
Hadoop MapReduceV2(Yarn) 框架简介
原 Hadoop MapReduce 框架的问题
对于业界的大数据存储及分布式处理系统来说,Hadoop 是耳熟能详的卓越开源分布式文件存储及处理框架,对于 Hadoop 框架的介绍在此不再累述,读者可参考 Hadoop 官方简介。使用和学习过老 Hadoop 框架(0.20.0 及之前版本)的同仁应该很熟悉如下的原 MapReduce 框架图:
                                    图 1.Hadoop 原 MapReduce 架构
 
从上图中可以清楚的看出原 MapReduce 程序的流程及设计思路:
  • 首先用户程序 (JobClient) 提交了一个 job,job 的信息会发送到 Job Tracker 中,Job Tracker 是 Map-reduce 框架的中心,他需要与集群中的机器定时通信 (heartbeat), 需要管理哪些程序应该跑在哪些机器上,需要管理所有 job 失败、重启等操作。
  • TaskTracker 是 Map-reduce 集群中每台机器都有的一个部分,他做的事情主要是监视自己所在机器的资源情况。
  • TaskTracker 同时监视当前机器的 tasks 运行状况。TaskTracker 需要把这些信息通过 heartbeat 发送给 JobTracker,JobTracker 会搜集这些信息以给新提交的 job 分配运行在哪些机器上。上图虚线箭头就是表示消息的发送 - 接收的过程。
可以看得出原来的 map-reduce 架构是简单明了的,在最初推出的几年,也得到了众多的成功案例,获得业界广泛的支持和肯定,但随着分布式系统集群的规模和其工作负荷的增长,原框架的问题逐渐浮出水面,主要的问题集中如下:
  • JobTracker 是 Map-reduce 的集中处理点,存在单点故障。
  • JobTracker 完成了太多的任务,造成了过多的资源消耗,当 map-reduce job 非常多的时候,会造成很大的内存开销,潜在来说,也增加了 JobTracker fail 的风险,这也是业界普遍总结出老 Hadoop 的 Map-Reduce 只能支持 4000 节点主机的上限。
  • 在 TaskTracker 端,以 map/reduce task 的数目作为资源的表示过于简单,没有考虑到 cpu/ 内存的占用情况,如果两个大内存消耗的 task 被调度到了一块,很容易出现 OOM。
  • 在 TaskTracker 端,把资源强制划分为 map task slot 和 reduce task slot, 如果当系统中只有 map task 或者只有 reduce task 的时候,会造成资源的浪费,也就是前面提过的集群资源利用的问题。
  • 源代码层面分析的时候,会发现代码非常的难读,常常因为一个 class 做了太多的事情,代码量达 3000 多行,,造成 class 的任务不清晰,增加 bug 修复和版本维护的难度。
  • 从操作的角度来看,现在的 Hadoop MapReduce 框架在有任何重要的或者不重要的变化 ( 例如 bug 修复,性能提升和特性化 ) 时,都会强制进行系统级别的升级更新。更糟的是,它不管用户的喜好,强制让分布式集群系统的每一个用户端同时更新。这些更新会让用户为了验证他们之前的应用程序是不是适用新的 Hadoop 版本而浪费大量时间。

新 Hadoop Yarn 框架原理及运作机制从业界使用分布式系统的变化趋势和 hadoop 框架的长远发展来看,MapReduce 的 JobTracker/TaskTracker 机制需要大规模的调整来修复它在可扩展性,内存消耗,线程模型,可靠性和性能上的缺陷。在过去的几年中,hadoop 开发团队做了一些 bug 的修复,但是最近这些修复的成本越来越高,这表明对原框架做出改变的难度越来越大。
为从根本上解决旧 MapReduce 框架的性能瓶颈,促进 Hadoop 框架的更长远发展,从 0.23.0 版本开始,Hadoop 的 MapReduce 框架完全重构,发生了根本的变化。新的 Hadoop MapReduce 框架命名为 MapReduceV2 或者叫 Yarn,其架构图如下图所示:

图 2. 新的 Hadoop MapReduce 框架(Yarn)架构

重构根本的思想是将 JobTracker 两个主要的功能分离成单独的组件,这两个功能是资源管理和任务调度 / 监控。新的资源管理器全局管理所有应用程序计算资源的分配,每一个应用的 ApplicationMaster 负责相应的调度和协调。一个应用程序无非是一个单独的传统的 MapReduce 任务或者是一个 DAG( 有向无环图 ) 任务。ResourceManager 和每一台机器的节点管理服务器能够管理用户在那台机器上的进程并能对计算进行组织。
事实上,每一个应用的 ApplicationMaster 是一个详细的框架库,它结合从 ResourceManager 获得的资源和 NodeManager 协同工作来运行和监控任务。
上图中 ResourceManager 支持分层级的应用队列,这些队列享有集群一定比例的资源。从某种意义上讲它就是一个纯粹的调度器,它在执行过程中不对应用进行监控和状态跟踪。同样,它也不能重启因应用失败或者硬件错误而运行失败的任务。
ResourceManager 是基于应用程序对资源的需求进行调度的 ; 每一个应用程序需要不同类型的资源因此就需要不同的容器。资源包括:内存,CPU,磁盘,网络等等。可以看出,这同现 Mapreduce 固定类型的资源使用模型有显著区别,它给集群的使用带来负面的影响。资源管理器提供一个调度策略的插件,它负责将集群资源分配给多个队列和应用程序。调度插件可以基于现有的能力调度和公平调度模型。
上图中 NodeManager 是每一台机器框架的代理,是执行应用程序的容器,监控应用程序的资源使用情况 (CPU,内存,硬盘,网络 ) 并且向调度器汇报。
每一个应用的 ApplicationMaster 的职责有:向调度器索要适当的资源容器,运行任务,跟踪应用程序的状态和监控它们的进程,处理任务的失败原因。