Spark的任务执行逻辑与Hive会有些差异,对于不同的语句,不同的数据源,表现形式也是不一样的,以insert overwrite举例:

insert overwrite table source select * from target

类似这种SQL,这种SQL下,结果是会把target的内容删除,再把source的数据拿过来覆盖,这里会有个问题,就是假设作业失败了,那么target的数据是否会被删除?

看一个例子:

spark-sql中执行:

create database demo; #创建一个hdfs的db
use demo;
set hive.exec.dynamic.partition.mode=nonstrict;

# 创建源表和插入数据
create table dp_dpartition(id int, name string) partitioned by(ct string, step string);
INSERT INTO TABLE dp_dpartition VALUES (1, 'test01', "2020", "01"), (2, 'test02', "2022", "02"), (3, 'test03', "2023", "03");
INSERT INTO TABLE dp_dpartition VALUES (1, 'test01', "2020", "04"), (2, 'test02', "2022", "05"), (3, 'test03', "2023", "06");

# 创建目标表,可以使用orc或者parquet格式
create table dp_dpartition2(id int, name string) partitioned by(ct string, step string) using orc;

spark-shell中执行:

val stem="select * from demo.dp_dpartition";
spark.sql(stem).repartition(1).createOrReplaceTempView("source_table")

再删除source的table path,模拟源不存在的情况,造成作业失败:

hadoop  fs -rm -r /warehouse/tablespace/managed/hive/demo.db/dp_dpartition 在命令行执行删除源目录,模拟源表缺失

继续在spark-shell中执行:

spark.sql(s"insert overwrite table demo.dp_dpartition2 select * from source_table")

查看目录:

hadoop  fs -ls  /warehouse/tablespace/managed/hive/demo.db/dp_dpartition2

查看目录,能发现target表会被删,也就是在Spark下,对于非text 格式的表,一旦作业失败,无论是什么原因,都会把target的数据清空,这是Spark开源行为。

从现象来说,其实不是很合理,比较作业失败了,target又被删了,新数据又没有进来,这个空档区间的数据就没了,所以需要上层业务额外做处理,例如做好前置检查和重试。

对应的代码是 src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala中:

// For dynamic partition overwrite, we never remove partitions but only update existing
// ones.
if (mode == SaveMode.Overwrite && !dynamicPartitionOverwrite) {
val deletedPartitions = initialMatchingPartitions.toSet -- updatedPartitions
if (deletedPartitions.nonEmpty) {
AlterTableDropPartitionCommand(
catalogTable.get.identifier, deletedPartitions.toSeq,
ifExists = true, purge = false,
retainData = true /* already deleted */).run(sparkSession)
}
}

也可以通过设置spark的配置文件:

hive.exec.orc.split.strategy=BI
spark.sql.hive.convertMetastoreOrc=false
spark.sql.orc.impl=hive(默认值native)

解决。

在insert前,会通过AlterTableDropPartitionCommand进行删除, 还有一种情况需要注意,就是如果存储采用对象存储,那么在spark下staging path和table path会是同一个,所以对于作业失败后,需要清除staging path需要额外处理。


扫码手机观看或分享: