/** * NOTE: PLEASE READ THIS CAREFULLY * * Even though [[HoodieFileIndex]] initializes eagerly listing all of the files w/in the given Hudi table, * this variable itself is _lazy_ (and have to stay that way) which guarantees that it's not initialized, until * it's actually accessed */ protected lazy val fileIndex: HoodieFileIndex = HoodieFileIndex(sparkSession, metaClient, Some(tableStructSchema), optParams, FileStatusCache.getOrCreate(sparkSession))
val fsView = new HoodieTableFileSystemView(metaClient, timeline, partitionDirs.flatMap(_.files).toArray) val latestBaseFiles = fsView.getLatestBaseFiles.iterator().asScala.toList.map(_.getFileStatus)
private def readDataSourceTable( table: CatalogTable, extraOptions: CaseInsensitiveStringMap): LogicalPlan = { val qualifiedTableName = QualifiedTableName(table.database, table.identifier.table) val catalog = sparkSession.sessionState.catalog val dsOptions = DataSourceUtils.generateDatasourceOptions(extraOptions, table) catalog.getCachedPlan(qualifiedTableName, () => { val dataSource = DataSource( sparkSession, // In older version(prior to 2.1) of Spark, the table schema can be empty and should be // inferred at runtime. We should still support it. userSpecifiedSchema = if (table.schema.isEmpty) None else Some(table.schema), partitionColumns = table.partitionColumnNames, bucketSpec = table.bucketSpec, className = table.provider.get, options = dsOptions, catalogTable = Some(table)) LogicalRelation(dataSource.resolveRelation(checkFilesExist = false), table) }) }
val METADATA_CACHE_TTL_SECONDS = buildStaticConf("spark.sql.metadataCacheTTLSeconds") .doc("Time-to-live (TTL) value for the metadata caches: partition file metadata cache and " + "session catalog cache. This configuration only has an effect when this value having " + "a positive value (> 0). It also requires setting " + s"'${StaticSQLConf.CATALOG_IMPLEMENTATION.key}' to `hive`, setting " + s"'${SQLConf.HIVE_FILESOURCE_PARTITION_FILE_CACHE_SIZE.key}' > 0 and setting " + s"'${SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key}' to `true` " + "to be applied to the partition file metadata cache.") .version("3.1.0") .timeConf(TimeUnit.SECONDS) .createWithDefault(-1)