Skip to content

Commit

Permalink
Fix partitionName and hence metdata log reader subclass, and use hbas…
Browse files Browse the repository at this point in the history
…e hfile reader for sec index
  • Loading branch information
codope committed Jun 5, 2024
1 parent 65a32e7 commit 8a9986a
Show file tree
Hide file tree
Showing 11 changed files with 238 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@
import org.apache.avro.Schema;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

import static org.apache.hudi.common.util.ValidationUtils.checkArgument;

/**
* Record merger that accumulates metadata records.
*/
Expand All @@ -37,7 +40,12 @@ public class HoodieMetadataRecordMerger extends HoodiePreCombineAvroRecordMerger

@Override
public List<Pair<HoodieRecord, Schema>> fullOuterMerge(HoodieRecord older, Schema oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws IOException {
// TODO: Implement this method for secondary keys. Currently, it just mimics the superclass.
return Collections.singletonList(super.merge(older, oldSchema, newer, newSchema, props).get());
// If the new record is not a delete record, then combine the two records.
if (newer.isDelete(newSchema, props)) {
return Collections.singletonList(Pair.of(newer, newSchema));
}
checkArgument(older.getRecordKey().equals(newer.getRecordKey()), "Record key must be the same for both records");
checkArgument(oldSchema.equals(newSchema), "Schema must be the same for both records");
return Arrays.asList(Pair.of(older, oldSchema), Pair.of(newer, newSchema));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -320,14 +320,14 @@ public Map<String, List<HoodieRecordGlobalLocation>> readRecordIndex(List<String
* @param secondaryKeys The list of secondary keys to read
*/
@Override
public Map<String, List<HoodieRecordGlobalLocation>> readSecondaryIndex(List<String> secondaryKeys) {
public Map<String, List<HoodieRecordGlobalLocation>> readSecondaryIndex(List<String> secondaryKeys, String partitionName) {
ValidationUtils.checkState(dataMetaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.RECORD_INDEX),
"Record index is not initialized in MDT");
ValidationUtils.checkState(
dataMetaClient.getTableConfig().getMetadataPartitions().stream().anyMatch(partitionName -> partitionName.startsWith(MetadataPartitionType.SECONDARY_INDEX.getPartitionPath())),
"Secondary index is not initialized in MDT");
dataMetaClient.getTableConfig().getMetadataPartitions().contains(partitionName),
"Secondary index is not initialized in MDT for: " + partitionName);
// Fetch secondary-index records
Map<String, List<HoodieRecord<HoodieMetadataPayload>>> secondaryKeyRecords = getSecondaryIndexRecords(secondaryKeys, MetadataPartitionType.SECONDARY_INDEX.getPartitionPath());
Map<String, List<HoodieRecord<HoodieMetadataPayload>>> secondaryKeyRecords = getSecondaryIndexRecords(secondaryKeys, partitionName);
// Now collect the record-keys and fetch the RLI records
List<String> recordKeys = new ArrayList<>();
secondaryKeyRecords.forEach((key, records) -> records.forEach(record -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ public Map<String, List<HoodieRecordGlobalLocation>> readRecordIndex(List<String
}

@Override
public Map<String, List<HoodieRecordGlobalLocation>> readSecondaryIndex(List<String> secondaryKeys) {
public Map<String, List<HoodieRecordGlobalLocation>> readSecondaryIndex(List<String> secondaryKeys, String partitionName) {
throw new HoodieMetadataException("Unsupported operation: readSecondaryIndex!");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieMetadataRecord;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodieListData;
Expand Down Expand Up @@ -77,6 +78,7 @@
import java.util.stream.Collectors;

import static org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FULL_SCAN_LOG_FILES;
import static org.apache.hudi.common.config.HoodieReaderConfig.USE_NATIVE_HFILE_READER;
import static org.apache.hudi.common.util.CollectionUtils.toStream;
import static org.apache.hudi.common.util.ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_READER;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS;
Expand Down Expand Up @@ -585,7 +587,10 @@ private Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader> openRead
try {
HoodieTimer timer = HoodieTimer.start();
// Open base file reader
Pair<HoodieSeekingFileReader<?>, Long> baseFileReaderOpenTimePair = getBaseFileReader(slice, timer);
// If the partition is a secondary index partition, use the HBase HFile reader instead of native HFile reader.
// TODO (HUDI-7831): Support reading secondary index records using native HFile reader.
boolean shouldUseNativeHFileReader = !partitionName.startsWith(HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX_PREFIX);
Pair<HoodieSeekingFileReader<?>, Long> baseFileReaderOpenTimePair = getBaseFileReader(slice, timer, shouldUseNativeHFileReader);
HoodieSeekingFileReader<?> baseFileReader = baseFileReaderOpenTimePair.getKey();
final long baseFileOpenMs = baseFileReaderOpenTimePair.getValue();

Expand All @@ -604,16 +609,20 @@ private Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader> openRead
}
}

private Pair<HoodieSeekingFileReader<?>, Long> getBaseFileReader(FileSlice slice, HoodieTimer timer) throws IOException {
private Pair<HoodieSeekingFileReader<?>, Long> getBaseFileReader(FileSlice slice, HoodieTimer timer, boolean shouldUseNativeHFileReader) throws IOException {
HoodieSeekingFileReader<?> baseFileReader;
long baseFileOpenMs;
// If the base file is present then create a reader
Option<HoodieBaseFile> baseFile = slice.getBaseFile();
if (baseFile.isPresent()) {
StoragePath baseFilePath = baseFile.get().getStoragePath();
HoodieConfig readerConfig = DEFAULT_HUDI_CONFIG_FOR_READER;
if (!shouldUseNativeHFileReader) {
readerConfig.setValue(USE_NATIVE_HFILE_READER, "false");
}
baseFileReader = (HoodieSeekingFileReader<?>) HoodieIOFactory.getIOFactory(metadataMetaClient.getStorage())
.getReaderFactory(HoodieRecordType.AVRO)
.getFileReader(DEFAULT_HUDI_CONFIG_FOR_READER, baseFilePath);
.getFileReader(readerConfig, baseFilePath);
baseFileOpenMs = timer.endTimer();
LOG.info(String.format("Opened metadata base file from %s at instant %s in %d ms", baseFilePath,
baseFile.get().getCommitTime(), baseFileOpenMs));
Expand Down Expand Up @@ -880,12 +889,10 @@ protected Map<String, List<HoodieRecord<HoodieMetadataPayload>>> getSecondaryInd
// Define the combOp function (merges elements across partitions)
Functions.Function2<Map<String, List<HoodieRecord<HoodieMetadataPayload>>>, Map<String, List<HoodieRecord<HoodieMetadataPayload>>>, Map<String, List<HoodieRecord<HoodieMetadataPayload>>>> combOp =
(map1, map2) -> {
map2.forEach((secondaryKey, secondaryRecords) -> {
map1.merge(secondaryKey, secondaryRecords, (oldRecords, newRecords) -> {
newRecords.addAll(oldRecords);
return newRecords;
});
});
map2.forEach((secondaryKey, secondaryRecords) -> map1.merge(secondaryKey, secondaryRecords, (oldRecords, newRecords) -> {
newRecords.addAll(oldRecords);
return newRecords;
}));
return map1;
};
// Use aggregate to merge results within and across partitions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
import java.util.function.Predicate;
import java.util.stream.Collectors;

import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX_PREFIX;

/**
* Metadata log-block records reading implementation, internally relying on
Expand Down Expand Up @@ -253,7 +253,7 @@ public HoodieMetadataLogRecordReader build() {
}

private boolean shouldUseMetadataMergedLogRecordScanner() {
return PARTITION_NAME_SECONDARY_INDEX.equals(partitionName);
return partitionName.startsWith(PARTITION_NAME_SECONDARY_INDEX_PREFIX);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ Map<Pair<String, String>, HoodieMetadataColumnStats> getColumnStats(final List<P
* Returns the location of records which the provided secondary keys maps to.
* Records that are not found are ignored and won't be part of map object that is returned.
*/
Map<String, List<HoodieRecordGlobalLocation>> readSecondaryIndex(List<String> secondaryKeys);
Map<String, List<HoodieRecordGlobalLocation>> readSecondaryIndex(List<String> secondaryKeys, String partitionName);

/**
* Fetch records by key prefixes. Key prefix passed is expected to match the same prefix as stored in Metadata table partitions. For eg, in case of col stats partition,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public void testFullOuterMerge() throws IOException {
List<HoodieRecord> updateRecordList = dataGen.generateUpdates("0001", newRecordList);
HoodieMetadataRecordMerger recordMerger = new HoodieMetadataRecordMerger();
List<Pair<HoodieRecord, Schema>> mergedRecords = recordMerger.fullOuterMerge(newRecordList.get(0), AVRO_SCHEMA, updateRecordList.get(0), AVRO_SCHEMA, new TypedProperties());
assertEquals(1, mergedRecords.size());
assertEquals(updateRecordList.get(0), mergedRecords.get(0).getLeft());
assertEquals(2, mergedRecords.size());
assertEquals(updateRecordList.get(0), mergedRecords.get(1).getLeft());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,14 @@ class SecondaryIndexSupport(spark: SparkSession,
prunedPartitionsAndFileSlices: Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])],
shouldPushDownFilesFilter: Boolean
): Option[Set[String]] = {
lazy val (_, secondaryKeys) = if (isIndexAvailable) filterQueriesWithSecondaryKey(queryFilters, getSecondaryKeyConfig(queryReferencedColumns, metaClient)) else (List.empty, List.empty)
val secondaryKeyConfigOpt = getSecondaryKeyConfig(queryReferencedColumns, metaClient)
if (secondaryKeyConfigOpt.isEmpty) {
Option.empty
}
lazy val (_, secondaryKeys) = if (isIndexAvailable) filterQueriesWithSecondaryKey(queryFilters, secondaryKeyConfigOpt.map(_._2)) else (List.empty, List.empty)
if (isIndexAvailable && queryFilters.nonEmpty && secondaryKeys.nonEmpty) {
val allFiles = fileIndex.inputFiles.map(strPath => new StoragePath(strPath)).toSeq
Some(getCandidateFilesFromSecondaryIndex(allFiles, secondaryKeys))
Some(getCandidateFilesFromSecondaryIndex(allFiles, secondaryKeys, secondaryKeyConfigOpt.get._1))
} else {
Option.empty
}
Expand All @@ -71,8 +75,8 @@ class SecondaryIndexSupport(spark: SparkSession,
* @param secondaryKeys - List of secondary keys.
* @return Sequence of file names which need to be queried
*/
private def getCandidateFilesFromSecondaryIndex(allFiles: Seq[StoragePath], secondaryKeys: List[String]): Set[String] = {
val recordKeyLocationsMap = metadataTable.readSecondaryIndex(JavaConverters.seqAsJavaListConverter(secondaryKeys).asJava)
private def getCandidateFilesFromSecondaryIndex(allFiles: Seq[StoragePath], secondaryKeys: List[String], secondaryIndexName: String): Set[String] = {
val recordKeyLocationsMap = metadataTable.readSecondaryIndex(JavaConverters.seqAsJavaListConverter(secondaryKeys).asJava, secondaryIndexName)
val fileIdToPartitionMap: mutable.Map[String, String] = mutable.Map.empty
val candidateFiles: mutable.Set[String] = mutable.Set.empty
for (locations <- JavaConverters.collectionAsScalaIterableConverter(recordKeyLocationsMap.values()).asScala) {
Expand All @@ -96,12 +100,12 @@ class SecondaryIndexSupport(spark: SparkSession,
* TODO: Handle multiple secondary indexes (similar to functional index)
*/
private def getSecondaryKeyConfig(queryReferencedColumns: Seq[String],
metaClient: HoodieTableMetaClient): Option[String] = {
metaClient: HoodieTableMetaClient): Option[(String, String)] = {
val indexDefinitions = metaClient.getIndexMetadata.get.getIndexDefinitions.asScala
indexDefinitions.values
.find(indexDef => indexDef.getIndexType.equals(PARTITION_NAME_SECONDARY_INDEX) &&
queryReferencedColumns.contains(indexDef.getSourceFields.get(0)))
.map(indexDef => indexDef.getSourceFields.get(0))
.map(indexDef => (indexDef.getIndexName, indexDef.getSourceFields.get(0)))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,32 +20,37 @@
package org.apache.hudi.functional

import org.apache.hudi.DataSourceWriteOptions.{HIVE_STYLE_PARTITIONING, PARTITIONPATH_FIELD, PRECOMBINE_FIELD, RECORDKEY_FIELD}
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
import org.apache.hudi.common.model.FileSlice
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.metadata.HoodieMetadataFileSystemView
import org.apache.hudi.testutils.HoodieSparkClientTestBase
import org.apache.hudi.util.JFunction
import org.apache.hudi.{DataSourceReadOptions, HoodieFileIndex}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Expression, Literal}
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.{AfterEach, BeforeEach}

import java.util.concurrent.atomic.AtomicInteger
import scala.collection.JavaConverters

class SecondaryIndexTestBase extends HoodieSparkClientTestBase {

var spark: SparkSession = _
var instantTime: AtomicInteger = _
val targetColumnsToIndex: Seq[String] = Seq("rider", "driver")
val metadataOpts: Map[String, String] = Map(
HoodieMetadataConfig.ENABLE.key -> "true",
HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key -> "true",
HoodieMetadataConfig.COLUMN_STATS_INDEX_FOR_COLUMNS.key -> targetColumnsToIndex.mkString(",")
HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key -> "true"
)
val commonOpts: Map[String, String] = Map(
"hoodie.insert.shuffle.parallelism" -> "4",
"hoodie.upsert.shuffle.parallelism" -> "4",
HoodieWriteConfig.TBL_NAME.key -> "hoodie_test_si",
RECORDKEY_FIELD.key -> "_row_key",
PARTITIONPATH_FIELD.key -> "partition,trip_type",
HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
RECORDKEY_FIELD.key -> "record_key_col",
PARTITIONPATH_FIELD.key -> "partition_key_col",
HIVE_STYLE_PARTITIONING.key -> "true",
PRECOMBINE_FIELD.key -> "timestamp"
PRECOMBINE_FIELD.key -> "ts"
) ++ metadataOpts
var mergedDfList: List[DataFrame] = List.empty

Expand All @@ -64,4 +69,54 @@ class SecondaryIndexTestBase extends HoodieSparkClientTestBase {
cleanupResources()
}

def verifyQueryPredicate(hudiOpts: Map[String, String], columnName: String): Unit = {
mergedDfList = spark.read.format("hudi").options(hudiOpts).load(basePath).repartition(1).cache() :: mergedDfList
val secondaryKey = mergedDfList.last.limit(1).collect().map(row => row.getAs(columnName).toString)
val dataFilter = EqualTo(attribute(columnName), Literal(secondaryKey(0)))
verifyFilePruning(hudiOpts, dataFilter)
}

private def attribute(partition: String): AttributeReference = {
AttributeReference(partition, StringType, nullable = true)()
}


private def verifyFilePruning(opts: Map[String, String], dataFilter: Expression): Unit = {
// with data skipping
val commonOpts = opts + ("path" -> basePath)
metaClient = HoodieTableMetaClient.reload(metaClient)
var fileIndex = HoodieFileIndex(spark, metaClient, None, commonOpts, includeLogFiles = true)
val filteredPartitionDirectories = fileIndex.listFiles(Seq(), Seq(dataFilter))
val filteredFilesCount = filteredPartitionDirectories.flatMap(s => s.files).size
assertTrue(filteredFilesCount < getLatestDataFilesCount(opts))

// with no data skipping
fileIndex = HoodieFileIndex(spark, metaClient, None, commonOpts + (DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "false"), includeLogFiles = true)
val filesCountWithNoSkipping = fileIndex.listFiles(Seq(), Seq(dataFilter)).flatMap(s => s.files).size
assertTrue(filesCountWithNoSkipping == getLatestDataFilesCount(opts))
}

private def getLatestDataFilesCount(opts: Map[String, String], includeLogFiles: Boolean = true) = {
var totalLatestDataFiles = 0L
getTableFileSystemView(opts).getAllLatestFileSlicesBeforeOrOn(metaClient.getActiveTimeline.lastInstant().get().getTimestamp)
.values()
.forEach(JFunction.toJavaConsumer[java.util.stream.Stream[FileSlice]]
(slices => slices.forEach(JFunction.toJavaConsumer[FileSlice](
slice => totalLatestDataFiles += (if (includeLogFiles) slice.getLogFiles.count() else 0)
+ (if (slice.getBaseFile.isPresent) 1 else 0)))))
totalLatestDataFiles
}

private def getTableFileSystemView(opts: Map[String, String]): HoodieMetadataFileSystemView = {
new HoodieMetadataFileSystemView(metaClient, metaClient.getActiveTimeline, metadataWriter(getWriteConfig(opts)).getTableMetadata)
}

private def getWriteConfig(hudiOpts: Map[String, String]): HoodieWriteConfig = {
val props = TypedProperties.fromMap(JavaConverters.mapAsJavaMapConverter(hudiOpts).asJava)
HoodieWriteConfig.newBuilder()
.withProps(props)
.withPath(basePath)
.build()
}

}
Loading

0 comments on commit 8a9986a

Please sign in to comment.