Skip to content

Commit

Permalink
Optimise the index computation
Browse files Browse the repository at this point in the history
  • Loading branch information
lokeshj1703 committed Oct 18, 2024
1 parent 4ebb134 commit bc6dbd7
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 230 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,48 +21,53 @@

import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.function.SerializableFunction;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner;
import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.index.functional.HoodieFunctionalIndex;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileWriterFactory;
import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.util.JavaScalaConverters;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapGroupsFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.StructType;

import javax.annotation.Nullable;
import org.jetbrains.annotations.NotNull;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;

import scala.Function1;
import scala.collection.Seq;

import static org.apache.hudi.common.config.HoodieCommonConfig.MAX_DFS_STREAM_BUFFER_SIZE;
import static org.apache.hudi.common.model.HoodieRecord.HOODIE_META_COLUMNS;
import static org.apache.hudi.common.util.ConfigUtils.getReaderConfigs;
import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
import static org.apache.hudi.metadata.HoodieMetadataPayload.createBloomFilterMetadataRecord;
Expand All @@ -74,81 +79,93 @@
*/
public class SparkMetadataWriterUtils {

public static List<HoodieRecord> getFunctionalIndexRecordsUsingColumnStats(HoodieTableMetaClient metaClient,
Schema readerSchema,
String filePath,
Long fileSize,
String partition,
HoodieFunctionalIndex<Column, Column> functionalIndex,
String columnToIndex,
SQLContext sqlContext) {
List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList = new ArrayList<>();
buildColumnRangeMetadata(metaClient, readerSchema, functionalIndex, columnToIndex, sqlContext, columnRangeMetadataList, fileSize, new StoragePath(filePath),
FSUtils.isBaseFile(new StoragePath(filePath.substring(filePath.lastIndexOf("/") + 1))));
return createColumnStatsRecords(partition, columnRangeMetadataList, false, functionalIndex.getIndexName(), COLUMN_STATS.getRecordType()).collect(Collectors.toList());
public static Column[] getFunctionalIndexColumns() {
return new Column[] {
functions.col(HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_PARTITION),
functions.col(HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_FILE_PATH),
functions.col(HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_FILE_SIZE)
};
}

public static List<HoodieRecord> getFunctionalIndexRecordsUsingBloomFilter(HoodieTableMetaClient metaClient,
Schema readerSchema,
String filePath,
String partition,
HoodieFunctionalIndex<Column, Column> functionalIndex,
String columnToIndex,
SQLContext sqlContext,
HoodieWriteConfig metadataWriteConfig,
String instantTime) {
List<HoodieRecord> bloomFilterMetadataList = new ArrayList<>();
// log file handling
buildBloomFilterMetadata(metaClient, readerSchema, functionalIndex, columnToIndex, sqlContext, bloomFilterMetadataList, new StoragePath(filePath), metadataWriteConfig, partition,
instantTime, FSUtils.isBaseFile(new StoragePath(filePath.substring(filePath.lastIndexOf("/") + 1))));
return bloomFilterMetadataList;
public static String[] getFunctionalIndexColumnNames() {
return new String[] {
HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_PARTITION,
HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_FILE_PATH,
HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_FILE_SIZE
};
}

private static void buildColumnRangeMetadata(HoodieTableMetaClient metaClient,
Schema readerSchema,
HoodieFunctionalIndex<Column, Column> functionalIndex,
String columnToIndex,
SQLContext sqlContext,
List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList,
long fileSize,
StoragePath filePath,
boolean isBaseFile) {
Dataset<Row> fileDf = readRecordsAsRow(new StoragePath[] {filePath}, sqlContext, metaClient, readerSchema, isBaseFile);
Column indexedColumn = functionalIndex.apply(Collections.singletonList(fileDf.col(columnToIndex)));
fileDf = fileDf.withColumn(columnToIndex, indexedColumn);
HoodieColumnRangeMetadata<Comparable> columnRangeMetadata = computeColumnRangeMetadata(fileDf, columnToIndex, filePath.toString(), fileSize);
columnRangeMetadataList.add(columnRangeMetadata);
@NotNull
public static List<Row> getRowsWithFunctionalIndexMetadata(List<Row> rowsForFilePath, String partition, String filePath, long fileSize) {
return rowsForFilePath.stream().map(row -> {
Seq<Object> indexMetadata = JavaScalaConverters.convertJavaListToScalaSeq(Arrays.asList(partition, filePath, fileSize));
Row functionalIndexRow = Row.fromSeq(indexMetadata);
List<Row> rows = new ArrayList<>(2);
rows.add(row);
rows.add(functionalIndexRow);
Seq<Row> rowSeq = JavaScalaConverters.convertJavaListToScalaSeq(rows);
return Row.merge(rowSeq);
}).collect(Collectors.toList());
}

private static void buildBloomFilterMetadata(HoodieTableMetaClient metaClient,
Schema readerSchema,
HoodieFunctionalIndex<Column, Column> functionalIndex,
String columnToIndex,
SQLContext sqlContext,
List<HoodieRecord> bloomFilterMetadataList,
StoragePath filePath,
HoodieWriteConfig writeConfig,
String partitionName,
String instantTime,
boolean isBaseFile) {
Dataset<Row> fileDf = readRecordsAsRow(new StoragePath[] {filePath}, sqlContext, metaClient, readerSchema, isBaseFile);
Column indexedColumn = functionalIndex.apply(Collections.singletonList(fileDf.col(columnToIndex)));
fileDf = fileDf.withColumn(columnToIndex, indexedColumn);
BloomFilter bloomFilter = HoodieFileWriterFactory.createBloomFilter(writeConfig);
fileDf.foreach(row -> {
byte[] key = row.getAs(columnToIndex).toString().getBytes();
bloomFilter.add(key);
});
ByteBuffer bloomByteBuffer = ByteBuffer.wrap(getUTF8Bytes(bloomFilter.serializeToString()));
bloomFilterMetadataList.add(createBloomFilterMetadataRecord(partitionName, filePath.toString(), instantTime, writeConfig.getBloomFilterType(), bloomByteBuffer, false));
public static HoodieData<HoodieRecord> getFunctionalIndexRecordsUsingColumnStats(Dataset<Row> dataset,
HoodieFunctionalIndex<Column, Column> functionalIndex,
String columnToIndex) {
// Aggregate col stats related data for the column to index
Dataset<Row> columnRangeMetadataDataset = dataset
.select(columnToIndex, SparkMetadataWriterUtils.getFunctionalIndexColumnNames())
.groupBy(SparkMetadataWriterUtils.getFunctionalIndexColumns())
.agg(functions.count(functions.when(functions.col(columnToIndex).isNull(), 1)).alias("nullCount"),
functions.min(columnToIndex).alias("minValue"),
functions.max(columnToIndex).alias("maxValue"),
functions.count(columnToIndex).alias("valueCount"));
// Generate column stat records using the aggregated data
return HoodieJavaRDD.of(columnRangeMetadataDataset.javaRDD()).flatMap((SerializableFunction<Row, Iterator<HoodieRecord>>)
row -> {
int baseAggregatePosition = SparkMetadataWriterUtils.getFunctionalIndexColumnNames().length;
long nullCount = row.getLong(baseAggregatePosition);
Comparable minValue = (Comparable) row.get(baseAggregatePosition + 1);
Comparable maxValue = (Comparable) row.get(baseAggregatePosition + 2);
long valueCount = row.getLong(baseAggregatePosition + 3);

String partitionName = row.getString(0);
String filePath = row.getString(1);
long totalFileSize = row.getLong(2);
// Total uncompressed size is harder to get directly. This is just an approximation to maintain the order.
long totalUncompressedSize = totalFileSize * 2;

HoodieColumnRangeMetadata<Comparable> rangeMetadata = HoodieColumnRangeMetadata.create(
filePath,
columnToIndex,
minValue,
maxValue,
nullCount,
valueCount,
totalFileSize,
totalUncompressedSize
);
return createColumnStatsRecords(partitionName, Collections.singletonList(rangeMetadata), false, functionalIndex.getIndexName(),
COLUMN_STATS.getRecordType()).collect(Collectors.toList()).iterator();}
);
}

private static Dataset<Row> readRecordsAsRow(StoragePath[] paths, SQLContext sqlContext,
HoodieTableMetaClient metaClient, Schema schema,
boolean isBaseFile) {
List<HoodieRecord> records = isBaseFile ? getBaseFileRecords(new HoodieBaseFile(paths[0].toString()), metaClient, schema)
: getUnmergedLogFileRecords(Arrays.stream(paths).map(StoragePath::toString).collect(Collectors.toList()), metaClient, schema);
return dropMetaFields(toDataset(records, schema, sqlContext));
public static HoodieData<HoodieRecord> getFunctionalIndexRecordsUsingBloomFilter(Dataset<Row> dataset, String columnToIndex,
HoodieWriteConfig metadataWriteConfig, String instantTime) {
// Group data using functional index metadata and then create bloom filter on the group
Dataset<HoodieRecord> bloomFilterRecords = dataset.select(columnToIndex, SparkMetadataWriterUtils.getFunctionalIndexColumnNames())
.groupByKey((MapFunction<Row, Pair>) row -> Pair.of(row.getString(0), row.getString(1)), Encoders.kryo(Pair.class))
.flatMapGroups((FlatMapGroupsFunction<Pair, Row, HoodieRecord>) ((pair, iterator) -> {
String partition = pair.getLeft().toString();
String filePath = pair.getRight().toString();
BloomFilter bloomFilter = HoodieFileWriterFactory.createBloomFilter(metadataWriteConfig);
iterator.forEachRemaining(row -> {
byte[] key = row.getAs(columnToIndex).toString().getBytes();
bloomFilter.add(key);
});
ByteBuffer bloomByteBuffer = ByteBuffer.wrap(getUTF8Bytes(bloomFilter.serializeToString()));
return Collections.singletonList((HoodieRecord) createBloomFilterMetadataRecord(partition, filePath, instantTime, metadataWriteConfig.getBloomFilterType(), bloomByteBuffer, false)).iterator();
}), Encoders.kryo(HoodieRecord.class));
return HoodieJavaRDD.of(bloomFilterRecords.javaRDD());
}

public static List<Row> readRecordsAsRows(StoragePath[] paths, SQLContext sqlContext,
Expand Down Expand Up @@ -188,18 +205,6 @@ private static List<HoodieRecord> getBaseFileRecords(HoodieBaseFile baseFile, Ho
}
}

private static Dataset<Row> toDataset(List<HoodieRecord> records, Schema schema, SQLContext sqlContext) {
List<GenericRecord> avroRecords = records.stream()
.map(r -> (GenericRecord) r.getData())
.collect(Collectors.toList());
if (avroRecords.isEmpty()) {
return sqlContext.emptyDataFrame().toDF();
}
JavaSparkContext jsc = new JavaSparkContext(sqlContext.sparkContext());
JavaRDD<GenericRecord> javaRDD = jsc.parallelize(avroRecords);
return AvroConversionUtils.createDataFrame(javaRDD.rdd(), schema.toString(), sqlContext.sparkSession());
}

private static List<Row> toRows(List<HoodieRecord> records, Schema schema, SQLContext sqlContext, String path) {
StructType structType = AvroConversionUtils.convertAvroSchemaToStructType(schema);
Function1<GenericRecord, Row> converterToRow = AvroConversionUtils.createConverterToRow(schema, structType);
Expand All @@ -210,45 +215,4 @@ private static List<Row> toRows(List<HoodieRecord> records, Schema schema, SQLCo
.collect(Collectors.toList());
return avroRecords;
}

private static <T extends Comparable<T>> HoodieColumnRangeMetadata<Comparable> computeColumnRangeMetadata(Dataset<Row> rowDataset,
String columnName,
String filePath,
long fileSize) {
long totalSize = fileSize;
// Get nullCount, minValue, and maxValue
Dataset<Row> aggregated = rowDataset.agg(
functions.count(functions.when(functions.col(columnName).isNull(), 1)).alias("nullCount"),
functions.min(columnName).alias("minValue"),
functions.max(columnName).alias("maxValue"),
functions.count(columnName).alias("valueCount")
);

Row result = aggregated.collectAsList().get(0);
long nullCount = result.getLong(0);
@Nullable T minValue = (T) result.get(1);
@Nullable T maxValue = (T) result.get(2);
long valueCount = result.getLong(3);

// Total uncompressed size is harder to get directly. This is just an approximation to maintain the order.
long totalUncompressedSize = totalSize * 2;

return HoodieColumnRangeMetadata.<Comparable>create(
filePath,
columnName,
minValue,
maxValue,
nullCount,
valueCount,
totalSize,
totalUncompressedSize
);
}

private static Dataset<Row> dropMetaFields(Dataset<Row> df) {
return df.select(
Arrays.stream(df.columns())
.filter(c -> !HOODIE_META_COLUMNS.contains(c))
.map(df::col).toArray(Column[]::new));
}
}
Loading

0 comments on commit bc6dbd7

Please sign in to comment.