最近有一套大数据架构,中间的一个数据流是:

Flink 通过 CDC实时的把数据写入对象存储,采用的是hudi connector。
Flink开启了compaction机制,且频率较快,因为数据量比较大,所以可以理解为几乎持续在做合并。
在Hue中使用Spark SQL对表进行查询分析。

在这个链路下,碰到一个问题,问题是:

t1时刻:flink cdc不断的给对象存储写文件,假设写了10个文件,且开启了定期合并
t2时刻:建出一个SparkSession。执行一个SQL对表进行count,SQL可以成功执行
t3时刻:flink 启动定期合并,这时候会删除之前的10个,产生一个新的文件。
t4时刻:继续使用之前那个SparkSession对象执行SQL,会报错,报错被merge掉的文件找不到。

看起来就是对于一个已经存在的SparkSession,它并没有去感知对象存储中的文件发生的变化,开始以为是hudi的表格式的问题,于是去看了一下hudi的代码,对应到hudi(0.11.1)的代码是在:

hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala

在这个文件中,定义了用来记录一张表有哪些文件列表的meta信息,使用了lazy的形式进行加载:

/**
* NOTE: PLEASE READ THIS CAREFULLY
*
* Even though [[HoodieFileIndex]] initializes eagerly listing all of the files w/in the given Hudi table,
* this variable itself is _lazy_ (and have to stay that way) which guarantees that it's not initialized, until
* it's actually accessed
*/
protected lazy val fileIndex: HoodieFileIndex =
HoodieFileIndex(sparkSession, metaClient, Some(tableStructSchema), optParams,
FileStatusCache.getOrCreate(sparkSession))

由于 fileIndex 是lazy的,所以在定义的时候并不会去创建HoodieFileIndex对象,而是在使用的时候才会去初始化,对应到使用的地方是listLatestBaseFiles方法:

protected def listLatestBaseFiles(globbedPaths: Seq[Path], partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Map[Path, Seq[FileStatus]] = {
val partitionDirs = if (globbedPaths.isEmpty) {
fileIndex.listFiles(partitionFilters, dataFilters)
} else {
val inMemoryFileIndex = HoodieInMemoryFileIndex.create(sparkSession, globbedPaths)
inMemoryFileIndex.listFiles(partitionFilters, dataFilters)
}

val fsView = new HoodieTableFileSystemView(metaClient, timeline, partitionDirs.flatMap(_.files).toArray)
val latestBaseFiles = fsView.getLatestBaseFiles.iterator().asScala.toList.map(_.getFileStatus)

latestBaseFiles.groupBy(getPartitionPath)
}

通过断点发现,对fileIndex的初始化只有第一次的时候,才会进入,第二次后就不会进入,也就是第二次SQL开始,fileIndex始终拿到的是第一次初始化的值,从而导致了后续的SQL出现问题。

这时候直观感觉可能不是Hudi的问题,可能是问题出在Spark(3.2.1)端,基于错误的堆栈逐步查找,最后看到Spark的执行计划这部分,基本能找到原因了,原因是在Spark里面,对于同一个SparkSession,它会Cache住相同SQL的执行计划。

对应到代码的位置是:

org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala

中的FindDataSourceTable

private def readDataSourceTable(
table: CatalogTable, extraOptions: CaseInsensitiveStringMap): LogicalPlan = {
val qualifiedTableName = QualifiedTableName(table.database, table.identifier.table)
val catalog = sparkSession.sessionState.catalog
val dsOptions = DataSourceUtils.generateDatasourceOptions(extraOptions, table)
catalog.getCachedPlan(qualifiedTableName, () => {
val dataSource =
DataSource(
sparkSession,
// In older version(prior to 2.1) of Spark, the table schema can be empty and should be
// inferred at runtime. We should still support it.
userSpecifiedSchema = if (table.schema.isEmpty) None else Some(table.schema),
partitionColumns = table.partitionColumnNames,
bucketSpec = table.bucketSpec,
className = table.provider.get,
options = dsOptions,
catalogTable = Some(table))
LogicalRelation(dataSource.resolveRelation(checkFilesExist = false), table)
})
}

org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala的:

private val tableRelationCache: Cache[QualifiedTableName, LogicalPlan] = {
var builder = CacheBuilder.newBuilder()
.maximumSize(cacheSize)

if (cacheTTL > 0) {
builder = builder.expireAfterWrite(cacheTTL, TimeUnit.SECONDS)
}

builder.build[QualifiedTableName, LogicalPlan]()
}

可以看到,Spark对于相同的SQL,会基于DB和table产生qualifiedTableName,然后把这个里面的meta给cache下来,从而导致后面拿到的meta都是cache后的,因此一旦出现类似flink这种做了一个merge的动作。

对于一个已经存在的SparkSession是无法感知到对应的变化的,问题明确了,要解决问题的话,就比较容易了,一般来说可以有两种办法去解决,一种是关闭cache,另一种是缩短这个cache时间。

对于缩短cache时间的话,是调整spark.sql.metadataCacheTTLSeconds这个key,对应到引擎中的描述是:

val METADATA_CACHE_TTL_SECONDS = buildStaticConf("spark.sql.metadataCacheTTLSeconds")
.doc("Time-to-live (TTL) value for the metadata caches: partition file metadata cache and " +
"session catalog cache. This configuration only has an effect when this value having " +
"a positive value (> 0). It also requires setting " +
s"'${StaticSQLConf.CATALOG_IMPLEMENTATION.key}' to `hive`, setting " +
s"'${SQLConf.HIVE_FILESOURCE_PARTITION_FILE_CACHE_SIZE.key}' > 0 and setting " +
s"'${SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key}' to `true` " +
"to be applied to the partition file metadata cache.")
.version("3.1.0")
.timeConf(TimeUnit.SECONDS)
.createWithDefault(-1)

如果要关闭缓存的话,则可以将spark.sql.filesourceTableRelationCacheSize设置成0,这里本质上是将cache的容量调整为0,自然一个key都放不进去,约等于关闭了cache。


扫码手机观看或分享: