Skip to content

Commit

Permalink
[SPARK-45054][SQL] HiveExternalCatalog.listPartitions should restore …
Browse files Browse the repository at this point in the history
…partition statistics

### What changes were proposed in this pull request?

Call `restorePartitionMetadata` in `listPartitions` to restore Spark SQL statistics.

### Why are the changes needed?

Currently when `listPartitions` is called, it doesn't restore Spark SQL statistics stored in metastore, such as `spark.sql.statistics.totalSize`. This means callers who rely on stats from the method call may wrong results.

In particular, when `spark.sql.statistics.size.autoUpdate.enabled` is turned on, during insert overwrite Spark will first list partitions and get old statistics, and then compare them with new statistics and see which partitions need to be updated. This issue will sometimes cause it to update all partitions instead of only those partitions that have been touched.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Added a new test.
### Was this patch authored or co-authored using generative AI tooling?

Closes #42777 from sunchao/list-partition-stat.

Authored-by: Chao Sun <[email protected]>
Signed-off-by: Chao Sun <[email protected]>
  • Loading branch information
sunchao committed Sep 2, 2023
1 parent e4ebb37 commit e9962e8
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,31 @@ abstract class ExternalCatalogSuite extends SparkFunSuite {
assert(catalog.listPartitions("db2", "tbl2", Some(Map("a" -> "unknown"))).isEmpty)
}

test("SPARK-45054: list partitions should restore stats") {
val catalog = newBasicCatalog()
val stats = Some(CatalogStatistics(sizeInBytes = 1))
val newPart = CatalogTablePartition(Map("a" -> "1", "b" -> "2"), storageFormat, stats = stats)
catalog.alterPartitions("db2", "tbl2", Seq(newPart))
val parts = catalog.listPartitions("db2", "tbl2", Some(Map("a" -> "1")))

assert(parts.length == 1)
val part = parts.head
assert(part.stats.exists(_.sizeInBytes == 1))
}

test("SPARK-45054: list partitions by filter should restore stats") {
val catalog = newBasicCatalog()
val stats = Some(CatalogStatistics(sizeInBytes = 1))
val newPart = CatalogTablePartition(Map("a" -> "1", "b" -> "2"), storageFormat, stats = stats)
catalog.alterPartitions("db2", "tbl2", Seq(newPart))
val tz = TimeZone.getDefault.getID
val parts = catalog.listPartitionsByFilter("db2", "tbl2", Seq($"a".int === 1), tz)

assert(parts.length == 1)
val part = parts.head
assert(part.stats.exists(_.sizeInBytes == 1))
}

test("SPARK-21457: list partitions with special chars") {
val catalog = newBasicCatalog()
assert(catalog.listPartitions("db2", "tbl1").isEmpty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1275,13 +1275,14 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
db: String,
table: String,
partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition] = withClient {
val partColNameMap = buildLowerCasePartColNameMap(getTable(db, table))
val catalogTable = getTable(db, table)
val partColNameMap = buildLowerCasePartColNameMap(catalogTable)
val metaStoreSpec = partialSpec.map(toMetaStorePartitionSpec)
val res = client.getPartitions(db, table, metaStoreSpec)
.map { part => part.copy(spec = restorePartitionSpec(part.spec, partColNameMap))
}

metaStoreSpec match {
val parts = metaStoreSpec match {
// This might be a bug of Hive: When the partition value inside the partial partition spec
// contains dot, and we ask Hive to list partitions w.r.t. the partial partition spec, Hive
// treats dot as matching any single character and may return more partitions than we
Expand All @@ -1290,6 +1291,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
res.filter(p => isPartialPartitionSpec(spec, toMetaStorePartitionSpec(p.spec)))
case _ => res
}
parts.map(restorePartitionMetadata(_, catalogTable))
}

override def listPartitionsByFilter(
Expand All @@ -1303,6 +1305,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
val clientPrunedPartitions =
client.getPartitionsByFilter(rawHiveTable, predicates).map { part =>
part.copy(spec = restorePartitionSpec(part.spec, partColNameMap))
restorePartitionMetadata(part, catalogTable)
}
prunePartitionsByFilter(catalogTable, clientPrunedPartitions, predicates, defaultTimeZoneId)
}
Expand Down

0 comments on commit e9962e8

Please sign in to comment.