From bc6dbd777c611a8cf1db8c61ad7c5cfb60f13938 Mon Sep 17 00:00:00 2001 From: Lokesh Jain Date: Sat, 19 Oct 2024 00:32:59 +0530 Subject: [PATCH] Optimise the index computation --- .../utils/SparkMetadataWriterUtils.java | 216 ++++++++---------- .../SparkHoodieBackedTableMetadataWriter.java | 106 +++------ .../functional/HoodieFunctionalIndex.java | 49 ++-- .../command/index/TestFunctionalIndex.scala | 4 +- 4 files changed, 145 insertions(+), 230 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java index 739ccbc8cb2f..07247f904875 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java @@ -21,8 +21,9 @@ import org.apache.hudi.AvroConversionUtils; import org.apache.hudi.common.bloom.BloomFilter; +import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.engine.EngineType; -import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.function.SerializableFunction; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieColumnRangeMetadata; import org.apache.hudi.common.model.HoodieRecord; @@ -30,39 +31,43 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner; import org.apache.hudi.common.util.HoodieRecordUtils; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.data.HoodieJavaRDD; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.index.functional.HoodieFunctionalIndex; import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.hudi.io.storage.HoodieFileWriterFactory; import org.apache.hudi.io.storage.HoodieIOFactory; import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.util.JavaScalaConverters; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.FlatMapGroupsFunction; +import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Column; import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.functions; import org.apache.spark.sql.types.StructType; - -import javax.annotation.Nullable; +import org.jetbrains.annotations.NotNull; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.stream.Collectors; import scala.Function1; +import scala.collection.Seq; import static org.apache.hudi.common.config.HoodieCommonConfig.MAX_DFS_STREAM_BUFFER_SIZE; -import static org.apache.hudi.common.model.HoodieRecord.HOODIE_META_COLUMNS; import static org.apache.hudi.common.util.ConfigUtils.getReaderConfigs; import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes; import static org.apache.hudi.metadata.HoodieMetadataPayload.createBloomFilterMetadataRecord; @@ -74,81 +79,93 @@ */ public class SparkMetadataWriterUtils { - public static List getFunctionalIndexRecordsUsingColumnStats(HoodieTableMetaClient metaClient, - Schema readerSchema, - String filePath, - Long fileSize, - String partition, - HoodieFunctionalIndex functionalIndex, - String columnToIndex, - SQLContext sqlContext) { - List> columnRangeMetadataList = new ArrayList<>(); - buildColumnRangeMetadata(metaClient, readerSchema, functionalIndex, columnToIndex, sqlContext, columnRangeMetadataList, fileSize, new StoragePath(filePath), - FSUtils.isBaseFile(new StoragePath(filePath.substring(filePath.lastIndexOf("/") + 1)))); - return createColumnStatsRecords(partition, columnRangeMetadataList, false, functionalIndex.getIndexName(), COLUMN_STATS.getRecordType()).collect(Collectors.toList()); + public static Column[] getFunctionalIndexColumns() { + return new Column[] { + functions.col(HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_PARTITION), + functions.col(HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_FILE_PATH), + functions.col(HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_FILE_SIZE) + }; } - public static List getFunctionalIndexRecordsUsingBloomFilter(HoodieTableMetaClient metaClient, - Schema readerSchema, - String filePath, - String partition, - HoodieFunctionalIndex functionalIndex, - String columnToIndex, - SQLContext sqlContext, - HoodieWriteConfig metadataWriteConfig, - String instantTime) { - List bloomFilterMetadataList = new ArrayList<>(); - // log file handling - buildBloomFilterMetadata(metaClient, readerSchema, functionalIndex, columnToIndex, sqlContext, bloomFilterMetadataList, new StoragePath(filePath), metadataWriteConfig, partition, - instantTime, FSUtils.isBaseFile(new StoragePath(filePath.substring(filePath.lastIndexOf("/") + 1)))); - return bloomFilterMetadataList; + public static String[] getFunctionalIndexColumnNames() { + return new String[] { + HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_PARTITION, + HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_FILE_PATH, + HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_FILE_SIZE + }; } - private static void buildColumnRangeMetadata(HoodieTableMetaClient metaClient, - Schema readerSchema, - HoodieFunctionalIndex functionalIndex, - String columnToIndex, - SQLContext sqlContext, - List> columnRangeMetadataList, - long fileSize, - StoragePath filePath, - boolean isBaseFile) { - Dataset fileDf = readRecordsAsRow(new StoragePath[] {filePath}, sqlContext, metaClient, readerSchema, isBaseFile); - Column indexedColumn = functionalIndex.apply(Collections.singletonList(fileDf.col(columnToIndex))); - fileDf = fileDf.withColumn(columnToIndex, indexedColumn); - HoodieColumnRangeMetadata columnRangeMetadata = computeColumnRangeMetadata(fileDf, columnToIndex, filePath.toString(), fileSize); - columnRangeMetadataList.add(columnRangeMetadata); + @NotNull + public static List getRowsWithFunctionalIndexMetadata(List rowsForFilePath, String partition, String filePath, long fileSize) { + return rowsForFilePath.stream().map(row -> { + Seq indexMetadata = JavaScalaConverters.convertJavaListToScalaSeq(Arrays.asList(partition, filePath, fileSize)); + Row functionalIndexRow = Row.fromSeq(indexMetadata); + List rows = new ArrayList<>(2); + rows.add(row); + rows.add(functionalIndexRow); + Seq rowSeq = JavaScalaConverters.convertJavaListToScalaSeq(rows); + return Row.merge(rowSeq); + }).collect(Collectors.toList()); } - private static void buildBloomFilterMetadata(HoodieTableMetaClient metaClient, - Schema readerSchema, - HoodieFunctionalIndex functionalIndex, - String columnToIndex, - SQLContext sqlContext, - List bloomFilterMetadataList, - StoragePath filePath, - HoodieWriteConfig writeConfig, - String partitionName, - String instantTime, - boolean isBaseFile) { - Dataset fileDf = readRecordsAsRow(new StoragePath[] {filePath}, sqlContext, metaClient, readerSchema, isBaseFile); - Column indexedColumn = functionalIndex.apply(Collections.singletonList(fileDf.col(columnToIndex))); - fileDf = fileDf.withColumn(columnToIndex, indexedColumn); - BloomFilter bloomFilter = HoodieFileWriterFactory.createBloomFilter(writeConfig); - fileDf.foreach(row -> { - byte[] key = row.getAs(columnToIndex).toString().getBytes(); - bloomFilter.add(key); - }); - ByteBuffer bloomByteBuffer = ByteBuffer.wrap(getUTF8Bytes(bloomFilter.serializeToString())); - bloomFilterMetadataList.add(createBloomFilterMetadataRecord(partitionName, filePath.toString(), instantTime, writeConfig.getBloomFilterType(), bloomByteBuffer, false)); + public static HoodieData getFunctionalIndexRecordsUsingColumnStats(Dataset dataset, + HoodieFunctionalIndex functionalIndex, + String columnToIndex) { + // Aggregate col stats related data for the column to index + Dataset columnRangeMetadataDataset = dataset + .select(columnToIndex, SparkMetadataWriterUtils.getFunctionalIndexColumnNames()) + .groupBy(SparkMetadataWriterUtils.getFunctionalIndexColumns()) + .agg(functions.count(functions.when(functions.col(columnToIndex).isNull(), 1)).alias("nullCount"), + functions.min(columnToIndex).alias("minValue"), + functions.max(columnToIndex).alias("maxValue"), + functions.count(columnToIndex).alias("valueCount")); + // Generate column stat records using the aggregated data + return HoodieJavaRDD.of(columnRangeMetadataDataset.javaRDD()).flatMap((SerializableFunction>) + row -> { + int baseAggregatePosition = SparkMetadataWriterUtils.getFunctionalIndexColumnNames().length; + long nullCount = row.getLong(baseAggregatePosition); + Comparable minValue = (Comparable) row.get(baseAggregatePosition + 1); + Comparable maxValue = (Comparable) row.get(baseAggregatePosition + 2); + long valueCount = row.getLong(baseAggregatePosition + 3); + + String partitionName = row.getString(0); + String filePath = row.getString(1); + long totalFileSize = row.getLong(2); + // Total uncompressed size is harder to get directly. This is just an approximation to maintain the order. + long totalUncompressedSize = totalFileSize * 2; + + HoodieColumnRangeMetadata rangeMetadata = HoodieColumnRangeMetadata.create( + filePath, + columnToIndex, + minValue, + maxValue, + nullCount, + valueCount, + totalFileSize, + totalUncompressedSize + ); + return createColumnStatsRecords(partitionName, Collections.singletonList(rangeMetadata), false, functionalIndex.getIndexName(), + COLUMN_STATS.getRecordType()).collect(Collectors.toList()).iterator();} + ); } - private static Dataset readRecordsAsRow(StoragePath[] paths, SQLContext sqlContext, - HoodieTableMetaClient metaClient, Schema schema, - boolean isBaseFile) { - List records = isBaseFile ? getBaseFileRecords(new HoodieBaseFile(paths[0].toString()), metaClient, schema) - : getUnmergedLogFileRecords(Arrays.stream(paths).map(StoragePath::toString).collect(Collectors.toList()), metaClient, schema); - return dropMetaFields(toDataset(records, schema, sqlContext)); + public static HoodieData getFunctionalIndexRecordsUsingBloomFilter(Dataset dataset, String columnToIndex, + HoodieWriteConfig metadataWriteConfig, String instantTime) { + // Group data using functional index metadata and then create bloom filter on the group + Dataset bloomFilterRecords = dataset.select(columnToIndex, SparkMetadataWriterUtils.getFunctionalIndexColumnNames()) + .groupByKey((MapFunction) row -> Pair.of(row.getString(0), row.getString(1)), Encoders.kryo(Pair.class)) + .flatMapGroups((FlatMapGroupsFunction) ((pair, iterator) -> { + String partition = pair.getLeft().toString(); + String filePath = pair.getRight().toString(); + BloomFilter bloomFilter = HoodieFileWriterFactory.createBloomFilter(metadataWriteConfig); + iterator.forEachRemaining(row -> { + byte[] key = row.getAs(columnToIndex).toString().getBytes(); + bloomFilter.add(key); + }); + ByteBuffer bloomByteBuffer = ByteBuffer.wrap(getUTF8Bytes(bloomFilter.serializeToString())); + return Collections.singletonList((HoodieRecord) createBloomFilterMetadataRecord(partition, filePath, instantTime, metadataWriteConfig.getBloomFilterType(), bloomByteBuffer, false)).iterator(); + }), Encoders.kryo(HoodieRecord.class)); + return HoodieJavaRDD.of(bloomFilterRecords.javaRDD()); } public static List readRecordsAsRows(StoragePath[] paths, SQLContext sqlContext, @@ -188,18 +205,6 @@ private static List getBaseFileRecords(HoodieBaseFile baseFile, Ho } } - private static Dataset toDataset(List records, Schema schema, SQLContext sqlContext) { - List avroRecords = records.stream() - .map(r -> (GenericRecord) r.getData()) - .collect(Collectors.toList()); - if (avroRecords.isEmpty()) { - return sqlContext.emptyDataFrame().toDF(); - } - JavaSparkContext jsc = new JavaSparkContext(sqlContext.sparkContext()); - JavaRDD javaRDD = jsc.parallelize(avroRecords); - return AvroConversionUtils.createDataFrame(javaRDD.rdd(), schema.toString(), sqlContext.sparkSession()); - } - private static List toRows(List records, Schema schema, SQLContext sqlContext, String path) { StructType structType = AvroConversionUtils.convertAvroSchemaToStructType(schema); Function1 converterToRow = AvroConversionUtils.createConverterToRow(schema, structType); @@ -210,45 +215,4 @@ private static List toRows(List records, Schema schema, SQLCo .collect(Collectors.toList()); return avroRecords; } - - private static > HoodieColumnRangeMetadata computeColumnRangeMetadata(Dataset rowDataset, - String columnName, - String filePath, - long fileSize) { - long totalSize = fileSize; - // Get nullCount, minValue, and maxValue - Dataset aggregated = rowDataset.agg( - functions.count(functions.when(functions.col(columnName).isNull(), 1)).alias("nullCount"), - functions.min(columnName).alias("minValue"), - functions.max(columnName).alias("maxValue"), - functions.count(columnName).alias("valueCount") - ); - - Row result = aggregated.collectAsList().get(0); - long nullCount = result.getLong(0); - @Nullable T minValue = (T) result.get(1); - @Nullable T maxValue = (T) result.get(2); - long valueCount = result.getLong(3); - - // Total uncompressed size is harder to get directly. This is just an approximation to maintain the order. - long totalUncompressedSize = totalSize * 2; - - return HoodieColumnRangeMetadata.create( - filePath, - columnName, - minValue, - maxValue, - nullCount, - valueCount, - totalSize, - totalUncompressedSize - ); - } - - private static Dataset dropMetaFields(Dataset df) { - return df.select( - Arrays.stream(df.columns()) - .filter(c -> !HOODIE_META_COLUMNS.contains(c)) - .map(df::col).toArray(Column[]::new)); - } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java index 4d5b1c8fc76c..7ecb1b7ed539 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java @@ -23,6 +23,7 @@ import org.apache.hudi.client.BaseHoodieWriteClient; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.client.utils.SparkMetadataWriterUtils; import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.engine.EngineType; import org.apache.hudi.common.engine.HoodieEngineContext; @@ -54,6 +55,10 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,8 +69,6 @@ import java.util.Map; import java.util.stream.Collectors; -import static org.apache.hudi.client.utils.SparkMetadataWriterUtils.getFunctionalIndexRecordsUsingBloomFilter; -import static org.apache.hudi.client.utils.SparkMetadataWriterUtils.getFunctionalIndexRecordsUsingColumnStats; import static org.apache.hudi.client.utils.SparkMetadataWriterUtils.readRecordsAsRows; import static org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy.EAGER; import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS; @@ -181,94 +184,41 @@ protected HoodieData getFunctionalIndexRecords(List allRecords = customThreadPool.submit(() -> - partitionFilePathPairs.parallelStream() - .flatMap(entry -> { - HoodieFunctionalIndex functionalIndex = - new HoodieSparkFunctionalIndex(indexDefinition.getIndexName(), indexDefinition.getIndexFunction(), - indexDefinition.getSourceFields(), indexDefinition.getIndexOptions()); - String partition = entry.getKey(); - Pair filePathSizePair = entry.getValue(); - List recordsForPartition = Collections.emptyList(); - - if (indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_COLUMN_STATS)) { - recordsForPartition = getFunctionalIndexRecordsUsingColumnStats(metaClient, readerSchema, filePathSizePair.getKey(), - filePathSizePair.getValue(), partition, - functionalIndex, columnToIndex, sqlContext); - } else if (indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_BLOOM_FILTERS)) { - recordsForPartition = getFunctionalIndexRecordsUsingBloomFilter(metaClient, readerSchema, filePathSizePair.getKey(), - partition, functionalIndex, columnToIndex, - sqlContext, metadataWriteConfig, instantTime); - } - return recordsForPartition.stream(); - }) - .collect(Collectors.toList())).join(); - customThreadPool.shutdown(); - return HoodieJavaRDD.of(allRecords, sparkEngineContext, parallelism);*/ + // Read records and append functional index metadata to every row HoodieData rowData = sparkEngineContext.parallelize(partitionFilePathPairs, parallelism) .flatMap((SerializableFunction>, Iterator>) entry -> { String partition = entry.getKey(); Pair filePathSizePair = entry.getValue(); String filePath = filePathSizePair.getKey(); + long fileSize = filePathSizePair.getValue(); List rowsForFilePath = readRecordsAsRows(new StoragePath[] {new StoragePath(filePath)}, sqlContext, metaClient, readerSchema, FSUtils.isBaseFile(new StoragePath(filePath.substring(filePath.lastIndexOf("/") + 1)))); - return rowsForFilePath.iterator(); + List rowsWithIndexMetadata = SparkMetadataWriterUtils.getRowsWithFunctionalIndexMetadata(rowsForFilePath, partition, filePath, fileSize); + return rowsWithIndexMetadata.iterator(); }); - Dataset rowDataset = sparkEngineContext.getSqlContext().createDataFrame(HoodieJavaRDD.getJavaRDD(rowData).rdd(), AvroConversionUtils.convertAvroSchemaToStructType(readerSchema)); - // rowDataset.schema() - // StructType( - // StructField(_hoodie_commit_time,StringType,true), - // StructField(_hoodie_commit_seqno,StringType,true), - // StructField(_hoodie_record_key,StringType,true), - // StructField(_hoodie_partition_path,StringType,true), - // StructField(_hoodie_file_name,StringType,true), - // StructField(ts,LongType,true)) - - - /* - rowDataset.show() - +-------------------+--------------------+------------------+----------------------+--------------------+----+ - |_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path| _hoodie_file_name| ts| - +-------------------+--------------------+------------------+----------------------+--------------------+----+ - | 20241014190257965|20241014190257965...| 1| ts=1000|8f4cdb11-a3b4-4ec...|1000| - | 20241014190322300|20241014190322300...| 2| ts=1001|a6025154-c6d3-49f...|1001| - | 20241014190326221|20241014190326221...| 3| ts=1002|cf434900-9108-460...|1002| - +-------------------+--------------------+------------------+----------------------+--------------------+----+ - - */ - HoodieFunctionalIndex functionalIndex2 = + // Generate dataset with functional index metadata + StructType structType = AvroConversionUtils.convertAvroSchemaToStructType(readerSchema); + Dataset rowDataset = sparkEngineContext.getSqlContext().createDataFrame(HoodieJavaRDD.getJavaRDD(rowData).rdd(), + structType.add(StructField.apply(HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_PARTITION, DataTypes.StringType, false, Metadata.empty())) + .add(StructField.apply(HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_FILE_PATH, DataTypes.StringType, false, Metadata.empty())) + .add(StructField.apply(HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_FILE_SIZE, DataTypes.LongType, false, Metadata.empty())) + ); + // Apply functional index and generate the column to index + HoodieFunctionalIndex functionalIndex = new HoodieSparkFunctionalIndex(indexDefinition.getIndexName(), indexDefinition.getIndexFunction(), indexDefinition.getSourceFields(), indexDefinition.getIndexOptions()); - Column indexedColumn = functionalIndex2.apply(Collections.singletonList(rowDataset.col(columnToIndex))); + Column indexedColumn = functionalIndex.apply(Collections.singletonList(rowDataset.col(columnToIndex))); rowDataset = rowDataset.withColumn(columnToIndex, indexedColumn); - /* - rowDataset.show() - +-------------------+---------------------+------------------+----------------------+-------------------------------------------------------------------------+----------+ - |_hoodie_commit_time|_hoodie_commit_seqno |_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name |ts | - +-------------------+---------------------+------------------+----------------------+-------------------------------------------------------------------------+----------+ - |20241014190257965 |20241014190257965_0_0|1 |ts=1000 |8f4cdb11-a3b4-4ec4-bb05-f77668f486e4-0_0-27-65_20241014190257965.parquet |1970-01-01| - |20241014190322300 |20241014190322300_0_0|2 |ts=1001 |a6025154-c6d3-49f6-ae88-b3c7f0617ef2-0_0-62-111_20241014190322300.parquet|1970-01-01| - |20241014190326221 |20241014190326221_0_0|3 |ts=1002 |cf434900-9108-460e-b120-aad545533358-0_0-97-157_20241014190326221.parquet|1970-01-01| - +-------------------+---------------------+------------------+----------------------+-------------------------------------------------------------------------+----------+ - */ - return sparkEngineContext.parallelize(partitionFilePathPairs, parallelism) - .flatMap((SerializableFunction>, Iterator>) entry -> { - HoodieFunctionalIndex functionalIndex = - new HoodieSparkFunctionalIndex(indexDefinition.getIndexName(), indexDefinition.getIndexFunction(), indexDefinition.getSourceFields(), indexDefinition.getIndexOptions()); - String partition = entry.getKey(); - Pair filePathSizePair = entry.getValue(); - List recordsForPartition = Collections.emptyList(); - if (indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_COLUMN_STATS)) { - recordsForPartition = getFunctionalIndexRecordsUsingColumnStats(metaClient, readerSchema, filePathSizePair.getKey(), filePathSizePair.getValue(), partition, - functionalIndex, columnToIndex, sqlContext); - } else if (indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_BLOOM_FILTERS)) { - recordsForPartition = getFunctionalIndexRecordsUsingBloomFilter(metaClient, readerSchema, filePathSizePair.getKey(), partition, - functionalIndex, columnToIndex, sqlContext, metadataWriteConfig, instantTime); - } - return recordsForPartition.iterator(); - }); + + // Generate functional index records + if (indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_COLUMN_STATS)) { + return SparkMetadataWriterUtils.getFunctionalIndexRecordsUsingColumnStats(rowDataset, functionalIndex, columnToIndex); + } else if (indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_BLOOM_FILTERS)) { + return SparkMetadataWriterUtils.getFunctionalIndexRecordsUsingBloomFilter(rowDataset, columnToIndex, metadataWriteConfig, instantTime); + } else { + throw new UnsupportedOperationException(indexDefinition.getIndexType() + " is not yet supported"); + } } @Override diff --git a/hudi-common/src/main/java/org/apache/hudi/index/functional/HoodieFunctionalIndex.java b/hudi-common/src/main/java/org/apache/hudi/index/functional/HoodieFunctionalIndex.java index 0ca71cf3aac8..4edb1e4c090a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/index/functional/HoodieFunctionalIndex.java +++ b/hudi-common/src/main/java/org/apache/hudi/index/functional/HoodieFunctionalIndex.java @@ -31,29 +31,32 @@ */ public interface HoodieFunctionalIndex extends Serializable { - public static final String SPARK_DATE_FORMAT = "date_format"; - public static final String SPARK_DAY = "day"; - public static final String SPARK_MONTH = "month"; - public static final String SPARK_YEAR = "year"; - public static final String SPARK_HOUR = "hour"; - public static final String SPARK_FROM_UNIXTIME = "from_unixtime"; - public static final String SPARK_UNIX_TIMESTAMP = "unix_timestamp"; - public static final String SPARK_TO_DATE = "to_date"; - public static final String SPARK_TO_TIMESTAMP = "to_timestamp"; - public static final String SPARK_DATE_ADD = "date_add"; - public static final String SPARK_DATE_SUB = "date_sub"; - public static final String SPARK_CONCAT = "concat"; - public static final String SPARK_SUBSTRING = "substring"; - public static final String SPARK_UPPER = "upper"; - public static final String SPARK_LOWER = "lower"; - public static final String SPARK_TRIM = "trim"; - public static final String SPARK_LTRIM = "ltrim"; - public static final String SPARK_RTRIM = "rtrim"; - public static final String SPARK_LENGTH = "length"; - public static final String SPARK_REGEXP_REPLACE = "regexp_replace"; - public static final String SPARK_REGEXP_EXTRACT = "regexp_extract"; - public static final String SPARK_SPLIT = "split"; - public static final String SPARK_IDENTITY = "identity"; + String HOODIE_FUNCTIONAL_INDEX_FILE_PATH = "_hoodie_functional_index_file_path"; + String HOODIE_FUNCTIONAL_INDEX_PARTITION = "_hoodie_functional_index_partition"; + String HOODIE_FUNCTIONAL_INDEX_FILE_SIZE = "_hoodie_functional_index_file_size"; + String SPARK_DATE_FORMAT = "date_format"; + String SPARK_DAY = "day"; + String SPARK_MONTH = "month"; + String SPARK_YEAR = "year"; + String SPARK_HOUR = "hour"; + String SPARK_FROM_UNIXTIME = "from_unixtime"; + String SPARK_UNIX_TIMESTAMP = "unix_timestamp"; + String SPARK_TO_DATE = "to_date"; + String SPARK_TO_TIMESTAMP = "to_timestamp"; + String SPARK_DATE_ADD = "date_add"; + String SPARK_DATE_SUB = "date_sub"; + String SPARK_CONCAT = "concat"; + String SPARK_SUBSTRING = "substring"; + String SPARK_UPPER = "upper"; + String SPARK_LOWER = "lower"; + String SPARK_TRIM = "trim"; + String SPARK_LTRIM = "ltrim"; + String SPARK_RTRIM = "rtrim"; + String SPARK_LENGTH = "length"; + String SPARK_REGEXP_REPLACE = "regexp_replace"; + String SPARK_REGEXP_EXTRACT = "regexp_extract"; + String SPARK_SPLIT = "split"; + String SPARK_IDENTITY = "identity"; /** * Get the name of the index. diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala index ac63a8f36ff0..8378810f0bc0 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala @@ -30,16 +30,14 @@ import org.apache.hudi.hive.{HiveSyncTool, HoodieHiveSyncClient} import org.apache.hudi.metadata.MetadataPartitionType import org.apache.hudi.sync.common.HoodieSyncConfig.{META_SYNC_BASE_PATH, META_SYNC_DATABASE_NAME, META_SYNC_NO_PARTITION_METADATA, META_SYNC_TABLE_NAME} import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient - import org.apache.spark.sql.catalyst.analysis.Analyzer import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.hudi.command.{CreateIndexCommand, ShowIndexesCommand} import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} -import org.scalatest.Ignore -@Ignore + class TestFunctionalIndex extends HoodieSparkSqlTestBase { override protected def beforeAll(): Unit = {