Spark

Spark

beeline 连接spark-thriftServer,insert语句报错

回复

啊啊啊吧 发起了问题 1 人关注 0 个回复 1148 次浏览 2020-07-20 15:10 来自相关话题

几千万个文本(1TB左右)需要根据规则计算做结构化,用什么大数据架构比较合适?

天明ss7 回复了问题 2 人关注 1 个回复 1312 次浏览 2019-09-06 20:23 来自相关话题

spark yarn-cluster集群模式提交任务,计算结果保存到哪了?

Patrick_SZ 回复了问题 5 人关注 5 个回复 9278 次浏览 2019-01-30 13:35 来自相关话题

spark streaming 每个task耗时差距较大的问题

回复

daofeng 发起了问题 1 人关注 0 个回复 1533 次浏览 2019-01-11 18:21 来自相关话题

spark-submit 提交失败,Stack trace: ExitCodeException exitCode=10:

gccyd 回复了问题 2 人关注 2 个回复 6111 次浏览 2018-12-23 15:57 来自相关话题

spark 报错:Protocol message end-group tag did not match expected tag

回复

baicha 回复了问题 1 人关注 1 个回复 4194 次浏览 2018-12-03 15:48 来自相关话题

优雅的停止SparkStreaming 作业

西西里ecf 回复了问题 3 人关注 2 个回复 1498 次浏览 2018-10-25 16:36 来自相关话题

请问SparkRDD里如何实现在上下两行或多行间比较的功能?

回复

ArgonPrime 发起了问题 1 人关注 0 个回复 2348 次浏览 2018-10-24 15:28 来自相关话题

请教一个参数 serialization.format

回复

那小子真帅 发起了问题 1 人关注 0 个回复 5220 次浏览 2018-09-11 15:59 来自相关话题

用hive 随机抽取n行记录 怎么做?

qiweicei 回复了问题 2 人关注 1 个回复 3225 次浏览 2018-08-20 11:24 来自相关话题

Spark导入Hbase包不识别

fish 回复了问题 2 人关注 1 个回复 1170 次浏览 2018-06-27 16:35 来自相关话题

编译spark报错

fish 回复了问题 2 人关注 1 个回复 1321 次浏览 2018-06-19 13:45 来自相关话题

Spark写parquet文件时,经过shuffle和不shuffle数据量 不同,shuffle后parquet文件压缩比降低

回复

shining0123 发起了问题 1 人关注 0 个回复 2526 次浏览 2018-05-30 09:59 来自相关话题

spark多个程序依赖同一个第三方jar包 这个在生产环境怎么管理比较好

fish 回复了问题 2 人关注 1 个回复 1589 次浏览 2018-04-09 15:44 来自相关话题

Spark读取CSV文件无法读取文件内容

fish 回复了问题 2 人关注 1 个回复 2072 次浏览 2018-04-02 13:56 来自相关话题

flume自带的avro,里面有个压缩的配置, 是对单条数据的压缩还是对一个beachSize的压缩呢?

回复

史晓江 发起了问题 1 人关注 0 个回复 1796 次浏览 2018-03-20 12:17 来自相关话题

将python写的job提交到spark(yarn cluster model),总是ACCEPT状态,请问是不是yarn配置有问题造成的?

回复

史晓江 发起了问题 1 人关注 0 个回复 2069 次浏览 2018-03-20 12:16 来自相关话题

dateframe.saveAsParquetFile("") 报错

回复

史晓江 发起了问题 1 人关注 0 个回复 1482 次浏览 2018-03-20 12:16 来自相关话题

条新动态, 点击查看
spark 1.3.0 1.4.1    上面练习的代码是使用 程超老师录播视频中的示例,并没有按照最新直播的示例。spark 最近的版本此api发生了变化。 [b]不是registerAsTable,而是registerTempTable[/b]   正确代... 显示全部 »
spark 1.3.0 1.4.1    上面练习的代码是使用 程超老师录播视频中的示例,并没有按照最新直播的示例。spark 最近的版本此api发生了变化。 [b]不是registerAsTable,而是registerTempTable[/b]   正确代码如下:   val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.implicits._ case class Person(name: String, age: Int) val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF() people.registerTempTable("people") sqlContext.sql("select * from  people").show      
tsingfu

tsingfu 回答了问题 • 2016-10-11 16:11 • 54 个回复 不感兴趣

《第一课Spark概述》提问帖

学习使用开源框架,阅读源码是重要的学习方法,如何有效的阅读源码,可以分享一下心得不?
学习使用开源框架,阅读源码是重要的学习方法,如何有效的阅读源码,可以分享一下心得不?
@CrazyChao

@CrazyChao 回答了问题 • 2016-10-14 15:39 • 35 个回复 不感兴趣

第二课Spark程序设计与实战

之前董老师讲课的时候讲到过中搜的日志统计与分析系统,它的可视化是怎么实现的呢? 现在做可视化用什么工具比较好?
之前董老师讲课的时候讲到过中搜的日志统计与分析系统,它的可视化是怎么实现的呢? 现在做可视化用什么工具比较好?
[code=Java]16/10/18 04:30:59 ERROR SparkContext: Error initializing SparkContext. org.apache.spark.SparkException: A master URL mu... 显示全部 »
[code=Java]16/10/18 04:30:59 ERROR SparkContext: Error initializing SparkContext. org.apache.spark.SparkException: A master URL must be set in your configuration at org.apache.spark.SparkContext.(SparkContext.scala:401) at cn.chinahadoop.SparkPi$.main(SparkPi.scala:58) at cn.chinahadoop.SparkPi.main(SparkPi.scala)[/code]这是IDEA运行一个Spark自带的例子SparkPi,已经在Program arguments中配置了“local”,但是出现如上报错,在代码中设置 setMaster("spark://192.168.141.128:7077") 出现报错:[code=Java] WARN AppClient$ClientEndpoint: Failed to connect to master 192.168.141.128:7077 java.io.IOException: Failed to connect to /192.168.141.128:7077[/code]这些是什么原因导致的呢?另外我已经在spark-defaults.conf配置了spark.master!  
老师,能不能录个小视频讲解一下maven配置,能让例子能mvn package运行起来?
老师,能不能录个小视频讲解一下maven配置,能让例子能mvn package运行起来?
run_psw

run_psw 回答了问题 • 2017-04-06 16:21 • 45 个回复 不感兴趣

第一课_Spark概述

董老师,您好!我想问2个问题   1、如何控制spark代码运行时的权限。例如,可以读写那些hive或hbase的表。或者那些hdfs目录 2、目前我能想到的访问和操作hadoop eco cluster 的方式有三种1、rest, 2、CLI,3、代码。请有... 显示全部 »
董老师,您好!我想问2个问题   1、如何控制spark代码运行时的权限。例如,可以读写那些hive或hbase的表。或者那些hdfs目录 2、目前我能想到的访问和操作hadoop eco cluster 的方式有三种1、rest, 2、CLI,3、代码。请有其他方式吗?   我们目前采用的是HDP2.5,通过ambari2.4部署的。希望控制住三条路线上的权限,分别是:restful、CLI、代码 的读写hdfs、Hive、Hbase等几个组件的权限。 目前看到的一套安全解决方案是通过knox + ranger + kerberos来实现的。整套框架如图所示。我的需求是构建一个安全的大数据据平台。希望做到安全的数据管理,其实就是做到各种方式的数据操作都受控就行了。   附件里面的那套方案能否实现我们的目标,无论通过那种方式访问集群的任何服务,必须要有认证授权才可以。否则连hdfs 50070 和 yarn 8088这样的界面都打不开  

Spark summit SAN FRANCISCO 文档下载

相约地平线 回复了问题 3 人关注 1 个回复 2673 次浏览 2016-07-29 18:02 来自相关话题

梁堰波《Spark MLlib在金融行业的应用》演讲PPT

DataScientist 发表了文章 7 个评论 6270 次浏览 2015-10-28 14:50 来自相关话题

题目:Spark MLlib在金融行业的应用 内容简介:Spark MLlib最新的一些进展,包括一些新的算法(神经网络,生存分析,WLS优化算法等)、使用ML构建机器学习pipeline以及如何调优等。最后会分享一些使用Spark ...查看全部
题目:Spark MLlib在金融行业的应用

内容简介:Spark MLlib最新的一些进展,包括一些新的算法(神经网络,生存分析,WLS优化算法等)、使用ML构建机器学习pipeline以及如何调优等。最后会分享一些使用Spark MLlib进行机器学习的应用案例。

梁堰波.JPG

 
演讲人:梁堰波,明略数据
演讲人介绍:现就职于明略数据,开源爱好者,Apache Hadoop & Spark contributor。北京航空航天大学计算机硕士,曾就职于Yahoo!、美团网、法国电信,具备丰富的大数据、数据挖掘和机器学习领域的项目经验。
 
现场没有来得及回答的问题,我们将邀请主讲人继续在这里互动回答。

演讲PPT在此:
 

Hadoop与Spark计算模型的比较分析

回复

zp0824 发起了问题 1 人关注 0 个回复 3364 次浏览 2015-09-20 10:37 来自相关话题

Spark1.0新特性-->Spark SQL

cenyuhai 发表了文章 1 个评论 2736 次浏览 2015-09-11 15:17 来自相关话题

Spark1.0出来了,变化还是挺大的,文档比以前齐全了,RDD支持的操作比以前多了一些,Spark on yarn功能我居然跑通了。但是最最重要的就是多了一个Spark SQL的功能,它能对RDD进行Sql操作,目前它只是一个alpha版本,喜欢尝鲜的同志们 ...查看全部
Spark1.0出来了,变化还是挺大的,文档比以前齐全了,RDD支持的操作比以前多了一些,Spark on yarn功能我居然跑通了。但是最最重要的就是多了一个Spark SQL的功能,它能对RDD进行Sql操作,目前它只是一个alpha版本,喜欢尝鲜的同志们进来看看吧,下面是它的官网的翻译。
 
Spark SQL是支持在Spark中使用Sql、HiveSql、Scaca中的关系型查询表达式。它的核心组件是一个新增的RDD类型SchemaRDD,它把行对象用一个Schema来描述行里面的所有列的数据类型,它就像是关系型数据库里面的一张表。它可以从原有的RDD创建,也可以是Parquet文件,最重要的是它可以支持用HiveQL从hive里面读取数据。
下面是一些案例,可以在Spark shell当中运行。
首先我们要创建一个熟悉的Context,熟悉spark的人都知道吧,有了Context我们才可以进行各种操作。
val sc: SparkContext // 已经存在的SparkContext
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

import sqlContext._
Running SQL on RDDs
Spark SQL支持的一种表的类型是Scala的case class,case class定义了表的类型,下面是例子:
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext._

// case class在Scala 2.10里面最多支持22个列,,为了突破这个现实,最好是定义一个类实现Product接口
case class Person(name: String, age: Int)

// 为Person的对象创建一个RDD,然后注册成一张表
val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt))
people.registerAsTable("people")

// 直接写sql吧,这个方法是sqlContext提供的
val teenagers = sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

// teenagers是SchemaRDDs类型,它支持所有普通的RDD操作
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)


从上面这个方法来看,不是很好用,一个表好几十个字段,我就得一个一个的去赋值,它现在支持的操作都是很简单的操作,想要实现复杂的操作可以具体去看HiveContext提供的HiveQL。Using Parquet
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext._

val people: RDD[Person] = ... // 同上面的例子.

// 这个RDD已经隐式转换成一个SchemaRDD, 允许它存储成Parquet格式.
people.saveAsParquetFile("people.parquet")

// 从上面创建的文件里面读取,加载一个Parquet文件的结果也是一种JavaSchemaRDD.
val parquetFile = sqlContext.parquetFile("people.parquet")

//注册成表,然后使用
parquetFile.registerAsTable("parquetFile")
val teenagers = sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
teenagers.collect().foreach(println)


 Writing Language-Integrated Relational Queries
目前这个功能只是在Scala里面支持,挺鸡肋的一个功能
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext._
val people: RDD[Person] = ... // 同前面的例子.

// 和后面这个语句是一样的 'SELECT name FROM people WHERE age >= 10 AND age <= 19'
val teenagers = people.where('age >= 10).where('age <= 19).select('name)
 Hive Support
这下面的才是高潮,它可以从hive里面取数据。但是hive的依赖太多了,默认Spark assembly是没带这些依赖的,需要我们运行SPARK_HIVE=true sbt/sbt assembly/assembly重新编译,或者用maven的时候添加-Phive参数,它会重新编译出来一个hive assembly的jar包,然后需要把这个jar包放到所有的节点上。另外还需要把
hive-site.xml放到conf目录下。没进行hive部署的话,下面的例子也可以用LocalHiveContext来代替HiveContext。

val sc: SparkContext // 已经存在的SparkContext
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)

// 引入这个Context,然后就会给所有的sql语句进行隐式转换
import hiveContext._

hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
hql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

// 使用HiveQL查询
hql("FROM src SELECT key, value").collect().foreach(println)


这个功能看起来还挺像样,前面两个看起来就像渣一样,没劲儿,不知道为什么不自带那些依赖,还要我们再编译一下,但是我下的那个版本运行的时候提示我已经编译包括了hive的。尼玛,真恶心。
 

Spark部署

cenyuhai 发表了文章 0 个评论 2524 次浏览 2015-09-11 14:47 来自相关话题

Spark的部署让人有点儿困惑,有些需要注意的事项,本来我已经装成功了YARN模式的,但是发现了一些问题,出现错误看日志信息,完全看不懂那个错误信息,所以才打算翻译Standalone的部署的文章。第一部分,我先说一下YARN模式的部署方法。第二部分才是Sta ...查看全部
Spark的部署让人有点儿困惑,有些需要注意的事项,本来我已经装成功了YARN模式的,但是发现了一些问题,出现错误看日志信息,完全看不懂那个错误信息,所以才打算翻译Standalone的部署的文章。第一部分,我先说一下YARN模式的部署方法。第二部分才是Standalone的方式。
  我们首先看一下Spark的结构图,和hadoop的差不多。
  
 1、YARN模式
  采用yarn模式的话,其实就是把spark作为一个客户端提交作业给YARN,实际运行程序的是YARN,就不需要部署多个节点,部署一个节点就可以了。
  把从官网下载的压缩包在linux下解压之后,进入它的根目录,没有安装git的,先执行yum install git安装git
  1)运行这个命令:
SPARK_HADOOP_VERSION=2.2.0 SPARK_YARN=true ./sbt/sbt assembly

  就等着吧,它会下载很多jar包啥的,这个过程可能会卡死,卡死的就退出之后,重新执行上面的命令。
  2)编辑conf目录下的spark-env.sh(原来的是.template结尾的,删掉.template),添加上HADOOP_CONF_DIR参数
   HADOOP_CONF_DIR=/etc/hadoop/conf

  3)运行一下demo看看,能出结果
Pi is roughly 3.13794

   SPARK_JAR=./assembly/target/scala-2.9.3/spark-assembly_2.9.3-0.8.1-incubating-hadoop2.2.0.jar \
./spark-class org.apache.spark.deploy.yarn.Client \
--jar examples/target/scala-2.9.3/spark-examples-assembly-0.8.1-incubating.jar \
--class org.apache.spark.examples.SparkPi \
--args yarn-standalone \
--num-workers 3 \
--master-memory 1g \
--worker-memory 1g \
--worker-cores 1

2、Standalone模式
  下面我们就讲一下怎么部署Standalone,参考页面是http://spark.incubator.apache.org/docs/latest/spark-standalone.html。
  这里我们要一个干净的环境,刚解压出来的,运行之前的命令的时候不能再用了,会报错的。
  1)打开make-distribution.sh,修改SPARK_HADOOP_VERSION=2.2.0,然后执行./make-distribution.sh, 然后会生成一个dist目录,这个目录就是我们要部署的内容。官方推荐是先把master跑起来,再部署别的节点,大家看看bin目录下面的脚本,和hadoop的差不多的,按照官方文档的推荐的安装方式有点儿麻烦。下面我们先说简单的方法,再说官方的方式。
  我们打开dist目录下conf目录的,如果没有slaves文件,添加一个,按照hadoop的那种配置方式,把slave的主机名写进去,然后把dist目录部署到各台机器上,回到master上面,进入第三题、目录的sbin目录下,有个start-all.sh,执行它就可以了。
  下面是官方文档推荐的方式,先启动master,执行。
./bin/start-master.sh

  2)部署dist的目录到各个节点,然后通过这个命令来连接master节点
./spark-class org.apache.spark.deploy.worker.Worker spark://IP:PORT

  3)然后在主节点查看一下http://localhost:8080 ,查看一下子节点是否在这里,如果在,就说明连接成功了。
  4) 部署成功之后,想要在上面部署程序的话,在执行./spark-shell的时候,要加上MASTER这个参数。
MASTER=spark://IP:PORT ./spark-shell
3、High Availability
  Spark采用Standalone模式的话,Spark本身是一个master/slaves的模式,这样就会存在单点问题,Spark采用的是zookeeper作为它的active-standby切换的工具,设置也很简单。一个完整的切换需要1-2分钟的时间,这个时候新提交的作业会受到影响,之前提交到作业不会受到影响。
  在spark-env.sh添加以下设置:
//设置下面三项JVM参数,具体的设置方式在下面
//spark.deploy.recoveryMode=ZOOKEEPER
//spark.deploy.zookeeper.url=192.168.1.100:2181,192.168.1.101:2181
// /spark是默认的,可以不写
//spark.deploy.zookeeper.dir=/spark

export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=hadoop.Master:2181,hadoop.SlaveT1:2181,hadoop.SlaveT2:2181"


  这里就有一个问题了,集群里面有多个master,我们连接的时候,连接谁?用过hbase的都知道是先连接的zookeeper,但是Spark采用的是另外的一种方式,如果我们有多个master的话,实例化SparkContext的话,使用spark://host1:port1,host2:port2这样的地址,这样它会同时注册两个,一个失效了,还有另外一个。
  如果不愿意配置高可用的话,只是想失败的时候,再恢复一下,重新启动的话,那就使用FILESYSTEM的使用,指定一个目录,把当前的各个节点的状态写入到文件系统。
spark.deploy.recoveryMode=FILESYSTEM
spark.deploy.recoveryDirectory=/usr/lib/spark/dataDir

  当 stop-master.sh来杀掉master之后,状态没有及时更新,再次启动的时候,会增加一分钟的启动时间来等待原来的连接超时。
  recoveryDirectory最好是能够使用一个nfs,这样一个master失败之后,就可以启动另外一个master了。
  
 

beeline 连接spark-thriftServer,insert语句报错

回复

啊啊啊吧 发起了问题 1 人关注 0 个回复 1148 次浏览 2020-07-20 15:10 来自相关话题

几千万个文本(1TB左右)需要根据规则计算做结构化,用什么大数据架构比较合适?

回复

天明ss7 回复了问题 2 人关注 1 个回复 1312 次浏览 2019-09-06 20:23 来自相关话题

spark yarn-cluster集群模式提交任务,计算结果保存到哪了?

回复

Patrick_SZ 回复了问题 5 人关注 5 个回复 9278 次浏览 2019-01-30 13:35 来自相关话题

spark streaming 每个task耗时差距较大的问题

回复

daofeng 发起了问题 1 人关注 0 个回复 1533 次浏览 2019-01-11 18:21 来自相关话题

spark-submit 提交失败,Stack trace: ExitCodeException exitCode=10:

回复

gccyd 回复了问题 2 人关注 2 个回复 6111 次浏览 2018-12-23 15:57 来自相关话题

spark 报错:Protocol message end-group tag did not match expected tag

回复

baicha 回复了问题 1 人关注 1 个回复 4194 次浏览 2018-12-03 15:48 来自相关话题

优雅的停止SparkStreaming 作业

回复

西西里ecf 回复了问题 3 人关注 2 个回复 1498 次浏览 2018-10-25 16:36 来自相关话题

请问SparkRDD里如何实现在上下两行或多行间比较的功能?

回复

ArgonPrime 发起了问题 1 人关注 0 个回复 2348 次浏览 2018-10-24 15:28 来自相关话题

请教一个参数 serialization.format

回复

那小子真帅 发起了问题 1 人关注 0 个回复 5220 次浏览 2018-09-11 15:59 来自相关话题

用hive 随机抽取n行记录 怎么做?

回复

qiweicei 回复了问题 2 人关注 1 个回复 3225 次浏览 2018-08-20 11:24 来自相关话题

Spark导入Hbase包不识别

回复

fish 回复了问题 2 人关注 1 个回复 1170 次浏览 2018-06-27 16:35 来自相关话题

编译spark报错

回复

fish 回复了问题 2 人关注 1 个回复 1321 次浏览 2018-06-19 13:45 来自相关话题

Spark写parquet文件时,经过shuffle和不shuffle数据量 不同,shuffle后parquet文件压缩比降低

回复

shining0123 发起了问题 1 人关注 0 个回复 2526 次浏览 2018-05-30 09:59 来自相关话题

spark多个程序依赖同一个第三方jar包 这个在生产环境怎么管理比较好

回复

fish 回复了问题 2 人关注 1 个回复 1589 次浏览 2018-04-09 15:44 来自相关话题

Spark读取CSV文件无法读取文件内容

回复

fish 回复了问题 2 人关注 1 个回复 2072 次浏览 2018-04-02 13:56 来自相关话题

flume自带的avro,里面有个压缩的配置, 是对单条数据的压缩还是对一个beachSize的压缩呢?

回复

史晓江 发起了问题 1 人关注 0 个回复 1796 次浏览 2018-03-20 12:17 来自相关话题

将python写的job提交到spark(yarn cluster model),总是ACCEPT状态,请问是不是yarn配置有问题造成的?

回复

史晓江 发起了问题 1 人关注 0 个回复 2069 次浏览 2018-03-20 12:16 来自相关话题

dateframe.saveAsParquetFile("") 报错

回复

史晓江 发起了问题 1 人关注 0 个回复 1482 次浏览 2018-03-20 12:16 来自相关话题

梁堰波《Spark MLlib在金融行业的应用》演讲PPT

DataScientist 发表了文章 7 个评论 6270 次浏览 2015-10-28 14:50 来自相关话题

题目:Spark MLlib在金融行业的应用 内容简介:Spark MLlib最新的一些进展,包括一些新的算法(神经网络,生存分析,WLS优化算法等)、使用ML构建机器学习pipeline以及如何调优等。最后会分享一些使用Spark ...查看全部
题目:Spark MLlib在金融行业的应用

内容简介:Spark MLlib最新的一些进展,包括一些新的算法(神经网络,生存分析,WLS优化算法等)、使用ML构建机器学习pipeline以及如何调优等。最后会分享一些使用Spark MLlib进行机器学习的应用案例。

梁堰波.JPG

 
演讲人:梁堰波,明略数据
演讲人介绍:现就职于明略数据,开源爱好者,Apache Hadoop & Spark contributor。北京航空航天大学计算机硕士,曾就职于Yahoo!、美团网、法国电信,具备丰富的大数据、数据挖掘和机器学习领域的项目经验。
 
现场没有来得及回答的问题,我们将邀请主讲人继续在这里互动回答。

演讲PPT在此:
 

Spark与Hadoop计算模型的比较分析

唐半张 发表了文章 0 个评论 1782 次浏览 2015-10-10 09:36 来自相关话题

Spark与Hadoop计算模型的比较分析 Spark是一个通用的并行计算框架,由UCBerkeley的AMP实验室开发。那么Spark和Hadoop有什么不同呢?1.Spark的中间数据放到内存中,对于迭代运算效率比较高。 Spark ...查看全部
Spark与Hadoop计算模型的比较分析
Spark是一个通用的并行计算框架,由UCBerkeley的AMP实验室开发。那么Spark和Hadoop有什么不同呢?1.Spark的中间数据放到内存中,对于迭代运算效率比较高。
Spark aims to extend MapReduce for iterative algorithms, and interactive low latency data mining. One major difference between MapReduce and Sparkis that MapReduce is acyclic. That is, data flows in from a stable source, isprocessed, and flows out to a stable filesystem. Spark allows iterative computation on the same data, which would form a cycle if jobs were visualized.   (旨在延长MapReduce的迭代算法,和互动低延迟数据挖掘的。 MapReduce和Sparkis的一个主要区别,MapReduce是非周期性。也就是说,数据流从一个稳定的来源,加工,流出到一个稳定的文件系统。“Spark允许相同的数据,这将形成一个周期,如果工作是可视化的迭代计算。)
Spark更适合于迭代运算比较多的ML和DM运算。因为在Spark里面,有RDD的概念。
Resilient Distributed Dataset (RDD) serves as an abstraction to rawdata, and some data is kept in memory and cached for later use. This last pointis very important; Spark allows data to be committed in RAM for an approximate20x speedup over MapReduce based on disks. RDDs are immutable and created through parallel transformations such as map, filter, groupBy and reduce.   (弹性分布式数据集(RDD)作为原始数据的抽象,和一些数据保存在内存中缓存供以后使用。最后这点很重要;星火允许在RAM致力于为近似20X基于加速了MapReduce的磁盘上的数据。RDDs是不可改变的,并通过并行转换,如地图,过滤器,GroupBy和减少创建的。)
RDD可以cache到内存中,那么每次对RDD数据集的操作之后的结果,都可以存放到内存中,下一个操作可以直接从内存中输入,省去了MapReduce大量的磁盘IO操作。这对于迭代运算比较常见的机器学习算法来说,效率提升比较大。但是由于Spark目前只是在UC Berkeley的一个研究项目,目前看到的最大规模也就200台机器,没有像Hadoop那样的部署规模,所以,在大规模使用的时候还是要慎重考虑的。
2.Spark比Hadoop更通用。
Spark提供的数据集操作类型有很多种,不像Hadoop只提供了Map和Reduce两种操作。比如map, filter, flatMap,sample, groupByKey, reduceByKey, union, join, cogroup, mapValues, sort,partionBy等多种操作类型,他们把这些操作称为Transformations。同时还提供Count, collect, reduce, lookup, save等多种actions。
这些多种多样的数据集操作类型,给上层应用者提供了方便。各个处理节点之间的通信模型不再像Hadoop那样就是唯一的Data Shuffle一种模式。用户可以命名,物化,控制中间结果的分区等。可以说编程模型比Hadoop更灵活。
不过论文中也提到,Spark不适用那种异步细粒度更新状态的应用,例如web服务的存储或者是增量的web爬虫和索引。就是对于那种增量修改的应用模型,当然不适合把大量数据拿到内存中了。增量改动完了,也就不用了,不需要迭代了。
3.容错性。
从Spark的论文《Resilient Distributed Datasets: AFault-Tolerant Abstraction for In-Memory Cluster Computing》中没看出容错性做的有多好。倒是提到了分布式数据集计算,做checkpoint的两种方式,一个是checkpoint data,一个是logging the updates。貌似Spark采用了后者。但是文中后来又提到,虽然后者看似节省存储空间。但是由于数据处理模型是类似DAG的操作过程,由于图中的某个节点出错,由于lineage chains的依赖复杂性,可能会引起全部计算节点的重新计算,这样成本也不低。他们后来说,是存数据,还是存更新日志,做checkpoint还是由用户说了算吧。相当于什么都没说,又把这个皮球踢给了用户。所以我看就是由用户根据业务类型,衡量是存储数据IO和磁盘空间的代价和重新计算的代价,选择代价较小的一种策略。
4.关于Spark和Hadoop的融合
不知道Apache基金会的人怎么想的,我看Spark还是应该融入到Hadoop生态系统中。从Hadoop 0.23把MapReduce做成了库,看出Hadoop的目标是要支持包括MapReduce在内的更多的并行计算模型,比如MPI,Spark等。毕竟现在Hadoop的单节点CPU利用率并不高,那么假如这种迭代密集型运算是和现有平台的互补。同时,这对资源调度系统就提出了更高的要求。有关资源调度方面,UC Berkeley貌似也在做一个Mesos的东西,还用了Linux container,统一调度Hadoop和其他应用模型。

基于大数据分析系统Hadoop的13个开源工具

唐半张 发表了文章 0 个评论 2267 次浏览 2015-10-09 09:24 来自相关话题

Hadoop是由Apache基金会开发的一个大数据分布式系统基础架构,最早版本是2003年原Yahoo!DougCutting根据Google发布的学术论文研究而来。用户可以在不了解分布式底层细节的情况下,轻松地在Hadoop上开发和运行处理海量数据的应用程序 ...查看全部
Hadoop是由Apache基金会开发的一个大数据分布式系统基础架构,最早版本是2003年原Yahoo!DougCutting根据Google发布的学术论文研究而来。用户可以在不了解分布式底层细节的情况下,轻松地在Hadoop上开发和运行处理海量数据的应用程序。低成本、高可 靠、高扩展、高有效、高容错等特性让Hadoop成为最流行的大数据分析系统,然而其赖以生存的HDFS和MapReduce组件却让其一度陷入困境——批处理的工作方式让其只适用于离线数据处理,在要求实时性的场景下毫无用武之地。因此,各种基于Hadoop的工具应运而生,本次为大家分享Hadoop生态系统中最常用的13个开源工具,其中包括资源调度、流计算及各种业务针对应用场景。首先,我们看资源管理相关。
资源统一管理/调度系统

在公司和机构中,服务器往往会因为业务逻辑被拆分为多个集群,基于数据密集型的处理框架也是不断涌现,比如支持离线处理的MapReduce、支持在线处理的Storm及Impala、支持迭代计算的Spark及流处理框架S4,它们诞生于不同的实验室,并各有所长。为了减 少管理成本,提升资源的利用率,一个共同的想法产生——让这些框架运行在同一个集群上;因此,就有了当下众多的资源统一管理/调度系统,比如Google的Borg、Apache的YARN、Twitter的Mesos(已贡献给Apache基金会)、腾讯搜搜的Torca、FacebookCorona(开源),本次为大家重点介绍ApacheMesos及YARN:

1.ApacheMesos
    代码托管地址:ApacheSVN
    Mesos提供了高效、跨分布式应用程序和框架的资源隔离和共享,支持Hadoop、MPI、Hypertable、Spark等。
    Mesos是Apache孵化器中的一个开源项目,使用ZooKeeper实现容错复制,使用LinuxContainers来隔离任务, 支持多种资源计划分配(内存和CPU)。提供Java、Python和C++APIs来开发新的并行应用程序,提供基于Web的用户界面来提查看集群状 态。

2.HadoopYARN
    代码托管地址:ApacheSVN
    YARN又被称为MapReduce2.0,借鉴Mesos,YARN提出了资源隔离解决方案Container,但是目前尚未成熟,仅仅提供Java虚拟机内存的隔离。
    对比MapReduce1.x,YARN架构在客户端上并未做太大的改变,在调用API及接口上还保持大部分的兼容,然而在YARN中,开 发人员使用ResourceManager、ApplicationMaster与NodeManager代替了原框架中核心的JobTracker和TaskTracker。其中ResourceManager是一个中心的服务,负责调度、启动每一个Job所属的ApplicationMaster, 另外还监控ApplicationMaster的存在情况;NodeManager负责Container状态的维护,并向RM保持心跳。ApplicationMaster负责一个Job生命周期内的所有工作,类似老的框架中JobTracker。

Hadoop上的实时解决方案

前面我们有说过,在互联网公司中基于业务逻辑需求,企业往往会采用多种计算框架,比如从事搜索业务的公司:网页索引建立用MapReduce,自然语言处理用Spark等。本节为大家分享的则是Storm、Impala、Spark三个框架:

3.ClouderaImpala
    代码托管地址:GitHub
    Impala是由Cloudera开发,一个开源的MassivelyParallelProcessing(MPP)查询引擎。与Hive相同的元数据、SQL语法、ODBC驱动程序和用户接口(HueBeeswax),可以直接在HDFS或HBase上提供快速、交互式SQL查 询。Impala是在Dremel的启发下开发的,第一个版本发布于2012年末。
    Impala不再使用缓慢的Hive+MapReduce批处理,而是通过与商用并行关系数据库中类似的分布式查询引擎(由QueryPlanner、QueryCoordinator和QueryExecEngine三部分组成),可以直接从HDFS或者HBase中用SELECT、JOIN和统计函数查询数据,从而大大降低了延迟。

4.Spark
    代码托管地址:Apache
    Spark是个开源的数据分析集群计算框架,最初由加州大学伯克利分校AMPLab开发,建立于HDFS之上。Spark与Hadoop一样,用于构建大规模、低延时的数据分析应用。Spark采用Scala语言实现,使用Scala作为应用框架。
    Spark采用基于内存的分布式数据集,优化了迭代式的工作负载以及交互式查询。与Hadoop不同的是,Spark和Scala紧密集 成,Scala像管理本地collective对象那样管理分布式数据集。Spark支持分布式数据集上的迭代式任务,实际上可以在Hadoop文件系统 上与Hadoop一起运行(通过YARN、Mesos等实现)。

5.Storm
    代码托管地址:GitHub
    Storm是一个分布式的、容错的实时计算系统,由BackType开发,后被Twitter捕获。Storm属于流处理平台,多用于实时 计算并更新数据库。Storm也可被用于“连续计算”(continuouscomputation),对数据流做连续查询,在计算时就将结果以流的形式 输出给用户。它还可被用于“分布式RPC”,以并行的方式运行昂贵的运算。

Hadoop上的其它解决方案

就像前文说,基于业务对实时的需求,各个实验室发明了Storm、Impala、Spark、Samza等流实时处理工具。而本节我们将分 享的是实验室基于性能、兼容性、数据类型研究的开源解决方案,其中包括Shark、Phoenix、ApacheAccumulo、ApacheDrill、ApacheGiraph、ApacheHama、ApacheTez、ApacheAmbari。

6.Shark
    代码托管地址:GitHub
    Shark,代表了“HiveonSpark”,一个专为Spark打造的大规模数据仓库系统,兼容ApacheHive。无需修改现有的数据或者查询,就可以用100倍的速度执行HiveQL。
    Shark支持Hive查询语言、元存储、序列化格式及自定义函数,与现有Hive部署无缝集成,是一个更快、更强大的替代方案。

7.Phoenix
    代码托管地址:GitHub
    Phoenix是构建在ApacheHBase之上的一个SQL中间层,完全使用Java编写,提供了一个客户端可嵌入的JDBC驱动。Phoenix查询引擎会将SQL查询转换为一个或多个HBasescan,并编排执行以生成标准的JDBC结果集。直接使用HBaseAPI、协同处理 器与自定义过滤器,对于简单查询来说,其性能量级是毫秒,对于百万级别的行数来说,其性能量级是秒。Phoenix完全托管在GitHub之上。
    Phoenix值得关注的特性包括:1,嵌入式的JDBC驱动,实现了大部分的java.sql接口,包括元数据API;2,可以通过多个 行键或是键/值单元对列进行建模;3,DDL支持;4,版本化的模式仓库;5,DML支持;5,通过客户端的批处理实现的有限的事务支持;6,紧跟ANSISQL标准。

8.ApacheAccumulo
    代码托管地址:ApacheSVN
    ApacheAccumulo是一个可靠的、可伸缩的、高性能、排序分布式的键值存储解决方案,基于单元访问控制以及可定制的服务器端处 理。使用GoogleBigTable设计思路,基于ApacheHadoop、Zookeeper和Thrift构建。Accumulo最早由NSA开 发,后被捐献给了Apache基金会。
    对比GoogleBigTable,Accumulo主要提升在基于单元的访问及服务器端的编程机制,后一处修改让Accumulo可以在数据处理过程中任意点修改键值对。

9.ApacheDrill
    代码托管地址:GitHub
    本质上,ApacheDrill是GoogleDremel的开源实现,本质是一个分布式的mpp查询层,支持SQL及一些用于NoSQL和Hadoop数据存储系统上的语言,将有助于Hadoop用户实现更快查询海量数据集的目的。当下Drill还只能算上一个框架,只包含了Drill愿 景中的初始功能。
    Drill的目的在于支持更广泛的数据源、数据格式及查询语言,可以通过对PB字节数据的快速扫描(大约几秒内)完成相关分析,将是一个专为互动分析大型数据集的分布式系统。

10.ApacheGiraph
    代码托管地址:GitHub
    ApacheGiraph是一个可伸缩的分布式迭代图处理系统,灵感来自BSP(bulksynchronousparallel)和Google的Pregel,与它们区别于则是是开源、基于Hadoop的架构等。
    Giraph处理平台适用于运行大规模的逻辑计算,比如页面排行、共享链接、基于个性化排行等。Giraph专注于社交图计算,被Facebook作为其OpenGraph工具的核心,几分钟内处理数万亿次用户及其行为之间的连接。

11.ApacheHama
    代码托管地址:GitHub
    ApacheHama是一个建立在Hadoop上基于BSP(BulkSynchronousParallel)的计算框架,模仿了Google的Pregel。用来处理大规模的科学计算,特别是矩阵和图计算。集群环境中的系统架构由BSPMaster/GroomServer(ComputationEngine)、Zookeeper(DistributedLocking)、HDFS/HBase(StorageSystems)这3大块组成。

12.ApacheTez
    代码托管地址:GitHub
    ApacheTez是基于HadoopYarn之上的DAG(有向无环图,DirectedAcyclicGraph)计算框架。它把Map/Reduce过程拆分成若干个子过程,同时可以把多个Map/Reduce任务组合成一个较大的DAG任务,减少了Map/Reduce之间的文 件存储。同时合理组合其子过程,减少任务的运行时间。由Hortonworks开发并提供主要支持。

13.ApacheAmbari
    代码托管地址:ApacheSVN
    ApacheAmbari是一个供应、管理和监视ApacheHadoop集群的开源框架,它提供一个直观的操作工具和一个健壮的HadoopAPI,可以隐藏复杂的Hadoop操作,使集群操作大大简化,首个版本发布于2012年6月。
    ApacheAmbari现在是一个Apache的顶级项目,早在2011年8月,Hortonworks引进Ambari作为ApacheIncubator项目,制定了Hadoop集群极致简单管理的愿景。在两年多的开发社区显著成长,从一个小团队,成长为Hortonworks各种组织的贡献者。Ambari用户群一直在稳步增长,许多机构依靠Ambari在其大型数据中心大规模部署和管理Hadoop集 群。
    目前ApacheAmbari支持的Hadoop组件包括:HDFS、MapReduce、Hive、HCatalog、HBase、ZooKeeper、Oozie、Pig及Sqoop

Yarn(MR2)上的应用汇总

唐半张 发表了文章 0 个评论 1667 次浏览 2015-10-08 10:40 来自相关话题

Yarn做为hadoop下一代集群资源管理和调度平台, 其上能支持多种计算框架, 本文就简要介绍一下这些计算框架. 1.       MapReduce 首先是大家熟悉的mapreduce, 在MR2之前, hadoop包 ...查看全部
Yarn做为hadoop下一代集群资源管理和调度平台, 其上能支持多种计算框架, 本文就简要介绍一下这些计算框架.
1.       MapReduce

首先是大家熟悉的mapreduce, 在MR2之前, hadoop包括HDFS和mapreduce, 做为hadoop上唯一的分布式计算框架, 其优点是用户可以很方便的编写分布式计算程序, 并支持许多的应用, 如hive, mahout, pig等. 但是其缺点是无法充分利用集群资源, 不支持DAG, 迭代式计算等. 为了解决这些问题, yahoo提出了Yarn (next generation mapreduce), 一个分布式集群集群资源管理和调度平台. 这样除了mapreduce外, 还可以支持各种计算框架.

2.       Spark

Spark是一种与mapreduce相似的开源计算框架, 不同之处在于Spark在某些工作负载方面表现更优, 因为它使用了内存分布式数据集, 另外除了提供交互式查询外, 它还可以优化迭代工作负载.

3.       Apache HAMA

Apache Hama 是一个运行在HDFS上的BSP(Bulk Synchronous Parallel大容量同步并行) 计算框架, 主要针对大规模科学计算,如矩阵, 图像, 网络算法等.当前它有一下功能:

作业提交和管理接口
单节点上运行多个任务
输入/输出格式化
备份恢复
支持通过Apache Whirr运行在云端
支持与Yarn一起运行
4.       Apache Giraph

图像处理平台上运行这大型算法(如page rank, shared connections, personalization-based popularity 等)已经很流行, Giraph采用BSP模型(bulk-synchronous parallel model),可用于等迭代类算法。

5.       Open MPI

这是一个高性能计算函数库,通常在HPC(High Performance Computing)中采用,与MapReduce相比,其性能更高,用户可控性更强,但编程复杂,容错性差,可以说,各有所长,在实际应用中,针对不同 该应用会采用MPI或者MapReduce。

6.       Apache HBase

HBase是一个hadoop数据库, 其特点是分布式,可扩展的,存储大数据。当有需要随机,实时读写的大数据时, 使用HBase很适合.

MapReduce\Tez\Storm\Spark四个框架的异同

唐半张 发表了文章 0 个评论 1847 次浏览 2015-10-08 10:38 来自相关话题

1) MapReduce:是一种离线计算框架,将一个算法抽象成Map和Reduce两个阶段进行 处理,非常适合数据密集型计算。 2) Spark:MapReduce计算框架不适合迭代计算和交互式计算,MapR ...查看全部
1) MapReduce:是一种离线计算框架,将一个算法抽象成Map和Reduce两个阶段进行

处理,非常适合数据密集型计算。

2) Spark:MapReduce计算框架不适合迭代计算和交互式计算,MapReduce是一种磁盘

计算框架,而Spark则是一种内存计算框架,它将数据尽可能放到内存中以提高迭代

应用和交互式应用的计算效率。

3) Storm:MapReduce也不适合进行流式计算、实时分析,比如广告点击计算等,而

Storm则更擅长这种计算、它在实时性要远远好于MapReduce计算框架。

4)Tez: 运行在YARN之上支持DAG作业的计算框架,对MapReduce数据处理的归纳。它

把Map/Reduce过程拆分成若干个子过程,同时可以把多个Map/Reduce任务组合成一个

较大的DAG任务,减少了Map/Reduce之间的文件存储。同时合理组合其子过程,也可

以减少任务的运行时间。

Spark与Hadoop计算模型的比较分析

唐半张 发表了文章 0 个评论 1855 次浏览 2015-10-08 10:37 来自相关话题

Spark与Hadoop计算模型的比较分析 Spark是一个通用的并行计算框架,由UCBerkeley的AMP实验室开发。 1.Spark的中间数据放到内存中,对于迭代运算效率比较高。 Spark更适合于迭代运算比较多的ML ...查看全部
Spark与Hadoop计算模型的比较分析
Spark是一个通用的并行计算框架,由UCBerkeley的AMP实验室开发。
1.Spark的中间数据放到内存中,对于迭代运算效率比较高。
Spark更适合于迭代运算比较多的ML和DM运算。因为在Spark里面,有RDD的概念。
2.Spark比Hadoop更通用。
      Spark提供的数据集操作类型有很多种,不像Hadoop只提供了Map和Reduce两种操作。比如map, filter, flatMap,sample, groupByKey, reduceByKey, union, join, cogroup, mapValues, sort,partionBy等多种操作类型,他们把这些操作称为Transformations。同时还提供Count, collect, reduce, looku p, save等多种actions。
     这些多种多样的数据集操作类型,给上层应用者提供了方便。各个处理节点之间的通信模型不再像Hadoop那样就是唯一的Data Shuffle一种模式。用户可以命名,物化,控制中间结果的分区等。可以说编程模型比Hadoop更灵活。
    不过论文中也提到,Spark不适用那种异步细粒度更新状态的应用,例如web服务的存储或者是增量的web爬虫和索引。就是对于那种增量修改的应用模型,当然不适合把大量数据拿到内存中了。增量改动完了,也就不用了,不需要迭代了。
3.容错性。
从Spark的论文《Resilient Distributed Datasets: AFault-Tolerant Abstraction for In-Memory Cluster Computing》中没看出容错性做的有多好。倒是提到了分布式数据集计算,做checkpoint的两种方式,一个是checkpoint data,一个是logging the updates。貌似Spark采用了后者。但是文中后来又提到,虽然后者看似节省存储空间。但是由于数据处理模型是类似DAG的操作过程,由于图中的某个节点出错,由于lineage chains的依赖复杂性,可能会引起全部计算节点的重新计算,这样成本也不低。他们后来说,是存数据,还是存更新日志,做checkpoint还是由用户说了算吧。相当于什么都没说,又把这个皮球踢给了用户。所以我看就是由用户根据业务类型,衡量是存储数据IO和磁盘空间的代价和重新计算的代价,选择代价较小的一种策略。
4.关于Spark和Hadoop的融合
不知道Apache基金会的人怎么想的,我看Spark还是应该融入到Hadoop生态系统中。从Hadoop 0.23把MapReduce做成了库,看出Hadoop的目标是要支持包括MapReduce在内的更多的并行计算模型,比如MPI,Spark等。毕竟现在Hadoop的单节点CPU利用率并不高,那么假如这种迭代密集型运算是和现有平台的互补。同时,这对资源调度系统就提出了更高的要求。有关资源调度方面,UC Berkeley貌似也在做一个Mesos的东西,还用了Linux container,统一调度Hadoop和其他应用模型。
 
 

Spark源码系列(九)Spark SQL初体验之解析过程详解

cenyuhai 发表了文章 0 个评论 2224 次浏览 2015-09-11 15:23 来自相关话题

好久没更新博客了,之前学了一些R语言和机器学习的内容,做了一些笔记,之后也会放到博客上面来给大家共享。一个月前就打算更新Spark Sql的内容了,因为一些别的事情耽误了,今天就简单写点,Spark1.2马上就要出来了,不知道变动会不会很大,据说添加了很多的新 ...查看全部
好久没更新博客了,之前学了一些R语言和机器学习的内容,做了一些笔记,之后也会放到博客上面来给大家共享。一个月前就打算更新Spark Sql的内容了,因为一些别的事情耽误了,今天就简单写点,Spark1.2马上就要出来了,不知道变动会不会很大,据说添加了很多的新功能呢,期待中...
首先声明一下这个版本的代码是1.1的,之前讲的都是1.0的。
Spark支持两种模式,一种是在spark里面直接写sql,可以通过sql来查询对象,类似.net的LINQ一样,另外一种支持hive的HQL。不管是哪种方式,下面提到的步骤都会有,不同的是具体的执行过程。下面就说一下这个过程。
Sql解析成LogicPlan
使用Idea的快捷键Ctrl + Shift + N打开SQLQuerySuite文件,进行调试吧。
  def sql(sqlText: String): SchemaRDD = {
if (dialect == "sql") {
new SchemaRDD(this, parseSql(sqlText))
} else {
sys.error(s"Unsupported SQL dialect: $dialect")
}
}


从这里可以看出来,第一步是解析sql,最后把它转换成一个SchemaRDD。点击进入parseSql函数,发现解析Sql的过程在SqlParser这个类里面。
在SqlParser的apply方法里面,我们可以看到else语句里面的这段代码。
      //对input进行解析,符合query的模式的就返回Success
phrase(query)(new lexical.Scanner(input)) match {
case Success(r, x) => r
case x => sys.error(x.toString)
}

这里我们主要关注query就可以。
  protected lazy val query: Parser[LogicalPlan] = (
select * (
UNION ~ ALL ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Union(q1, q2) } |
INTERSECT ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Intersect(q1, q2) } |
EXCEPT ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Except(q1, q2)} |
UNION ~ opt(DISTINCT) ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) }
)
| insert | cache
)


这里面有很多看不懂的操作符,请到下面这个网址里面去学习。这里可以看出来它目前支持的sql语句只是select和insert。
http://www.scala-lang.org/api/2.10.4/index.html#scala.util.parsing.combinator.Parsers$Parser
我们继续查看select。

  // ~>只保留右边的模式 opt可选的 ~按顺序合成 <~只保留左边的
protected lazy val select: Parser[LogicalPlan] =
SELECT ~> opt(DISTINCT) ~ projections ~
opt(from) ~ opt(filter) ~
opt(grouping) ~
opt(having) ~
opt(orderBy) ~
opt(limit) <~ opt(";") ^^ {
case d ~ p ~ r ~ f ~ g ~ h ~ o ~ l =>
val base = r.getOrElse(NoRelation)
val withFilter = f.map(f => Filter(f, base)).getOrElse(base)
val withProjection =
g.map {g =>
Aggregate(assignAliases(g), assignAliases(p), withFilter)
}.getOrElse(Project(assignAliases(p), withFilter))
val withDistinct = d.map(_ => Distinct(withProjection)).getOrElse(withProjection)
val withHaving = h.map(h => Filter(h, withDistinct)).getOrElse(withDistinct)
val withOrder = o.map(o => Sort(o, withHaving)).getOrElse(withHaving)
val withLimit = l.map { l => Limit(l, withOrder) }.getOrElse(withOrder)
withLimit
}
View Code
可以看得出来它对sql的解析是和我们常用的sql写法是一致的,这里面再深入下去还有递归,并不是看起来那么好理解。这里就不继续讲下去了,在解析hive的时候我会重点讲一下,我认为目前大家使用得更多是仍然是来源于hive的数据集,毕竟hive那么稳定。
到这里我们可以知道第一步是通过Parser把sql解析成一个LogicPlan。
LogicPlan到RDD的转换过程
好,下面我们回到刚才的代码,接着我们应该看SchemaRDD。
  override def compute(split: Partition, context: TaskContext): Iterator[Row] =
firstParent[Row].compute(split, context).map(_.copy())

override def getPartitions: Array[Partition] = firstParent[Row].partitions

override protected def getDependencies: Seq[Dependency[_]] =
List(new OneToOneDependency(queryExecution.toRdd))


SchemaRDD是一个RDD的话,那么它最重要的3个属性:compute函数,分区,依赖全在这里面,其它的函数我们就不看了。
挺奇怪的是,我们new出来的RDD,怎么会有依赖呢,这个queryExecution是啥,点击进去看看吧,代码跳转到SchemaRDD继承的SchemaRDDLike里面。
lazy val queryExecution = sqlContext.executePlan(baseLogicalPlan)

protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution =
new this.QueryExecution { val logical = plan }

把这两段很短的代码都放一起了,executePlan方法就是new了一个QueryExecution出来,那我们继续看看QueryExecution这个类吧。
    lazy val analyzed = ExtractPythonUdfs(analyzer(logical))
lazy val optimizedPlan = optimizer(analyzed)
lazy val sparkPlan = {
SparkPlan.currentContext.set(self)
planner(optimizedPlan).next()
}
// 在需要的时候加入Shuffle操作
lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)
lazy val toRdd: RDD[Row] = executedPlan.execute()


从这里可以看出来LogicPlan是经过了5个步骤的转换,要被analyzer和optimizer的处理,然后转换成SparkPlan,在执行之前还要被prepareForExecution处理一下,最后调用execute方法转成RDD.
下面我们分步讲这些个东东到底是干啥了。
首先我们看看Anayzer,它是继承自RuleExecutor的,这里插句题外话,Spark sql的作者Michael Armbrust在2013年的Spark Submit上介绍Catalyst的时候,就说到要从整体地去优化一个sql的执行是很困难的,所有设计成这种基于一个一个小规则的这种优化方式,既简单又方便维护。
好,我们接下来看看RuleExecutor的apply方法。

  def apply(plan: TreeType): TreeType = {
var curPlan = plan
//规则还分批次的,分批对plan进行处理
batches.foreach { batch =>
val batchStartPlan = curPlan
var iteration = 1
var lastPlan = curPlan
var continue = true

// Run until fix point (or the max number of iterations as specified in the strategy.
while (continue) {
//用batch种的小规则从左到右挨个对plan进行处理
curPlan = batch.rules.foldLeft(curPlan) {
case (plan, rule) =>
val result = rule(plan)
result
}
iteration += 1
//超过了规定的迭代次数就要退出的
if (iteration > batch.strategy.maxIterations) {
continue = false
}
//经过处理成功的plan是会发生改变的,如果和上一次处理接触的plan一样,这说明已经没有优化空间了,可以结束,这个就是前面提到的Fixed point
if (curPlan.fastEquals(lastPlan)) {
continue = false
}
lastPlan = curPlan
}
}

curPlan
}
View Code
看完了RuleExecutor,我们继续看Analyzer,下面我只贴出来batches这块的代码,剩下的要自己去看了哦。

  val batches: Seq[Batch] = Seq(
//碰到继承自MultiInstanceRelations接口的LogicPlan时,发现id以后重复的,就强制要求它们生成一个新的全局唯一的id
//涉及到InMemoryRelation、LogicRegion、ParquetRelation、SparkLogicPlan
Batch("MultiInstanceRelations", Once,
NewRelationInstances),
//如果大小写不敏感就把属性都变成小写
Batch("CaseInsensitiveAttributeReferences", Once,
(if (caseSensitive) Nil else LowercaseAttributeReferences :: Nil) : _*),
//这个牛逼啊,居然想迭代100次的。
Batch("Resolution", fixedPoint,
//解析从子节点的操作生成的属性,一般是别名引起的,比如a.id
ResolveReferences ::
//通过catalog解析表名
ResolveRelations ::
//在select语言里,order by的属性往往在前面没写,查询的时候也需要把这些字段查出来,排序完毕之后再删除
ResolveSortReferences ::
//前面讲过了
NewRelationInstances ::
//清除被误认为别名的属性,比如sum(score) as a,其实它应该是sum(score)才对
//它被解析的时候解析成Project(Seq(Alias(g: Generator, _)),直接返回Generator就可以了
ImplicitGenerate ::
//处理语句中的*,比如select *, count(*)
StarExpansion ::
//解析函数
ResolveFunctions ::
//解析全局的聚合函数,比如select sum(score) from table
GlobalAggregates ::
//解析having子句后面的聚合过滤条件,比如having sum(score) > 400
UnresolvedHavingClauseAttributes ::
//typeCoercionRules是hive的类型转换规则
typeCoercionRules :_*),
//检查所有节点的属性是否都已经处理完毕了,如果还有没解析出来的属性,这里就会报错!
Batch("Check Analysis", Once,
CheckResolution),
//清除多余的操作符,现在是Subquery和LowerCaseSchema,
//第一个是子查询,第二个HiveContext查询树里面把子节点全部转换成小写
Batch("AnalysisOperators", fixedPoint,
EliminateAnalysisOperators)
)
View Code
可以看得出来Analyzer是把Unresolved的LogicPlan解析成resolved的,解析里面的表名、字段、函数、别名什么的。
我们接着看Optimizer, 从单词上看它是用来做优化的,但是从代码上来看它更多的是为了过滤我们写的一些垃圾语句,并没有做什么实际的优化。

object Optimizer extends RuleExecutor[LogicalPlan] {
val batches =
//递归合并相邻的两个limit
Batch("Combine Limits", FixedPoint(100),
CombineLimits) ::
Batch("ConstantFolding", FixedPoint(100),
//替换null值
NullPropagation,
//替换一些简单的常量表达式,比如 1 in (1,2) 直接返回一个true就可以了
ConstantFolding,
//简化like语句,避免全表扫描,目前支持'%demo%', '%demo','demo*','demo'
LikeSimplification,
//简化过滤条件,比如true and score > 0 直接替换成score > 0
BooleanSimplification,
//简化filter,比如where 1=1 或者where 1=2,前者直接去掉这个过滤,后者这个查询就没必要做了
SimplifyFilters,
//简化转换,比如两个比较字段的数据类型是一样的,就不需要转换了
SimplifyCasts,
//简化大小写转换,比如Upper(Upper('a'))转为认为是Upper('a')
SimplifyCaseConversionExpressions) ::
Batch("Filter Pushdown", FixedPoint(100),
//递归合并相邻的两个过滤条件
CombineFilters,
//把从表达式里面的过滤替换成,先做过滤再取表达式,并且掉过滤里面的别名属性
//典型的例子 select * from (select a,b from table) where a=1
//替换成select * from (select a,b from table where a=1)
PushPredicateThroughProject,
//把join的on条件中可以在原表当中做过滤的先做过滤
//比如select a,b from x join y on x.id = y.id and x.a >0 and y.b >0
//这个语句可以改写为 select a,b from x where x.a > 0 join (select * from y where y.b >0) on x.id = y.id
PushPredicateThroughJoin,
//去掉一些用不上的列
ColumnPruning) :: Nil
}
View Code
真是用心良苦啊,看来我们写sql的时候还是要注意一点的,你看人家花多大的功夫来优化我们的烂sql。。。要是我肯定不优化。。。写得烂就慢去吧!
接下来,就改看这一句了planner(optimizedPlan).next() 我们先看看SparkPlanner吧。

  protected[sql] class SparkPlanner extends SparkStrategies {
val sparkContext: SparkContext = self.sparkContext

val sqlContext: SQLContext = self

def codegenEnabled = self.codegenEnabled

def numPartitions = self.numShufflePartitions
//把LogicPlan转换成实际的操作,具体操作类在org.apache.spark.sql.execution包下面
val strategies: Seq[Strategy] =
//把cache、set、expain命令转化为实际的Command
CommandStrategy(self) ::
//把limit转换成TakeOrdered操作
TakeOrdered ::
//名字有点蛊惑人,就是转换聚合操作
HashAggregation ::
//left semi join只显示连接条件成立的时候连接左边的表的信息
//比如select * from table1 left semi join table2 on(table1.student_no=table2.student_no);
//它只显示table1中student_no在表二当中的信息,它可以用来替换exist语句
LeftSemiJoin ::
//等值连接操作,有些优化的内容,如果表的大小小于spark.sql.autoBroadcastJoinThreshold设置的字节
//就自动转换为BroadcastHashJoin,即把表缓存,类似hive的map join(顺序是先判断右表再判断右表)。
//这个参数的默认值是10000
//另外做内连接的时候还会判断左表右表的大小,shuffle取数据大表不动,从小表拉取数据过来计算
HashJoin ::
//在内存里面执行select语句进行过滤,会做缓存
InMemoryScans ::
//和parquet相关的操作
ParquetOperations ::
//基本的操作
BasicOperators ::
//没有条件的连接或者内连接做笛卡尔积
CartesianProduct ::
//把NestedLoop连接进行广播连接
BroadcastNestedLoopJoin :: Nil
......
}
View Code
这一步是把逻辑计划转换成物理计划,或者说是执行计划了,里面有很多概念是我以前没听过的,网上查了一下才知道,原来数据库的执行计划还有那么多的说法,这一块需要是专门研究数据库的人比较了解了。剩下的两步就是prepareForExecution和execute操作。
prepareForExecution操作是检查物理计划当中的Distribution是否满足Partitioning的要求,如果不满足的话,需要重新弄做分区,添加shuffle操作,这块暂时没咋看懂,以后还需要仔细研究。最后调用SparkPlan的execute方法,这里面稍微讲讲这块的树型结构。

sql解析出来就是一个二叉树的结构,不管是逻辑计划还是物理计划,都是这种结构,所以在代码里面可以看到LogicPlan和SparkPlan的具体实现类都是有继承上面图中的三种类型的节点的。
非LeafNode的SparkPlan的execute方法都会有这么一句child.execute(),因为它需要先执行子节点的execute来返回数据,执行的过程是一个先序遍历。
最后把这个过程也用一个图来表示吧,方便记忆。

(1)通过一个Parser来把sql语句转换成Unresolved LogicPlan,目前有两种Parser,SqlParser和HiveQl。
(2)通过Analyzer把LogicPlan当中的Unresolved的内容给解析成resolved的,这里面包括表名、函数、字段、别名等。
(3)通过Optimizer过滤掉一些垃圾的sql语句。
(4)通过Strategies把逻辑计划转换成可以具体执行的物理计划,具体的类有SparkStrategies和HiveStrategies。
(5)在执行前用prepareForExecution方法先检查一下。
(6)先序遍历,调用执行计划树的execute方法。
 
岑玉海
转载请注明出处,谢谢!
 

Spark源码系列(八)Spark Streaming实例分析

cenyuhai 发表了文章 0 个评论 2134 次浏览 2015-09-11 15:23 来自相关话题

这一章要讲Spark Streaming,讲之前首先回顾下它的用法,具体用法请参照《Spark Streaming编程指南》。Example代码分析 val ssc = new StreamingContext(sparkConf, Seconds ...查看全部
这一章要讲Spark Streaming,讲之前首先回顾下它的用法,具体用法请参照《Spark Streaming编程指南》。Example代码分析
val ssc = new StreamingContext(sparkConf, Seconds(1));
// 获得一个DStream负责连接 监听端口:地址
val lines = ssc.socketTextStream(serverIP, serverPort);
// 对每一行数据执行Split操作
val words = lines.flatMap(_.split(" "));
// 统计word的数量
val pairs = words.map(word => (word, 1));
val wordCounts = pairs.reduceByKey(_ + _);
// 输出结果
wordCounts.print();
ssc.start(); // 开始
ssc.awaitTermination(); // 计算完毕退出


1、首先实例化一个StreamingContext
2、调用StreamingContext的socketTextStream
3、对获得的DStream进行处理
4、调用StreamingContext是start方法,然后等待
我们看StreamingContext的socketTextStream方法吧。
  def socketTextStream(
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[String] = {
socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
}


1、StoageLevel是StorageLevel.MEMORY_AND_DISK_SER_2
2、使用SocketReceiver的bytesToLines把输入流转换成可遍历的数据
继续看socketStream方法,它直接new了一个
new SocketInputDStream[T](this, hostname, port, converter, storageLevel)

继续深入挖掘SocketInputDStream,追述一下它的继承关系,SocketInputDStream>>ReceiverInputDStream>>InputDStream>>DStream。
具体实现ReceiverInputDStream的类有好几个,基本上都是从网络端来数据的。
它实现了ReceiverInputDStream的getReceiver方法,实例化了一个SocketReceiver来接收数据。
SocketReceiver的onStart方法里面调用了receive方法,处理代码如下:
      socket = new Socket(host, port)
val iterator = bytesToObjects(socket.getInputStream())
while(!isStopped && iterator.hasNext) {
store(iterator.next)
}

1、new了一个Socket来接收数据,用bytesToLines方法把InputStream转换成一行一行的字符串。
2、把每一行数据用store方法保存起来,store方法是从SocketReceiver的父类Receiver继承而来,内部实现是:
  def store(dataItem: T) {
executor.pushSingle(dataItem)
}

executor是ReceiverSupervisor类型,Receiver的操作都是由它来处理。这里先不深纠,后面我们再说这个pushSingle的实现。
到这里我们知道lines的类型是SocketInputDStream,然后对它是一顿的转换,flatMap、map、reduceByKey、print,这些方法都不是RDD的那种方法,而是DStream独有的。
讲到上面这几个方法,我们开始转入DStream了,flatMap、map、reduceByKey、print方法都涉及到DStream的转换,这和RDD的转换是类似的。我们讲一下reduceByKey和print。
reduceByKey方法和RDD一样,调用的combineByKey方法实现的,不一样的是它直接new了一个ShuffledDStream了,我们接着看一下它的实现吧。
override def compute(validTime: Time): Option[RDD[(K,C)]] = {
parent.getOrCompute(validTime) match {
case Some(rdd) => Some(rdd.combineByKey[C](createCombiner, mergeValue, mergeCombiner, partitioner, mapSideCombine))
case None => None
}
}

在compute阶段,对通过Time获得的rdd进行reduceByKey操作。接下来的print方法也是一个转换:
new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register()

打印前十个,超过10个打印"..."。需要注意register方法。
ssc.graph.addOutputStream(this)

它会把代码插入到当前的DStream添加到outputStreams里面,后面输出的时候如果没有outputStream就不会有输出,这个需要记住哦!启动过程分析
前戏结束之后,ssc.start() 高潮开始了。 start方法很小,最核心的一句是JobScheduler的start方法。我们得转到JobScheduler方法上面去。
下面是start方法的代码:
  def start(): Unit = synchronized {
  // 接受到JobSchedulerEvent就处理事件
eventActor = ssc.env.actorSystem.actorOf(Props(new Actor {
def receive = {
case event: JobSchedulerEvent => processEvent(event)
}
}), "JobScheduler")

listenerBus.start()
receiverTracker = new ReceiverTracker(ssc)
receiverTracker.start()
jobGenerator.start()
}


1、启动了一个Actor来处理JobScheduler的JobStarted、JobCompleted、ErrorReported事件。
2、启动StreamingListenerBus作为监听器。
3、启动ReceiverTracker。
4、启动JobGenerator。
我们接下来看看ReceiverTracker的start方法。
  def start() = synchronized {if (!receiverInputStreams.isEmpty) {
actor = ssc.env.actorSystem.actorOf(Props(new ReceiverTrackerActor), "ReceiverTracker")
receiverExecutor.start()
}
}

1、首先判断了一下receiverInputStreams不能为空,那receiverInputStreams是怎么时候写入值的呢?答案在SocketInputDStream的父类InputDStream当中,当实例化InputDStream的时候会在DStreamGraph里面添加InputStream。
abstract class InputDStream[T: ClassTag] (@transient ssc_ : StreamingContext) extends DStream[T](ssc_) {
ssc.graph.addInputStream(this)
//....
}

2、实例化ReceiverTrackerActor,它负责RegisterReceiver(注册Receiver)、AddBlock、ReportError(报告错误)、DeregisterReceiver(注销Receiver)等事件的处理。
3、启动receiverExecutor(实际类是ReceiverLauncher,这名字起得。。),它主要负责启动Receiver,start方法里面调用了startReceivers方法吧。

    private def startReceivers() {
   // 对应着上面的那个例子,getReceiver方法获得是SocketReceiver
val receivers = receiverInputStreams.map(nis => {
val rcvr = nis.getReceiver()
rcvr.setReceiverId(nis.id)
rcvr
})

// 查看是否所有的receivers都有优先选择机器,这个需要重写Receiver的preferredLocation方法,目前只有FlumeReceiver重写了
val hasLocationPreferences = receivers.map(_.preferredLocation.isDefined).reduce(_ && _)

// 创建一个并行receiver集合的RDD, 把它们分散到各个worker节点上
val tempRDD =
if (hasLocationPreferences) {
val receiversWithPreferences = receivers.map(r => (r, Seq(r.preferredLocation.get)))
ssc.sc.makeRDD[Receiver[_]](receiversWithPreferences)
} else {
ssc.sc.makeRDD(receivers, receivers.size)
}

// 在worker节点上启动Receiver的方法,遍历所有Receiver,然后启动
val startReceiver = (iterator: Iterator[Receiver[_]]) => {
if (!iterator.hasNext) {
throw new SparkException("Could not start receiver as object not found.")
}
val receiver = iterator.next()
val executor = new ReceiverSupervisorImpl(receiver, SparkEnv.get)
executor.start()
executor.awaitTermination()
}
// 运行这个重复的作业来确保所有的slave都已经注册了,避免所有的receivers都到一个节点上
if (!ssc.sparkContext.isLocal) {
ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()
}

// 把receivers分发出去,启动
ssc.sparkContext.runJob(tempRDD, startReceiver)
}
View Code
1、遍历receiverInputStreams获取所有的Receiver。
2、查看这些Receiver是否全都有优先选择机器。
3、把SparkContext的makeRDD方法把所有Receiver包装到ParallelCollectionRDD里面,并行度是Receiver的数量。
4、发个小任务给确保所有的slave节点都已经注册了(这个小任务有点儿莫名其妙,感觉怪怪的)。
5、提交作业,启动所有Receiver。
Spark写得实在是太巧妙了,居然可以把Receiver包装在RDD里面,当做是数据来处理!
启动Receiver的时候,new了一个ReceiverSupervisorImpl,然后调的start方法,主要干了这么三件事情,代码就不贴了。
1、启动BlockGenerator。
2、调用Receiver的OnStart方法,开始接受数据,并把数据写入到ReceiverSupervisor。
3、调用onReceiverStart方法,发送RegisterReceiver消息给driver报告自己启动了。保存接收到的数据
ok,到了这里,重点落到了BlockGenerator。前面说到SocketReceiver把接受到的数据调用ReceiverSupervisor的pushSingle方法保存。
  // 这是ReceiverSupervisorImpl的方法
def pushSingle(data: Any) {
blockGenerator += (data)
}
// 这是BlockGenerator的方法
def += (data: Any): Unit = synchronized {
currentBuffer += data
}


我们看一下它的start方法吧。

  def start() {
blockIntervalTimer.start()
blockPushingThread.start()
}
View Code
它启动了一个定时器RecurringTimer和一个线程执行keepPushingBlocks方法。
先看RecurringTimer的实现:
      while (!stopped) {
clock.waitTillTime(nextTime)
callback(nextTime)
prevTime = nextTime
nextTime += period
}

每隔一段时间就执行callback函数,callback函数是new的时候传进来的,是BlockGenerator的updateCurrentBuffer方法。
  private def updateCurrentBuffer(time: Long): Unit = synchronized {
try {
val newBlockBuffer = currentBuffer
currentBuffer = new ArrayBuffer[Any]
if (newBlockBuffer.size > 0) {
val blockId = StreamBlockId(receiverId, time - blockInterval)
val newBlock = new Block(blockId, newBlockBuffer)
blocksForPushing.put(newBlock)
}
} catch {case t: Throwable =>
reportError("Error in block updating thread", t)
}
}


它new了一个Block出来,然后添加到blocksForPushing这个ArrayBlockingQueue队列当中。
提到这里,有两个参数需要大家注意的:
spark.streaming.blockInterval   默认值是200
spark.streaming.blockQueueSize 默认值是10

这是前面提到的间隔时间和队列的长度,间隔时间默认是200毫秒,队列是最多能容纳10个Block,多了就要阻塞了。
我们接下来看一下BlockGenerator另外启动的那个线程执行的keepPushingBlocks方法到底在干什么?
  private def keepPushingBlocks() {
    while(!stopped) {
Option(blocksForPushing.poll(100, TimeUnit.MILLISECONDS)) match {
case Some(block) => pushBlock(block)
case None =>
}
}
   // ...退出之前把剩下的也输出去了
}


它在把blocksForPushing中的block不停的拿出来,调用pushBlock方法,这个方法属于在实例化BlockGenerator的时候,从ReceiverSupervisorImpl传进来的BlockGeneratorListener的。
  private val blockGenerator = new BlockGenerator(new BlockGeneratorListener {
def onError(message: String, throwable: Throwable) {
reportError(message, throwable)
}

def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) {
pushArrayBuffer(arrayBuffer, None, Some(blockId))
}
}, streamId, env.conf)


1、reportError,通过actor向driver发送错误报告消息ReportError。
2、调用pushArrayBuffer保存数据。
下面是pushArrayBuffer方法:
  def pushArrayBuffer(arrayBuffer: ArrayBuffer[_], optionalMetadata: Option[Any], optionalBlockId: Option[StreamBlockId]
) {
val blockId = optionalBlockId.getOrElse(nextBlockId)
val time = System.currentTimeMillis
blockManager.put(blockId, arrayBuffer.asInstanceOf[ArrayBuffer[Any]], storageLevel, tellMaster = true)
reportPushedBlock(blockId, arrayBuffer.size, optionalMetadata)
}


1、把Block保存到BlockManager当中,序列化方式为之前提到的StorageLevel.MEMORY_AND_DISK_SER_2(内存不够就写入到硬盘,并且在2个节点上保存的方式)。
2、调用reportPushedBlock给driver发送AddBlock消息,报告新添加的Block,ReceiverTracker收到消息之后更新内部的receivedBlockInfo映射关系。处理接收到的数据
前面只讲了数据的接收和保存,那数据是怎么处理的呢?
之前一直讲ReceiverTracker,而忽略了之前的JobScheduler的start方法里面最后启动的JobGenerator。
  def start(): Unit = synchronized {
eventActor = ssc.env.actorSystem.actorOf(Props(new Actor {
def receive = {
case event: JobGeneratorEvent => processEvent(event)
}
}), "JobGenerator")
if (ssc.isCheckpointPresent) {
restart()
} else {
startFirstTime()
}
}


1、启动一个actor处理JobGeneratorEvent事件。
2、如果是已经有CheckPoint了,就接着上次的记录进行处理,否则就是第一次启动。
我们先看startFirstTime吧,CheckPoint以后再说吧,有点儿小复杂。
  private def startFirstTime() {
val startTime = new Time(timer.getStartTime())
graph.start(startTime - graph.batchDuration)
timer.start(startTime.milliseconds)
}

1、timer.getStartTime计算出来下一个周期的到期时间,计算公式:(math.floor(clock.currentTime.toDouble / period) + 1).toLong * period,以当前的时间/除以间隔时间,再用math.floor求出它的上一个整数(即上一个周期的到期时间点),加上1,再乘以周期就等于下一个周期的到期时间。
2、启动DStreamGraph,启动时间=startTime - graph.batchDuration。
3、启动Timer,我们看看它的定义:
  private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
longTime => eventActor ! GenerateJobs(new Time(longTime)), "JobGenerator")

到这里就清楚了,DStreamGraph的间隔时间就是timer的间隔时间,启动时间要设置成比Timer早一个时间间隔,原因再慢慢探究。
可以看出来每隔一段时间,Timer给eventActor发送GenerateJobs消息,我们直接去看它的处理方法generateJobs吧,中间忽略了一步,大家自己看。

  private def processEvent(event: JobGeneratorEvent) {
event match {
case GenerateJobs(time) => generateJobs(time)
case ClearMetadata(time) => clearMetadata(time)
case DoCheckpoint(time) => doCheckpoint(time)
case ClearCheckpointData(time) => clearCheckpointData(time)
}
}
View Code
下面是generateJobs方法。
  private def generateJobs(time: Time) {
SparkEnv.set(ssc.env)
Try(graph.generateJobs(time)) match {
case Success(jobs) =>
val receivedBlockInfo = graph.getReceiverInputStreams.map { stream =>
val streamId = stream.id
val receivedBlockInfo = stream.getReceivedBlockInfo(time)
(streamId, receivedBlockInfo)
}.toMap
jobScheduler.submitJobSet(JobSet(time, jobs, receivedBlockInfo))
case Failure(e) =>
jobScheduler.reportError("Error generating jobs for time " + time, e)
}
eventActor ! DoCheckpoint(time)
}


1、DStreamGraph生成jobs。
2、从stream那里获取接收到的Block信息。
3、调用submitJobSet方法提交作业。
4、提交完作业之后,做一个CheckPoint。
先看DStreamGraph是怎么生成的jobs。
  def generateJobs(time: Time): Seq[Job] = {
val jobs = this.synchronized {
outputStreams.flatMap(outputStream => outputStream.generateJob(time))
}
jobs
}

outputStreams在这个例子里面是print这个方法里面添加的,这个在前面说了,我们继续看DStream的generateJob。

  private[streaming] def generateJob(time: Time): Option[Job] = {
getOrCompute(time) match {
case Some(rdd) => {
val jobFunc = () => {
val emptyFunc = { (iterator: Iterator[T]) => {} }
context.sparkContext.runJob(rdd, emptyFunc)
}
Some(new Job(time, jobFunc))
}
case None => None
}
}
View Code
1、调用getOrCompute方法获得RDD
2、new了一个方法去提交这个作业,缺什么都不做
为什么呢?这是直接跳转的错误,呵呵,因为这个outputStream是print方法返回的,它应该是ForEachDStream,所以我们应该看的是它里面的generateJob方法。

  override def generateJob(time: Time): Option[Job] = {
parent.getOrCompute(time) match {
case Some(rdd) =>
val jobFunc = () => {
foreachFunc(rdd, time)
}

Some(new Job(time, jobFunc))
case None => None
}
}
View Code
这里请大家千万要注意,不要在这块被卡住了。
我们看看它这个RDD是怎么出来的吧。

  private[streaming] def getOrCompute(time: Time): Option[RDD[T]] = {
// If this DStream was not initialized (i.e., zeroTime not set), then do it
// If RDD was already generated, then retrieve it from HashMap
generatedRDDs.get(time) match {

// 这个RDD已经被生成过了,直接用就是了
case Some(oldRDD) => Some(oldRDD)

// 还没生成过,就调用compte函数生成一个
case None => {
if (isTimeValid(time)) {
compute(time) match {
case Some(newRDD) =>
         // 设置保存的级别
if (storageLevel != StorageLevel.NONE) {
newRDD.persist(storageLevel)
}
         // 如果现在需要,就做CheckPoint
if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {
newRDD.checkpoint()
}
         // 添加到generatedRDDs里面去,可以再次利用
generatedRDDs.put(time, newRDD)
Some(newRDD)
case None =>
None
}
} else {
None
}
}
}
}
View Code
从上面的方法可以看出来它是通过每个DStream自己实现的compute函数得出来的RDD。我们找到SocketInputDStream,没有compute函数,在父类ReceiverInputDStream里面找到了。
  override def compute(validTime: Time): Option[RDD[T]] = {
// 如果出现了时间比startTime早的话,就返回一个空的RDD,因为这个很可能是master挂了之后的错误恢复
if (validTime >= graph.startTime) {
val blockInfo = ssc.scheduler.receiverTracker.getReceivedBlockInfo(id)
receivedBlockInfo(validTime) = blockInfo
val blockIds = blockInfo.map(_.blockId.asInstanceOf[BlockId])
Some(new BlockRDD[T](ssc.sc, blockIds))
} else {
Some(new BlockRDD[T](ssc.sc, Array[BlockId]()))
}
}


通过DStream的id把receiverTracker当中把接收到的block信息全部拿出来,记录到ReceiverInputDStream自身的receivedBlockInfo这个HashMap里面,就把RDD返回了,RDD里面实际包含的是Block的id的集合。
现在我们就可以回到之前JobGenerator的generateJobs方法,我们就清楚它这句是提交的什么了。
jobScheduler.submitJobSet(JobSet(time, jobs, receivedBlockInfo))

JobSet是记录Job的完成情况的,直接看submitJobSet方法吧。

  def submitJobSet(jobSet: JobSet) {
if (jobSet.jobs.isEmpty) {
} else {
jobSets.put(jobSet.time, jobSet)
jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
}
}
View Code
遍历jobSet里面的所有jobs,通过jobExecutor这个线程池提交。我们看一下JobHandler就知道了。
  private class JobHandler(job: Job) extends Runnable {
def run() {
eventActor ! JobStarted(job)
job.run()
eventActor ! JobCompleted(job)
}
}


1、通知eventActor处理JobStarted事件。
2、运行job。
3、通知eventActor处理JobCompleted事件。
这里的重点是job.run,事件处理只是更新相关的job信息。
  def run() {
result = Try(func())
}

在遍历BlockRDD的时候,在compute函数获取该Block(详细请看BlockRDD),然后对这个RDD的结果进行打印。
 
到这里就算结束了,最后来个总结吧,图例在下一章补上,这一章只是过程分析:
1、可以有多个输入,我们可以通过StreamingContext定义多个输入,比如我们监听多个(host,ip),可以给它们定义各自的处理逻辑和输出,输出方式不仅限于print方法,还可以有别的方法,saveAsTextFiles和saveAsObjectFiles。这块的设计是支持共享StreamingContext的。
2、StreamingContext启动了JobScheduler,JobScheduler启动ReceiverTracker和JobGenerator。
3、ReceiverTracker是通过把Receiver包装成RDD的方式,发送到Executor端运行起来的,Receiver起来之后向ReceiverTracker发送RegisterReceiver消息。
3、Receiver把接收到的数据,通过ReceiverSupervisor保存。
4、ReceiverSupervisorImpl把数据写入到BlockGenerator的一个ArrayBuffer当中。
5、BlockGenerator内部每个一段时间(默认是200毫秒)就把这个ArrayBuffer构造成Block添加到blocksForPushing当中。
6、BlockGenerator的另外一条线程则不断的把加入到blocksForPushing当中的Block写入到BlockManager当中,并向ReceiverTracker发送AddBlock消息。
7、JobGenerator内部有个定时器,定期生成Job,通过DStream的id,把ReceiverTracker接收到的Block信息从BlockManager上抓取下来进行处理,这个间隔时间是我们在实例化StreamingContext的时候传进去的那个时间,在这个例子里面是Seconds(1)。
 
 
岑玉海
转载请注明出处,谢谢!
 
 

Spark源码系列(七)Spark on yarn具体实现

cenyuhai 发表了文章 0 个评论 1966 次浏览 2015-09-11 15:22 来自相关话题

本来不打算写的了,但是真的是闲来无事,整天看美剧也没啥意思。这一章打算讲一下Spark on yarn的实现,1.0.0里面已经是一个stable的版本了,可是1.0.1也出来了,离1.0.0发布才一个月的时间,更新太快了,节奏跟不上啊,这里仍旧是讲1.0.0 ...查看全部
本来不打算写的了,但是真的是闲来无事,整天看美剧也没啥意思。这一章打算讲一下Spark on yarn的实现,1.0.0里面已经是一个stable的版本了,可是1.0.1也出来了,离1.0.0发布才一个月的时间,更新太快了,节奏跟不上啊,这里仍旧是讲1.0.0的代码,所以各位朋友也不要再问我讲的是哪个版本,目前为止发布的文章都是基于1.0.0的代码。
在第一章《spark-submit提交作业过程》的时候,我们讲过Spark on yarn的在cluster模式下它的main class是org.apache.spark.deploy.yarn.Client。okay,这个就是我们的头号目标。提交作业
找到main函数,里面调用了run方法,我们直接看run方法。
    val appId = runApp()
monitorApplication(appId)
System.exit(0)

运行App,跟踪App,最后退出。我们先看runApp吧。

  def runApp(): ApplicationId = {
// 校验参数,内存不能小于384Mb,Executor的数量不能少于1个。
validateArgs()
// 这两个是父类的方法,初始化并且启动Client
init(yarnConf)
start()

// 记录集群的信息(e.g, NodeManagers的数量,队列的信息).
logClusterResourceDetails()

// 准备提交请求到ResourcManager (specifically its ApplicationsManager (ASM)// Get a new client application.
val newApp = super.createApplication()
val newAppResponse = newApp.getNewApplicationResponse()
val appId = newAppResponse.getApplicationId()
// 检查集群的内存是否满足当前的作业需求
verifyClusterResources(newAppResponse)

// 准备资源和环境变量.
//1.获得工作目录的具体地址: /.sparkStaging/appId/
val appStagingDir = getAppStagingDir(appId)
  //2.创建工作目录,设置工作目录权限,上传运行时所需要的jar包
val localResources = prepareLocalResources(appStagingDir)
//3.设置运行时需要的环境变量
val launchEnv = setupLaunchEnv(localResources, appStagingDir)
  //4.设置运行时JVM参数,设置SPARK_USE_CONC_INCR_GC为true的话,就使用CMS的垃圾回收机制
val amContainer = createContainerLaunchContext(newAppResponse, localResources, launchEnv)

// 设置application submission context.
val appContext = newApp.getApplicationSubmissionContext()
appContext.setApplicationName(args.appName)
appContext.setQueue(args.amQueue)
appContext.setAMContainerSpec(amContainer)
appContext.setApplicationType("SPARK")

// 设置ApplicationMaster的内存,Resource是表示资源的类,目前有CPU和内存两种.
val memoryResource = Records.newRecord(classOf[Resource]).asInstanceOf[Resource]
memoryResource.setMemory(args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
appContext.setResource(memoryResource)

// 提交Application.
submitApp(appContext)
appId
}
View Code
monitorApplication就不说了,不停的调用getApplicationReport方法获得最新的Report,然后调用getYarnApplicationState获取当前状态,如果状态为FINISHED、FAILED、KILLED就退出。
说到这里,顺便把跟yarn相关的参数也贴出来一下,大家一看就清楚了。

    while (!args.isEmpty) {
args match {
case ("--jar") :: value :: tail =>
userJar = value
args = tail

case ("--class") :: value :: tail =>
userClass = value
args = tail

case ("--args" | "--arg") :: value :: tail =>
if (args(0) == "--args") {
println("--args is deprecated. Use --arg instead.")
}
userArgsBuffer += value
args = tail

case ("--master-class" | "--am-class") :: value :: tail =>
if (args(0) == "--master-class") {
println("--master-class is deprecated. Use --am-class instead.")
}
amClass = value
args = tail

case ("--master-memory" | "--driver-memory") :: MemoryParam(value) :: tail =>
if (args(0) == "--master-memory") {
println("--master-memory is deprecated. Use --driver-memory instead.")
}
amMemory = value
args = tail

case ("--num-workers" | "--num-executors") :: IntParam(value) :: tail =>
if (args(0) == "--num-workers") {
println("--num-workers is deprecated. Use --num-executors instead.")
}
numExecutors = value
args = tail

case ("--worker-memory" | "--executor-memory") :: MemoryParam(value) :: tail =>
if (args(0) == "--worker-memory") {
println("--worker-memory is deprecated. Use --executor-memory instead.")
}
executorMemory = value
args = tail

case ("--worker-cores" | "--executor-cores") :: IntParam(value) :: tail =>
if (args(0) == "--worker-cores") {
println("--worker-cores is deprecated. Use --executor-cores instead.")
}
executorCores = value
args = tail

case ("--queue") :: value :: tail =>
amQueue = value
args = tail

case ("--name") :: value :: tail =>
appName = value
args = tail

case ("--addJars") :: value :: tail =>
addJars = value
args = tail

case ("--files") :: value :: tail =>
files = value
args = tail

case ("--archives") :: value :: tail =>
archives = value
args = tail

case Nil =>
if (userClass == null) {
printUsageAndExit(1)
}

case _ =>
printUsageAndExit(1, args)
}
}
View CodeApplicationMaster
直接看run方法就可以了,main函数就干了那么一件事...

  def run() {
// 设置本地目录,默认是先使用yarn的YARN_LOCAL_DIRS目录,再到LOCAL_DIRS
System.setProperty("spark.local.dir", getLocalDirs())

// set the web ui port to be ephemeral for yarn so we don't conflict with
// other spark processes running on the same box
System.setProperty("spark.ui.port", "0")

// when running the AM, the Spark master is always "yarn-cluster"
System.setProperty("spark.master", "yarn-cluster")

  // 设置优先级为30,和mapreduce的优先级一样。它比HDFS的优先级高,因为它的操作是清理该作业在hdfs上面的Staging目录
ShutdownHookManager.get().addShutdownHook(new AppMasterShutdownHook(this), 30)

appAttemptId = getApplicationAttemptId()
  // 通过yarn.resourcemanager.am.max-attempts来设置,默认是2
  // 目前发现它只在清理Staging目录的时候用
isLastAMRetry = appAttemptId.getAttemptId() >= maxAppAttempts
amClient = AMRMClient.createAMRMClient()
amClient.init(yarnConf)
amClient.start()

// setup AmIpFilter for the SparkUI - do this before we start the UI
  // 方法的介绍说是yarn用来保护ui界面的,我感觉是设置ip代理的
addAmIpFilter()
  // 注册ApplicationMaster到内部的列表里
ApplicationMaster.register(this)

// 安全认证相关的东西,默认是不开启的,省得给自己找事
val securityMgr = new SecurityManager(sparkConf)

// 启动driver程序
userThread = startUserClass()

// 等待SparkContext被实例化,主要是等待spark.driver.port property被使用
  // 等待结束之后,实例化一个YarnAllocationHandler
waitForSparkContextInitialized()

// Do this after Spark master is up and SparkContext is created so that we can register UI Url.
  // 向yarn注册当前的ApplicationMaster, 这个时候isFinished不能为true,是true就说明程序失败了
synchronized {
if (!isFinished) {
registerApplicationMaster()
registered = true
}
}

// 申请Container来启动Executor
allocateExecutors()

// 等待程序运行结束
userThread.join()

System.exit(0)
}
View Code
run方法里面主要干了5项工作:
1、初始化工作
2、启动driver程序
3、注册ApplicationMaster
4、分配Executors
5、等待程序运行结束
我们重点看分配Executor方法。

  private def allocateExecutors() {
try {
logInfo("Allocating " + args.numExecutors + " executors.")
// 分host、rack、任意机器三种类型向ResourceManager提交ContainerRequest
    // 请求的Container数量可能大于需要的数量
yarnAllocator.addResourceRequests(args.numExecutors)
// Exits the loop if the user thread exits.
while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive) {
if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
finishApplicationMaster(FinalApplicationStatus.FAILED, "max number of executor failures reached")
}
     // 把请求回来的资源进行分配,并释放掉多余的资源
yarnAllocator.allocateResources()
ApplicationMaster.incrementAllocatorLoop(1)
Thread.sleep(100)
}
} finally {
// In case of exceptions, etc - ensure that count is at least ALLOCATOR_LOOP_WAIT_COUNT,
// so that the loop in ApplicationMaster#sparkContextInitialized() breaks.
ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT)
}
logInfo("All executors have launched.")

// 启动一个线程来状态报告
if (userThread.isAlive) {
// Ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapses.
val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)

// we want to be reasonably responsive without causing too many requests to RM.
val schedulerInterval = sparkConf.getLong("spark.yarn.scheduler.heartbeat.interval-ms", 5000)

// must be <= timeoutInterval / 2.
val interval = math.min(timeoutInterval / 2, schedulerInterval)

launchReporterThread(interval)
}
}
View Code
这里面我们只需要看addResourceRequests和allocateResources方法即可。
先说addResourceRequests方法,代码就不贴了。
Client向ResourceManager提交Container的请求,分三种类型:优先选择机器、同一个rack的机器、任意机器。
优先选择机器是在RDD里面的getPreferredLocations获得的机器位置,如果没有优先选择机器,也就没有同一个rack之说了,可以是任意机器。
下面我们接着看allocateResources方法。

  def allocateResources() {
// We have already set the container request. Poll the ResourceManager for a response.
// This doubles as a heartbeat if there are no pending container requests.
  // 之前已经提交过Container请求了,现在只需要获取response即可
val progressIndicator = 0.1f
val allocateResponse = amClient.allocate(progressIndicator)

val allocatedContainers = allocateResponse.getAllocatedContainers()
if (allocatedContainers.size > 0) {
var numPendingAllocateNow = numPendingAllocate.addAndGet(-1 * allocatedContainers.size)

if (numPendingAllocateNow < 0) {
numPendingAllocateNow = numPendingAllocate.addAndGet(-1 * numPendingAllocateNow)
}

val hostToContainers = new HashMap[String, ArrayBuffer[Container]]()

for (container <- allocatedContainers) {
     // 内存 > Executor所需内存 + 384
if (isResourceConstraintSatisfied(container)) {
// 把container收入名册当中,等待发落
val host = container.getNodeId.getHost
val containersForHost = hostToContainers.getOrElseUpdate(host, new ArrayBuffer[Container]())
containersForHost += container
} else {
// 内存不够,释放掉它
releaseContainer(container)
}
}

// 找到合适的container来使用.
val dataLocalContainers = new HashMap[String, ArrayBuffer[Container]]()
val rackLocalContainers = new HashMap[String, ArrayBuffer[Container]]()
val offRackContainers = new HashMap[String, ArrayBuffer[Container]]()
    // 遍历所有的host
for (candidateHost <- hostToContainers.keySet) {
val maxExpectedHostCount = preferredHostToCount.getOrElse(candidateHost, 0)
val requiredHostCount = maxExpectedHostCount - allocatedContainersOnHost(candidateHost)

val remainingContainersOpt = hostToContainers.get(candidateHost)
var remainingContainers = remainingContainersOpt.get
      
if (requiredHostCount >= remainingContainers.size) {
// 需要的比现有的多,把符合数据本地性的添加到dataLocalContainers映射关系里
dataLocalContainers.put(candidateHost, remainingContainers)
// 没有containner剩下的.
remainingContainers = null
} else if (requiredHostCount > 0) {
// 获得的container比所需要的多,把多余的释放掉
val (dataLocal, remaining) = remainingContainers.splitAt(remainingContainers.size - requiredHostCount)
dataLocalContainers.put(candidateHost, dataLocal)

for (container <- remaining) releaseContainer(container)
remainingContainers = null
}

// 数据所在机器已经分配满任务了,只能在同一个rack里面挑选了
if (remainingContainers != null) {
val rack = YarnAllocationHandler.lookupRack(conf, candidateHost)
if (rack != null) {
val maxExpectedRackCount = preferredRackToCount.getOrElse(rack, 0)
val requiredRackCount = maxExpectedRackCount - allocatedContainersOnRack(rack) -
rackLocalContainers.getOrElse(rack, List()).size

if (requiredRackCount >= remainingContainers.size) {
// Add all remaining containers to to `dataLocalContainers`.
dataLocalContainers.put(rack, remainingContainers)
remainingContainers = null
} else if (requiredRackCount > 0) {
// Container list has more containers that we need for data locality.
val (rackLocal, remaining) = remainingContainers.splitAt(remainingContainers.size - requiredRackCount)
val existingRackLocal = rackLocalContainers.getOrElseUpdate(rack, new ArrayBuffer[Container]())

existingRackLocal ++= rackLocal
remainingContainers = remaining
}
}
}

if (remainingContainers != null) {
// 还是不够,只能放到别的rack的机器上运行了
offRackContainers.put(candidateHost, remainingContainers)
}
}

// 按照数据所在机器、同一个rack、任意机器来排序
val allocatedContainersToProcess = new ArrayBuffer[Container](allocatedContainers.size)
allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(dataLocalContainers)
allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(rackLocalContainers)
allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(offRackContainers)

// 遍历选择了的Container,为每个Container启动一个ExecutorRunnable线程专门负责给它发送命令
for (container <- allocatedContainersToProcess) {
val numExecutorsRunningNow = numExecutorsRunning.incrementAndGet()
val executorHostname = container.getNodeId.getHost
val containerId = container.getId
     // 内存需要大于Executor的内存 + 384
val executorMemoryOverhead = (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)

if (numExecutorsRunningNow > maxExecutors) {
// 正在运行的比需要的多了,释放掉多余的Container
releaseContainer(container)
numExecutorsRunning.decrementAndGet()
} else {
val executorId = executorIdCounter.incrementAndGet().toString
val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
sparkConf.get("spark.driver.host"),
sparkConf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)


// To be safe, remove the container from `pendingReleaseContainers`.
pendingReleaseContainers.remove(containerId)
// 把container记录到已分配的rack的映射关系当中
val rack = YarnAllocationHandler.lookupRack(conf, executorHostname)
allocatedHostToContainersMap.synchronized {
val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
new HashSet[ContainerId]())

containerSet += containerId
allocatedContainerToHostMap.put(containerId, executorHostname)

if (rack != null) {
allocatedRackCount.put(rack, allocatedRackCount.getOrElse(rack, 0) + 1)
}
}
      // 启动一个线程给它进行跟踪服务,给它发送运行Executor的命令
val executorRunnable = new ExecutorRunnable(
container,
conf,
sparkConf,
driverUrl,
executorId,
executorHostname,
executorMemory,
executorCores)
new Thread(executorRunnable).start()
}
}

}
View Code
1、把从ResourceManager中获得的Container进行选择,选择顺序是按照前面的介绍的三种类别依次进行,优先选择机器 > 同一个rack的机器 > 任意机器。
2、选择了Container之后,给每一个Container都启动一个ExecutorRunner一对一贴身服务,给它发送运行CoarseGrainedExecutorBackend的命令。
3、ExecutorRunner通过NMClient来向NodeManager发送请求。
 
总结:
把作业发布到yarn上面去执行这块涉及到的类不多,主要是涉及到Client、ApplicationMaster、YarnAllocationHandler、ExecutorRunner这四个类。
1、Client作为Yarn的客户端,负责向Yarn发送启动ApplicationMaster的命令。
2、ApplicationMaster就像项目经理一样负责整个项目所需要的工作,包括请求资源,分配资源,启动Driver和Executor,Executor启动失败的错误处理。
3、ApplicationMaster的请求、分配资源是通过YarnAllocationHandler来进行的。
4、Container选择的顺序是:优先选择机器 > 同一个rack的机器 > 任意机器。
5、ExecutorRunner只负责向Container发送启动CoarseGrainedExecutorBackend的命令。
6、Executor的错误处理是在ApplicationMaster的launchReporterThread方法里面,它启动的线程除了报告运行状态,还会监控Executor的运行,一旦发现有丢失的Executor就重新请求。
7、在yarn目录下看到的名称里面带有YarnClient的是属于yarn-client模式的类,实现和前面的也差不多。
其它的内容更多是Yarn的客户端api使用,我也不太会,只是看到了能懂个意思,哈哈。
 
 
岑玉海
转载请注明出处,谢谢!
 

Spark源码系列(六)Shuffle的过程解析

cenyuhai 发表了文章 0 个评论 2337 次浏览 2015-09-11 15:21 来自相关话题

Spark大会上,所有的演讲嘉宾都认为shuffle是最影响性能的地方,但是又无可奈何。之前去百度面试hadoop的时候,也被问到了这个问题,直接回答了不知道。 这篇文章主要是沿着下面几个问题来开展: 1、shuffle过程的划分? ...查看全部
Spark大会上,所有的演讲嘉宾都认为shuffle是最影响性能的地方,但是又无可奈何。之前去百度面试hadoop的时候,也被问到了这个问题,直接回答了不知道。
这篇文章主要是沿着下面几个问题来开展:
1、shuffle过程的划分?
2、shuffle的中间结果如何存储?
3、shuffle的数据如何拉取过来?Shuffle过程的划分
Spark的操作模型是基于RDD的,当调用RDD的reduceByKey、groupByKey等类似的操作的时候,就需要有shuffle了。再拿出reduceByKey这个来讲。
  def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = {
reduceByKey(new HashPartitioner(numPartitions), func)
}

reduceByKey的时候,我们可以手动设定reduce的个数,如果不指定的话,就可能不受控制了。

  def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse
for (r <- bySize if r.partitioner.isDefined) {
return r.partitioner.get
}
if (rdd.context.conf.contains("spark.default.parallelism")) {
new HashPartitioner(rdd.context.defaultParallelism)
} else {
new HashPartitioner(bySize.head.partitions.size)
}
}
View Code
如果不指定reduce个数的话,就按默认的走:
1、如果自定义了分区函数partitioner的话,就按你的分区函数来走。
2、如果没有定义,那么如果设置了spark.default.parallelism,就使用哈希的分区方式,reduce个数就是设置的这个值。
3、如果这个也没设置,那就按照输入数据的分片的数量来设定。如果是hadoop的输入数据的话,这个就多了。。。大家可要小心啊。
设定完之后,它会做三件事情,也就是之前讲的3次RDD转换。

//map端先按照key合并一次
val combined = self.mapPartitionsWithContext((context, iter) => {
aggregator.combineValuesByKey(iter, context)
}, preservesPartitioning = true)
//reduce抓取数据
val partitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner).setSerializer(serializer)
//合并数据,执行reduce计算

partitioned.mapPartitionsWithContext((context, iter) => {
new InterruptibleIterator(context, aggregator.combineCombinersByKey(iter, context))
}, preservesPartitioning = true)
View Code

1、在第一个MapPartitionsRDD这里先做一次map端的聚合操作。
2、ShuffledRDD主要是做从这个抓取数据的工作。
3、第二个MapPartitionsRDD把抓取过来的数据再次进行聚合操作。
4、步骤1和步骤3都会涉及到spill的过程。
怎么做的聚合操作,回去看RDD那章。Shuffle的中间结果如何存储
作业提交的时候,DAGScheduler会把Shuffle的过程切分成map和reduce两个Stage(之前一直被我叫做shuffle前和shuffle后),具体的切分的位置在上图的虚线处。
map端的任务会作为一个ShuffleMapTask提交,最后在TaskRunner里面调用了它的runTask方法。

  override def runTask(context: TaskContext): MapStatus = {
val numOutputSplits = dep.partitioner.numPartitions
metrics = Some(context.taskMetrics)

val blockManager = SparkEnv.get.blockManager
val shuffleBlockManager = blockManager.shuffleBlockManager
var shuffle: ShuffleWriterGroup = null
var success = false

try {
// serializer为空的情况调用默认的JavaSerializer,也可以通过spark.serializer来设置成别的
val ser = Serializer.getSerializer(dep.serializer)
// 实例化Writer,Writer的数量=numOutputSplits=前面我们说的那个reduce的数量
shuffle = shuffleBlockManager.forMapTask(dep.shuffleId, partitionId, numOutputSplits, ser)

// 遍历rdd的元素,按照key计算出来它所在的bucketId,然后通过bucketId找到相应的Writer写入
for (elem <- rdd.iterator(split, context)) {
val pair = elem.asInstanceOf[Product2[Any, Any]]
val bucketId = dep.partitioner.getPartition(pair._1)
shuffle.writers(bucketId).write(pair)
}

// 提交写入操作. 计算每个bucket block的大小
var totalBytes = 0L
var totalTime = 0L
val compressedSizes: Array[Byte] = shuffle.writers.map { writer: BlockObjectWriter =>
writer.commit()
writer.close()
val size = writer.fileSegment().length
totalBytes += size
totalTime += writer.timeWriting()
MapOutputTracker.compressSize(size)
}

// 更新 shuffle 监控参数.
val shuffleMetrics = new ShuffleWriteMetrics
shuffleMetrics.shuffleBytesWritten = totalBytes
shuffleMetrics.shuffleWriteTime = totalTime
metrics.get.shuffleWriteMetrics = Some(shuffleMetrics)

success = true
new MapStatus(blockManager.blockManagerId, compressedSizes)
} catch { case e: Exception =>
// 出错了,取消之前的操作,关闭writer
if (shuffle != null && shuffle.writers != null) {
for (writer <- shuffle.writers) {
writer.revertPartialWrites()
writer.close()
}
}
throw e
} finally {
// 关闭writer
if (shuffle != null && shuffle.writers != null) {
try {
shuffle.releaseWriters(success)
} catch {
case e: Exception => logError("Failed to release shuffle writers", e)
}
}
// 执行注册的回调函数,一般是做清理工作
context.executeOnCompleteCallbacks()
}
}
View Code
遍历每一个记录,通过它的key来确定它的bucketId,再通过这个bucket的writer写入数据。
下面我们看看ShuffleBlockManager的forMapTask方法吧。

def forMapTask(shuffleId: Int, mapId: Int, numBuckets: Int, serializer: Serializer) = {
new ShuffleWriterGroup {
shuffleStates.putIfAbsent(shuffleId, new ShuffleState(numBuckets))
private val shuffleState = shuffleStates(shuffleId)
private var fileGroup: ShuffleFileGroup = null

val writers: Array[BlockObjectWriter] = if (consolidateShuffleFiles) {
fileGroup = getUnusedFileGroup()
Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
      // 从已有的文件组里选文件,一个bucket一个文件,即要发送到同一个reduce的数据写入到同一个文件
blockManager.getDiskWriter(blockId, fileGroup(bucketId), serializer, bufferSize)
}
} else {
Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
// 按照blockId来生成文件,文件数为map数*reduce数
val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
val blockFile = blockManager.diskBlockManager.getFile(blockId)
if (blockFile.exists) {
if (blockFile.delete()) {
logInfo(s"Removed existing shuffle file $blockFile")
} else {
logWarning(s"Failed to remove existing shuffle file $blockFile")
}
}
blockManager.getDiskWriter(blockId, blockFile, serializer, bufferSize)
}
}
View Code
1、map的中间结果是写入到本地硬盘的,而不是内存。
2、默认是一个Executor的中间结果文件是M*R(M=map数量,R=reduce的数量),设置了spark.shuffle.consolidateFiles为true之后是R个文件,根据bucketId把要分到同一个reduce的结果写入到一个文件中。
3、consolidateFiles采用的是一个reduce一个文件,它还记录了每个map的写入起始位置,所以查找的时候先通过reduceId查找到哪个文件,再通过mapId查找索引当中的起始位置offset,长度length=(mapId + 1).offset -(mapId).offset,这样就可以确定一个FileSegment(file, offset, length)。
4、Finally,存储结束之后, 返回了一个new MapStatus(blockManager.blockManagerId, compressedSizes),把blockManagerId和block的大小都一起返回。
个人想法,shuffle这块和hadoop的机制差别不大,tez这样的引擎会赶上spark的速度呢?还是让我们拭目以待吧!Shuffle的数据如何拉取过来
ShuffleMapTask结束之后,最后走到DAGScheduler的handleTaskCompletion方法当中(关于中间的过程,请看《图解作业生命周期》)。

case smt: ShuffleMapTask =>
val status = event.result.asInstanceOf[MapStatus]
val execId = status.location.executorId
if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) {
logInfo("Ignoring possibly bogus ShuffleMapTask completion from " + execId)
} else {
stage.addOutputLoc(smt.partitionId, status)
}
if (runningStages.contains(stage) && pendingTasks(stage).isEmpty) {
markStageAsFinished(stage)
if (stage.shuffleDep.isDefined) {
// 真的map过程才会有这个依赖,reduce过程None
mapOutputTracker.registerMapOutputs(
  stage.shuffleDep.get.shuffleId,
stage.outputLocs.map(list => if (list.isEmpty) null else list.head).toArray,
changeEpoch = true)
}
clearCacheLocs()
if (stage.outputLocs.exists(_ == Nil)) {
// 一些任务失败了,需要重新提交stage
submitStage(stage)
} else {
// 提交下一批任务
   }
}
View Code
1、把结果添加到Stage的outputLocs数组里,它是按照数据的分区Id来存储映射关系的partitionId->MapStaus。
2、stage结束之后,通过mapOutputTracker的registerMapOutputs方法,把此次shuffle的结果outputLocs记录到mapOutputTracker里面。
这个stage结束之后,就到ShuffleRDD运行了,我们看一下它的compute函数。
SparkEnv.get.shuffleFetcher.fetch[P](shuffledId, split.index, context, ser)

它是通过ShuffleFetch的fetch方法来抓取的,具体实现在BlockStoreShuffleFetcher里面。

  override def fetch[T](
shuffleId: Int,
reduceId: Int,
context: TaskContext,
serializer: Serializer)
: Iterator[T] =
{
val blockManager = SparkEnv.get.blockManager
val startTime = System.currentTimeMillis
   // mapOutputTracker也分Master和Worker,Worker向Master请求获取reduce相关的MapStatus,主要是(BlockManagerId和size)
val statuses = SparkEnv.get.mapOutputTracker.getServerStatuses(shuffleId, reduceId)
// 一个BlockManagerId对应多个文件的大小
val splitsByAddress = new HashMap[BlockManagerId, ArrayBuffer[(Int, Long)]]
for (((address, size), index) <- statuses.zipWithIndex) {
splitsByAddress.getOrElseUpdate(address, ArrayBuffer()) += ((index, size))
}
// 构造BlockManagerId 和 BlockId的映射关系,想不到ShffleBlockId的mapId,居然是1,2,3,4的序列...
val blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])] = splitsByAddress.toSeq.map {
case (address, splits) =>
(address, splits.map(s => (ShuffleBlockId(shuffleId, s._1, reduceId), s._2)))
}
// 名为updateBlock,实际是检验函数,每个Block都对应着一个Iterator接口,如果该接口为空,则应该报错
def unpackBlock(blockPair: (BlockId, Option[Iterator[Any]])) : Iterator[T] = {
val blockId = blockPair._1
val blockOption = blockPair._2
blockOption match {
case Some(block) => {
block.asInstanceOf[Iterator[T]]
}
case None => {
blockId match {
case ShuffleBlockId(shufId, mapId, _) =>
val address = statuses(mapId.toInt)._1
throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId, null)
case _ =>
throw new SparkException("Failed to get block " + blockId + ", which is not a shuffle block")
}
}
}
}
// 从blockManager获取reduce所需要的全部block,并添加校验函数
val blockFetcherItr = blockManager.getMultiple(blocksByAddress, serializer)
val itr = blockFetcherItr.flatMap(unpackBlock)

  val completionIter = CompletionIterator[T, Iterator[T]](itr, {
// CompelteIterator迭代结束之后,会执行以下这部分代码,提交它记录的各种参数
val shuffleMetrics = new ShuffleReadMetrics
shuffleMetrics.shuffleFinishTime = System.currentTimeMillis
shuffleMetrics.fetchWaitTime = blockFetcherItr.fetchWaitTime
shuffleMetrics.remoteBytesRead = blockFetcherItr.remoteBytesRead
shuffleMetrics.totalBlocksFetched = blockFetcherItr.totalBlocks
shuffleMetrics.localBlocksFetched = blockFetcherItr.numLocalBlocks
shuffleMetrics.remoteBlocksFetched = blockFetcherItr.numRemoteBlocks
context.taskMetrics.shuffleReadMetrics = Some(shuffleMetrics)
})

new InterruptibleIterator[T](context, completionIter)
}
}
View Code
1、MapOutputTrackerWorker向MapOutputTrackerMaster获取shuffle相关的map结果信息。
2、把map结果信息构造成BlockManagerId --> Array(BlockId, size)的映射关系。
3、通过BlockManager的getMultiple批量拉取block。
4、返回一个可遍历的Iterator接口,并更新相关的监控参数。
我们继续看getMultiple方法。

  def getMultiple(
blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])],
serializer: Serializer): BlockFetcherIterator = {
val iter =
if (conf.getBoolean("spark.shuffle.use.netty", false)) {
new BlockFetcherIterator.NettyBlockFetcherIterator(this, blocksByAddress, serializer)
} else {
new BlockFetcherIterator.BasicBlockFetcherIterator(this, blocksByAddress, serializer)
}

iter.initialize()
iter
}
View Code
分两种情况处理,分别是netty的和Basic的,Basic的就不讲了,就是通过ConnectionManager去指定的BlockManager那里获取数据,上一章刚好说了。
我们讲一下Netty的吧,这个是需要设置的才能启用的,不知道性能会不会好一些呢?
看NettyBlockFetcherIterator的initialize方法,再看BasicBlockFetcherIterator的initialize方法,发现Basic的不能同时抓取超过48Mb的数据。

    override def initialize() {
// 分开本地请求和远程请求,返回远程的FetchRequest
val remoteRequests = splitLocalRemoteBlocks()
// 抓取顺序随机
for (request <- Utils.randomize(remoteRequests)) {
fetchRequestsSync.put(request)
}
// 默认是开6个线程去进行抓取
copiers = startCopiers(conf.getInt("spark.shuffle.copier.threads", 6))// 读取本地的block
getLocalBlocks()
}
View Code
在NettyBlockFetcherIterator的sendRequest方法里面,发现它是通过ShuffleCopier来试下的。
  val cpier = new ShuffleCopier(blockManager.conf)
cpier.getBlocks(cmId, req.blocks, putResult)

这块接下来就是netty的客户端调用的方法了,我对这个不了解。在服务端的处理是在DiskBlockManager内部启动了一个ShuffleSender的服务,最终的业务处理逻辑是在FileServerHandler。
它是通过getBlockLocation返回一个FileSegment,下面这段代码是ShuffleBlockManager的getBlockLocation方法。
  def getBlockLocation(id: ShuffleBlockId): FileSegment = {
// Search all file groups associated with this shuffle.
val shuffleState = shuffleStates(id.shuffleId)
for (fileGroup <- shuffleState.allFileGroups) {
val segment = fileGroup.getFileSegmentFor(id.mapId, id.reduceId)
if (segment.isDefined) { return segment.get }
}
throw new IllegalStateException("Failed to find shuffle block: " + id)
}


先通过shuffleId找到ShuffleState,再通过reduceId找到文件,最后通过mapId确定它的文件分片的位置。但是这里有个疑问了,如果启用了consolidateFiles,一个reduce的所需数据都在一个文件里,是不是就可以把整个文件一起返回呢,而不是通过N个map来多次读取?还是害怕一次发送一个大文件容易失败?这就不得而知了。
到这里整个过程就讲完了。可以看得出来Shuffle这块还是做了一些优化的,但是这些参数并没有启用,有需要的朋友可以自己启用一下试试效果。
 
 
岑玉海
转载请注明出处,谢谢!
 

Spark源码系列(五)分布式缓存

cenyuhai 发表了文章 0 个评论 2070 次浏览 2015-09-11 15:21 来自相关话题

这一章想讲一下Spark的缓存是如何实现的。这个persist方法是在RDD里面的,所以我们直接打开RDD这个类。 def persist(newLevel: StorageLevel): this.type = { // ...查看全部
这一章想讲一下Spark的缓存是如何实现的。这个persist方法是在RDD里面的,所以我们直接打开RDD这个类。
  def persist(newLevel: StorageLevel): this.type = {
// StorageLevel不能随意更改
if (storageLevel != StorageLevel.NONE && newLevel != storageLevel) {
throw new UnsupportedOperationException("Cannot change storage level of an RDD after it was already assigned a level")
}
sc.persistRDD(this)
// Register the RDD with the ContextCleaner for automatic GC-based cleanup
// 注册清理方法
sc.cleaner.foreach(_.registerRDDForCleanup(this))
storageLevel = newLevel
this
}


它调用SparkContext去缓存这个RDD,追杀下去。
  private[spark] def persistRDD(rdd: RDD[_]) {
persistentRdds(rdd.id) = rdd
}

它居然是用一个HashMap来存的,具体看这个map的类型是TimeStampedWeakValueHashMap[Int, RDD[_]]类型。把存进去的值都隐式转换成WeakReference,然后加到一个内部的一个ConcurrentHashMap里面。这里貌似也没干啥,这是有个鸟蛋用。。大神莫喷,知道干啥用的人希望告诉我一下。CacheManager
现在并没有保存,等到真正运行Task运行的时候才会去缓存起来。入口在Task的runTask方法里面,具体的我们可以看ResultTask,它调用了RDD的iterator方法。
  final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
if (storageLevel != StorageLevel.NONE) {
SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
} else {
computeOrReadCheckpoint(split, context)
}
}


一旦设置了StorageLevel,就要从SparkEnv的cacheManager取数据。

  def getOrCompute[T](rdd: RDD[T], split: Partition, context: TaskContext, storageLevel: StorageLevel): Iterator[T] = {
val key = RDDBlockId(rdd.id, split.index)
blockManager.get(key) match {
case Some(values) =>
// 已经有了,直接返回就可以了
new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]])

case None =>
// loading包含这个key表示已经有人在加载了,等到loading被释放了,就可以去blockManager里面取到了
loading.synchronized {
if (loading.contains(key)) {
while (loading.contains(key)) {
try {
loading.wait()
} catch {
case e: Exception =>
logWarning(s"Got an exception while waiting for another thread to load $key", e)
}
}
// 别人成功拿到了,我们直接取结果就是了,如果别人取失败了,我们再来取一次
blockManager.get(key) match {
case Some(values) =>
return new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]])
case None =>
loading.add(key)
}
} else {
loading.add(key)
}
}
try {
// 通过rdd自身的compute方法去计算得到结果,回去看看RDD那文章,自己看看源码就清楚了
val computedValues = rdd.computeOrReadCheckpoint(split, context)

// 如果是本地运行的,就没必要缓存了,直接返回即可
if (context.runningLocally) {
return computedValues
}

// 跟踪blocks的更新状态
var updatedBlocks = Seq[(BlockId, BlockStatus)]()
val returnValue: Iterator[T] = {
if (storageLevel.useDisk && !storageLevel.useMemory) {
/* 这是RDD采用DISK_ONLY的情况,直接扔给blockManager
* 然后把结果直接返回,它不需要把结果一下子全部加载进内存
* 这同样适用于MEMORY_ONLY_SER,但是我们需要在启用它之前确认blocks没被block store给丢弃 */
updatedBlocks = blockManager.put(key, computedValues, storageLevel, tellMaster = true)
blockManager.get(key) match {
case Some(values) =>
values.asInstanceOf[Iterator[T]]
case None =>
throw new Exception("Block manager failed to return persisted valued")
}
} else {
// 先存到一个ArrayBuffer,然后一次返回,在blockManager里也存一份
val elements = new ArrayBuffer[Any]
elements ++= computedValues
updatedBlocks = blockManager.put(key, elements, storageLevel, tellMaster = true)
elements.iterator.asInstanceOf[Iterator[T]]
}
}

// 更新task的监控参数
val metrics = context.taskMetrics
metrics.updatedBlocks = Some(updatedBlocks)

new InterruptibleIterator(context, returnValue)

} finally {
// 改完了,释放锁
loading.synchronized {
loading.remove(key)
loading.notifyAll()
}
}
}
}
View Code
1、如果blockManager当中有,直接从blockManager当中取。
2、如果blockManager没有,就先用RDD的compute函数得到出来一个Iterable接口。
3、如果StorageLevel是只保存在硬盘的话,就把值存在blockManager当中,然后从blockManager当中取出一个Iterable接口,这样的好处是不会一次把数据全部加载进内存。
4、如果StorageLevel是需要使用内存的情况,就把结果添加到一个ArrayBuffer当中一次返回,另外在blockManager存上一份,下次直接从blockManager取。
对StorageLevel说明一下吧,贴一下它的源码。

class StorageLevel private(
private var useDisk_ : Boolean,
private var useMemory_ : Boolean,
private var useOffHeap_ : Boolean,
private var deserialized_ : Boolean,
private var replication_ : Int = 1)

val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(false, false, true, false)
View Code
大家注意看它那几个参数,useDisk_、useMemory_、useOffHeap_、deserialized_、replication_ 在具体的类型的时候是传的什么值。
下面我们的目标要放到blockManager。BlockManager
BlockManager这个类比较大,我们从两方面开始看吧,putBytes和get方法。先从putBytes说起,之前说过Task运行结束之后,结果超过10M的话,会用BlockManager缓存起来。
env.blockManager.putBytes(blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)

putBytes内部又掉了另外一个方法doPut,方法很大呀,先折叠起来。

  private def doPut(
blockId: BlockId,
data: Values,
level: StorageLevel,
tellMaster: Boolean = true): Seq[(BlockId, BlockStatus)] = {// Return value
val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]

// 记录它的StorageLevel,以便我们可以在它加载进内存之后,可以按需写入硬盘。
  // 此外,在我们把调用BlockInfo的markReay方法之前,都没法通过get方法获得该部分内容
val putBlockInfo = {
val tinfo = new BlockInfo(level, tellMaster)
// 如果不存在,就添加到blockInfo里面
val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo)
if (oldBlockOpt.isDefined) {
// 如果已经存在了,就不需要重复添加了
if (oldBlockOpt.get.waitForReady()) {return updatedBlocks
}
// 存在于blockInfo当中->但是上一次保存失败了,拿出旧的信息,再试一遍
oldBlockOpt.get
} else {
tinfo
}
}

val startTimeMs = System.currentTimeMillis
// 当我们需要存储数据,并且要复制数据到别的机器,我们需要访问它的值,但是因为我们的put操作会读取整个iterator,
// 这就不会有任何的值留下。在我们保存序列化的数据的场景,我们可以记住这些bytes,但在其他场景,比如反序列化存储的
// 时候,我们就必须依赖返回一个Iterator
var valuesAfterPut: Iterator[Any] = null
// Ditto for the bytes after the put
var bytesAfterPut: ByteBuffer = null
// Size of the block in bytes
var size = 0L

// 在保存数据之前,我们要实例化,在数据已经序列化并且准备好发送的情况下,这个过程是很快的
val replicationFuture = if (data.isInstanceOf[ByteBufferValues] && level.replication > 1) {
// duplicate并不是复制这些数据,只是做了一个包装
val bufferView = data.asInstanceOf[ByteBufferValues].buffer.duplicate()
Future {
// 把block复制到别的机器上去
replicate(blockId, bufferView, level)
}
} else {
null
}

putBlockInfo.synchronized {

var marked = false
try {
if (level.useMemory) {
// 首先是保存到内存里面,尽管它也使用硬盘,等内存不够的时候,才会写入硬盘
// 下面分了三种情况,但是Task的结果是ByteBufferValues这种情况,具体看putBytes方法
val res = data match {
case IteratorValues(iterator) =>
memoryStore.putValues(blockId, iterator, level, true)
case ArrayBufferValues(array) =>
memoryStore.putValues(blockId, array, level, true)
case ByteBufferValues(bytes) =>
bytes.rewind()
memoryStore.putBytes(blockId, bytes, level)
}
size = res.size
// 这里写得那么恶心,是跟data的类型有关系的,data: Either[Iterator[_], ByteBuffer],Left是Iterator,Right是ByteBuffer
res.data match {
case Right(newBytes) => bytesAfterPut = newBytes
case Left(newIterator) => valuesAfterPut = newIterator
}
// 把被置换到硬盘的blocks记录到updatedBlocks上
res.droppedBlocks.foreach { block => updatedBlocks += block }
} else if (level.useOffHeap) {
// 保存到Tachyon上.
val res = data match {
case IteratorValues(iterator) =>
tachyonStore.putValues(blockId, iterator, level, false)
case ArrayBufferValues(array) =>
tachyonStore.putValues(blockId, array, level, false)
case ByteBufferValues(bytes) =>
bytes.rewind()
tachyonStore.putBytes(blockId, bytes, level)
}
size = res.size
res.data match {
case Right(newBytes) => bytesAfterPut = newBytes
case _ =>
}
} else {
// 直接保存到硬盘,不要复制到其它节点的就别返回数据了.
val askForBytes = level.replication > 1
val res = data match {
case IteratorValues(iterator) =>
diskStore.putValues(blockId, iterator, level, askForBytes)
case ArrayBufferValues(array) =>
diskStore.putValues(blockId, array, level, askForBytes)
case ByteBufferValues(bytes) =>
bytes.rewind()
diskStore.putBytes(blockId, bytes, level)
}
size = res.size
res.data match {
case Right(newBytes) => bytesAfterPut = newBytes
case _ =>
}
}
     // 通过blockId获得当前的block状态
val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
if (putBlockStatus.storageLevel != StorageLevel.NONE) {
// 成功了,把该block标记为ready,通知BlockManagerMaster
marked = true
putBlockInfo.markReady(size)
if (tellMaster) {
reportBlockStatus(blockId, putBlockInfo, putBlockStatus)
}
updatedBlocks += ((blockId, putBlockStatus))
}
} finally {
// 如果没有标记成功,就把该block信息清除
if (!marked) {
blockInfo.remove(blockId)
putBlockInfo.markFailure()
}
}
}

// 把数据发送到别的节点做备份
if (level.replication > 1) {
data match {
case ByteBufferValues(bytes) => Await.ready(replicationFuture, Duration.Inf)
case _ => {
val remoteStartTime = System.currentTimeMillis
// 把Iterator里面的数据序列化之后,发送到别的节点
if (bytesAfterPut == null) {
if (valuesAfterPut == null) {
throw new SparkException("Underlying put returned neither an Iterator nor bytes! This shouldn't happen.")
}
bytesAfterPut = dataSerialize(blockId, valuesAfterPut)
}
replicate(blockId, bytesAfterPut, level)
}
}
}
// 销毁bytesAfterPut
BlockManager.dispose(bytesAfterPut)
updatedBlocks
}
View Code
从上面的的来看:
1、存储的时候按照不同的存储级别分了3种情况来处理:存在内存当中(包括MEMORY字样的),存在tachyon上(OFF_HEAP),只存在硬盘上(DISK_ONLY)。
2、存储完成之后会根据存储级别决定是否发送到别的节点,在名字上最后带2字的都是这种,2表示一个block会在两个节点上保存。
3、存储完毕之后,会向BlockManagerMaster汇报block的情况。
4、这里面的序列化其实是先压缩后序列化,默认使用的是LZF压缩,可以通过spark.io.compression.codec设定为snappy或者lzo,序列化方式通过spark.serializer设置,默认是JavaSerializer。

接下来我们再看get的情况。
    val local = getLocal(blockId)
if (local.isDefined) return local
val remote = getRemote(blockId)
if (remote.isDefined) return remote
None

先从本地取,本地没有再去别的节点取,都没有,返回None。从本地取就不说了,怎么进怎么出。讲一下怎么从别的节点去,它们是一个什么样子的关系?
我们先看getRemote方法

  private def doGetRemote(blockId: BlockId, asValues: Boolean): Option[Any] = {
val locations = Random.shuffle(master.getLocations(blockId))
for (loc <- locations) {
val data = BlockManagerWorker.syncGetBlock(GetBlock(blockId), ConnectionManagerId(loc.host, loc.port))
if (data != null) {
if (asValues) {
return Some(dataDeserialize(blockId, data))
} else {
return Some(data)
}
}
}
None
}
View Code
这个方法包括两个步骤:
1、用blockId通过master的getLocations方法找到它的位置。
2、通过BlockManagerWorker.syncGetBlock到指定的节点获取数据。
ok,下面就重点讲BlockManager和BlockManagerMaster之间的关系,以及BlockManager之间是如何相互传输数据。BlockManager与BlockManagerMaster的关系
BlockManager我们使用的时候是从SparkEnv.get获得的,我们观察了一下SparkEnv,发现它包含了我们运行时候常用的那些东东。那它创建是怎么创建的呢,我们找到SparkEnv里面的create方法,右键FindUsages,就会找到两个地方调用了,一个是SparkContext,另一个是Executor。在SparkEnv的create方法里面会实例化一个BlockManager和BlockManagerMaster。这里我们需要注意看BlockManagerMaster的实例化方法,里面调用了registerOrLookup方法。
    def registerOrLookup(name: String, newActor: => Actor): ActorRef = {
if (isDriver) {
actorSystem.actorOf(Props(newActor), name = name)
} else {
val driverHost: String = conf.get("spark.driver.host", "localhost")
val driverPort: Int = conf.getInt("spark.driver.port", 7077)
Utils.checkHost(driverHost, "Expected hostname")
val url = s"akka.tcp://spark@$driverHost:$driverPort/user/$name"
val timeout = AkkaUtils.lookupTimeout(conf)
Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout)
}
}


所以从这里可以看出来,除了Driver之后的actor都是,都是持有的Driver的引用ActorRef。梳理一下,我们可以得出以下结论:
1、SparkContext持有一个BlockManager和BlockManagerMaster。
2、每一个Executor都持有一个BlockManager和BlockManagerMaster。
3、Executor和SparkContext的BlockManagerMaster通过BlockManagerMasterActor来通信。
接下来,我们看看BlockManagerMasterActor里的三组映射关系。
  // 1、BlockManagerId和BlockManagerInfo的映射关系
private val blockManagerInfo = new mutable.HashMap[BlockManagerId, BlockManagerInfo]
// 2、Executor ID 和 Block manager ID的映射关系
private val blockManagerIdByExecutor = new mutable.HashMap[String, BlockManagerId]
// 3、BlockId和保存它的BlockManagerId的映射关系
private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]]

看到这三组关系,前面的getLocations方法不用看它的实现,我们都应该知道是怎么找了。BlockManager相互传输数据
BlockManager之间发送数据和接受数据是通过BlockManagerWorker的syncPutBlock和syncGetBlock方法来实现。看BlockManagerWorker的注释,说是BlockManager的网络接口,采用的是事件驱动模型。
再仔细看这两个方法,它传输的数据包装成BlockMessage之后,通过ConnectionManager的sendMessageReliablySync方法来传输。
接下来的故事就是nio之间的发送和接收了,就简单说几点吧:
1、ConnectionManager内部实例化一个selectorThread线程来接收消息,具体请看run方法。
2、Connection发送数据的时候,是一次把消息队列的message全部发送,不是一个一个message发送,具体看SendConnection的write方法,与之对应的接收看ReceivingConnection的read方法。
3、read完了之后,调用回调函数ConnectionManager的receiveMessage方法,它又调用了handleMessage方法,handleMessage又调用了BlockManagerWorker的onBlockMessageReceive方法。传说中的事件驱动又出现了。
  def processBlockMessage(blockMessage: BlockMessage): Option[BlockMessage] = {
blockMessage.getType match {
case BlockMessage.TYPE_PUT_BLOCK => {
val pB = PutBlock(blockMessage.getId, blockMessage.getData, blockMessage.getLevel)
putBlock(pB.id, pB.data, pB.level)
None
}
case BlockMessage.TYPE_GET_BLOCK => {
val gB = new GetBlock(blockMessage.getId)
val buffer = getBlock(gB.id)
Some(BlockMessage.fromGotBlock(GotBlock(gB.id, buffer)))
}
case _ => None
}
}


根据BlockMessage的类型进行处理,put类型就保存数据,get类型就从本地把block读出来返回给它。
 

注:BlockManagerMasterActor是存在于BlockManagerMaster内部,画在外面只是因为它在通信的时候起了关键的作用的,Executor上持有的BlockManagerMasterActor均是Driver节点的Actor的引用。广播变量
先回顾一下怎么使用广播变量:
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: spark.Broadcast[Array[Int]] = spark.Broadcast(b5c40191-a864-4c7d-b9bf-d87e1a4e787c)

scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)

看了一下实现调用的是broadcastFactory的newBroadcast方法。
  def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean) = {
broadcastFactory.newBroadcast[T](value_, isLocal, nextBroadcastId.getAndIncrement())
}

默认的broadcastFactory是HttpBroadcastFactory,内部还有另外一个实现TorrentBroadcastFactory,先说HttpBroadcastFactory的newBroadcast方法。
它直接new了一个HttpBroadcast。
  HttpBroadcast.synchronized {
SparkEnv.get.blockManager.putSingle(blockId, value_, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
}

if (!isLocal) {
HttpBroadcast.write(id, value_)
}


它的内部做了两个操作,把数据保存到driver端的BlockManager并且写入到硬盘。
TorrentBroadcast和HttpBroadcast都把数据存进了BlockManager做备份,但是TorrentBroadcast接着并没有把数据写入文件,而是采用了下面这种方式:

  def sendBroadcast() {
// 把数据给切分了,每4M一个分片
val tInfo = TorrentBroadcast.blockifyObject(value_)
totalBlocks = tInfo.totalBlocks
totalBytes = tInfo.totalBytes
hasBlocks = tInfo.totalBlocks

// 把分片的信息存到BlockManager,并通知Master
val metaId = BroadcastBlockId(id, "meta")
val metaInfo = TorrentInfo(null, totalBlocks, totalBytes)
TorrentBroadcast.synchronized {
SparkEnv.get.blockManager.putSingle(
metaId, metaInfo, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
}

// 遍历所有分片,存到BlockManager上面,并通知Master
for (i <- 0 until totalBlocks) {
val pieceId = BroadcastBlockId(id, "piece" + i)
TorrentBroadcast.synchronized {
SparkEnv.get.blockManager.putSingle(
pieceId, tInfo.arrayOfBlocks(i), StorageLevel.MEMORY_AND_DISK, tellMaster = true)
}
}
}
View Code
1、把数据序列化之后,每4M切分一下。
2、切分完了之后,把所有分片写入BlockManager。
但是找不到它们是怎么传播的??只是写入到BlockManager,但是tellMaster为false的话,就相当于存在本地了,别的BlockManager是没法获取到的。
这时候我注意到它内部有两个方法,readObject和writeObject,会不会和这两个方法有关呢?它们做的操作就是给value赋值。
为了检验这个想法,我亲自调试了一下,在反序列化任务的时候,readObject这个方法是被ObjectInputStream调用了。这块的知识大家可以百度下ObjectInputStream和ObjectOutputStream。
具体操作如下:
1、打开BroadcastSuite这个类,找到下面这段代码,图中的地方原来是512, 被我改成256了,之前一直运行不起来。
  test("Accessing TorrentBroadcast variables in a local cluster") {
val numSlaves = 4
sc = new SparkContext("local-cluster[%d, 1, 256]".format(numSlaves), "test", torrentConf)
val list = List[Int](1, 2, 3, 4)
val broadcast = sc.broadcast(list)
val results = sc.parallelize(1 to numSlaves).map(x => (x, broadcast.value.sum))
assert(results.collect().toSet === (1 to numSlaves).map(x => (x, 10)).toSet)
}


2、找到TorrentBroadcast,在readObject方法上打上断点。
3、开始调试吧。
之前讲过,Task是被序列化之后包装在消息里面发送给Worker去运行的,所以在运行之前必须把Task进行反序列化,具体在TaskRunner的run方法里面:
task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)

Ok,告诉大家入口了,剩下的大家去尝试吧。前面介绍了怎么切分的,到TorrentBroadcast的readObject里面就很容易理解了。
1、先通过MetaId从BlockManager里面取出来Meta信息。
2、通过Meta信息,构造分片id,去BlockManager里面取。
3、获得分片之后,把分片写入到本地的BlockManager当中。
4、全部取完之后,通过下面的方法反向赋值。
if (receiveBroadcast()) {
value_ = TorrentBroadcast.unBlockifyObject[T](arrayOfBlocks, totalBytes, totalBlocks)
   SparkEnv.get.blockManager.putSingle(broadcastId, value_, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
}

5、把value_又顺手写入到BlockManager当中。(这里相当于写了两份进去,大家要注意了哈,内存消耗还是大大地。幸好是MEMORY_AND_DISK的)
这么做是有好处的,这是一种类似BT的做法,把数据切分成一小块一小块,容易传播,从不同的机器上获取一小块一小块的数据,最后组装成完整的。
把完整的value写入BlockManager是为了使用的时候方便,不需要再次组装。相关参数
// BlockManager的最大内存
spark.storage.memoryFraction 默认值0.6
// 文件保存的位置
spark.local.dir 默认是系统变量java.io.tmpdir的值
// tachyon保存的地址
spark.tachyonStore.url 默认值tachyon://localhost:19998
// 默认不启用netty来传输shuffle的数据
spark.shuffle.use.netty 默认值是false
spark.shuffle.sender.port 默认值是0
// 一个reduce抓取map中间结果的最大的同时抓取数量大小(to avoid over-allocating memory for receiving shuffle outputs)
spark.reducer.maxMbInFlight 默认值是48*1024*1024
// TorrentBroadcast切分数据块的分片大小
spark.broadcast.blockSize 默认是4096
// 广播变量的工厂类
spark.broadcast.factory 默认是org.apache.spark.broadcast.HttpBroadcastFactory,也可以设置为org.apache.spark.broadcast.TorrentBroadcastFactory
// 压缩格式
spark.io.compression.codec 默认是LZF,可以设置成Snappy或者Lzo


 
岑玉海
转载请注明出处,谢谢!

Spark源码系列(四)图解作业生命周期

cenyuhai 发表了文章 0 个评论 2313 次浏览 2015-09-11 15:20 来自相关话题

这一章我们探索了Spark作业的运行过程,但是没把整个过程描绘出来,好,跟着我走吧,let you know! 我们先回顾一下这个图,Driver Program是我们写的那个程序,它的核心是SparkContext,回想一下,从ap ...查看全部
这一章我们探索了Spark作业的运行过程,但是没把整个过程描绘出来,好,跟着我走吧,let you know!

我们先回顾一下这个图,Driver Program是我们写的那个程序,它的核心是SparkContext,回想一下,从api的使用角度,RDD都必须通过它来获得。
下面讲一讲它所不为认知的一面,它和其它组件是如何交互的。Driver向Master注册Application过程
SparkContext实例化之后,在内部实例化两个很重要的类,DAGScheduler和TaskScheduler。
在standalone的模式下,TaskScheduler的实现类是TaskSchedulerImpl,在初始化它的时候SparkContext会传入一个SparkDeploySchedulerBackend。
在SparkDeploySchedulerBackend的start方法里面启动了一个AppClient。
    val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs, 
            classPathEntries, libraryPathEntries, extraJavaOpts)
val sparkHome = sc.getSparkHome()
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
           sparkHome, sc.ui.appUIAddress, sc.eventLogger.map(_.logDir))
client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
client.start()


maxCores是由参数spark.cores.max来指定的,executorMemoy是由spark.executor.memory指定的。
AppClient启动之后就会去向Master注册Applicatoin了,后面的过程我用图来表达。

上面的图中涉及到了三方通信,具体的过程如下:
1、Driver通过AppClient向Master发送了RegisterApplication消息来注册Application,Master收到消息之后会发送RegisteredApplication通知Driver注册成功,Driver的接收类还是AppClient。
2、Master接受到RegisterApplication之后会触发调度过程,在资源足够的情况下会向Woker和Driver分别发送LaunchExecutor、ExecutorAdded消息。
3、Worker接收到LaunchExecutor消息之后,会执行消息中携带的命令,执行CoarseGrainedExecutorBackend类(图中仅以它继承的接口ExecutorBackend代替),执行完毕之后会发送ExecutorStateChanged消息给Master。
4、Master接收ExecutorStateChanged之后,立即发送ExecutorUpdated消息通知Driver。
5、Driver中的AppClient接收到Master发过来的ExecutorAdded和ExecutorUpdated后进行相应的处理。
6、启动之后的CoarseGrainedExecutorBackend会向Driver发送RegisterExecutor消息。
7、Driver中的SparkDeploySchedulerBackend(具体代码在CoarseGrainedSchedulerBackend里面)接收到RegisterExecutor消息,回复注册成功的消息RegisteredExecutor给ExecutorBackend,并且立马准备给它发送任务。
8、CoarseGrainedExecutorBackend接收到RegisteredExecutor消息之后,实例化一个Executor等待任务的到来。资源的调度
好,在我们讲完了整个注册Application的通信过程之后,其中一个比较重要的地方是它的调度这块,它是怎么调度的?这也是我在前面为什么那么强调maxCores和executorMemoy的原因。
细心的读者如果看了第一章《spark-submit提交作业过程》的就知道,其实我已经讲过调度了,因为当时不知道这个app是啥。但是现在我们知道app是啥了。代码我不就贴了,总结一下吧。
1、先调度Driver,再调度Application。
2、它的调度Application的方式是先进先出,所以就不要奇怪为什么你的App总得不到调度了,就像去北京的医院看病,去晚了号就没了,是一个道理。
3、Executor的分配方式有两种,一种是倾向于把任务分散在多个节点上,一种是在尽量少的节点上运行,由参数spark.deploy.spreadOut参数来决定的,默认是true,把任务分散到多个节点上。
遍历所有等待的Application,给它分配Executor运行的Worker,默认分配方式如下:
1、先从workers当中选出内存大于executorMemoy的worker,并且按照空闲cpu数从大到小的顺序来排序。
2、遍历worker,从可用的worker分配需要的cpu数,每个worker提供一个cpu核心,直到cpu数不足或者maxCores分配完毕。
3、给选出来的worker发送任务,让它们启动Executor,每个Executor占用的内存是我们设定的executorMemoy。
资源调度的过程大体是这样的,说到这里有些童鞋在有点儿疑惑了,那我们任务调度里面FIFO/FAIR调度是在哪里用的?任务调度器调度的不是Application,而是你的代码里面被解析出来的所有Task,这在上一章当中有提到。
基于这个原因,在共用SparkContext的情况下,比如Shark、JobServer什么的,任务调度器的作用才会明显。Driver向Executor发布Task过程
下面我们讲一讲Driver向Executor发布Task过程,这在上一章是讲过的,现在把图给大家放出来了。

1、Driver程序的代码运行到action操作,触发了SparkContext的runJob方法。
2、SparkContext比较懒,转手就交给DAGScheduler。
3、DAGScheduler把Job划分stage,然后把stage转化为相应的Tasks,把Tasks交给TaskScheduler。
4、通过TaskScheduler把Tasks添加到任务队列当中,转手就交给SchedulerBackend了。
5、调度器给Task分配执行Executor,ExecutorBackend负责执行Task了。
补充:ExecutorBackend执行Task,是通过它内部的Executor来执行的,Executor内部有个线程池,new了一个TaskRunner交给线程池了。Task状态更新
Task执行是通过TaskRunner来运行,它需要通过ExecutorBackend和Driver通信,通信消息是StatusUpdate:
1、Task运行之前,告诉Driver当前Task的状态为TaskState.RUNNING。
2、Task运行之后,告诉Driver当前Task的状态为TaskState.FINISHED,并返回计算结果。
3、如果Task运行过程中发生错误,告诉Driver当前Task的状态为TaskState.FAILED,并返回错误原因。
4、如果Task在中途被Kill掉了,告诉Driver当前Task的状态为TaskState.FAILED。
下面讲的是运行成功的状态,具体过程以文字为主。

1、Task运行结束之后,调用ExecutorBackend的statusUpdate方法,把结果返回。结果超过10M,就把结果保存在blockManager处,返回blockId,需要的时候通过blockId到blockManager认领。
2、ExecutorBackend直接向Driver发送StatusUpdate返回Task的信息。
3、Driver(这里具体指的是SchedulerBackend)接收到StatusUpdate消息之后,调用TaskScheduler的statusUpdate方法,然后准备给ExecutorBackend发送下一批Task。
4、TaskScheduler通过TaskId找到管理这个Task的TaskSetManager(负责管理一批Task的类),从TaskSetManager里面删掉这个Task,并把Task插入到TaskResultGetter(负责获取Task结果的类)的成功队列里。
5、TaskResultGetter获取到结果之后,调用TaskScheduler的handleSuccessfulTask方法把结果返回。
6、TaskScheduler调用TaskSetManager的handleSuccessfulTask方法,处理成功的Task。
7、TaskSetManager调用DAGScheduler的taskEnded方法,告诉DAGScheduler这个Task运行结束了,如果这个时候Task全部成功了,就会结束TaskSetManager。
8、DAGScheduler在taskEnded方法里触发CompletionEvent事件,CompletionEvent分ResultTask和ShuffleMapTask来处理。
  1)ResultTask:job的numFinished加1,如果numFinished等于它的分片数,则表示任务该Stage结束,标记这个Stage为结束,最后调用JobListener(具体实现在JobWaiter)的taskSucceeded方法,把结果交给resultHandler(经过包装的自己写的那个匿名函数)处理,如果完成的Task数量等于总任务数,任务退出。
  2)ShuffleMapTask:
   (1)调用Stage的addOutputLoc方法,把结果添加到Stage的outputLocs列表里。
   (2)如果该Stage没有等待的Task了,就标记该Stage为结束。
   (3)把Stage的outputLocs注册到MapOutputTracker里面,留个下一个Stage用。
   (4)如果Stage的outputLocs为空,表示它的计算失败,重新提交Stage。
   (5)找出下一个在等待并且没有父亲的Stage提交。
 
岑玉海
转载请注明出处,谢谢!
 

Spark源码系列(三)作业运行过程

cenyuhai 发表了文章 0 个评论 2036 次浏览 2015-09-11 15:19 来自相关话题

作业执行 上一章讲了RDD的转换,但是没讲作业的运行,它和Driver Program的关系是啥,和RDD的关系是啥? 官方给的例子里面,一执行collect方法就能出结果,那我们就从collect开始看吧,进入RDD,找到collec ...查看全部
作业执行
上一章讲了RDD的转换,但是没讲作业的运行,它和Driver Program的关系是啥,和RDD的关系是啥?
官方给的例子里面,一执行collect方法就能出结果,那我们就从collect开始看吧,进入RDD,找到collect方法。
  def collect(): Array[T] = {
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Array.concat(results: _*)
}

它进行了两个操作:
1、调用SparkContext的runJob方法,把自身的引用传入去,再传了一个匿名函数(把Iterator转换成Array数组)
2、把result结果合并成一个Array,注意results是一个Array[Array[T]]类型,所以第二句的那个写法才会那么奇怪。这个操作是很重的一个操作,如果结果很大的话,这个操作是会报OOM的,因为它是把结果保存在Driver程序的内存当中的result数组里面。
我们点进去runJob这个方法吧。
    val callSite = getCallSite
val cleanedFunc = clean(func)
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal, resultHandler, localProperties.get)
rdd.doCheckpoint()

追踪下去,我们会发现经过多个不同的runJob同名函数调用之后,执行job作业靠的是dagScheduler,最后把结果通过resultHandler保存返回。DAGScheduler如何划分作业
好的,我们继续看DAGScheduler的runJob方法,提交作业,然后等待结果,成功什么都不做,失败抛出错误,我们接着看submitJob方法。
    val jobId = nextJobId.getAndIncrement()
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
// 记录作业成功与失败的数据结构,一个作业的Task数量是和分片的数量一致的,Task成功之后调用resultHandler保存结果。
val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
eventProcessActor ! JobSubmitted(jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties)

走到这里,感觉有点儿绕了,为什么到了这里,还不直接运行呢,还要给eventProcessActor发送一个JobSubmitted请求呢,new一个线程和这个区别有多大?
不管了,搜索一下eventProcessActor吧,结果发现它是一个DAGSchedulerEventProcessActor,它的定义也在DAGScheduler这个类里面。它的receive方法里面定义了12种事件的处理方法,这里我们只需要看
JobSubmitted的就行,它也是调用了自身的handleJobSubmitted方法。但是这里很奇怪,没办法打断点调试,但是它的结果倒是能返回的,因此我们得用另外一种方式,打开test工程,找到scheduler目录下的DAGSchedulerSuite这个类,我们自己写一个test方法,首先我们要在import那里加上import org.apache.spark.SparkContext._ ,然后加上这一段测试代码。

  test("run shuffle") {
val rdd1 = sc.parallelize(1 to 100, 4)
val rdd2 = rdd1.filter(_ % 2 == 0).map(_ + 1)
val rdd3 = rdd2.map(_ - 1).filter(_ < 50).map(i => (i, i))
val rdd4 = rdd3.reduceByKey(_ + _)
submit(rdd4, Array(0,1,2,3))
complete(taskSets(0), Seq(
(Success, makeMapStatus("hostA", 1)),
(Success, makeMapStatus("hostB", 1))))
complete(taskSets(1), Seq((Success, 42)))
complete(taskSets(2), Seq(
(Success, makeMapStatus("hostA", 2)),
(Success, makeMapStatus("hostB", 2))))
complete(taskSets(3), Seq((Success, 68)))
}
View Code
这个例子的重点还是shuffle那块,另外也包括了map的多个转换,大家可以按照这个例子去测试下。
我们接着看handleJobSubmitted吧。

    var finalStage: Stage = null
try {
finalStage = newStage(finalRDD, partitions.size, None, jobId, Some(callSite))
} catch {
// 错误处理,告诉监听器作业失败,返回....
}
if (finalStage != null) {
val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)
clearCacheLocs()
if (allowLocal && finalStage.parents.size == 0 && partitions.length == 1) {
// 很短、没有父stage的本地操作,比如 first() or take() 的操作本地执行.
listenerBus.post(SparkListenerJobStart(job.jobId, Array[Int](), properties))
runLocally(job)
} else {
// collect等操作走的是这个过程,更新相关的关系映射,用监听器监听,然后提交作业
jobIdToActiveJob(jobId) = job
activeJobs += job
resultStageToJob(finalStage) = job
listenerBus.post(SparkListenerJobStart(job.jobId, jobIdToStageIds(jobId).toArray, properties))
// 提交stage
submitStage(finalStage)
}
}
// 提交stage
submitWaitingStages()
View Code
从上面这个方法来看,我们应该重点关注newStage方法、submitStage方法和submitWaitingStages方法。
我们先看newStage,它得到的结果叫做finalStage,挺奇怪的哈,为啥?先看吧
    val id = nextStageId.getAndIncrement()
val stage = new Stage(id, rdd, numTasks, shuffleDep, getParentStages(rdd, jobId), jobId, callSite)
stageIdToStage(id) = stage
updateJobIdStageIdMaps(jobId, stage)
stageToInfos(stage) = StageInfo.fromStage(stage)
stage

可以看出来Stage也没有太多的东西可言,它就是把rdd给传了进去,tasks的数量,shuffleDep是空,parentStage。
那它的parentStage是啥呢?

  private def getParentStages(rdd: RDD[_], jobId: Int): List[Stage] = {
val parents = new HashSet[Stage]
val visited = new HashSet[RDD[_]]
def visit(r: RDD[_]) {
if (!visited(r)) {
visited += r
// 在visit函数里面,只有存在ShuffleDependency的,parent才通过getShuffleMapStage计算出来
for (dep <- r.dependencies) {
dep match {
case shufDep: ShuffleDependency[_,_] =>
parents += getShuffleMapStage(shufDep, jobId)
case _ =>
visit(dep.rdd)
}
}
}
}
visit(rdd)
parents.toList
}
View Code
它是通过不停的遍历它之前的rdd,如果碰到有依赖是ShuffleDependency类型的,就通过getShuffleMapStage方法计算出来它的Stage来。
那我们就开始看submitStage方法吧。

  private def submitStage(stage: Stage) {
//...
val missing = getMissingParentStages(stage).sortBy(_.id)
logDebug("missing: " + missing)
if (missing == Nil) {
// 没有父stage,执行这stage的tasks
submitMissingTasks(stage, jobId.get)
runningStages += stage
} else {
   // 提交父stage的task,这里是个递归,真正的提交在上面的注释的地方
for (parent <- missing) {
submitStage(parent)
}
// 暂时不能提交的stage,先添加到等待队列
waitingStages += stage
}
}
}
View Code
这个提交stage的过程是一个递归的过程,它是先要把父stage先提交,然后把自己添加到等待队列中,直到没有父stage之后,就提交该stage中的任务。等待队列在最后的submitWaitingStages方法中提交。
这里我引用一下上一章当中我所画的那个图来表示这个过程哈。

从getParentStages方法可以看出来,RDD当中存在ShuffleDependency的Stage才会有父Stage, 也就是图中的虚线的位置!
所以我们只需要记住凡是涉及到shuffle的作业都会至少有两个Stage,即shuffle前和shuffle后。TaskScheduler提交Task
那我们接着看submitMissingTasks方法,下面是主体代码。

  private def submitMissingTasks(stage: Stage, jobId: Int) {
val myPending = pendingTasks.getOrElseUpdate(stage, new HashSet)
myPending.clear()
var tasks = ArrayBuffer[Task[_]]()
if (stage.isShuffleMap) {
// 这是shuffle stage的情况
for (p <- 0 until stage.numPartitions if stage.outputLocs(p) == Nil) {
val locs = getPreferredLocs(stage.rdd, p)
tasks += new ShuffleMapTask(stage.id, stage.rdd, stage.shuffleDep.get, p, locs)
}
} else {
// 这是final stage的情况
val job = resultStageToJob(stage)
for (id <- 0 until job.numPartitions if !job.finished(id)) {
val partition = job.partitions(id)
val locs = getPreferredLocs(stage.rdd, partition)
tasks += new ResultTask(stage.id, stage.rdd, job.func, partition, locs, id)
}
}
if (tasks.size > 0) {
myPending ++= tasks
taskScheduler.submitTasks(new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
stageToInfos(stage).submissionTime = Some(System.currentTimeMillis())
} else {
runningStages -= stage
}
}
View Code
Task也是有两类的,一种是ShuffleMapTask,一种是ResultTask,我们需要注意这两种Task的runTask方法。最后Task是通过taskScheduler.submitTasks来提交的。
我们找到TaskSchedulerImpl里面看这个方法。
  override def submitTasks(taskSet: TaskSet) {
val tasks = taskSet.tasksthis.synchronized {
val manager = new TaskSetManager(this, taskSet, maxTaskFailures)
activeTaskSets(taskSet.id) = manager
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
hasReceivedTask = true
}
backend.reviveOffers()
}


调度器有两种模式,FIFO和FAIR,默认是FIFO, 可以通过spark.scheduler.mode来设置,schedulableBuilder也有相应的两种FIFOSchedulableBuilder和FairSchedulableBuilder。
那backend是啥?据说是为了给TaskSchedulerImpl提供插件式的调度服务的。
它是怎么实例化出来的,这里我们需要追溯回到SparkContext的createTaskScheduler方法,下面我直接把常用的3中类型的TaskScheduler给列出来了。
mode Scheduler Backend
cluster TaskSchedulerImpl SparkDeploySchedulerBackend
yarn-cluster YarnClusterScheduler CoarseGrainedSchedulerBackend
yarn-client YarnClientClusterScheduler YarnClientSchedulerBackend
好,我们回到之前的代码上,schedulableBuilder.addTaskSetManager比较简单,把作业集添加到调度器的队列当中。
我们接着看backend的reviveOffers,里面只有一句话driverActor ! ReviveOffers。真是头晕,搞那么多Actor,只是为了接收消息。。。
照旧吧,找到它的receive方法,找到ReviveOffers这个case,发现它调用了makeOffers方法,我们继续追杀!
def makeOffers() {
launchTasks(scheduler.resourceOffers(executorHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))}))
}

从executorHost中随机抽出一些来给调度器,然后调度器返回TaskDescription,executorHost怎么来的,待会儿再说,我们接着看resourceOffers方法。

def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
SparkEnv.set(sc.env)

// 遍历worker提供的资源,更新executor相关的映射
for (o <- offers) {
executorIdToHost(o.executorId) = o.host
if (!executorsByHost.contains(o.host)) {
executorsByHost(o.host) = new HashSet[String]()
executorAdded(o.executorId, o.host)
}
}

// 从worker当中随机选出一些来,防止任务都堆在一个机器上
val shuffledOffers = Random.shuffle(offers)
// worker的task列表
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
val availableCpus = shuffledOffers.map(o => o.cores).toArray
val sortedTaskSets = rootPool.getSortedTaskSetQueue

// 随机遍历抽出来的worker,通过TaskSetManager的resourceOffer,把本地性最高的Task分给Worker
var launchedTask = false
for (taskSet <- sortedTaskSets; maxLocality <- TaskLocality.values) {
do {
launchedTask = false
for (i <- 0 until shuffledOffers.size) {
val execId = shuffledOffers(i).executorId
val host = shuffledOffers(i).host
if (availableCpus(i) >= CPUS_PER_TASK) {
// 把本地性最高的Task分给Worker
for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
tasks(i) += task
val tid = task.taskId
taskIdToTaskSetId(tid) = taskSet.taskSet.id
taskIdToExecutorId(tid) = execId
activeExecutorIds += execId
executorsByHost(host) += execId
availableCpus(i) -= CPUS_PER_TASK
assert (availableCpus(i) >= 0)
launchedTask = true
}
}
}
} while (launchedTask)
}

if (tasks.size > 0) {
hasLaunchedTask = true
}
return tasks
}
View Code
resourceOffers主要做了3件事:
1、从Workers里面随机抽出一些来执行任务。
2、通过TaskSetManager找出和Worker在一起的Task,最后编译打包成TaskDescription返回。
3、将Worker-->Array[TaskDescription]的映射关系返回。
我们继续看TaskSetManager的resourceOffer,看看它是怎么找到和host再起的Task,并且包装成TaskDescription。
通过查看代码,我发现之前我解释的和它具体实现的差别比较大,它所谓的本地性是根据当前的等待时间来确定的任务本地性的级别。
它的本地性主要是包括四类:PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY。

  private def getAllowedLocalityLevel(curTime: Long): TaskLocality.TaskLocality = {
while (curTime - lastLaunchTime >= localityWaits(currentLocalityIndex) &&
currentLocalityIndex < myLocalityLevels.length - 1)
{
// 成立条件是当前时间-上次发布任务的时间 > 当前本地性级别的,条件成立就跳到下一个级别
lastLaunchTime += localityWaits(currentLocalityIndex)
currentLocalityIndex += 1
}
myLocalityLevels(currentLocalityIndex)
}
View Code
等待时间是可以通过参数去设置的,具体的自己查下面的代码。

  private def getLocalityWait(level: TaskLocality.TaskLocality): Long = {
val defaultWait = conf.get("spark.locality.wait", "3000")
level match {
case TaskLocality.PROCESS_LOCAL =>
conf.get("spark.locality.wait.process", defaultWait).toLong
case TaskLocality.NODE_LOCAL =>
conf.get("spark.locality.wait.node", defaultWait).toLong
case TaskLocality.RACK_LOCAL =>
conf.get("spark.locality.wait.rack", defaultWait).toLong
case TaskLocality.ANY =>
0L
}
}
View Code
下面继续看TaskSetManager的resourceOffer的方法,通过findTask来从Task集合里面找到相应的Task。
      findTask(execId, host, allowedLocality) match {
case Some((index, taskLocality)) => {
val task = tasks(index)
val serializedTask = Task.serializeWithDependencies(task, sched.sc.addedFiles, sched.sc.addedJars, ser)
  val timeTaken = clock.getTime() - startTime
  addRunningTask(taskId)
  val taskName = "task %s:%d".format(taskSet.id, index)
  sched.dagScheduler.taskStarted(task, info)
  return Some(new TaskDescription(taskId, execId, taskName, index, serializedTask))
}


它的findTask方法如下:

  private def findTask(execId: String, host: String, locality: TaskLocality.Value)
: Option[(Int, TaskLocality.Value)] =
{
   // 同一个Executor,通过execId来查找相应的等待的task
for (index <- findTaskFromList(execId, getPendingTasksForExecutor(execId))) {
return Some((index, TaskLocality.PROCESS_LOCAL))
}
   // 通过主机名找到相应的Task,不过比之前的多了一步判断
if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) {
for (index <- findTaskFromList(execId, getPendingTasksForHost(host))) {
return Some((index, TaskLocality.NODE_LOCAL))
}
}
  // 通过Rack的名称查找Task
if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) {
for {
rack <- sched.getRackForHost(host)
index <- findTaskFromList(execId, getPendingTasksForRack(rack))
} {
return Some((index, TaskLocality.RACK_LOCAL))
}
}
   // 查找那些preferredLocations为空的,不指定在哪里执行的Task来执行
for (index <- findTaskFromList(execId, pendingTasksWithNoPrefs)) {
return Some((index, TaskLocality.PROCESS_LOCAL))
}
  // 查找那些preferredLocations为空的,不指定在哪里执行的Task来执行
if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) {
for (index <- findTaskFromList(execId, allPendingTasks)) {
return Some((index, TaskLocality.ANY))
}
}
// 最后没办法了,拖的时间太长了,只能启动推测执行了
findSpeculativeTask(execId, host, locality)
}
View Code
从这个方面可以看得出来,Spark对运行时间还是很注重的,等待的时间越长,它就可能越饥不择食,从PROCESS_LOCAL一直让步到ANY,最后的最后,推测执行都用到了。
找到任务之后,它就调用dagScheduler.taskStarted方法,通知dagScheduler任务开始了,taskStarted方法就不详细讲了,它触发dagScheduler的BeginEvent事件,里面只做了2件事:
1、检查Task序列化的大小,超过100K就警告。
2、提交等待的Stage。
好,我们继续回到发布Task上面来,中间过程讲完了,我们应该是要回到CoarseGrainedSchedulerBackend的launchTasks方法了。
def makeOffers() {
launchTasks(scheduler.resourceOffers(executorHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))}))
}

它的方法体是:
    def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
for (task <- tasks.flatten) {
freeCores(task.executorId) -= scheduler.CPUS_PER_TASK
executorActor(task.executorId) ! LaunchTask(task)
}
}

通过executorId找到相应的executorActor,然后发送LaunchTask过去,一个Task占用一个Cpu。注册Application
那这个executorActor是怎么来的呢?找呗,最后发现它是在receive方法里面接受到RegisterExecutor消息的时候注册的。通过搜索,我们找到CoarseGrainedExecutorBackend这个类,在它的preStart方法里面赫然找到了driver ! RegisterExecutor(executorId, hostPort, cores) 带的这三个参数都是在初始化的时候传入的,那是谁实例化的它呢,再逆向搜索找到SparkDeploySchedulerBackend!之前的backend一直都是它,我们看reviveOffers是在它的父类CoarseGrainedSchedulerBackend里面。
关系清楚了,在这个backend的start方法里面启动了一个AppClient,AppClient的其中一个参数ApplicationDescription就是封装的运行CoarseGrainedExecutorBackend的命令。AppClient内部启动了一个ClientActor,这个ClientActor启动之后,会尝试向Master发送一个指令actor ! RegisterApplication(appDescription) 注册一个Application。
别废话了,Ctrl +Shift + N吧,定位到Master吧。
    case RegisterApplication(description) => {
val app = createApplication(description, sender)
registerApplication(app)
persistenceEngine.addApplication(app)
sender ! RegisteredApplication(app.id, masterUrl)
schedule()
}


它做了5件事:
1、createApplication为这个app构建一个描述App数据结构的ApplicationInfo。
2、注册该Application,更新相应的映射关系,添加到等待队列里面。
3、用persistenceEngine持久化Application信息,默认是不保存的,另外还有两种方式,保存在文件或者Zookeeper当中。
4、通过发送方注册成功。
5、开始作业调度。
关于调度的问题,在第一章《spark-submit提交作业过程》已经介绍过了,建议回去再看看,搞清楚Application和Executor之间的关系。
Application一旦获得资源,Master会发送launchExecutor指令给Worker去启动Executor。
进到Worker里面搜索LaunchExecutor。
  val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_, self, workerId, host,
appDesc.sparkHome.map(userSparkHome => new File(userSparkHome)).getOrElse(sparkHome), workDir, akkaUrl, ExecutorState.RUNNING)
 executors(appId + "/" + execId) = manager
  manager.start()
coresUsed += cores_
memoryUsed += memory_
masterLock.synchronized {
master ! ExecutorStateChanged(appId, execId, manager.state, None, None)
}


原来ExecutorRunner还不是传说中的Executor,它内部是执行了appDesc内部的那个命令,启动了CoarseGrainedExecutorBackend,它才是我们的真命天子Executor。
启动之后ExecutorRunner报告ExecutorStateChanged事件给Master。
Master干了两件事:
1、转发给Driver,这个Driver是之前注册Application的那个AppClient
2、如果是Executor运行结束,从相应的映射关系里面删除发布Task
上面又花了那么多时间讲Task的运行环境ExecutorRunner是怎么注册,那我们还是回到我们的主题,Task的发布。
发布任务是发送LaunchTask指令给CoarseGrainedExecutorBackend,接受到指令之后,让它内部的executor来发布这个任务。
这里我们看一下Executor的launchTask。
  def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) {
val tr = new TaskRunner(context, taskId, serializedTask)
runningTasks.put(taskId, tr)
threadPool.execute(tr)
}

TaskRunner是这里的重头戏啊!看它的run方法吧。

    override def run() {
// 准备工作若干...那天我们放学回家经过一片玉米地,以上省略一百字

try {
// 反序列化Task
SparkEnv.set(env)
Accumulators.clear()
val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask)
updateDependencies(taskFiles, taskJars)
task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)

   // 命令为尝试运行,和hadoop的mapreduce作业是一致的
attemptedTask = Some(task)
logDebug("Task " + taskId + "'s epoch is " + task.epoch)
env.mapOutputTracker.updateEpoch(task.epoch)

// 运行Task, 具体可以去看之前让大家关注的ResultTask和ShuffleMapTask
taskStart = System.currentTimeMillis()
val value = task.run(taskId.toInt)
val taskFinish = System.currentTimeMillis()

     // 对结果进行序列化
val resultSer = SparkEnv.get.serializer.newInstance()
val beforeSerialization = System.currentTimeMillis()
val valueBytes = resultSer.serialize(value)
val afterSerialization = System.currentTimeMillis()
     // 更新任务的相关监控信息,会反映到监控页面上的
for (m <- task.metrics) {
m.hostname = Utils.localHostName()
m.executorDeserializeTime = taskStart - startTime
m.executorRunTime = taskFinish - taskStart
m.jvmGCTime = gcTime - startGCTime
m.resultSerializationTime = afterSerialization - beforeSerialization
}

val accumUpdates = Accumulators.values
     // 对结果进行再包装,包装完再进行序列化
val directResult = new DirectTaskResult(valueBytes, accumUpdates, task.metrics.getOrElse(null))
val serializedDirectResult = ser.serialize(directResult)
// 如果中间结果的大小超过了spark.akka.frameSize(默认是10M)的大小,就要提升序列化级别了,超过内存的部分要保存到硬盘的
val serializedResult = {
if (serializedDirectResult.limit >= akkaFrameSize - 1024) {
val blockId = TaskResultBlockId(taskId)
env.blockManager.putBytes(blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)
ser.serialize(new IndirectTaskResult[Any](blockId))
} else {
serializedDirectResult
}
}
     // 返回结果
execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
} catch {
// 这部分是错误处理,被我省略掉了,主要内容是通关相关负责人处理后事
} finally {
// 清理为ResultTask注册的shuffle内存,最后把task从正在运行的列表当中删除
val shuffleMemoryMap = env.shuffleMemoryMap
shuffleMemoryMap.synchronized {
shuffleMemoryMap.remove(Thread.currentThread().getId)
}
runningTasks.remove(taskId)
}
}
}
View Code
以上代码被我这些了,但是建议大家看看注释吧。
最后结果是通过statusUpdate返回的。
  override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
driver ! StatusUpdate(executorId, taskId, state, data)
}

这回这个Driver又不是刚才那个AppClient,而是它的家长SparkDeploySchedulerBackend,是在SparkDeploySchedulerBackend的父类CoarseGrainedSchedulerBackend接受了这个StatusUpdate消息。
这关系真他娘够乱的。。
继续,Task里面走的是TaskSchedulerImpl这个方法。
scheduler.statusUpdate(taskId, state, data.value)

到这里,一个Task就运行结束了,后面就不再扩展了,作业运行这块是Spark的核心,再扩展基本就能写出来一本书了,限于文章篇幅,这里就不再深究了。
以上的过程应该是和下面的图一致的。

 
看完这篇文章,估计大家会云里雾里的,在下一章《作业生命周期》会把刚才描述的整个过程重新梳理出来,便于大家记忆,敬请期待!
 
岑玉海
转载请注明出处,谢谢!

Spark源码系列(二)RDD详解

cenyuhai 发表了文章 0 个评论 2051 次浏览 2015-09-11 15:18 来自相关话题

1、什么是RDD? 上一章讲了Spark提交作业的过程,这一章我们要讲RDD。简单的讲,RDD就是Spark的input,知道input是啥吧,就是输入的数据。 RDD的全名是Resilient Distributed Dataset, ...查看全部
1、什么是RDD?
上一章讲了Spark提交作业的过程,这一章我们要讲RDD。简单的讲,RDD就是Spark的input,知道input是啥吧,就是输入的数据。
RDD的全名是Resilient Distributed Dataset,意思是容错的分布式数据集,每一个RDD都会有5个特征:
1、有一个分片列表。就是能被切分,和hadoop一样的,能够切分的数据才能并行计算。
2、有一个函数计算每一个分片,这里指的是下面会提到的compute函数。
3、对其他的RDD的依赖列表,依赖还具体分为宽依赖和窄依赖,但并不是所有的RDD都有依赖。
4、可选:key-value型的RDD是根据哈希来分区的,类似于mapreduce当中的Paritioner接口,控制key分到哪个reduce。
5、可选:每一个分片的优先计算位置(preferred locations),比如HDFS的block的所在位置应该是优先计算的位置。
对应着上面这几点,我们在RDD里面能找到这4个方法和1个属性,别着急,下面我们会慢慢展开说这5个东东。
  //只计算一次  
protected def getPartitions: Array[Partition]
//对一个分片进行计算,得出一个可遍历的结果
def compute(split: Partition, context: TaskContext): Iterator[T]
//只计算一次,计算RDD对父RDD的依赖
protected def getDependencies: Seq[Dependency[_]] = deps
//可选的,分区的方法,针对第4点,类似于mapreduce当中的Paritioner接口,控制key分到哪个reduce
@transient val partitioner: Option[Partitioner] = None
//可选的,指定优先位置,输入参数是split分片,输出结果是一组优先的节点位置
protected def getPreferredLocations(split: Partition): Seq[String] = Nil

2、多种RDD之间的转换
下面用一个实例讲解一下吧,就拿我们常用的一段代码来讲吧,然后会把我们常用的RDD都会讲到。
    val hdfsFile = sc.textFile(args(1))
val flatMapRdd = hdfsFile.flatMap(s => s.split(" "))
val filterRdd = flatMapRdd.filter(_.length == 2)
val mapRdd = filterRdd.map(word => (word, 1))
val reduce = mapRdd.reduceByKey(_ + _)

这里涉及到很多个RDD,textFile是一个HadoopRDD经过map后的MappredRDD,经过flatMap是一个FlatMappedRDD,经过filter方法之后生成了一个FilteredRDD,经过map函数之后,变成一个MappedRDD,通过隐式转换成 PairRDD,最后经过reduceByKey。
我们首先看textFile的这个方法,进入SparkContext这个方法,找到它。
def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = {
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minPartitions).map(pair => pair._2.toString)
}

看它的输入参数,path,TextInputFormat,LongWritable,Text,同志们联想到什么?写过mapreduce的童鞋都应该知道哈。
1、hdfs的地址
2、InputFormat的类型
3、Mapper的第一个类型
4、Mapper的第二类型
这就不难理解为什么立马就对hadoopFile后面加了一个map方法,取pair的第二个参数了,最后在shell里面我们看到它是一个MappredRDD了。
那么现在如果大家要用的不是textFile,而是一个别的hadoop文件类型,大家会不会使用hadoopFile来得到自己要得到的类型呢,不要告诉我不会哈,不会的赶紧回去复习mapreduce。
言归正传,默认的defaultMinPartitions的2太小了,我们用的时候还是设置大一点吧。2.1 HadoopRDD
我们继续追杀下去,看看hadoopFile方法,里面我们看到它做了3个操作。
1、把hadoop的配置文件保存到广播变量里。
2、设置路径的方法
3、new了一个HadoopRDD返回
好,我们接下去看看HadoopRDD这个类吧,我们重点看看它的getPartitions、compute、getPreferredLocations。
先看getPartitions,它的核心代码如下:
    val inputSplits = inputFormat.getSplits(jobConf, minPartitions)
val array = new Array[Partition](inputSplits.size)
for (i <- 0 until inputSplits.size) {
array(i) = new HadoopPartition(id, i, inputSplits(i))
}

它调用的是inputFormat自带的getSplits方法来计算分片,然后把分片HadoopPartition包装到到array里面返回。
这里顺便顺带提一下,因为1.0又出来一个NewHadoopRDD,它使用的是mapreduce新api的inputformat,getSplits就不要有minPartitions了,别的逻辑都是一样的,只是使用的类有点区别。
我们接下来看compute方法,它的输入值是一个Partition,返回是一个Iterator[(K, V)]类型的数据,这里面我们只需要关注2点即可。
1、把Partition转成HadoopPartition,然后通过InputSplit创建一个RecordReader
2、重写Iterator的getNext方法,通过创建的reader调用next方法读取下一个值。

      // 转换成HadoopPartition
val split = theSplit.asInstanceOf[HadoopPartition]
logInfo("Input split: " + split.inputSplit)
var reader: RecordReader[K, V] = null
val jobConf = getJobConf()
val inputFormat = getInputFormat(jobConf)
context.stageId, theSplit.index, context.attemptId.toInt, jobConf)
// 通过Inputform的getRecordReader来创建这个InputSpit的Reader
reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)

// 调用Reader的next方法
val key: K = reader.createKey()
val value: V = reader.createValue()
override def getNext() = {
try {
finished = !reader.next(key, value)
} catch {
case eof: EOFException =>
finished = true
}
(key, value)
}
View Code
从这里我们可以看得出来compute方法是通过分片来获得Iterator接口,以遍历分片的数据。
getPreferredLocations方法就更简单了,直接调用InputSplit的getLocations方法获得所在的位置。2.2 依赖
下面我们看RDD里面的map方法
def map[U: ClassTag](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f))

直接new了一个MappedRDD,还把匿名函数f处理了再传进去,我们继续追杀到MappedRDD。
private[spark]
class MappedRDD[U: ClassTag, T: ClassTag](prev: RDD[T], f: T => U)
extends RDD[U](prev) {
override def getPartitions: Array[Partition] = firstParent[T].partitions
override def compute(split: Partition, context: TaskContext) =
firstParent[T].iterator(split, context).map(f)
}


MappedRDD把getPartitions和compute给重写了,而且都用到了firstParent[T],这个firstParent是何须人也?我们可以先点击进入RDD[U](prev)这个构造函数里面去。
def this(@transient oneParent: RDD[_]) = this(oneParent.context , List(new OneToOneDependency(oneParent)))

就这样你会发现它把RDD复制给了deps,HadoopRDD成了MappedRDD的父依赖了,这个OneToOneDependency是一个窄依赖,子RDD直接依赖于父RDD,继续看firstParent。
protected[spark] def firstParent[U: ClassTag] = {
dependencies.head.rdd.asInstanceOf[RDD[U]]
}

由此我们可以得出两个结论:
1、getPartitions直接沿用了父RDD的分片信息
2、compute函数是在父RDD遍历每一行数据时套一个匿名函数f进行处理
好吧,现在我们可以理解compute函数真正是在干嘛的了
它的两个显著作用:
1、在没有依赖的条件下,根据分片的信息生成遍历数据的Iterable接口
2、在有前置依赖的条件下,在父RDD的Iterable接口上给遍历每个元素的时候再套上一个方法
我们看看点击进入map(f)的方法进去看一下
  def map[B](f: A => B): Iterator[B] = new AbstractIterator[B] {
def hasNext = self.hasNext
def next() = f(self.next())
}

看黄色的位置,看它的next函数,不得不说,写得真的很妙!
我们接着看RDD的flatMap方法,你会发现它和map函数几乎没什么区别,只是RDD变成了FlatMappedRDD,但是flatMap和map的效果还是差别挺大的。
比如((1,2),(3,4)), 如果是调用了flatMap函数,我们访问到的就是(1,2,3,4)4个元素;如果是map的话,我们访问到的就是(1,2),(3,4)两个元素。
有兴趣的可以去看看FlatMappedRDD和FilteredRDD这里就不讲了,和MappedRDD类似。2.3 reduceByKey
前面的RDD转换都简单,可是到了reduceByKey可就不简单了哦,因为这里有一个同相同key的内容聚合的一个过程,所以它是最复杂的那一类。
那reduceByKey这个方法在哪里呢,它在PairRDDFunctions里面,这是个隐式转换,所以比较隐蔽哦,你在RDD里面是找不到的。
  def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = {
combineByKey[V]((v: V) => v, func, func, partitioner)
}

它调用的是combineByKey方法,过程过程蛮复杂的,折叠起来,喜欢看的人看看吧。

def combineByKey[C](createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null): RDD[(K, C)] = {

val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
if (self.partitioner == Some(partitioner)) {
// 一般的RDD的partitioner是None,这个条件不成立,即使成立只需要对这个数据做一次按key合并value的操作即可
self.mapPartitionsWithContext((context, iter) => {
new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
}, preservesPartitioning = true)
} else if (mapSideCombine) {
// 默认是走的这个方法,需要map端的combinber.
val combined = self.mapPartitionsWithContext((context, iter) => {
aggregator.combineValuesByKey(iter, context)
}, preservesPartitioning = true)
val partitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner)
.setSerializer(serializer)
partitioned.mapPartitionsWithContext((context, iter) => {
new InterruptibleIterator(context, aggregator.combineCombinersByKey(iter, context))
}, preservesPartitioning = true)
} else {
// 不需要map端的combine,直接就来shuffle
val values = new ShuffledRDD[K, V, (K, V)](self, partitioner).setSerializer(serializer)
values.mapPartitionsWithContext((context, iter) => {
new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
}, preservesPartitioning = true)
}
}
View Code
按照一个比较标准的流程来看的话,应该是走的中间的这条路径,它干了三件事:
1、给每个分片的数据在外面套一个combineValuesByKey方法的MapPartitionsRDD。
2、用MapPartitionsRDD来new了一个ShuffledRDD出来。
3、对ShuffledRDD做一次combineCombinersByKey。
下面我们先看MapPartitionsRDD,我把和别的RDD有别的两行给拿出来了,很明显的区别,f方法是套在iterator的外边,这样才能对iterator的所有数据做一个合并。
  override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None
override def compute(split: Partition, context: TaskContext) =
f(context, split.index, firstParent[T].iterator(split, context))
}

接下来我们看Aggregator的combineValuesByKey的方法吧。

def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]],
context: TaskContext): Iterator[(K, C)] = {
// 是否使用外部排序,是由参数spark.shuffle.spill,默认是true
if (!externalSorting) {
val combiners = new AppendOnlyMap[K,C]
var kv: Product2[K, V] = null
val update = (hadValue: Boolean, oldValue: C) => {
if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
}
// 用map来去重,用update方法来更新值,如果没值的时候,返回值,如果有值的时候,通过mergeValue方法来合并
// mergeValue方法就是我们在reduceByKey里面写的那个匿名函数,在这里就是(_ + _)
while (iter.hasNext) {
kv = iter.next()
combiners.changeValue(kv._1, update)
}
combiners.iterator
} else {
// 用了一个外部排序的map来去重,就不停的往里面插入值即可,基本原理和上面的差不多,区别在于需要外部排序
val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
while (iter.hasNext) {
val (k, v) = iter.next()
combiners.insert(k, v)
}
combiners.iterator
}
View Code
这个就是一个很典型的按照key来做合并的方法了,我们继续看ShuffledRDD吧。
ShuffledRDD和之前的RDD很明显的特征是
1、它的依赖传了一个Nil(空列表)进去,表示它没有依赖。
2、它的compute计算方式比较特别,这个在之后的文章说,过程比较复杂。
3、它的分片默认是采用HashPartitioner,数量和前面的RDD的分片数量一样,也可以不一样,我们可以在reduceByKey的时候多传一个分片数量即可。
在new完ShuffledRDD之后又来了一遍mapPartitionsWithContext,不过调用的匿名函数变成了combineCombinersByKey。
combineCombinersByKey和combineValuesByKey的逻辑基本相同,只是输入输出的类型有区别。combineCombinersByKey只是做单纯的合并,不会对输入输出的类型进行改变,combineValuesByKey会把iter[K, V]的V值变成iter[K, C]。
case class Aggregator[K, V, C] (
  createCombiner: V => C,
  mergeValue: (C, V) => C,
  mergeCombiners: (C, C) => C)
  ......
}

这个方法会根据我们传进去的匿名方法的参数的类型做一个自动转换。
到这里,作业都没有真正执行,只是将RDD各种嵌套,我们通过RDD的id和类型的变化观测到这一点,RDD[1]->RDD[2]->RDD[3]......
3其它RDD
平常我们除了从hdfs上面取数据之后,我们还可能从数据库里面取数据,那怎么办呢?没关系,有个JdbcRDD!
    val rdd = new JdbcRDD(
sc,
() => { DriverManager.getConnection("jdbc:derby:target/JdbcRDDSuiteDb") },
"SELECT DATA FROM FOO WHERE ? <= ID AND ID <= ?",
1, 100, 3,
(r: ResultSet) => { r.getInt(1) }
).cache()


前几个参数大家都懂,我们重点说一下后面1, 100, 3是咋回事?
在这个JdbcRDD里面它默认我们是会按照一个long类型的字段对数据进行切分,(1,100)分别是最小值和最大值,3是分片的数量。
比如我们要一次查ID为1-1000,000的的用户,分成10个分片,我们就填(1, 1000,000, 10)即可,在sql语句里面还必须有"? <= ID AND ID <= ?"的句式,别尝试着自己造句哦!
最后是怎么处理ResultSet的方法,自己爱怎么处理怎么处理去吧。不过确实觉着用得不方便的可以自己重写一个RDD。
 
小结:
这一章重点介绍了各种RDD那5个特征,以及RDD之间的转换,希望大家可以对RDD有更深入的了解,下一章我们将要讲作业的运行过程,敬请关注!
 
岑玉海
转载请注明出处,谢谢!
 

Spark源码系列(一)spark-submit提交作业过程

cenyuhai 发表了文章 0 个评论 2244 次浏览 2015-09-11 15:18 来自相关话题

前言 折腾了很久,终于开始学习Spark的源码了,第一篇我打算讲一下Spark作业的提交过程。   这个是Spark的App运行图,它通过一个Driver来和集群通信,集群负责作业的分配。今天我要讲的是如何创建 ...查看全部
前言
折腾了很久,终于开始学习Spark的源码了,第一篇我打算讲一下Spark作业的提交过程。
 

这个是Spark的App运行图,它通过一个Driver来和集群通信,集群负责作业的分配。今天我要讲的是如何创建这个Driver Program的过程。作业提交方法以及参数
我们先看一下用Spark Submit提交的方法吧,下面是从官方上面摘抄的内容。
# Run on a Spark standalone cluster
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://207.184.161.138:7077 \
--executor-memory 20G \
--total-executor-cores 100 \
/path/to/examples.jar \
1000


这个是提交到standalone集群的方式,打开spark-submit这文件,我们会发现它最后是调用了org.apache.spark.deploy.SparkSubmit这个类。
我们直接进去看就行了,main函数就几行代码,太节省了。
def main(args: Array[String]) {
val appArgs = new SparkSubmitArguments(args)
val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
launch(childArgs, classpath, sysProps, mainClass, appArgs.verbose)
}

我们主要看看createLaunchEnv方法就可以了,launch是反射调用mainClass,精华全在createLaunchEnv里面了。
在里面我发现一些有用的信息,可能在官方文档上面都没有的,发出来大家瞅瞅。前面不带--的可以在spark-defaults.conf里面设置,带--的直接在提交的时候指定,具体含义大家一看就懂。

val options = List[OptionAssigner](
OptionAssigner(args.master, ALL_CLUSTER_MGRS, false, sysProp = "spark.master"),
OptionAssigner(args.name, ALL_CLUSTER_MGRS, false, sysProp = "spark.app.name"),
OptionAssigner(args.name, YARN, true, clOption = "--name", sysProp = "spark.app.name"),
OptionAssigner(args.driverExtraClassPath, STANDALONE | YARN, true,
sysProp = "spark.driver.extraClassPath"),
OptionAssigner(args.driverExtraJavaOptions, STANDALONE | YARN, true,
sysProp = "spark.driver.extraJavaOptions"),
OptionAssigner(args.driverExtraLibraryPath, STANDALONE | YARN, true,
sysProp = "spark.driver.extraLibraryPath"),
OptionAssigner(args.driverMemory, YARN, true, clOption = "--driver-memory"),
OptionAssigner(args.driverMemory, STANDALONE, true, clOption = "--memory"),
OptionAssigner(args.driverCores, STANDALONE, true, clOption = "--cores"),
OptionAssigner(args.queue, YARN, true, clOption = "--queue"),
OptionAssigner(args.queue, YARN, false, sysProp = "spark.yarn.queue"),
OptionAssigner(args.numExecutors, YARN, true, clOption = "--num-executors"),
OptionAssigner(args.numExecutors, YARN, false, sysProp = "spark.executor.instances"),
OptionAssigner(args.executorMemory, YARN, true, clOption = "--executor-memory"),
OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, false,
sysProp = "spark.executor.memory"),
OptionAssigner(args.executorCores, YARN, true, clOption = "--executor-cores"),
OptionAssigner(args.executorCores, YARN, false, sysProp = "spark.executor.cores"),
OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, false,
sysProp = "spark.cores.max"),
OptionAssigner(args.files, YARN, false, sysProp = "spark.yarn.dist.files"),
OptionAssigner(args.files, YARN, true, clOption = "--files"),
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.files"),
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, true, sysProp = "spark.files"),
OptionAssigner(args.archives, YARN, false, sysProp = "spark.yarn.dist.archives"),
OptionAssigner(args.archives, YARN, true, clOption = "--archives"),
OptionAssigner(args.jars, YARN, true, clOption = "--addJars"),
OptionAssigner(args.jars, ALL_CLUSTER_MGRS, false, sysProp = "spark.jars")
)
View Code
Driver程序的部署模式有两种,client和cluster,默认是client。client的话默认就是直接在本地运行了Driver程序了,cluster模式还会兜一圈把作业发到集群上面去运行。
指定部署模式需要用参数--deploy-mode来指定,或者在环境变量当中添加DEPLOY_MODE变量来指定。
下面讲的是cluster的部署方式,兜一圈的这种情况。
yarn模式的话mainClass是org.apache.spark.deploy.yarn.Client,standalone的mainClass是org.apache.spark.deploy.Client。
这次我们讲org.apache.spark.deploy.Client,yarn的话单独找一章出来单独讲,目前超哥还是推荐使用standalone的方式部署spark,具体原因不详,据说是因为资源调度方面的问题。
说个快捷键吧,Ctrl+Shift+N,然后输入Client就能找到这个类,这是IDEA的快捷键,相当好使。
我们直接找到它的main函数,发现了它居然使用了Akka框架,我百度了一下,被它震惊了。Akka
在main函数里面,主要代码就这么三行。
//创建一个ActorSystem
val (actorSystem, _) = AkkaUtils.createActorSystem("driverClient",Utils.localHostName(),0,
  conf, new SecurityManager(conf))
//执行ClientActor的preStart方法和receive方法
actorSystem.actorOf(Props(classOf[ClientActor], driverArgs, conf))
//等待运行结束
actorSystem.awaitTermination()


看了这里真的有点儿懵啊,这是啥玩意儿,不懂的朋友们,请点击这里Akka。下面是它官方放出来的例子:
//定义一个case class用来传递参数
case class Greeting(who: String)
//定义Actor,比较重要的一个方法是receive方法,用来接收信息的
class GreetingActor extends Actor with ActorLogging {
def receive = {
case Greeting(who) ⇒ log.info("Hello " + who)
}
}
//创建一个ActorSystem
val system = ActorSystem("MySystem")
//给ActorSystem设置Actor
val greeter = system.actorOf(Props[GreetingActor], name = "greeter")
//向greeter发送信息,用Greeting来传递
greeter ! Greeting("Charlie Parker")


简直是无比强大啊,就这么几行代码就搞定了,接下来看你会更加震惊的。
我们回到Client类当中,找到ClientActor,它有两个方法,是之前说的preStart和receive方法,preStart方法用于连接master提交作业请求,receive方法用于接收从master返回的反馈信息。
我们先看preStart方法吧。
override def preStart() = {
// 这里需要把master的地址转换成akka的地址,然后通过这个akka地址获得指定的actor
// 它的格式是"akka.tcp://%s@%s:%s/user/%s".format(systemName, host, port, actorName)
masterActor = context.actorSelection(Master.toAkkaUrl(driverArgs.master))
// 把自身设置成远程生命周期的事件
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])

driverArgs.cmd match {
case "launch" =>
// 此处省略100个字
val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"
// 此处省略100个字
// 向master发送提交Driver的请求,把driverDescription传过去,RequestSubmitDriver前面说过了,是个case class
masterActor ! RequestSubmitDriver(driverDescription)

case "kill" =>
val driverId = driverArgs.driverId
val killFuture = masterActor ! RequestKillDriver(driverId)
}
}


从上面的代码看得出来,它需要设置master的连接地址,最后提交了一个RequestSubmitDriver的信息。在receive方法里面,就是等待接受回应了,有两个Response分别对应着这里的launch和kill。
线索貌似到这里就断了,那下一步在哪里了呢?当然是在Master里面啦,怎么知道的,猜的,哈哈。
Master也是继承了Actor,在它的main函数里面找到了以下代码:
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port, conf = conf, 
  securityManager = securityMgr)
val actor = actorSystem.actorOf(Props(classOf[Master], host, boundPort, webUiPort, securityMgr), actorName)
val timeout = AkkaUtils.askTimeout(conf)
val respFuture = actor.ask(RequestWebUIPort)(timeout)
val resp = Await.result(respFuture, timeout).asInstanceOf[WebUIPortResponse]

和前面的actor基本一致,多了actor.ask这句话,查了一下官网的文档,这句话的意思的发送消息,并且接受一个Future作为response,和前面的actor ! message的区别就是它还接受返回值。
具体的Akka的用法,大家还是参照官网吧,Akka确实如它官网所言的那样子,是一个简单、强大、并行的分布式框架。
小结:
Akka的使用确实简单,短短的几行代码即刻完成一个通信功能,比Socket简单很多。但是它也逃不脱我们常说的那些东西,请求、接收请求、传递的消息、注册的地址和端口这些概念。调度schedule
我们接下来查找Master的receive方法吧,Master是作为接收方的,而不是主动请求,这点和hadoop是一致的。
    case RequestSubmitDriver(description) => {
val driver = createDriver(description)
persistenceEngine.addDriver(driver)
waitingDrivers += driver
drivers.add(driver)
// 调度
schedule()
// 告诉client,提交成功了,把driver.id告诉它
sender ! SubmitDriverResponse(true, Some(driver.id), s"Driver successfully submitted as ${driver.id}")
}


这里我们主要看schedule方法就可以了,它是执行调度的方法。
private def schedule() {
if (state != RecoveryState.ALIVE) { return }

// 首先调度Driver程序,从workers里面随机抽一些出来
val shuffledWorkers = Random.shuffle(workers)
for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) {
for (driver <- waitingDrivers) {
// 判断内存和cpu够不够,够的就执行了哈
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
launchDriver(worker, driver)
waitingDrivers -= driver
}
}
}

// 这里是按照先进先出的,spreadOutApps是由spark.deploy.spreadOut参数来决定的,默认是true
if (spreadOutApps) {
// 遍历一下app
for (app <- waitingApps if app.coresLeft > 0) {
// canUse里面判断了worker的内存是否够用,并且该worker是否已经包含了该app的Executor
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
.filter(canUse(app, _)).sortBy(_.coresFree).reverse
val numUsable = usableWorkers.length
val assigned = new Array[Int](numUsable)
// 记录每个节点的核心数
var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
var pos = 0
// 遍历直到分配结束
while (toAssign > 0) {
// 从0开始遍历可用的work,如果可用的cpu减去已经分配的>0,就可以分配给它
if (usableWorkers(pos).coresFree - assigned(pos) > 0) {
toAssign -= 1
// 这个位置的work的可分配的cpu数+1
assigned(pos) += 1
}
pos = (pos + 1) % numUsable
}
// 给刚才标记的worker分配任务
for (pos <- 0 until numUsable) {
if (assigned(pos) > 0) {
val exec = app.addExecutor(usableWorkers(pos), assigned(pos))
launchExecutor(usableWorkers(pos), exec)
app.state = ApplicationState.RUNNING
}
}
}
} else {
// 这种方式和上面的方式的区别是,这种方式尽可能用少量的节点来完成这个任务
for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) {
for (app <- waitingApps if app.coresLeft > 0) {
// 判断条件是worker的内存比app需要的内存多
if (canUse(app, worker)) {
val coresToUse = math.min(worker.coresFree, app.coresLeft)
if (coresToUse > 0) {
val exec = app.addExecutor(worker, coresToUse)
launchExecutor(worker, exec)
app.state = ApplicationState.RUNNING
}
}
}
}
}
}


它的调度器是这样的,先调度Driver程序,然后再调度App,调度App的方式是从各个worker的里面和App进行匹配,看需要分配多少个cpu。
那我们接下来看两个方法launchDriver和launchExecutor即可。
  def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
logInfo("Launching driver " + driver.id + " on worker " + worker.id)
worker.addDriver(driver)
driver.worker = Some(worker)
worker.actor ! LaunchDriver(driver.id, driver.desc)
driver.state = DriverState.RUNNING
}


给worker发送了一个LaunchDriver的消息,下面在看launchExecutor的方法。
  def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo) {
logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
worker.addExecutor(exec)
worker.actor ! LaunchExecutor(masterUrl,
exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)
exec.application.driver ! ExecutorAdded(
exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)
}


它要做的事情多一点,除了给worker发送LaunchExecutor指令外,还需要给driver发送ExecutorAdded的消息,说你的任务已经有人干了。
在继续Worker讲之前,我们先看看它是怎么注册进来的,每个Worker启动之后,会自动去请求Master去注册自己,具体我们可以看receive的方法里面的RegisterWorker这一段,它需要上报自己的内存、Cpu、地址、端口等信息,注册成功之后返回RegisteredWorker信息给它,说已经注册成功了。Worker执行
同样的,我们到Worker里面在receive方法找LaunchDriver和LaunchExecutor就可以找到我们要的东西。
case LaunchDriver(driverId, driverDesc) => {
logInfo(s"Asked to launch driver $driverId")
val driver = new DriverRunner(driverId, workDir, sparkHome, driverDesc, self, akkaUrl)
drivers(driverId) = driver
driver.start()

coresUsed += driverDesc.cores
memoryUsed += driverDesc.mem
}


看一下start方法吧,start方法里面,其实是new Thread().start(),run方法里面是通过传过来的DriverDescription构造的一个命令,丢给ProcessBuilder去执行命令,结束之后调用。
worker !DriverStateChanged通知worker,worker再通过master ! DriverStateChanged通知master,释放掉worker的cpu和内存。
同理,LaunchExecutor执行完毕了,通过worker ! ExecutorStateChanged通知worker,然后worker通过master ! ExecutorStateChanged通知master,释放掉worker的cpu和内存。
下面我们再梳理一下这个过程,只包括Driver注册,Driver运行之后的过程在之后的文章再说,比较复杂。
1、Client通过获得Url地址获得ActorSelection(master的actor引用),然后通过ActorSelection给Master发送注册Driver请求(RequestSubmitDriver)
2、Master接收到请求之后就开始调度了,从workers列表里面找出可以用的Worker
3、通过Worker的actor引用ActorRef给可用的Worker发送启动Driver请求(LaunchDriver)
4、调度完毕之后,给Client回复注册成功消息(SubmitDriverResponse)
5、Worker接收到LaunchDriver请求之后,通过传过来的DriverDescription的信息构造出命令来,通过ProcessBuilder执行
6、ProcessBuilder执行完命令之后,通过DriverStateChanged通过Worker
7、Worker最后把DriverStateChanged汇报给Master
 
后记:听超哥说,org.apache.spark.deploy.Client这个类快要被删除了,不知道cluster的这种模式是不是也被放弃了,官方给出来的例子推荐的是client模式->直接运行程序。难怪在作业调度的时候,看到别的actor叫driverActor。
不过这篇文章还有存在的意义, Akka和调度这块,和我现在正在写的第三篇以及第四篇关系很密切。
 
岑玉海
转载请注明出处,谢谢!
 

Spark1.0新特性-->Spark SQL

cenyuhai 发表了文章 1 个评论 2736 次浏览 2015-09-11 15:17 来自相关话题

Spark1.0出来了,变化还是挺大的,文档比以前齐全了,RDD支持的操作比以前多了一些,Spark on yarn功能我居然跑通了。但是最最重要的就是多了一个Spark SQL的功能,它能对RDD进行Sql操作,目前它只是一个alpha版本,喜欢尝鲜的同志们 ...查看全部
Spark1.0出来了,变化还是挺大的,文档比以前齐全了,RDD支持的操作比以前多了一些,Spark on yarn功能我居然跑通了。但是最最重要的就是多了一个Spark SQL的功能,它能对RDD进行Sql操作,目前它只是一个alpha版本,喜欢尝鲜的同志们进来看看吧,下面是它的官网的翻译。
 
Spark SQL是支持在Spark中使用Sql、HiveSql、Scaca中的关系型查询表达式。它的核心组件是一个新增的RDD类型SchemaRDD,它把行对象用一个Schema来描述行里面的所有列的数据类型,它就像是关系型数据库里面的一张表。它可以从原有的RDD创建,也可以是Parquet文件,最重要的是它可以支持用HiveQL从hive里面读取数据。
下面是一些案例,可以在Spark shell当中运行。
首先我们要创建一个熟悉的Context,熟悉spark的人都知道吧,有了Context我们才可以进行各种操作。
val sc: SparkContext // 已经存在的SparkContext
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

import sqlContext._
Running SQL on RDDs
Spark SQL支持的一种表的类型是Scala的case class,case class定义了表的类型,下面是例子:
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext._

// case class在Scala 2.10里面最多支持22个列,,为了突破这个现实,最好是定义一个类实现Product接口
case class Person(name: String, age: Int)

// 为Person的对象创建一个RDD,然后注册成一张表
val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt))
people.registerAsTable("people")

// 直接写sql吧,这个方法是sqlContext提供的
val teenagers = sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

// teenagers是SchemaRDDs类型,它支持所有普通的RDD操作
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)


从上面这个方法来看,不是很好用,一个表好几十个字段,我就得一个一个的去赋值,它现在支持的操作都是很简单的操作,想要实现复杂的操作可以具体去看HiveContext提供的HiveQL。Using Parquet
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext._

val people: RDD[Person] = ... // 同上面的例子.

// 这个RDD已经隐式转换成一个SchemaRDD, 允许它存储成Parquet格式.
people.saveAsParquetFile("people.parquet")

// 从上面创建的文件里面读取,加载一个Parquet文件的结果也是一种JavaSchemaRDD.
val parquetFile = sqlContext.parquetFile("people.parquet")

//注册成表,然后使用
parquetFile.registerAsTable("parquetFile")
val teenagers = sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
teenagers.collect().foreach(println)


 Writing Language-Integrated Relational Queries
目前这个功能只是在Scala里面支持,挺鸡肋的一个功能
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext._
val people: RDD[Person] = ... // 同前面的例子.

// 和后面这个语句是一样的 'SELECT name FROM people WHERE age >= 10 AND age <= 19'
val teenagers = people.where('age >= 10).where('age <= 19).select('name)
 Hive Support
这下面的才是高潮,它可以从hive里面取数据。但是hive的依赖太多了,默认Spark assembly是没带这些依赖的,需要我们运行SPARK_HIVE=true sbt/sbt assembly/assembly重新编译,或者用maven的时候添加-Phive参数,它会重新编译出来一个hive assembly的jar包,然后需要把这个jar包放到所有的节点上。另外还需要把
hive-site.xml放到conf目录下。没进行hive部署的话,下面的例子也可以用LocalHiveContext来代替HiveContext。

val sc: SparkContext // 已经存在的SparkContext
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)

// 引入这个Context,然后就会给所有的sql语句进行隐式转换
import hiveContext._

hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
hql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

// 使用HiveQL查询
hql("FROM src SELECT key, value").collect().foreach(println)


这个功能看起来还挺像样,前面两个看起来就像渣一样,没劲儿,不知道为什么不自带那些依赖,还要我们再编译一下,但是我下的那个版本运行的时候提示我已经编译包括了hive的。尼玛,真恶心。
 

Spark Streaming自定义Receivers

cenyuhai 发表了文章 0 个评论 1686 次浏览 2015-09-11 15:01 来自相关话题

自定义一个Receiver class SocketTextStreamReceiver(host: String, port: Int( extends NetworkReceiver[String] ...查看全部
自定义一个Receiver
 class SocketTextStreamReceiver(host: String, port: Int(
extends NetworkReceiver[String]
{
protected lazy val blocksGenerator: BlockGenerator =
new BlockGenerator(StorageLevel.MEMORY_ONLY_SER_2)

protected def onStart() = {
blocksGenerator.start()
val socket = new Socket(host, port)
val dataInputStream = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8"))
var data: String = dataInputStream.readLine()
while (data != null) {
blocksGenerator += data
data = dataInputStream.readLine()
}
}

protected def onStop() {
blocksGenerator.stop()
}
}

An Actor as Receiver
 class SocketTextStreamReceiver (host:String,
port:Int,
bytesToString: ByteString => String) extends Actor with Receiver {

override def preStart = IOManager(context.system).connect(host, port)

def receive = {
case IO.Read(socket, bytes) => pushBlock(bytesToString(bytes))
}

}

A Sample Spark Application
  val ssc = new StreamingContext(master, "WordCountCustomStreamSource",
Seconds(batchDuration))
//使用自定义的receiver
val lines = ssc.networkStream[String](new SocketTextStreamReceiver(
"localhost", 8445))

//或者使用这个自定义的actor Receiver
val lines2 = ssc.actorStream[String](Props(new SocketTextStreamReceiver(
"localhost",8445, z => z.utf8String)),"SocketReceiver") */
[code]    val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
[/code]

提交成功之后,启动Netcat测试一下
$ nc -l localhost 8445 hello world hello hello

下面是合并多个输入流的方法:
  val lines = ssc.actorStream[String](Props(new SocketTextStreamReceiver(
"localhost",8445, z => z.utf8String)),"SocketReceiver")

// Another socket stream receiver
val lines2 = ssc.actorStream[String](Props(new SocketTextStreamReceiver(
"localhost",8446, z => z.utf8String)),"SocketReceiver")

val union = lines.union(lines2)


 
 

Spark Streaming编程指南

cenyuhai 发表了文章 0 个评论 1848 次浏览 2015-09-11 14:49 来自相关话题

Overview Spark Streaming属于Spark的核心api,它支持高吞吐量、支持容错的实时流数据处理。 它可以接受来自Kafka, Flume, Twitter, ZeroMQ和TCP Socket的数据源,使用简单的a ...查看全部
Overview
Spark Streaming属于Spark的核心api,它支持高吞吐量、支持容错的实时流数据处理。
它可以接受来自Kafka, Flume, Twitter, ZeroMQ和TCP Socket的数据源,使用简单的api函数比如
map
,
reduce
,
join
,
window等操作,还可以直接使用内置的机器学习算法、图算法包来处理数据。


 
它的工作流程像下面的图所示一样,接受到实时数据后,给数据分批次,然后传给Spark Engine处理最后生成该批次的结果。

它支持的数据流叫Dstream,直接支持Kafka、Flume的数据源。Dstream是一种连续的RDDs,下面是一个例子帮助大家理解Dstream。A Quick Example
 
// 创建StreamingContext,1秒一个批次
val ssc = new StreamingContext(sparkConf, Seconds(1));

// 获得一个DStream负责连接 监听端口:地址
val lines = ssc.socketTextStream(serverIP, serverPort);

// 对每一行数据执行Split操作
val words = lines.flatMap(_.split(" "));
// 统计word的数量
val pairs = words.map(word => (word, 1));
val wordCounts = pairs.reduceByKey(_ + _);

// 输出结果
wordCounts.print();

ssc.start(); // 开始
ssc.awaitTermination(); // 计算完毕退出


具体的代码可以访问这个页面:
https://github.com/apache/incubator-spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala
如果已经装好Spark的朋友,我们可以通过下面的例子试试。
首先,启动Netcat,这个工具在Unix-like的系统都存在,是个简易的数据服务器。
使用下面这句命令来启动Netcat:
$ nc -lk 9999

接着启动example
$ ./bin/run-example org.apache.spark.streaming.examples.NetworkWordCount local[2] localhost 9999

在Netcat这端输入hello world,看Spark这边的
# TERMINAL 1:
# Running Netcat

$ nc -lk 9999

hello world

...
# TERMINAL 2: RUNNING NetworkWordCount or JavaNetworkWordCount

$ ./bin/run-example org.apache.spark.streaming.examples.NetworkWordCount local[2] localhost 9999
...
-------------------------------------------
Time: 1357008430000 ms
-------------------------------------------
(hello,1)
(world,1)
...


 Basics
下面这块是如何编写代码的啦,哇咔咔!
首先我们要在SBT或者Maven工程添加以下信息:
groupId = org.apache.spark
artifactId = spark-streaming_2.10
version = 0.9.0-incubating

//需要使用一下数据源的,还要添加相应的依赖
Source Artifact
Kafka spark-streaming-kafka_2.10
Flume spark-streaming-flume_2.10
Twitter spark-streaming-twitter_2.10
ZeroMQ spark-streaming-zeromq_2.10
MQTT spark-streaming-mqtt_2.10


 
接着就是实例化
new StreamingContext(master, appName, batchDuration, [sparkHome], [jars])

这是之前的例子对DStream的操作。
Input Sources
除了sockets之外,我们还可以这样创建Dstream
streamingContext.fileStream(dataDirectory)

 
这里有3个要点:
(1)dataDirectory下的文件格式都是一样
(2)在这个目录下创建文件都是通过移动或者重命名的方式创建的
(3)一旦文件进去之后就不能再改变
假设我们要创建一个Kafka的Dstream。
import org.apache.spark.streaming.kafka._
KafkaUtils.createStream(streamingContext, kafkaParams, ...)

 
如果我们需要自定义流的receiver,可以查看https://spark.incubator.apache.org/docs/latest/streaming-custom-receivers.htmlOperations
对于Dstream,我们可以进行两种操作,transformationsoutput Transformations
Transformation                          Meaning
map(func) 对每一个元素执行func方法
flatMap(func) 类似map函数,但是可以map到0+个输出
filter(func) 过滤
repartition(numPartitions) 增加分区,提高并行度
union(otherStream) 合并两个流
count()          统计元素的个数
reduce(func) 对RDDs里面的元素进行聚合操作,2个输入参数,1个输出参数
countByValue() 针对类型统计,当一个Dstream的元素的类型是K的时候,调用它会返回一个新的Dstream,包含键值对,Long是每个K出现的频率。
reduceByKey(func, [numTasks]) 对于一个(K, V)类型的Dstream,为每个key,执行func函数,默认是local是2个线程,cluster是8个线程,也可以指定numTasks
join(otherStream, [numTasks]) 把(K, V)和(K, W)的Dstream连接成一个(K, (V, W))的新Dstream
cogroup(otherStream, [numTasks]) 把(K, V)和(K, W)的Dstream连接成一个(K, Seq[V], Seq[W])的新Dstream
transform(func) 转换操作,把原来的RDD通过func转换成一个新的RDD
updateStateByKey(func) 针对key使用func来更新状态和值,可以将state该为任何值

UpdateStateByKey Operation
使用这个操作,我们是希望保存它状态的信息,然后持续的更新它,使用它有两个步骤:
(1)定义状态,这个状态可以是任意的数据类型
(2)定义状态更新函数,从前一个状态更改新的状态
下面展示一个例子:
def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
val newCount = ... // add the new values with the previous running count to get the new count
Some(newCount)
}

 
它可以用在包含(word, 1) 的Dstream当中,比如前面展示的example
val runningCounts = pairs.updateStateByKey[Int](updateFunction _)

 
它会针对里面的每个word调用一下更新函数,newValues是最新的值,runningCount是之前的值。Transform Operation
和transformWith一样,可以对一个Dstream进行RDD->RDD操作,比如我们要对Dstream流里的RDD和另外一个数据集进行join操作,但是Dstream的API没有直接暴露出来,我们就可以使用
transform方法来进行这个操作,下面是例子:

val spamInfoRDD = sparkContext.hadoopFile(...) // RDD containing spam information

val cleanedDStream = inputDStream.transform(rdd => {
rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
...
})

 
另外,我们也可以在里面使用机器学习算法和图算法。Window Operations

先举个例子吧,比如前面的word count的例子,我们想要每隔10秒计算一下最近30秒的单词总数。
我们可以使用以下语句:
// Reduce last 30 seconds of data, every 10 seconds
val windowedWordCounts = pairs.reduceByKeyAndWindow(_ + _, Seconds(30), Seconds(10))

 
这里面提到了windows的两个参数:
(1)window length:window的长度是30秒,最近30秒的数据
(2)slice interval:计算的时间间隔
通过这个例子,我们大概能够窗口的意思了,定期计算滑动的数据。
下面是window的一些操作函数,还是有点儿理解不了window的概念,Meaning就不翻译了,直接删掉
Transformation                                                                              Meaning
window(windowLength, slideInterval)
countByWindow(windowLength, slideInterval)
reduceByWindow(func, windowLength, slideInterval)
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks])
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])
countByValueAndWindow(windowLength, slideInterval, [numTasks])


 Output Operations
Output Operation                                      Meaning
print() 打印到控制台
foreachRDD(func) 对Dstream里面的每个RDD执行func,保存到外部系统
saveAsObjectFiles(prefix, [suffix]) 保存流的内容为SequenceFile, 文件名 : "prefix-TIME_IN_MS[.suffix]".
saveAsTextFiles(prefix, [suffix]) 保存流的内容为文本文件, 文件名 : "prefix-TIME_IN_MS[.suffix]".
saveAsHadoopFiles(prefix, [suffix]) 保存流的内容为hadoop文件, 文件名 : "prefix-TIME_IN_MS[.suffix]".

 Persistence
Dstream中的RDD也可以调用persist()方法保存在内存当中,但是基于window和state的操作,
reduceByWindow,
reduceByKeyAndWindow,
updateStateByKey它们就是隐式的保存了,系统已经帮它自动保存了。
从网络接收的数据(such as, Kafka, Flume, sockets, etc.),默认是保存在两个节点来实现容错性,以序列化的方式保存在内存当中。RDD Checkpointing
状态的操作是基于多个批次的数据的。它包括基于window的操作和updateStateByKey。因为状态的操作要依赖于上一个批次的数据,所以它要根据时间,不断累积元数据。为了清空数据,它支持周期性的检查点,通过把中间结果保存到hdfs上。因为检查操作会导致保存到hdfs上的开销,所以设置这个时间间隔,要很慎重。对于小批次的数据,比如一秒的,检查操作会大大降低吞吐量。但是检查的间隔太长,会导致任务变大。通常来说,5-10秒的检查间隔时间是比较合适的。
ssc.checkpoint(hdfsPath)  //设置检查点的保存位置
dstream.checkpoint(checkpointInterval) //设置检查点间隔

 
对于必须设置检查点的Dstream,比如通过
updateStateByKey
reduceByKeyAndWindow创建的Dstream,默认设置是至少10秒。
Performance Tuning
对于调优,可以从两个方面考虑:
(1)利用集群资源,减少处理每个批次的数据的时间
(2)给每个批次的数据量的设定一个合适的大小Level of Parallelism
像一些分布式的操作,比如
reduceByKey和
reduceByKeyAndWindow,默认的8个并发线程,可以通过对应的函数提高它的值,或者通过修改参数spark.default.parallelism来提高这个默认值。
Task Launching Overheads
通过进行的任务太多也不好,比如每秒50个,发送任务的负载就会变得很重要,很难实现压秒级的时延了,当然可以通过压缩来降低批次的大小。Setting the Right Batch Size
要使流程序能在集群上稳定的运行,要使处理数据的速度跟上数据流入的速度。最好的方式计算这个批量的大小,我们首先设置batch size为5-10秒和一个很低的数据输入速度。确实系统能跟上数据的速度的时候,我们可以根据经验设置它的大小,通过查看日志看看Total delay的多长时间。如果delay的小于batch的,那么系统可以稳定,如果delay一直增加,说明系统的处理速度跟不上数据的输入速度。24/7 Operation
Spark默认不会忘记元数据,比如生成的RDD,处理的stages,但是Spark Streaming是一个24/7的程序,它需要周期性的清理元数据,通过spark.cleaner.ttl来设置。比如我设置它为600,当超过10分钟的时候,Spark就会清楚所有元数据,然后持久化RDDs。但是这个属性要在SparkContext 创建之前设置。
但是这个值是和任何的window操作绑定。Spark会要求输入数据在过期之后必须持久化到内存当中,所以必须设置delay的值至少和最大的window操作一致,如果设置小了,就会报错。Monitoring
除了Spark内置的监控能力,还可以StreamingListener这个接口来获取批处理的时间, 查询时延, 全部的端到端的试验。Memory Tuning
Spark Stream默认的序列化方式是StorageLevel.MEMORY_ONLY_SER,而不是RDD的StorageLevel.MEMORY_ONLY
默认的,所有持久化的RDD都会通过被Spark的LRU算法剔除出内存,如果设置了spark.cleaner.ttl,就会周期性的清理,但是这个参数设置要很谨慎。一个更好的方法是设置spark.streaming.unpersist为true,这就让Spark来计算哪些RDD需要持久化,这样有利于提高GC的表现。
推荐使用concurrent mark-and-sweep GC,虽然这样会降低系统的吞吐量,但是这样有助于更稳定的进行批处理。Fault-tolerance PropertiesFailure of a Worker Node
下面有两种失效的方式:
1.使用hdfs上的文件,因为hdfs是可靠的文件系统,所以不会有任何的数据失效。
2.如果数据来源是网络,比如Kafka和Flume,为了防止失效,默认是数据会保存到2个节点上,但是有一种可能性是接受数据的节点挂了,那么数据可能会丢失,因为它还没来得及把数据复制到另外一个节点。Failure of the Driver Node
为了支持24/7不间断的处理,Spark支持驱动节点失效后,重新恢复计算。Spark Streaming会周期性的写数据到hdfs系统,就是前面的检查点的那个目录。驱动节点失效之后,
StreamingContext可以被恢复的。

为了让一个Spark Streaming程序能够被回复,它需要做以下操作:
(1)第一次启动的时候,创建 StreamingContext,创建所有的streams,然后调用start()方法。
(2)恢复后重启的,必须通过检查点的数据重新创建StreamingContext。
下面是一个实际的例子:
通过StreamingContext.getOrCreate来构造StreamingContext,可以实现上面所说的。
// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
val ssc = new StreamingContext(...) // new context
val lines = ssc.socketTextStream(...) // create DStreams
...
ssc.checkpoint(checkpointDirectory) // set checkpoint directory
ssc
}

// Get StreaminContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)

// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...

// Start the context
context.start()
context.awaitTermination()


 
在stand-alone的部署模式下面,驱动节点失效了,也可以自动恢复,让别的驱动节点替代它。这个可以在本地进行测试,在提交的时候采用supervise模式,当提交了程序之后,使用jps查看进程,看到类似DriverWrapper就杀死它,如果是使用YARN模式的话就得使用其它方式来重新启动了。
这里顺便提一下向客户端提交程序吧,之前总结的时候把这块给落下了。
./bin/spark-class org.apache.spark.deploy.Client launch
[client-options] \
\
[application-options]

cluster-url: master的地址.
application-jar-url: jar包的地址,最好是hdfs上的,带上hdfs://...否则要所有的节点的目录下都有这个jar的
main-class: 要发布的程序的main函数所在类.
Client Options:
--memory (驱动程序的内存,单位是MB)
--cores (为你的驱动程序分配多少个核心)
--supervise (节点失效的时候,是否重新启动应用)
--verbose (打印增量的日志输出)


 
在未来的版本,会支持所有的数据源的可恢复性。
为了更好的理解基于HDFS的驱动节点失效恢复,下面用一个简单的例子来说明:
Time     Number of lines in input file     Output without driver failure     Output with driver failure
1   10                  10                 10
2   20                  20                 20
3   30                  30                 30
4   40                  40                 [DRIVER FAILS] no output
5   50                  50                 no output
6   60                  60                 no output
7   70                  70                 [DRIVER RECOVERS] 40, 50, 60, 70
8   80                  80                 80
9   90                  90                 90
10  100                  100                100


 
 
在4的时候出现了错误,40,50,60都没有输出,到70的时候恢复了,恢复之后把之前没输出的一下子全部输出。
 
 
 
 

Spark的机器学习算法mlib的例子运行

cenyuhai 发表了文章 0 个评论 1750 次浏览 2015-09-11 14:49 来自相关话题

 Spark自带了机器学习的算法mlib,页面网址 http://spark.incubator.apache.org/docs/latest/mllib-guide.html   但是运行的时候,遇到了很多问题,着实让我头疼了很久,不过最后还是解 ...查看全部
 Spark自带了机器学习的算法mlib,页面网址 http://spark.incubator.apache.org/docs/latest/mllib-guide.html
  但是运行的时候,遇到了很多问题,着实让我头疼了很久,不过最后还是解决了,下面说一下这两个问题吧。
  第一个demo运行到val model = SVMWithSGD.train(parsedData, numIterations)这一句的时候遇到了lzo的jar包。
  我是这么解决的,方法不是很好,我修改了spark-env.sh这个文件,添加了SPARK_CLASSPATH=/usr/lib/spark/ext/hadoop-lzo-0.5.0.jar这句话就ok了
  这种方式不是很好,比如我指定某个目录的话,它是不认的,只能一个jar包一个jar包的指定,也可以学习下面的方法。
  这次是遇到了jar包的问题,Spark搭配的是hadoop1.0.4,搭配hadoop2.2.0的时候就可能会出现这个问题,先放一下错误信息,方便大家搜索。
java.lang.NoSuchMethodError: org.apache.commons.io.IOUtils.closeQuietly

  最后被我查出来是commons.io这个jar包引起的,但是Spark自己下的话,它也下了commons-io-2.1.jar这个jar包,但是它并没有使用这个jar包,编译过的Spark会把所需的jar包全部合并到一起,最后打成一个类似spark-assembly_2.9.3-0.8.1-incubating-hadoop2.2.0.jar的jar包。
  这里面就涉及到怎么合并两个jar包的问题了,我是这么处理的,分别解压两个jar包,用commons-io-2.1.jar的解压出来的目录覆盖spark-assembly_2.9.3-0.8.1-incubating-hadoop2.2.0.jar解压出来的相应的目录,然后在加压出来的根目录下使用下面的命令,重新打包。
jar -cvf spark-assembly_2.9.3-0.8.1-incubating-hadoop2.2.0.jar *;

  替换掉原来的jar包就运行就正常了。

Spark部署

cenyuhai 发表了文章 0 个评论 2524 次浏览 2015-09-11 14:47 来自相关话题

Spark的部署让人有点儿困惑,有些需要注意的事项,本来我已经装成功了YARN模式的,但是发现了一些问题,出现错误看日志信息,完全看不懂那个错误信息,所以才打算翻译Standalone的部署的文章。第一部分,我先说一下YARN模式的部署方法。第二部分才是Sta ...查看全部
Spark的部署让人有点儿困惑,有些需要注意的事项,本来我已经装成功了YARN模式的,但是发现了一些问题,出现错误看日志信息,完全看不懂那个错误信息,所以才打算翻译Standalone的部署的文章。第一部分,我先说一下YARN模式的部署方法。第二部分才是Standalone的方式。
  我们首先看一下Spark的结构图,和hadoop的差不多。
  
 1、YARN模式
  采用yarn模式的话,其实就是把spark作为一个客户端提交作业给YARN,实际运行程序的是YARN,就不需要部署多个节点,部署一个节点就可以了。
  把从官网下载的压缩包在linux下解压之后,进入它的根目录,没有安装git的,先执行yum install git安装git
  1)运行这个命令:
SPARK_HADOOP_VERSION=2.2.0 SPARK_YARN=true ./sbt/sbt assembly

  就等着吧,它会下载很多jar包啥的,这个过程可能会卡死,卡死的就退出之后,重新执行上面的命令。
  2)编辑conf目录下的spark-env.sh(原来的是.template结尾的,删掉.template),添加上HADOOP_CONF_DIR参数
   HADOOP_CONF_DIR=/etc/hadoop/conf

  3)运行一下demo看看,能出结果
Pi is roughly 3.13794

   SPARK_JAR=./assembly/target/scala-2.9.3/spark-assembly_2.9.3-0.8.1-incubating-hadoop2.2.0.jar \
./spark-class org.apache.spark.deploy.yarn.Client \
--jar examples/target/scala-2.9.3/spark-examples-assembly-0.8.1-incubating.jar \
--class org.apache.spark.examples.SparkPi \
--args yarn-standalone \
--num-workers 3 \
--master-memory 1g \
--worker-memory 1g \
--worker-cores 1

2、Standalone模式
  下面我们就讲一下怎么部署Standalone,参考页面是http://spark.incubator.apache.org/docs/latest/spark-standalone.html。
  这里我们要一个干净的环境,刚解压出来的,运行之前的命令的时候不能再用了,会报错的。
  1)打开make-distribution.sh,修改SPARK_HADOOP_VERSION=2.2.0,然后执行./make-distribution.sh, 然后会生成一个dist目录,这个目录就是我们要部署的内容。官方推荐是先把master跑起来,再部署别的节点,大家看看bin目录下面的脚本,和hadoop的差不多的,按照官方文档的推荐的安装方式有点儿麻烦。下面我们先说简单的方法,再说官方的方式。
  我们打开dist目录下conf目录的,如果没有slaves文件,添加一个,按照hadoop的那种配置方式,把slave的主机名写进去,然后把dist目录部署到各台机器上,回到master上面,进入第三题、目录的sbin目录下,有个start-all.sh,执行它就可以了。
  下面是官方文档推荐的方式,先启动master,执行。
./bin/start-master.sh

  2)部署dist的目录到各个节点,然后通过这个命令来连接master节点
./spark-class org.apache.spark.deploy.worker.Worker spark://IP:PORT

  3)然后在主节点查看一下http://localhost:8080 ,查看一下子节点是否在这里,如果在,就说明连接成功了。
  4) 部署成功之后,想要在上面部署程序的话,在执行./spark-shell的时候,要加上MASTER这个参数。
MASTER=spark://IP:PORT ./spark-shell
3、High Availability
  Spark采用Standalone模式的话,Spark本身是一个master/slaves的模式,这样就会存在单点问题,Spark采用的是zookeeper作为它的active-standby切换的工具,设置也很简单。一个完整的切换需要1-2分钟的时间,这个时候新提交的作业会受到影响,之前提交到作业不会受到影响。
  在spark-env.sh添加以下设置:
//设置下面三项JVM参数,具体的设置方式在下面
//spark.deploy.recoveryMode=ZOOKEEPER
//spark.deploy.zookeeper.url=192.168.1.100:2181,192.168.1.101:2181
// /spark是默认的,可以不写
//spark.deploy.zookeeper.dir=/spark

export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=hadoop.Master:2181,hadoop.SlaveT1:2181,hadoop.SlaveT2:2181"


  这里就有一个问题了,集群里面有多个master,我们连接的时候,连接谁?用过hbase的都知道是先连接的zookeeper,但是Spark采用的是另外的一种方式,如果我们有多个master的话,实例化SparkContext的话,使用spark://host1:port1,host2:port2这样的地址,这样它会同时注册两个,一个失效了,还有另外一个。
  如果不愿意配置高可用的话,只是想失败的时候,再恢复一下,重新启动的话,那就使用FILESYSTEM的使用,指定一个目录,把当前的各个节点的状态写入到文件系统。
spark.deploy.recoveryMode=FILESYSTEM
spark.deploy.recoveryDirectory=/usr/lib/spark/dataDir

  当 stop-master.sh来杀掉master之后,状态没有及时更新,再次启动的时候,会增加一分钟的启动时间来等待原来的连接超时。
  recoveryDirectory最好是能够使用一个nfs,这样一个master失败之后,就可以启动另外一个master了。