第三课_Spark内部原理剖析与源码阅读

第三课_Spark内部原理剖析与源码阅读的相关问题都在下面进行提问回帖
1、大家在这个帖子上回复自己想要提的问题。(相同的问题,请点赞表示自己关注这个问题,不用重复提问)
2、提出的问题,老师在直播课的最后30分钟统一回答。
3、课后会整理出参考答案,给每个问题回帖。
第三课课后调查问卷:
https://wj.qq.com/s/1271712/e86d
或者扫二维码进行填写
[attach]5595[/attach]
 
已邀请:

MichaelFly

赞同来自: zstorm baoshan liuzhixin137

日志格式:日志产生时间,appid,........
flume采集日志落地到hdfs会有延迟,如何保证数据临界点的数据落地到对应分区?
比如:2017-04-12 23:59:59     appid ,发送到hdfs时已经是 2017-04-13 00:00:01 ,本来数据是20170412的数据但是落地到了20170413,这种问题通过flume如何解决? 谢谢老师了

run_psw

赞同来自: Angel儱唲 贰怪兽lyn

董老师,我再补充两个问题
 
1、关于源码阅读的,一个新手,在刚刚接触到一个新框架,比如说spark,我该如何来阅读spark的源码呢?
 
2、关于spark GC的问题。spark 的GC 最频繁的环节应该是executor执行task。请问这个时候,是否是执行完一个task就gc一次还是怎么样?能够请老师讲以spark 程序运行为案例,讲解一下spark   GC 过程和情况 
 
3、能否简单的介绍下spark 的 tunsgen project 。在什么问题背景下诞生的,现在的情况,将来的方向。

run_psw

赞同来自: 雨果等夏天

老师您好!问您三个问题
 
1、spark在submit任务的时候,如何考虑--executor-cores    --nums-executor   --executor-memory 这几个参数。特别是在海量数据的情况下。这里先考虑代码调优的情况,就比如说是个简单但数据量很大很大的etl 作业。
 
2、task 的个数和stage中的那个RDD 的partition 相等?map 操作应该stage中每个rdd 的paritio是一样的。filter 这种操作呢?
 
3、希望老师分享一些代码调优结合这种资源分配的经验。知道这是下一节课会讲的类容,但是因为调优经验可能不是一问,老师立马就能够完整的把自己经验分享出来,所以提前问一下。希望引起老师的重视。其实关于调优的方法,很多人都知道,但是实际操作起来的时候,还是没那么容易的。别是那些参数之间的关系?
 
 
 
 

呼噜噜

赞同来自: heming621

为什么说shuffle中间结果要写磁盘?spark内存不是分为执行内存和存储内存,执行内存不存中间结果吗?还有off-heap怎样理解?
yarn上提交spark,一直都是accepted状态,运行一段时间 后变为fail状态。如下图

[attach]5576[/attach]
 
提交shell脚本:
[attach]5577[/attach],感觉没有什么问题呀
 
 
如何解决flume内存chanel数据丢失处理问题
spark-sbumit目前有三种方式:mesos  yarn   standalone。各自优缺点是什么?目前见到网上有许多人是推荐yarn方式,仅仅是因为yarn方式不需要搭建spark集群?还是因为yarn集群的目前资源调度的性能是目前最优?
 

zstorm - 80后IT男

老师您好
1.用hive语句(tez引擎)处理hbase数据的时候,效率很高,但是因为hbase字段太多,处理很困难。使用mapreduce程序处理hbase数据的时候,逻辑是清楚了,但是效率特别低,Hbase的Filter也无法带来性能提升。
问题:使用spark处理hbase数据(数据量好几个T,字段几百个)的时候效率如何,加入Hbase的Filter能否有性能提升?
2.在演练spark2.1课件的时候发现api与课件中的不一样,课件中应该是1.x的,希望老师能将课件升级一下[code=Java]val sparkConf = new SparkConf().setMaster("local").setAppName("SparkPi")
val sc = new SparkContext(sparkConf)
val numSlices = 3
val n = 1000000000
val counter0 = sc.longAccumulator("counter0")
val counter1 = sc.longAccumulator("counter1")
val counter2 = sc.longAccumulator("counter2")
sc.parallelize(1 to n, numSlices)
.map { i =>
counter0.add(1)
val x = Math.random() * 2 - 1
val y = Math.random() * 2 - 1
val dist = x * x + y * y
if (dist < 1) {
counter1.add(1)
} else {
counter2.add(1)
}
if (dist < 1) 1 else 0
}.reduce(_ + _)[/code]
老师您好,请教一个Spark开发的问题。
我们想用spark实现数据分组求20日均的场景。
数据源提取的 RDD包含 用户、日期、当天交易额等相关字段。想给每个用户每天计算出前20天均线,类似Oracle中的Over partition by。这样每一条记录都都需要用到该用户前10天的交易量信息,在目前的transformer、action中都没有合适的方法。请老师给个实现的思路。谢谢
董老师,您好!我再问个问题
 
从数据的角度,宏观的来看shuffle,是不是可以这么理解,RDD中的每个partition里面的数据,都发生的从一个partition到另一个partition的移动或者说转化呢?

fanniao - 青山不改,绿水长流

请教下2个问题:
1、spark在运行任务时,如运行多个任务,怎么控制每个任务占用多少内存、CPU资源?
2、on yarn 和 alone,的控制资源方式,配置一样吗?
谢谢。
[code=Java]
 [/code][code=Java]
windows开发环境中:[/code][code=Java]var masterUrl-1 = "local[1]"
var masterUrl-2="spark://server01:7077"[/code][code=Java]masterUrl-1 本地运行没问题,[/code][code=Java]如何用masterUrl-2联接远程的spark???[/code]
spark中什么情况下用Kryo序列化?
 
请问下spark on yarn 报如下异常  什么原因呢? 
17/04/11 15:39:05 WARN scheduler.TaskSetManager: Lost task 20.3 in stage 11.0 (TID 2011, slave4): ExecutorLostFailure (executor 11 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 9.0 GB of 9 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
17/04/11 15:39:05 ERROR scheduler.TaskSetManager: Task 20 in stage 11.0 failed 4 times; aborting job
 
 
一般spark异常解决思路是怎么样呢
 
spark-submit 启动应用时,num-executors 和 executor-cores 
num-executors=100
executor-cores=1

num-executors=50
executor-cores=2

executors-total 是一样的,有啥区别?
shuffle read会从map拉去的数据的会写磁盘吗? 然后再来一个全局排序的时候?数据会全部加载到内存吗?
不同的应用,我怎么确定CPU的消耗和内存的消耗,总体分配多少,从而分配executor的数目以及每个executor的cpu和内存?
老师,executor中一个core就是一个线程?一个task就需要一个线程处理?是这样对应的关系吗
老师您好, 
我有一个List,里面存的是文件列表,为了并行处理这些文件,所以把文件的处理方法写到了list.map里,因为要做读取和写出操作,需要用到sparkSession,比如list.map(f=>function(sparkSession, f)),可是在map里用sparkSession会出错, 说不能序列化,请问老师有什么建议?
老师想问一下shuffle read/write 使用的计算内存还是数据内存,也就是使用的是监控界面里面executor页面的内存吗?
[code=Java]董老师,您好,
[/code][code=Java]我通过 ./spark-shell --master yarn启动了spark 。
[/code][code=Java]
val userrdd = sc.textFile("/opt/data/movie/users.dat")
当运行 userrdd.count 时出现以下错误,请问是怎么回事呢?
[/code][code=Java]SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/hadoop-2.5.1/nm-local-dir/usercache/root/filecache/28/__spark_libs__2285737196423591043.zip/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/software/hadoop-2.5.1/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
17/04/16 18:54:54 INFO executor.CoarseGrainedExecutorBackend: Started daemon with process name: 24407@master
17/04/16 18:54:54 INFO util.SignalUtils: Registered signal handler for TERM
17/04/16 18:54:54 INFO util.SignalUtils: Registered signal handler for HUP
17/04/16 18:54:54 INFO util.SignalUtils: Registered signal handler for INT
17/04/16 18:54:55 INFO spark.SecurityManager: Changing view acls to: root
17/04/16 18:54:55 INFO spark.SecurityManager: Changing modify acls to: root
17/04/16 18:54:55 INFO spark.SecurityManager: Changing view acls groups to:
17/04/16 18:54:55 INFO spark.SecurityManager: Changing modify acls groups to:
17/04/16 18:54:55 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); groups with view permissions: Set(); users with modify permissions: Set(root); groups with modify permissions: Set()
17/04/16 18:54:56 INFO client.TransportClientFactory: Successfully created connection to /10.135.111.231:48282 after 104 ms (0 ms spent in bootstraps)
17/04/16 18:54:56 INFO spark.SecurityManager: Changing view acls to: root
17/04/16 18:54:56 INFO spark.SecurityManager: Changing modify acls to: root
17/04/16 18:54:56 INFO spark.SecurityManager: Changing view acls groups to:
17/04/16 18:54:56 INFO spark.SecurityManager: Changing modify acls groups to:
17/04/16 18:54:56 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); groups with view permissions: Set(); users with modify permissions: Set(root); groups with modify permissions: Set()
17/04/16 18:54:56 INFO client.TransportClientFactory: Successfully created connection to /10.135.111.231:48282 after 2 ms (0 ms spent in bootstraps)
17/04/16 18:54:56 INFO storage.DiskBlockManager: Created local directory at /opt/hadoop-2.5.1/nm-local-dir/usercache/root/appcache/application_1492328023585_0010/blockmgr-55d5a33c-e918-4dd8-af78-38c7d035c60b
17/04/16 18:54:56 INFO memory.MemoryStore: MemoryStore started with capacity 413.9 MB
17/04/16 18:54:57 INFO executor.CoarseGrainedExecutorBackend: Connecting to driver: spark://CoarseGrainedScheduler@10.135.111.231:48282
17/04/16 18:54:57 INFO executor.CoarseGrainedExecutorBackend: Successfully registered with driver
17/04/16 18:54:57 INFO executor.Executor: Starting executor ID 1 on host master
17/04/16 18:54:57 INFO util.Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 43664.
17/04/16 18:54:57 INFO netty.NettyBlockTransferService: Server created on master:43664
17/04/16 18:54:57 INFO storage.BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
17/04/16 18:54:57 INFO storage.BlockManagerMaster: Registering BlockManager BlockManagerId(1, master, 43664, None)
17/04/16 18:54:57 INFO storage.BlockManagerMaster: Registered BlockManager BlockManagerId(1, master, 43664, None)
17/04/16 18:54:57 INFO storage.BlockManager: Initialized BlockManager: BlockManagerId(1, master, 43664, None)
17/04/16 18:54:57 INFO executor.Executor: Using REPL class URI: spark://10.135.111.231:48282/classes
17/04/16 18:56:57 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 0
17/04/16 18:56:57 INFO executor.Executor: Running task 0.0 in stage 0.0 (TID 0)
17/04/16 18:56:58 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 1
17/04/16 18:56:58 INFO client.TransportClientFactory: Successfully created connection to /10.135.111.231:35165 after 2 ms (0 ms spent in bootstraps)
17/04/16 18:56:58 INFO memory.MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 1889.0 B, free 413.9 MB)
17/04/16 18:56:58 INFO broadcast.TorrentBroadcast: Reading broadcast variable 1 took 179 ms
17/04/16 18:56:58 INFO memory.MemoryStore: Block broadcast_1 stored as values in memory (estimated size 3.1 KB, free 413.9 MB)
17/04/16 18:56:58 INFO rdd.HadoopRDD: Input split: hdfs://master:9000/opt/data/movie/users.dat:0+67184
17/04/16 18:56:58 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 0
17/04/16 18:56:58 INFO memory.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 23.4 KB, free 413.9 MB)
17/04/16 18:56:58 INFO broadcast.TorrentBroadcast: Reading broadcast variable 0 took 11 ms
17/04/16 18:56:58 INFO memory.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 296.9 KB, free 413.6 MB)
17/04/16 18:56:59 INFO Configuration.deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
17/04/16 18:56:59 INFO Configuration.deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
17/04/16 18:56:59 INFO Configuration.deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
17/04/16 18:56:59 INFO Configuration.deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
17/04/16 18:56:59 INFO Configuration.deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
17/04/16 18:56:59 ERROR executor.Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.UnsatisfiedLinkError: org.apache.hadoop.util.NativeCrc32.nativeComputeChunkedSums(IILjava/nio/ByteBuffer;ILjava/nio/ByteBuffer;IILjava/lang/String;JZ)V
at org.apache.hadoop.util.NativeCrc32.nativeComputeChunkedSums(Native Method)
at org.apache.hadoop.util.NativeCrc32.verifyChunkedSums(NativeCrc32.java:59)
at org.apache.hadoop.util.DataChecksum.verifyChunkedSums(DataChecksum.java:301)
at org.apache.hadoop.hdfs.RemoteBlockReader2.readNextPacket(RemoteBlockReader2.java:231)
at org.apache.hadoop.hdfs.RemoteBlockReader2.read(RemoteBlockReader2.java:152)
at org.apache.hadoop.hdfs.DFSInputStream$ByteArrayStrategy.doRead(DFSInputStream.java:775)
at org.apache.hadoop.hdfs.DFSInputStream.readBuffer(DFSInputStream.java:831)
at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:891)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:934)
at java.io.DataInputStream.read(DataInputStream.java:149)
at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.fillBuffer(UncompressedSplitLineReader.java:62)
at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.readLine(UncompressedSplitLineReader.java:94)
at org.apache.hadoop.mapred.LineRecordReader.skipUtfByteOrderMark(LineRecordReader.java:208)
at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:246)
at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:48)
at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:266)
at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:211)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1760)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1157)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
17/04/16 18:57:00 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 1
17/04/16 18:57:00 INFO executor.Executor: Running task 1.0 in stage 0.0 (TID 1)
17/04/16 18:57:00 INFO rdd.HadoopRDD: Input split: hdfs://master:9000/opt/data/movie/users.dat:67184+67184
17/04/16 18:57:00 ERROR executor.Executor: Exception in task 1.0 in stage 0.0 (TID 1)
java.lang.UnsatisfiedLinkError: org.apache.hadoop.util.NativeCrc32.nativeComputeChunkedSums(IILjava/nio/ByteBuffer;ILjava/nio/ByteBuffer;IILjava/lang/String;JZ)V
at org.apache.hadoop.util.NativeCrc32.nativeComputeChunkedSums(Native Method)
at org.apache.hadoop.util.NativeCrc32.verifyChunkedSums(NativeCrc32.java:59)
at org.apache.hadoop.util.DataChecksum.verifyChunkedSums(DataChecksum.java:301)
at org.apache.hadoop.hdfs.RemoteBlockReader2.readNextPacket(RemoteBlockReader2.java:231)
at org.apache.hadoop.hdfs.RemoteBlockReader2.read(RemoteBlockReader2.java:152)
at org.apache.hadoop.hdfs.DFSInputStream$ByteArrayStrategy.doRead(DFSInputStream.java:775)
at org.apache.hadoop.hdfs.DFSInputStream.readBuffer(DFSInputStream.java:831)
at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:891)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:934)
at java.io.DataInputStream.read(DataInputStream.java:149)
at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.fillBuffer(UncompressedSplitLineReader.java:62)
at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.readLine(UncompressedSplitLineReader.java:94)
at org.apache.hadoop.mapred.LineRecordReader.(LineRecordReader.java:136)
at org.apache.hadoop.mapred.TextInputFormat.getRecordReader(TextInputFormat.java:67)
at org.apache.spark.rdd.HadoopRDD$$anon$1.liftedTree1$1(HadoopRDD.scala:252)
at org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:251)
at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:211)
at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:102)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
17/04/16 18:57:00 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 2
17/04/16 18:57:00 INFO executor.Executor: Running task 0.1 in stage 0.0 (TID 2)
17/04/16 18:57:00 INFO rdd.HadoopRDD: Input split: hdfs://master:9000/opt/data/movie/users.dat:0+67184
17/04/16 18:57:00 ERROR executor.Executor: Exception in task 0.1 in stage 0.0 (TID 2)
java.lang.UnsatisfiedLinkError: org.apache.hadoop.util.NativeCrc32.nativeComputeChunkedSums(IILjava/nio/ByteBuffer;ILjava/nio/ByteBuffer;IILjava/lang/String;JZ)V
at org.apache.hadoop.util.NativeCrc32.nativeComputeChunkedSums(Native Method)
at org.apache.hadoop.util.NativeCrc32.verifyChunkedSums(NativeCrc32.java:59)
at org.apache.hadoop.util.DataChecksum.verifyChunkedSums(DataChecksum.java:301)
at org.apache.hadoop.hdfs.RemoteBlockReader2.readNextPacket(RemoteBlockReader2.java:231)
at org.apache.hadoop.hdfs.RemoteBlockReader2.read(RemoteBlockReader2.java:152)
at org.apache.hadoop.hdfs.DFSInputStream$ByteArrayStrategy.doRead(DFSInputStream.java:775)
at org.apache.hadoop.hdfs.DFSInputStream.readBuffer(DFSInputStream.java:831)
at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:891)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:934)
at java.io.DataInputStream.read(DataInputStream.java:149)
at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.fillBuffer(UncompressedSplitLineReader.java:62)
at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.readLine(UncompressedSplitLineReader.java:94)
at org.apache.hadoop.mapred.LineRecordReader.skipUtfByteOrderMark(LineRecordReader.java:208)
at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:246)
at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:48)
at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:266)
at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:211)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1760)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1157)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
17/04/16 18:57:00 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 3
17/04/16 18:57:00 INFO executor.Executor: Running task 1.1 in stage 0.0 (TID 3)
17/04/16 18:57:00 INFO rdd.HadoopRDD: Input split: hdfs://master:9000/opt/data/movie/users.dat:67184+67184
17/04/16 18:57:00 ERROR executor.Executor: Exception in task 1.1 in stage 0.0 (TID 3)
java.lang.UnsatisfiedLinkError: org.apache.hadoop.util.NativeCrc32.nativeComputeChunkedSums(IILjava/nio/ByteBuffer;ILjava/nio/ByteBuffer;IILjava/lang/String;JZ)V
at org.apache.hadoop.util.NativeCrc32.nativeComputeChunkedSums(Native Method)
at org.apache.hadoop.util.NativeCrc32.verifyChunkedSums(NativeCrc32.java:59)
at org.apache.hadoop.util.DataChecksum.verifyChunkedSums(DataChecksum.java:301)
at org.apache.hadoop.hdfs.RemoteBlockReader2.readNextPacket(RemoteBlockReader2.java:231)
at org.apache.hadoop.hdfs.RemoteBlockReader2.read(RemoteBlockReader2.java:152)
at org.apache.hadoop.hdfs.DFSInputStream$ByteArrayStrategy.doRead(DFSInputStream.java:775)
at org.apache.hadoop.hdfs.DFSInputStream.readBuffer(DFSInputStream.java:831)
at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:891)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:934)
at java.io.DataInputStream.read(DataInputStream.java:149)
at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.fillBuffer(UncompressedSplitLineReader.java:62)
at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.readLine(UncompressedSplitLineReader.java:94)
at org.apache.hadoop.mapred.LineRecordReader.(LineRecordReader.java:136)
at org.apache.hadoop.mapred.TextInputFormat.getRecordReader(TextInputFormat.java:67)
at org.apache.spark.rdd.HadoopRDD$$anon$1.liftedTree1$1(HadoopRDD.scala:252)
at org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:251)
at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:211)
at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:102)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
17/04/16 18:57:00 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 4
17/04/16 18:57:00 INFO executor.Executor: Running task 0.2 in stage 0.0 (TID 4)
17/04/16 18:57:00 INFO rdd.HadoopRDD: Input split: hdfs://master:9000/opt/data/movie/users.dat:0+67184
17/04/16 18:57:00 ERROR executor.Executor: Exception in task 0.2 in stage 0.0 (TID 4)
java.lang.UnsatisfiedLinkError: org.apache.hadoop.util.NativeCrc32.nativeComputeChunkedSums(IILjava/nio/ByteBuffer;ILjava/nio/ByteBuffer;IILjava/lang/String;JZ)V
at org.apache.hadoop.util.NativeCrc32.nativeComputeChunkedSums(Native Method)
at org.apache.hadoop.util.NativeCrc32.verifyChunkedSums(NativeCrc32.java:59)
at org.apache.hadoop.util.DataChecksum.verifyChunkedSums(DataChecksum.java:301)
at org.apache.hadoop.hdfs.RemoteBlockReader2.readNextPacket(RemoteBlockReader2.java:231)
at org.apache.hadoop.hdfs.RemoteBlockReader2.read(RemoteBlockReader2.java:152)
at org.apache.hadoop.hdfs.DFSInputStream$ByteArrayStrategy.doRead(DFSInputStream.java:775)
at org.apache.hadoop.hdfs.DFSInputStream.readBuffer(DFSInputStream.java:831)
at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:891)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:934)
at java.io.DataInputStream.read(DataInputStream.java:149)
at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.fillBuffer(UncompressedSplitLineReader.java:62)
at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.readLine(UncompressedSplitLineReader.java:94)
at org.apache.hadoop.mapred.LineRecordReader.skipUtfByteOrderMark(LineRecordReader.java:208)
at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:246)
at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:48)
at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:266)
at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:211)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1760)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1157)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
17/04/16 18:57:00 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 5
17/04/16 18:57:00 INFO executor.Executor: Running task 1.2 in stage 0.0 (TID 5)
17/04/16 18:57:00 INFO rdd.HadoopRDD: Input split: hdfs://master:9000/opt/data/movie/users.dat:67184+67184
17/04/16 18:57:00 ERROR executor.Executor: Exception in task 1.2 in stage 0.0 (TID 5)
java.lang.UnsatisfiedLinkError: org.apache.hadoop.util.NativeCrc32.nativeComputeChunkedSums(IILjava/nio/ByteBuffer;ILjava/nio/ByteBuffer;IILjava/lang/String;JZ)V
at org.apache.hadoop.util.NativeCrc32.nativeComputeChunkedSums(Native Method)
at org.apache.hadoop.util.NativeCrc32.verifyChunkedSums(NativeCrc32.java:59)
at org.apache.hadoop.util.DataChecksum.verifyChunkedSums(DataChecksum.java:301)
at org.apache.hadoop.hdfs.RemoteBlockReader2.readNextPacket(RemoteBlockReader2.java:231)
at org.apache.hadoop.hdfs.RemoteBlockReader2.read(RemoteBlockReader2.java:152)
at org.apache.hadoop.hdfs.DFSInputStream$ByteArrayStrategy.doRead(DFSInputStream.java:775)
at org.apache.hadoop.hdfs.DFSInputStream.readBuffer(DFSInputStream.java:831)
at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:891)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:934)
at java.io.DataInputStream.read(DataInputStream.java:149)
at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.fillBuffer(UncompressedSplitLineReader.java:62)
at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.readLine(UncompressedSplitLineReader.java:94)
at org.apache.hadoop.mapred.LineRecordReader.(LineRecordReader.java:136)
at org.apache.hadoop.mapred.TextInputFormat.getRecordReader(TextInputFormat.java:67)
at org.apache.spark.rdd.HadoopRDD$$anon$1.liftedTree1$1(HadoopRDD.scala:252)
at org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:251)
at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:211)
at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:102)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
17/04/16 18:57:00 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 6
17/04/16 18:57:00 INFO executor.Executor: Running task 0.3 in stage 0.0 (TID 6)
17/04/16 18:57:00 INFO rdd.HadoopRDD: Input split: hdfs://master:9000/opt/data/movie/users.dat:0+67184
17/04/16 18:57:00 ERROR executor.Executor: Exception in task 0.3 in stage 0.0 (TID 6)
java.lang.UnsatisfiedLinkError: org.apache.hadoop.util.NativeCrc32.nativeComputeChunkedSums(IILjava/nio/ByteBuffer;ILjava/nio/ByteBuffer;IILjava/lang/String;JZ)V
at org.apache.hadoop.util.NativeCrc32.nativeComputeChunkedSums(Native Method)
at org.apache.hadoop.util.NativeCrc32.verifyChunkedSums(NativeCrc32.java:59)
at org.apache.hadoop.util.DataChecksum.verifyChunkedSums(DataChecksum.java:301)
at org.apache.hadoop.hdfs.RemoteBlockReader2.readNextPacket(RemoteBlockReader2.java:231)
at org.apache.hadoop.hdfs.RemoteBlockReader2.read(RemoteBlockReader2.java:152)
at org.apache.hadoop.hdfs.DFSInputStream$ByteArrayStrategy.doRead(DFSInputStream.java:775)
at org.apache.hadoop.hdfs.DFSInputStream.readBuffer(DFSInputStream.java:831)
at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:891)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:934)
at java.io.DataInputStream.read(DataInputStream.java:149)
at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.fillBuffer(UncompressedSplitLineReader.java:62)
at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.readLine(UncompressedSplitLineReader.java:94)
at org.apache.hadoop.mapred.LineRecordReader.skipUtfByteOrderMark(LineRecordReader.java:208)
at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:246)
at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:48)
at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:266)
at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:211)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1760)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1157)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
17/04/16 18:57:00 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 7
17/04/16 18:57:00 INFO executor.Executor: Running task 1.3 in stage 0.0 (TID 7)
17/04/16 18:57:00 INFO rdd.HadoopRDD: Input split: hdfs://master:9000/opt/data/movie/users.dat:67184+67184
17/04/16 18:57:00 INFO executor.Executor: Executor is trying to kill task 1.3 in stage 0.0 (TID 7)
17/04/16 18:57:00 ERROR executor.Executor: Exception in task 1.3 in stage 0.0 (TID 7)
java.lang.UnsatisfiedLinkError: org.apache.hadoop.util.NativeCrc32.nativeComputeChunkedSums(IILjava/nio/ByteBuffer;ILjava/nio/ByteBuffer;IILjava/lang/String;JZ)V
at org.apache.hadoop.util.NativeCrc32.nativeComputeChunkedSums(Native Method)
at org.apache.hadoop.util.NativeCrc32.verifyChunkedSums(NativeCrc32.java:59)
at org.apache.hadoop.util.DataChecksum.verifyChunkedSums(DataChecksum.java:301)
at org.apache.hadoop.hdfs.RemoteBlockReader2.readNextPacket(RemoteBlockReader2.java:231)
at org.apache.hadoop.hdfs.RemoteBlockReader2.read(RemoteBlockReader2.java:152)
at org.apache.hadoop.hdfs.DFSInputStream$ByteArrayStrategy.doRead(DFSInputStream.java:775)
at org.apache.hadoop.hdfs.DFSInputStream.readBuffer(DFSInputStream.java:831)
at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:891)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:934)
at java.io.DataInputStream.read(DataInputStream.java:149)
at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.fillBuffer(UncompressedSplitLineReader.java:62)
at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.readLine(UncompressedSplitLineReader.java:94)
at org.apache.hadoop.mapred.LineRecordReader.(LineRecordReader.java:136)
at org.apache.hadoop.mapred.TextInputFormat.getRecordReader(TextInputFormat.java:67)
at org.apache.spark.rdd.HadoopRDD$$anon$1.liftedTree1$1(HadoopRDD.scala:252)
at org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:251)
at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:211)
at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:102)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)[/code]
以上时log中的信息
请老师进一步阐述一下Sorted shuffle为什么比优化合并中间文件后的Hash-based shuffle性能更好?谢谢。
我的理解:
假设一个shuffle有1000个map任务和1000个reduce任务,
Shuffle write阶段:
优化合并中间文件后的Hash-based shuffle中,map端一个core会生成reduce任务个即1000个中间文件,只是追加内容,且不用排序;
Sorted shuffle中,map端一个core会生成map任务个即1000个中间数据文件,追加内容,但需要排序,并生成索引文件。
 
Shuffle read阶段:
两种方式的成本差不多,或者说Sorted shuffle的成本略高,因为有个根据索引文件定位内容再读取,而hashed shuffle是直接从确定的文件中取。
 
这么看来Hash-based shuffle没有排序和生产索引文件的成本,理论是比Sorted shuffle性能更好的,但为什么现在抛弃了优化后的Hash-based shuffle?
 
 
 
老师帮忙解答一下
由于spark 会生成有很多 partition 怎么控制连接数据库的连接数?

要回复问题请先登录注册