第六课_Spark+SQL程序设计基础(第二部分)

 第六课_Spark+SQL程序设计基础(第二部分)的相关问题都在下面进行提问回帖
1、大家在这个帖子上回复自己想要提的问题。(相同的问题,请点赞表示自己关注这个问题,不用重复提问)
2、提出的问题,老师在直播课的最后30分钟统一回答。
3、课后会整理出参考答案,给每个问题回帖。

流年12411 - 90后it

赞同来自:

spark-sql执行过程中报出的一些错误,经常报下列错误,不影响计算结果,董老师能不能帮忙解决一下: spark版本:1.6.3 集群配置:单节点 256G*48核,集群规模300+   on yarn模式 错误如下:
图片.png
图片.png
图片.png
 

谢国亮

赞同来自:

老师好,我尝试用pyspark在不同节点,以on-yarn的形式先后提交2个程序作业,但在yarn上,两个程序没有同时运行,而是等待1个程序运行结束后,yarn再启动另一个程序,请问如何设置才能确保在一个集群上同时运行多个程序,由yarn实现资源分配?谢谢

竹之信

赞同来自:

使用Spark SQL group by 后对某个字段执行  count(distinct()) 执行效率很低,有没有什么优化方法?

谢国亮

赞同来自:

老师好,在执行Dataframe.write.format('parquet').mode('append').partitionBy('year').saveAsTable('faster_data')后,会把Dataframe注册成spark的临时表吗?如何把'faster_data'写入hive中?

水晶紫爱睡觉

赞同来自:

运行spark 2.1.0源码中的代码报错运行org.apache.spark.examples.mllib.LDAExample时报错: Exception in thread "main" java.lang.NoClassDefFoundError: scala/collection/Seq     at org.apache.spark.examples.mllib.LDAExample.main(LDAExample.scala)     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)     at java.lang.reflect.Method.invoke(Method.java:497)     at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) Caused by: java.lang.ClassNotFoundException: scala.collection.Seq     at java.net.URLClassLoader.findClass(URLClassLoader.java:381)     at java.lang.ClassLoader.loadClass(ClassLoader.java:424)     at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)     at java.lang.ClassLoader.loadClass(ClassLoader.java:357)     ... 6 more  网上说导入scala-library.jar这个jar包,但是这个jar包明明在项目里面。

zqiu9958

赞同来自:

192.168.3.135节点的yarn故障,无法执行task,无限打印以下日志,google查无果.yarn nodemanager服务正常,其他节点正常 2017-04-25 12:14:40,002 WARN [main] org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager: EndpointShmManager(DatanodeInfoWithStorage[192.168.3.135:50010,DS-4cae65be-174a-42d5-a61c-0bd57ee957ef,DISK], parent=ShortCircuitShmManager(0a5b0b86)): error requesting short-circuit shared memory access: Failed to create shared file descriptor: open(/dev/shm/HadoopShortCircuitShm_DFSClient_attempt_1493010929204_0442_m_000014_0_749967435_1_1681479683, O_CREAT | O_EXCL | O_RDWR) failed: error 13 (Permission denied) 源码看不懂: private DfsClientShm requestNewShm(String clientName, DomainPeer peer) throws IOException { final DataOutputStream out = new DataOutputStream( new BufferedOutputStream(peer.getOutputStream())); new Sender(out).requestShortCircuitShm(clientName); ShortCircuitShmResponseProto resp = ShortCircuitShmResponseProto.parseFrom( PBHelper.vintPrefixed(peer.getInputStream())); String error = resp.hasError() ? resp.getError() : "(unknown)"; switch (resp.getStatus()) { case SUCCESS:case ERROR_UNSUPPORTED:default: // The datanode experienced some kind of unexpected error when trying to // create the short-circuit shared memory segment. LOG.warn(this + ": error requesting short-circuit shared memory " + "access: " + error); return null; }

hero_naicha

赞同来自:

在使用spark load csv文件的时候会报下面的Exception,应该是某个行的数据有问题,如何才能catich住这个Exception呢? Job aborted due to stage failure: Task 49 in stage 1.0 failed 4 times, most recent failure: Lost task 49.3 in stage 1.0 (TID 1414, ip-172-31-30-205.ap-southeast-1.compute.internal): com.univocity.parsers.common.TextParsingException: Length of parsed input (1000001) exceeds the maximum number of characters defined in your parser settings (1000000).   pseudo code: val spark = SparkSession       .builder()       .appName(jobName)       .enableHiveSupport()       .getOrCreate()     try {       for (table <- tablesList) {         val df = spark           .read           .format("com.databricks.spark.csv")           .option("header", "true")           .schema(Schema.getSchema(table))           .load(inputPath + s"${table}/${date}/")         df.createOrReplaceTempView(table)       }       val df = DFSql(date, timeConstraint)       saveToParquet(spark, df.outputevents, tempPath + "outputevents/")       saveToParquet(spark, df.outputsessions, tempPath + "outputsessions/")       saveToParquet(spark, df.outputstates, tempPath + "outputstates/")       saveToParquet(spark, df.outputeventparameters, tempPath + "outputeventparameters/")       saveToParquet(spark, df.outputuserattributes, tempPath + "outputuserattributes/")       moveToTransformedFolder(tempPath, outputPath, tablesList, date, dateFormat, lookBackDays.toInt, spark)     } catch {       case e: Exception => {         //do nothing       }     }     spark.stop()

过江小卒

赞同来自:

老师,有次课的数据是用python生成的,数据量是1g的,由于对python不熟,老师能讲下吗?

R.kelly

赞同来自:

董老师,请教下,生产集群由于某些原因导致集群时钟不一致,请问时钟校正一定要停集群服务有没有比这个代价更小的校正方案

会飞的象

赞同来自:

老师好:Spark-Sql的内部运行原理和Hive有什么不一样?

懒虫

赞同来自:

老师好,能不能直接取到parquet 文件的指定一行,如直接获取第105行 如果可以的话,我就可以使用bitmap来对parquet做各种索引了 可行吗?

陆仲达

赞同来自:

使用spark sql去访问mysql第一实例,和直接使用mysql本身的查询语句去查询数据,当sql相对复杂的情况下,spark sql的性能一定好吗,能提升明显吗? 因为数据库通常瓶颈在磁盘的io上。如果sql的表中数据上100亿,spark sql能解决问题吗,查询时间到5秒以内?

xmlnet

赞同来自:

老师好,请问sparkSession.read.jdbc 如何执行一个query返回dataframe, 我看网上很多的sample都是把全表加载到dataframe.

jhg22

赞同来自:

spark读取hive的表, hive-site.xml已经copy在conf目录,spark thrift server也启动正常,就是找不到hive表? def main(args: Array[String]): Unit = {     //select * from otvcloud_wd.t_isp_info limit 10;     val warehouseLocation = "/apps/hive/warehouse/xx.db/"     val spark = SparkSession       .builder()       .appName("Spark Hive Example")       .config("spark.sql.warehouse.dir", warehouseLocation)       .config("spark.sql.hive.thriftServer.singleSession", "true")       .enableHiveSupport()       .getOrCreate()     import spark.implicits._     import spark.sql     sql("show databases").show     sql("use otvcloud_wd")     sql("select * from t_area_info limit 10").show       }

nj_alan

赞同来自:

我有个表在AWS RDS中,表机构如下:  -------------+---------+--------  cust_id     | bigint  |  store_id    | bigint  |  week_id     | integer |  category_id | bigint  |  brand_id    | bigint  |  prod_id     | bigint  | 我要在这个表中基本不同的维度做聚合,比如prod_id或者category_id等等。 表中有20亿条记录,大概40个G。 基本每个维度的运算要2个小时。五个维度的话大概就要10个小时了。现在想用spark放在内存中然后用spark sql来做聚合运算。我的理解是首先把表缓存在内存中应该是要花点时间的然后存在内存之后的各种聚合运算应该是很快的。 每次通过jdbc访问然后cache到内存中总是报错。 请问老师如果优化呢?针对这个具体的案例有什么具体的建议? 对于集群的节点个数 和各个节点的配置有没有什么建议。 我都是在AWS 上建cluster的

水晶紫爱睡觉

赞同来自:

如何把hive-site.xml打包到jar包中??

run_psw

赞同来自:

请问老师 1、打包再jar中的时候,hive-site.xml 应该放在那个目录下面呢? 2、sql 如何直接将一个df 存入hive中呢?

frmark

赞同来自:

问一下老师,spark1.6的时候sparksql创建临时表是用registerTempTable和2.1的registerOrCreateTempView本质一样吗,都是sparkContext关闭的时候被drop掉吗?这个注册临时表是将元数据信息放到内存中吗,数据是不是还在hdfs上?

allen_cdh

赞同来自:

spark streaming里可以用Dstream转换成dataFrame API来操作吗?

930523

赞同来自:

老师你的ide是在window里面还是在linux里面?在windows里面的intellij IDE可以用spark local模式跑么,如果可以那它怎么与spark 通信的,需要指定spark所在服务器地址么?  

nimitz_007

赞同来自:

董老师,我调用spark.read.jdbc 读取一张mysql表,并用write.save()保存出错。我尝试读取一张200MB左右的表没有问题。如果我读一张700MB的表则会报错(必现)。报错信息如下:  WARN spark.HeartbeatReceiver: Removing executor 10 with no recent heartbeats: 166323 ms exceeds timeout 120000 ms 17/04/12 15:55:15 ERROR scheduler.TaskSchedulerImpl: Lost executor 10 on 172.21.102.93: Executor heartbeat timed out after 166323 ms 17/04/12 15:55:15 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, 172.21.102.93): ExecutorLostFailure (executor 10 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 166323 ms 17/04/12 15:55:25 ERROR scheduler.TaskSchedulerImpl: Lost executor 10 on 172.21.102.93: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.失败四次后,任务被终止。我用的是spark-shell(standlone调试)。也试过用yarn-client模式都一样 请问这是为什么? 我的配置是 spark-shell --master spark://172.21.102.93:7077 --executor-memory 4g --driver-cores 1 --executor-cores 1 --driver-memory 8g我的机子内存超过20GB,cpu核心也够。spark版本2.1.0

zqiu9958

赞同来自:

多个数据集根据相同的key做join怎么办

过江小卒

赞同来自:

DataFrame userDf = sqlContext.createDataFrame(rowRdd,structType);
userDf.registerTempTable();
sqlContext.registerDataFrameAsTable(userDf,
"student"
);
这两种方法把一个DataFrame注册成一个table,有区别吗

heming621

赞同来自:

Spark SQL对数据倾斜(即数据本地性)或任务倾斜有相应的优化操作吗?

Dong - Hulu

赞同来自:

df = a. join b   df 2 = df join c     SQL

xmlnet

赞同来自:

老师好,请问sparkSession.read.jdbc读取jdbc源的时候,如何执行一个query语句?

陆仲达

赞同来自:

使用spark sql去访问mysql单一实例,和直接使用mysql本身的查询语句去查询数据,当sql相对复杂的情况下,spark sql的性能一定好吗,能提升明显吗? 因为数据库通常瓶颈在磁盘的io上。如果sql的表中数据上100亿,spark sql能解决问题吗,查询时间比直接查询mysql快吗?

run_psw

赞同来自:

老师,请问如何将hive-site 设置到hiveConf 实例里面

要回复问题请先登录注册