Skip to content

Commit

Permalink
[HUDI-8347] Fix partition stats update with async comapction (#12106)
Browse files Browse the repository at this point in the history
---------

Co-authored-by: sivabalan <[email protected]>
  • Loading branch information
codope and nsivabalan authored Oct 15, 2024
1 parent 16d686c commit ece8d7c
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ public class HoodieMetadataPayload implements HoodieRecordPayload<HoodieMetadata
public static final String COLUMN_STATS_FIELD_COLUMN_NAME = "columnName";
public static final String COLUMN_STATS_FIELD_TOTAL_UNCOMPRESSED_SIZE = "totalUncompressedSize";
public static final String COLUMN_STATS_FIELD_IS_DELETED = FIELD_IS_DELETED;
public static final String COLUMN_STATS_FIELD_IS_TIGHT_BOUND = "isTightBound";

/**
* HoodieMetadata record index payload field ids
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2284,10 +2284,6 @@ public static String getPartitionStatsIndexKey(String partitionPath, String colu
public static HoodieMetadataColumnStats mergeColumnStatsRecords(HoodieMetadataColumnStats prevColumnStats,
HoodieMetadataColumnStats newColumnStats) {
checkArgument(Objects.equals(prevColumnStats.getColumnName(), newColumnStats.getColumnName()));
// If new column stats is tight bound, then discard the previous column stats
if (newColumnStats.getIsTightBound()) {
return newColumnStats;
}
// We're handling 2 cases in here
// - New record is a tombstone: in this case it simply overwrites previous state
// - Previous record is a tombstone: in that case new proper record would also
Expand All @@ -2296,6 +2292,11 @@ public static HoodieMetadataColumnStats mergeColumnStatsRecords(HoodieMetadataCo
return newColumnStats;
}

// If new column stats is tight bound, then discard the previous column stats
if (newColumnStats.getIsTightBound()) {
return newColumnStats;
}

Comparable minValue =
(Comparable) Stream.of(
(Comparable) unwrapAvroValueWrapper(prevColumnStats.getMinValue()),
Expand All @@ -2322,6 +2323,7 @@ public static HoodieMetadataColumnStats mergeColumnStatsRecords(HoodieMetadataCo
.setTotalSize(prevColumnStats.getTotalSize() + newColumnStats.getTotalSize())
.setTotalUncompressedSize(prevColumnStats.getTotalUncompressedSize() + newColumnStats.getTotalUncompressedSize())
.setIsDeleted(newColumnStats.getIsDeleted())
.setIsTightBound(newColumnStats.getIsTightBound())
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import static org.apache.hudi.metadata.HoodieMetadataPayload.COLUMN_STATS_FIELD_COLUMN_NAME;
import static org.apache.hudi.metadata.HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME;
import static org.apache.hudi.metadata.HoodieMetadataPayload.COLUMN_STATS_FIELD_IS_DELETED;
import static org.apache.hudi.metadata.HoodieMetadataPayload.COLUMN_STATS_FIELD_IS_TIGHT_BOUND;
import static org.apache.hudi.metadata.HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE;
import static org.apache.hudi.metadata.HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE;
import static org.apache.hudi.metadata.HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT;
Expand Down Expand Up @@ -309,6 +310,7 @@ private static void constructColumnStatsMetadataPayload(HoodieMetadataPayload pa
.setTotalSize((Long) columnStatsRecord.get(COLUMN_STATS_FIELD_TOTAL_SIZE))
.setTotalUncompressedSize((Long) columnStatsRecord.get(COLUMN_STATS_FIELD_TOTAL_UNCOMPRESSED_SIZE))
.setIsDeleted((Boolean) columnStatsRecord.get(COLUMN_STATS_FIELD_IS_DELETED))
.setIsTightBound((Boolean) columnStatsRecord.get(COLUMN_STATS_FIELD_IS_TIGHT_BOUND))
.build();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,87 @@ class TestPartitionStatsIndexWithSql extends HoodieSparkSqlTestBase {
}
}

/**
* 1. Create MOR table with compaction enabled.
* 2. Do an insert and validate the partition stats index initialization.
* 3. Do an update and validate the partition stats index.
* 4. Schedule a compaction and validate the partition stats index.
* 5. Do an update and validate the partition stats index.
* 6. Complete the compaction and validate the partition stats index.
*/
test("Test partition stats index with inflight compaction") {
withTempDir { tmp =>
val tableName = generateTableName
val tablePath = s"${tmp.getCanonicalPath}/$tableName"
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price int,
| ts long
|) using hudi
|partitioned by (ts)
|tblproperties (
| type = 'mor',
| primaryKey = 'id',
| preCombineField = 'price',
| hoodie.metadata.index.partition.stats.enable = 'true',
| hoodie.metadata.index.column.stats.enable = 'true',
| hoodie.metadata.index.column.stats.column.list = 'price'
|)
|location '$tablePath'
|""".stripMargin
)

// insert data
writeAndValidatePartitionStats(tableName, tablePath)

// update data
spark.sql(s"update $tableName set price = price + 1 where id = 6")
checkAnswer(s"select id, name, price, ts from $tableName where price>3000")(
Seq(6, "a6", 4002, 30)
)

// validate partition stats index after update
checkAnswer(s"select key, ColumnStatsMetadata.minValue.member1.value, ColumnStatsMetadata.maxValue.member1.value from hudi_metadata('$tableName') where type=${MetadataPartitionType.PARTITION_STATS.getRecordType} and ColumnStatsMetadata.columnName='price'")(
Seq(getPartitionStatsIndexKey("ts=10", "price"), 1000, 2000),
Seq(getPartitionStatsIndexKey("ts=20", "price"), 2000, 3000),
Seq(getPartitionStatsIndexKey("ts=30", "price"), 3000, 4002)
)

// schedule compaction
spark.sql(s"refresh table $tableName")
spark.sql("set hoodie.compact.inline=false")
spark.sql("set hoodie.compact.inline.max.delta.commits=2")
spark.sql(s"schedule compaction on $tableName")
val compactionRows = spark.sql(s"show compaction on $tableName").collect()
val timestamps = compactionRows.map(_.getString(0))
assertTrue(timestamps.length == 1)

// update data
spark.sql(s"update $tableName set price = price + 1 where id = 6")
checkAnswer(s"select id, name, price, ts from $tableName where price>3000")(
Seq(6, "a6", 4003, 30)
)

// complete compaction
// set partition stats related configs
spark.sql(s"refresh table $tableName")
spark.sql("set hoodie.metadata.index.partition.stats.enable=true")
spark.sql("set hoodie.metadata.index.column.stats.enable=true")
spark.sql("set hoodie.metadata.index.column.stats.column.list=price")
spark.sql(s"run compaction on $tableName at ${timestamps(0)}")

// validate partition stats index
checkAnswer(s"select key, ColumnStatsMetadata.minValue.member1.value, ColumnStatsMetadata.maxValue.member1.value, ColumnStatsMetadata.isTightBound from hudi_metadata('$tableName') where type=${MetadataPartitionType.PARTITION_STATS.getRecordType} and ColumnStatsMetadata.columnName='price'")(
Seq(getPartitionStatsIndexKey("ts=10", "price"), 1000, 2000, false),
Seq(getPartitionStatsIndexKey("ts=20", "price"), 2000, 3000, false),
Seq(getPartitionStatsIndexKey("ts=30", "price"), 3000, 4003, false)
)
}
}

test(s"Test partition stats index on int type field with update and file pruning") {
Seq("cow", "mor").foreach { tableType =>
Seq(true, false).foreach { shouldCompact =>
Expand Down Expand Up @@ -319,7 +400,7 @@ class TestPartitionStatsIndexWithSql extends HoodieSparkSqlTestBase {
checkAnswer(s"select key, ColumnStatsMetadata.minValue.member1.value, ColumnStatsMetadata.maxValue.member1.value, ColumnStatsMetadata.isTightBound from hudi_metadata('$tableName') where type=${MetadataPartitionType.PARTITION_STATS.getRecordType} and ColumnStatsMetadata.columnName='price'")(
Seq(getPartitionStatsIndexKey("ts=10", "price"), 1000, 2000, false),
Seq(getPartitionStatsIndexKey("ts=20", "price"), 2000, 3000, false),
Seq(getPartitionStatsIndexKey("ts=30", "price"), 3000, 4001, false)
Seq(getPartitionStatsIndexKey("ts=30", "price"), 3000, 4001, true)
)
}
}
Expand Down

0 comments on commit ece8d7c

Please sign in to comment.