Spark写parquet文件时,经过shuffle和不shuffle数据量 不同,shuffle后parquet文件压缩比降低

最近在做测试时遇到一个奇怪的问题,不能理解,问题描述如下:   对相同的一份数据进行读取并写出为parquet文件时,对数据集进行shuffle和不进行shuffle后生成的parquet文件大小不同,且相差较多。具体操作如下: 原始数据集为snappy压缩的46G parquet文件,文件大小从11M~1.5G不等,共100个文件,对文件读取后写出为不压缩的parquet文件:
val productDF = spark.read.parquet("/ingest/product/20180202/22-43/")
//读取结果集后直接写出,不进行shuffle
productDF
  .write.mode(org.apache.spark.sql.SaveMode.Overwrite)
  .option("compression", "none")
  .parquet("/processed/product/20180215/04-37/read_repartition_write/nonewithoutshuffle")
//读取结果集后,repartition为500个文件,shuffle后写出
productDF
  .repartition(500)
  .write.mode(org.apache.spark.sql.SaveMode.Overwrite)
  .option("compression", "none")
  .parquet("/processed/product/20180215/04-37/read_repartition_write/nonewithshuffle"
两次程序执行写出的结果文件,大小不同。其中第一次不经过shuffle直接写出,生成parquent文件为80G,而第二次经过shuffle后生成文件总大小为283G。   80GB parquet 文件元数据实例:
AVAILABLE:                       INT64 UNCOMPRESSED DO:0 FPO:456753 SZ:1452623/1452623/1.00 VC:11000100 ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: -1735747200000, max: 2524550400000, num_nulls: 7929352]
283 GB parquet 文件元数据示例:
AVAILABLE:                       INT64 UNCOMPRESSED DO:0 FPO:2800387 SZ:2593838/2593838/1.00 VC:3510100 ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: -2209136400000, max: 10413820800000, num_nulls: 2244255]
It seems, that parquet itself (with encoding?) much reduce size of data even without uncompressed data. How ? :) 同时我对80G的文件再次进行读取并repartition(500),生成283G文件。有两个问题不太明白:
  • 第一个是为什么spark在repartitioning/shuffle写出parquet文件后文件总大小会增大
  • 第二个问题是如何有效地shuffle spark中的数据,才能有效地对parquent文件进行编码/压缩?

要回复问题请先登录注册