第二课Spark程序设计与实战

第二课Spark程序设计与实战的相关问题都在下面进行提问回帖

1、大家在这个帖子上回复自己想要提的问题。(相同的问题,请点赞表示自己关注这个问题,不用重复提问)

2、提出的问题,老师在直播课的最后30分钟统一回答。

3、课后会整理出参考答案,给每个问题回帖。

第二节课spark程序设计和实战 满意度调查问卷,
http://wj.qq.com/s/834859/2cab 或扫描二维码

spark调查问卷2.jpg

 

@CrazyChao - 人生不止眼前的苟且,还有诗和远方的田野!^.^

赞同来自: lcg_1023 alexanda2000 wwz573398723 kgars DreamゞRick

之前董老师讲课的时候讲到过中搜的日志统计与分析系统,它的可视化是怎么实现的呢? 现在做可视化用什么工具比较好?

对方正在输入中 - 80后IT男、运维

赞同来自: tabtou fanniao 先低头后抬头

spark-shell在namenode的stand-by节点执行会报sparkContext出错,然后到active的namenode的机器起spark-shell就可以了。为什么这样

Oner_wv

赞同来自: yesuio

Spark 从HDFS中创建RDD时,并行度(Partitions)是如何定的?

Oner_wv

赞同来自: yesuio

分布式估算Pi时,val x = random * 2 - 1,random具体指的是什么?

Oner_wv

赞同来自: yjyzsl

在老师的PopularMovieAnalyzer.scala代码中,在对userlist进行了广播 //broadcast val userSet = HashSet() ++ userlist val broadcastUserSet = sc.broadcast(userSet)   之后在讲movie_id转为movie_title时,没有对movieID2Name进行广播 val movieID2Name = moviesRdd.map(_.split("::")).map { x =>       (x(0), x(1))     }.collect().toMap topKmovies.map(x => (movieID2Name.getOrElse(x._1, null), x._2)).foreach(println)   这是因为movieID2Name数据比较小的原因么?如果真的是因为数据量比较小的原因,那一般多大的数据量可以考虑进行广播呢?

Benjamin

赞同来自:

hadoop集群中有5个节点,其中1个主节点,1个备用节点,3个数据节点(数据节点上有日志节点),分配内存的时候,主备节点各分配了64G,数据节点各分配了12G,请问董老师,这样分配合理么?是不是随着数据的不断增多,主备节点的内存消耗会成线性增长? ps:平时datanode节点内存消耗平均保持在8G左右,主节点目前已经占用了8G多。

LinJK

赞同来自:

环境是hdp2.4.2.0-258,spark1.6.1和hadoop2.7.1的结合版本。   我在本地IDEA上提交spark作业,模式是yarn-client。部分log: 16/10/13 18:21:36 WARN DomainSocketFactory: The short-circuit local reads feature cannot be used because UNIX Domain sockets are not available on Windows. 16/10/13 18:21:36 INFO Client: Uploading resource file:/C:/Users/Administrator/Desktop/spark-assembly-1.6.1.2.4.2.0-258-hadoop2.7.1.2.4.2.0-258.jar -> hdfs://mycluster/user/Áֻý/.sparkStaging/application_1475981805871_0020/spark-assembly-1.6.1.2.4.2.0-258-hadoop2.7.1.2.4.2.0-258.jar 16/10/13 18:22:09 INFO Client: Uploading resource file:/C:/Users/Administrator/AppData/Local/Temp/spark-763eced0-1f04-40ed-b458-43467711c481/__spark_conf__2250551793309207354.zip -> hdfs://mycluster/user/Áֻý/.sparkStaging/application_1475981805871_0020/__spark_conf__2250551793309207354.zip 16/10/13 18:22:10 WARN Client:  hdp.version is not found, Please set HDP_VERSION=xxx in spark-env.sh, or set -Dhdp.version=xxx in spark.{driver|yarn.am}.extraJavaOptions or set SPARK_JAVA_OPTS="-Dhdp.verion=xxx" in spark-env.sh If you're running Spark under HDP. ... 16/10/13 18:22:10 INFO SchedulerExtensionServices: Starting Yarn extension services with app application_1475981805871_0020 and attemptId None 16/10/13 18:22:11 INFO Client: Application report for application_1475981805871_0020 (state: ACCEPTED) 16/10/13 18:22:11 INFO Client:       client token: N/A      diagnostics: N/A      ApplicationMaster host: N/A      ApplicationMaster RPC port: -1      queue: default      start time: 1476354134999      final status: UNDEFINED      tracking URL: http://rm.hadoop:8088/proxy/ap ... 0020/      user: Áֻý 16/10/13 18:22:12 INFO Client: Application report for application_1475981805871_0020 (state: ACCEPTED) 16/10/13 18:22:13 INFO Client: Application report for application_1475981805871_0020 (state: ACCEPTED) ... 16/10/13 18:46:43 INFO Client:       client token: N/A      diagnostics: Application application_1475981805871_0020 failed 2 times due to ApplicationMaster for attempt appattempt_1475981805871_0020_000002 timed out. Failing the application.      ApplicationMaster host: N/A      ApplicationMaster RPC port: -1      queue: default      start time: 1476354134999      final status: FAILED      tracking URL: http://rm.hadoop:8088/cluster/ ... _0020      user: Áֻý 16/10/13 18:46:43 INFO Client: Deleting staging directory .sparkStaging/application_1475981805871_0020 16/10/13 18:46:43 ERROR SparkContext: Error initializing SparkContext. org.apache.spark.SparkException: Yarn application has already ended! It might have been killed or unable to launch application master. 是因为什么问题导致的?local是可以的。standalone模式只要指定自己程序打包好的jar包,也可以,就是这个模式不行 贴上测试代码:
object SparkTest {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("spark wordcount").setMaster("yarn-client")
//val conf = new SparkConf().setAppName("spark wordcount").setMaster("spark://dn1:7077")
    val sc = new SparkContext(conf)
//    sc.addJar("hdfs://mycluster/user/wsh/spark-1.0.0.0.jar")
    val textFile = sc.textFile("hdfs://mycluster/user/wsh/input/ljktest.txt")
    val counts = textFile.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_ + _)
    counts.saveAsTextFile("hdfs://mycluster/user/wsh/sparkOut/" + new Date().getTime)
  }
}

Oner_wv

赞同来自:

Scala中方法调用时,什么时候可以省略掉“.”?

Oner_wv

赞同来自:

key/value RDD有一个参数可以设置reduce任务的并行度,spark.default.parallelism也可以设置并行度,那么这两种设置方法的区别在哪呢?或者说是影响范围一样么?   此外,key/value RDD设置了reduce任务并行度之后,是不是最后还需要有一个reduce来进行最终结果的汇总呢? 如果没有的话,感觉计算结果会不正确啊,

Oner_wv

赞同来自:

广播变量示例中,parallelize参数中数据集数量和并行度都不一样 val rdd = sc.parallelize(1to 6, 2) val rdd = sc.parallelize(1to =14pt1=14pt000000, =14pt1=14pt00) 怎么比较效率呢?

shining0123

赞同来自:

spark是如何实现数据共享的,比如RDD1读取输入文件后,经过两个转换操作生成RDD2和RDD3,RDD2和RDD3分别由不同的操作,最终经过action操作提交作业。spark内部是如何划分stage呢,从RDD1分别到RDD2和RDD3是如何实现数据共享的?  请教董老师。

阳光少年安度因

赞同来自:

我在整合hive和spark之后,使用yarn-client启动spark-sql出现了这个问题: 16/10/14 17:11:49 ERROR Datastore: An exception was thrown while adding/validating class(es) : Specified key was too long; max key length is 767 bytes com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Specified key was too long; max key length is 767 bytes 这是我的数据库的字符集设置:  show variables like '%character%'; +--------------------------+----------------------------+ | Variable_name            | Value                      | +--------------------------+----------------------------+ | character_set_client     | utf8                       | | character_set_connection | utf8                       | | character_set_database   | utf8                       | | character_set_filesystem | binary                     | | character_set_results    | utf8                       | | character_set_server     | utf8                       | | character_set_system     | utf8                       | | character_sets_dir       | /usr/share/mysql/charsets/ | +--------------------------+----------------------------+ 请问是什么原因导致的呢?ps:虽然报ERROR但是spark-sql依然可以使用。    

Benjamin

赞同来自:

@Dong:董老师,我接着问您一个问题,我的hadoop进群中HADOOP_HEAPSIZE需要做调整么?每个节点的jvm内存是如何分配的?put的并发和持续性还是比较频繁的,因为hdfs当做了文件服务器,put完后又要read出来,最近我的datanode节点会经常死掉,datanode线程还在,但是数据节点状态就是dead,分析了下日志,大概报下面几个错误: java.io.IOException: Connection reset by peer         at sun.nio.ch.FileDispatcher.write0(Native Method)         at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:29)         at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:69)         at sun.nio.ch.IOUtil.write(IOUtil.java:40)         at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:336)         at org.apache.hadoop.net.SocketOutputStream$Writer.performIO(SocketOutputStream.java:63)         at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:142)         at org.apache.hadoop.net.SocketOutputStream.write(SocketOutputStream.java:159)         at org.apache.hadoop.net.SocketOutputStream.write(SocketOutputStream.java:117)         at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:65)         at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:123)         at java.io.DataOutputStream.flush(DataOutputStream.java:106)         at org.apache.hadoop.hdfs.server.datanode.BlockReceiver$PacketResponder.run(BlockReceiver.java:1079)         at java.lang.Thread.run(Thread.java:662) 2016-10-12 12:06:50,189 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: Not checking disk as checkDiskError was called on a network related exception java.io.IOException: Premature EOF from inputStream         at org.apache.hadoop.io.IOUtils.readFully(IOUtils.java:194)         at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doReadFully(PacketReceiver.java:213)         at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:134)         at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:109)         at org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receivePacket(BlockReceiver.java:435)         at org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receiveBlock(BlockReceiver.java:693)         at org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:569)         at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opWriteBlock(Receiver.java:115)         at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.processOp(Receiver.java:68)         at org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:221)         at java.lang.Thread.run(Thread.java:662) java.io.EOFException: Premature EOF: no length prefix available         at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:1492)         at org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:116)         at org.apache.hadoop.hdfs.server.datanode.BlockReceiver$PacketResponder.run(BlockReceiver.java:950)         at java.lang.Thread.run(Thread.java:662) java.net.SocketTimeoutException: 70000 millis timeout while waiting for channel to be ready for read. ch : java.nio.channels.SocketChannel[connected local=/10.64.39.119:54404 remote=/10.64.39.118:50010]         at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:164)         at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:164)         at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)         at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131)         at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:118)         at java.io.FilterInputStream.read(FilterInputStream.java:66)         at java.io.FilterInputStream.read(FilterInputStream.java:66)         at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:1490)         at org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:511)         at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opWriteBlock(Receiver.java:115)         at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.processOp(Receiver.java:68)         at org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:221)         at java.lang.Thread.run(Thread.java:662) java.nio.channels.ClosedByInterruptException         at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:184)         at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:343)         at org.apache.hadoop.net.SocketOutputStream$Writer.performIO(SocketOutputStream.java:63)         at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:142)         at org.apache.hadoop.net.SocketOutputStream.write(SocketOutputStream.java:159)         at org.apache.hadoop.net.SocketOutputStream.write(SocketOutputStream.java:117)         at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:65)         at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:123)         at java.io.DataOutputStream.flush(DataOutputStream.java:106)         at org.apache.hadoop.hdfs.server.datanode.BlockReceiver$PacketResponder.run(BlockReceiver.java:1079)         at java.lang.Thread.run(Thread.java:662) 2016-10-12 12:07:11,125 WARN org.apache.hadoop.hdfs.server.datanode.DataNode: checkDiskError: exception:

ShadowFiend

赞同来自:

kafka连sparkstreaming 直连方式不设置offset 和另一种连接方式有没有差别,有2个topic要消费,写在一起吗

xiao5

赞同来自:

请教spark的优化流程,或者从哪下手或者参数?

dark

赞同来自:

董老师,你好:    请问,数据库连接对象、es的连接对象能否放入到广播中吗?我现在是把连接字符串放入到广播中,然后在rdd中创建数据库连接对象。对象使用的是单例。您是否有什么好的方式创建这些对象呢?

镜中夜 - just do it

赞同来自:

spark支持所有Scala的集合操作?

镜中夜 - just do it

赞同来自:

alluxio的一个作用就是spark的cache操作么?

yslbg2008

赞同来自:

var data=sc.textFile("/.....")data.cache() data.filter(......) 如果文本数据太大,加载到内存会不会爆呢??      

kaiball9999 - Focus on bigdata

赞同来自:

driver 不需要计算,要的内存是用来干嘛的?一般是不是需要比较小的内存就ok?

wwmjeff

赞同来自:

1.算pi值的那个例子中,有slices,这个跟配置文件中的线程数有关系吗? 2.pom.xml中写的是参数配置吗?如果写在pom里了,在执行jar包的时候还用加上参数吗 谢谢

xuyifei

赞同来自:

请问董老师,SparkContext里的Logging  Trait 里 调用了这个trait的一个变量的时候,object里的语句都会执行的,这是为什么

shareshow

赞同来自:

如何把单机程序转为并行程序呢?

wwmjeff

赞同来自:

如果不用maven,直接在程序中设置参数以及打包可以运行吗?  

ioridong

赞同来自:

调试spark程序时候,在win上idea里写scala代码,hadoop及spark安装在虚拟机里,那么在win宿主机上怎么调试?

xiaohe001

赞同来自:

老师不能不录制个IDEA搭建的视频,包括Maven中每个配置的含义,对于我们这些非Java程序员很有必要啊

ShadowFiend

赞同来自:

一张表,如果其中有些数据会删除或者增加变动,也可以用cache吗,做实时计算的时候

Dong - Hulu

赞同来自:

时间可能比较赶,我比较想看到 老师在IDEA上直接操作 多种模式,因为本人在IDEA提交spark程序的时候,yarn-client模式会报错,其他模式没有问题!

andyzhang

赞同来自:

可以单独设置RDD中每个partition的缓存策略吗?

andyzhang

赞同来自:

我看官网还有一个deploy-model参数,这个参数与master参数之间是什么关系呢?

xbyang18

赞同来自:

老师PPT的在哪里下载了?

yjyzsl

赞同来自:

在简易电影受众系统中得分最高的10部电影的例子,在TopKMovieAnalyzer.scala中 val topKScoreMostMovie = ratings.map{x =>   (x._2, (x._3.toInt, 1)) }.reduceByKey { (v1, v2) =>   (v1._1 + v2._1, v1._2 + v2._2) }.map { x =>   (x._2._1.toFloat / x._2._2.toFloat, x._1) }.sortByKey(false).     take(10).     foreach(println) reduceByKey { (v1, v2) => (v1._1 + v2._1, v1._2 + v2._2)}这个是不是对电影评分和评分人数量进行一个求和的统计,但是不明白为什么是这么写的 v1,v2是指什么,v1._1 + v2._1是不是表示是电影评分的叠加,v1._2 + v2._2是不是表示评分人数的叠加。

命运

赞同来自:

同学们,如何提高spark读取hbase的效率?
1476677037887.png
 

luckyzkk

赞同来自:

请教老师,如果分配给driver和executor超过每台物理机的内存,会怎样?

ironicshuang

赞同来自:

老师,能否每次课把代码课上讲过的代码发给我们呢?比如第二课将spark rdd的那几个算子的代码。

要回复问题请先登录注册