Spark中Job执行失败后,对Target目录的处理机制
Spark的任务执行逻辑与Hive会有些差异,对于不同的语句,不同的数据源,表现形式也是不一样的,以insert overwrite举例:
insert overwrite table source select * from target |
类似这种SQL,这种SQL下,结果是会把target的内容删除,再把source的数据拿过来覆盖,这里会有个问题,就是假设作业失败了,那么target的数据是否会被删除?
看一个例子:
spark-sql中执行:
create database demo; #创建一个hdfs的db |
spark-shell中执行:
val stem="select * from demo.dp_dpartition"; |
再删除source的table path,模拟源不存在的情况,造成作业失败:
hadoop fs -rm -r /warehouse/tablespace/managed/hive/demo.db/dp_dpartition 在命令行执行删除源目录,模拟源表缺失 |
继续在spark-shell中执行:
spark.sql(s"insert overwrite table demo.dp_dpartition2 select * from source_table") |
查看目录:
hadoop fs -ls /warehouse/tablespace/managed/hive/demo.db/dp_dpartition2 |
查看目录,能发现target表会被删,也就是在Spark下,对于非text 格式的表,一旦作业失败,无论是什么原因,都会把target的数据清空,这是Spark开源行为。
从现象来说,其实不是很合理,比较作业失败了,target又被删了,新数据又没有进来,这个空档区间的数据就没了,所以需要上层业务额外做处理,例如做好前置检查和重试。
对应的代码是 src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
中:
// For dynamic partition overwrite, we never remove partitions but only update existing |
也可以通过设置spark的配置文件:
hive.exec.orc.split.strategy=BI |
解决。
在insert前,会通过AlterTableDropPartitionCommand
进行删除, 还有一种情况需要注意,就是如果存储采用对象存储,那么在spark下staging path和table path会是同一个,所以对于作业失败后,需要清除staging path需要额外处理。
扫码手机观看或分享: