diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java index 6035dbcbd497..4300811be08c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java @@ -131,6 +131,7 @@ public class HoodieMetadataPayload implements HoodieRecordPayload + val tableName = generateTableName + val tablePath = s"${tmp.getCanonicalPath}/$tableName" + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price int, + | ts long + |) using hudi + |partitioned by (ts) + |tblproperties ( + | type = 'mor', + | primaryKey = 'id', + | preCombineField = 'price', + | hoodie.metadata.index.partition.stats.enable = 'true', + | hoodie.metadata.index.column.stats.enable = 'true', + | hoodie.metadata.index.column.stats.column.list = 'price' + |) + |location '$tablePath' + |""".stripMargin + ) + + // insert data + writeAndValidatePartitionStats(tableName, tablePath) + + // update data + spark.sql(s"update $tableName set price = price + 1 where id = 6") + checkAnswer(s"select id, name, price, ts from $tableName where price>3000")( + Seq(6, "a6", 4002, 30) + ) + + // validate partition stats index after update + checkAnswer(s"select key, ColumnStatsMetadata.minValue.member1.value, ColumnStatsMetadata.maxValue.member1.value from hudi_metadata('$tableName') where type=${MetadataPartitionType.PARTITION_STATS.getRecordType} and ColumnStatsMetadata.columnName='price'")( + Seq(getPartitionStatsIndexKey("ts=10", "price"), 1000, 2000), + Seq(getPartitionStatsIndexKey("ts=20", "price"), 2000, 3000), + Seq(getPartitionStatsIndexKey("ts=30", "price"), 3000, 4002) + ) + + // schedule compaction + spark.sql(s"refresh table $tableName") + spark.sql("set hoodie.compact.inline=false") + spark.sql("set hoodie.compact.inline.max.delta.commits=2") + spark.sql(s"schedule compaction on $tableName") + val compactionRows = spark.sql(s"show compaction on $tableName").collect() + val timestamps = compactionRows.map(_.getString(0)) + assertTrue(timestamps.length == 1) + + // update data + spark.sql(s"update $tableName set price = price + 1 where id = 6") + checkAnswer(s"select id, name, price, ts from $tableName where price>3000")( + Seq(6, "a6", 4003, 30) + ) + + // complete compaction + // set partition stats related configs + spark.sql(s"refresh table $tableName") + spark.sql("set hoodie.metadata.index.partition.stats.enable=true") + spark.sql("set hoodie.metadata.index.column.stats.enable=true") + spark.sql("set hoodie.metadata.index.column.stats.column.list=price") + spark.sql(s"run compaction on $tableName at ${timestamps(0)}") + + // validate partition stats index + checkAnswer(s"select key, ColumnStatsMetadata.minValue.member1.value, ColumnStatsMetadata.maxValue.member1.value, ColumnStatsMetadata.isTightBound from hudi_metadata('$tableName') where type=${MetadataPartitionType.PARTITION_STATS.getRecordType} and ColumnStatsMetadata.columnName='price'")( + Seq(getPartitionStatsIndexKey("ts=10", "price"), 1000, 2000, false), + Seq(getPartitionStatsIndexKey("ts=20", "price"), 2000, 3000, false), + Seq(getPartitionStatsIndexKey("ts=30", "price"), 3000, 4003, false) + ) + } + } + test(s"Test partition stats index on int type field with update and file pruning") { Seq("cow", "mor").foreach { tableType => Seq(true, false).foreach { shouldCompact => @@ -319,7 +400,7 @@ class TestPartitionStatsIndexWithSql extends HoodieSparkSqlTestBase { checkAnswer(s"select key, ColumnStatsMetadata.minValue.member1.value, ColumnStatsMetadata.maxValue.member1.value, ColumnStatsMetadata.isTightBound from hudi_metadata('$tableName') where type=${MetadataPartitionType.PARTITION_STATS.getRecordType} and ColumnStatsMetadata.columnName='price'")( Seq(getPartitionStatsIndexKey("ts=10", "price"), 1000, 2000, false), Seq(getPartitionStatsIndexKey("ts=20", "price"), 2000, 3000, false), - Seq(getPartitionStatsIndexKey("ts=30", "price"), 3000, 4001, false) + Seq(getPartitionStatsIndexKey("ts=30", "price"), 3000, 4001, true) ) } }