简介
- insertInto 写入表
- save(path) 写为文件
- rdd.saveAsHadoopFile 写为文件
优化
众所周知,hadoop写入文件有两种算法,算法一和算法二。分别介绍下算法一和算法二的区别以及使用场景。建议将算法设置为V2.
算法一
hadoop 3.0 以下版本默认采用该算法。该段转载自:过往记忆
设置参数为:
1
| mapreduce.fileoutputcommitter.algorithm.version = 1
|
此种算法写入文件的过程为:
- 先写入临时目录
- 将临时目录中的数据移动到对应的task中
- 将task中的数据移动到最终的输出目录
算法二
设置参数为:
1
| mapreduce.fileoutputcommitter.algorithm.version = 2
|
此种算法写入文件的过程为:
- 先写入临时目录
将临时目录中的数据移动到对应的task中
- 将临时目录中的数据直接移动到最终的输出目录
设置
conf/spark-defaults.conf
全局设置
1
| spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2
|
程序:spark.conf
Job级别
1
| spark.conf.set("mapreduce.fileoutputcommitter.algorithm.version", "2")
|
程序:DataSet.write
Job级别
1
| dataset.write.option("mapreduce.fileoutputcommitter.algorithm.version", "2")
|
insertInto
直接将dataframe
写入hive
表中。
样例
1
| df.write.mode("append").insertInto("tbl_name")
|
问题
- 如果设置的shuffle数量太多,会造成小文件的问题
- 如果设置的shuffle数量太少,会造成写入性能慢的问题
最佳实践
- 设置合适的shuffle数量进行写入
- 写入之后再合并小文件
save文件
将dataframe
先写入文件中,在将hive
表与文件关联起来。
样例
1 2 3 4 5
| df.write.partitionBy("org_id", "log_date").mode("append").option("compression", "snappy").format("orc").save(resPath)
spark.sql("msck repair table odl.test").show()
|
问题
- 如果设置的shuffle数量太多,会造成小文件的问题
- 如果设置的shuffle数量太少,会造成写入性能慢的问题
saveAsHadoopFile
按照自定义分区标准,将rdd
写为文件,再load
到hive
表中。
样例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| import org.apache.hadoop.io.{LongWritable, Text} import org.apache.hadoop.mapred.TextInputFormat import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat import org.apache.spark.HashPartitioner
class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] { override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = { key.asInstanceOf[String] + "/" + name }
override def generateActualKey(key: Any, value: Any): AnyRef = null
}
def writeHadoopFile(spark: SparkSession, path: String, resPath: String, partitions: Int): Unit = { val srcDf = spark.read.option("header", "true").csv(path) srcDf.rdd.map(row => { val org_id = row.getAs("org_id").asInstanceOf[String] val log_date = CommonUtil.dateFormat(row.getAs("log_date").asInstanceOf[String], "yyyy") var partitionName = "" if (org_id != null) { partitionName = partitionName + "org_id=" + org_id + "/" if (log_date != null) { partitionName = partitionName + "log_date=" + log_date + "/" } } (partitionName, row.mkString("\t")) }).partitionBy(new HashPartitioner(partitions)) .saveAsHadoopFile(resPath, classOf[String], classOf[String], classOf[RDDMultipleTextOutputFormat])
} spark.sql("LOAD DATA [LOCAL] INPATH 'filepath' [OVERWRITE] INTO TABLE tablename [PARTITION (partcol1=val1, partcol2=val2 ...)]").show()
|
问题
- 本地执行没有问题,但在集群上运行时会出现
文件已存在
的问题,暂未解决。