0%

spark写文件优化

简介

  • insertInto 写入表
  • save(path) 写为文件
  • rdd.saveAsHadoopFile 写为文件

优化

众所周知,hadoop写入文件有两种算法,算法一和算法二。分别介绍下算法一和算法二的区别以及使用场景。建议将算法设置为V2.

算法一

hadoop 3.0 以下版本默认采用该算法。该段转载自:过往记忆

设置参数为:

1
mapreduce.fileoutputcommitter.algorithm.version = 1

此种算法写入文件的过程为:

  1. 先写入临时目录
  2. 将临时目录中的数据移动到对应的task中
  3. 将task中的数据移动到最终的输出目录

算法二

设置参数为:

1
mapreduce.fileoutputcommitter.algorithm.version = 2

此种算法写入文件的过程为:

  1. 先写入临时目录
  2. 将临时目录中的数据移动到对应的task中
  3. 将临时目录中的数据直接移动到最终的输出目录

设置

conf/spark-defaults.conf

全局设置

1
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2

程序:spark.conf

Job级别

1
spark.conf.set("mapreduce.fileoutputcommitter.algorithm.version", "2")

程序:DataSet.write

Job级别

1
dataset.write.option("mapreduce.fileoutputcommitter.algorithm.version", "2")

insertInto

直接将dataframe写入hive表中。

样例

1
df.write.mode("append").insertInto("tbl_name")

问题

  • 如果设置的shuffle数量太多,会造成小文件的问题
  • 如果设置的shuffle数量太少,会造成写入性能慢的问题

最佳实践

  • 设置合适的shuffle数量进行写入
  • 写入之后再合并小文件

save文件

dataframe先写入文件中,在将hive表与文件关联起来。

样例

1
2
3
4
5
// 1. 写为文件,将文件保存在表的目录下
df.write.partitionBy("org_id", "log_date").mode("append").option("compression", "snappy").format("orc").save(resPath)

// 2. 刷新表的元数据信息
spark.sql("msck repair table odl.test").show()

问题

  • 如果设置的shuffle数量太多,会造成小文件的问题
  • 如果设置的shuffle数量太少,会造成写入性能慢的问题

saveAsHadoopFile

按照自定义分区标准,将rdd写为文件,再loadhive表中。

样例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapred.TextInputFormat
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat
import org.apache.spark.HashPartitioner

/**
* Spark多文件输出(MultipleTextOutputFormat)
* 覆写 MultipleTextOutputFormat方法
*/
class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {
//管理key值,作为文件名,有/ 则为目录
override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = {
key.asInstanceOf[String] + "/" + name
}


//管理value值,是否将key写入文件
override def generateActualKey(key: Any, value: Any): AnyRef = null

}

/**
* 保存为文件
*/
def writeHadoopFile(spark: SparkSession, path: String, resPath: String, partitions: Int): Unit = {
val srcDf = spark.read.option("header", "true").csv(path)
srcDf.rdd.map(row => {
val org_id = row.getAs("org_id").asInstanceOf[String]
val log_date = CommonUtil.dateFormat(row.getAs("log_date").asInstanceOf[String], "yyyy")
var partitionName = ""
if (org_id != null) {
partitionName = partitionName + "org_id=" + org_id + "/"
if (log_date != null) {
partitionName = partitionName + "log_date=" + log_date + "/"
}
}
(partitionName, row.mkString("\t"))
}).partitionBy(new HashPartitioner(partitions))
.saveAsHadoopFile(resPath, classOf[String], classOf[String], classOf[RDDMultipleTextOutputFormat])

}
// 执行load文件操作
spark.sql("LOAD DATA [LOCAL] INPATH 'filepath' [OVERWRITE] INTO TABLE tablename [PARTITION (partcol1=val1, partcol2=val2 ...)]").show()

问题

  • 本地执行没有问题,但在集群上运行时会出现文件已存在的问题,暂未解决。