Skip to content

Commit

Permalink
rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
huaxingao committed Jan 26, 2025
1 parent d431803 commit f69013f
Show file tree
Hide file tree
Showing 10 changed files with 105 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,19 @@
*/
package org.apache.iceberg.spark;

/** Enumerates the types of Parquet readers. */
public enum ParquetReaderType {
/** ICEBERG type utilizes the Parquet reader from Apache Iceberg. */
ICEBERG,

/**
* COMET type changes the Parquet reader to the Apache DataFusion Comet Parquet reader. Comet
* Parquet reader performs I/O and decompression in the JVM but decodes in native to improve
* performance. Additionally, Comet will convert Spark's physical plan into a native physical plan
* and execute this plan natively.
*
* <p>TODO: Implement {@link org.apache.comet.parquet.SupportsComet} in SparkScan to convert Spark
* physical plan to native physical plan for native execution.
*/
COMET
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,37 +38,12 @@
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;

/**
* A Iceberg Parquet column reader backed by a Comet {@link ColumnReader}. This class should be used
* together with {@link CometVector}.
*
* <p>Example:
*
* <pre>
* CometColumnReader reader = ...
* reader.setBatchSize(batchSize);
*
* while (hasMoreRowsToRead) {
* if (endOfRowGroup) {
* reader.reset();
* PageReader pageReader = ...
* reader.setPageReader(pageReader);
* }
*
* int numRows = ...
* CometVector vector = reader.read(null, numRows);
*
* // consume the vector
* }
*
* reader.close();
* </pre>
*/
@SuppressWarnings({"checkstyle:VisibilityModifier", "ParameterAssignment"})
class CometColumnReader implements VectorizedReader<CometVector> {
public static final int DEFAULT_BATCH_SIZE = 5000;

private final DataType sparkType;
// the delegated column reader from Comet side
protected AbstractColumnReader delegate;
private final CometVector vector;
private final ColumnDescriptor descriptor;
Expand All @@ -94,7 +69,7 @@ public AbstractColumnReader getDelegate() {
}

/**
* This method is to initialized/reset the ColumnReader. This needs to be called for each row
* This method is to initialized/reset the CometColumnReader. This needs to be called for each row
* group after readNextRowGroup, so a new dictionary encoding can be set for each of the new row
* groups.
*/
Expand Down Expand Up @@ -145,6 +120,7 @@ public void setPageReader(PageReader pageReader) throws IOException {

@Override
public void close() {
// close reader on native side
if (delegate != null) {
delegate.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.iceberg.parquet.VectorizedReader;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.util.Pair;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.metadata.ColumnPath;
Expand All @@ -39,7 +40,7 @@
/**
* {@link VectorizedReader} that returns Spark's {@link ColumnarBatch} to support Spark's vectorized
* read path. The {@link ColumnarBatch} returned is created by passing in the Arrow vectors
* populated via delegated read calls to {@linkplain CometColumnReader VectorReader(s)}.
* populated via delegated read calls to {@link CometColumnReader VectorReader(s)}.
*/
@SuppressWarnings("checkstyle:VisibilityModifier")
class CometColumnarBatchReader implements VectorizedReader<ColumnarBatch> {
Expand All @@ -48,6 +49,7 @@ class CometColumnarBatchReader implements VectorizedReader<ColumnarBatch> {
private final boolean hasIsDeletedColumn;
private DeleteFilter<InternalRow> deletes = null;
private long rowStartPosInBatch = 0;
// The delegated batch reader on Comet side
private final BatchReader delegate;

CometColumnarBatchReader(List<VectorizedReader<?>> readers, Schema schema) {
Expand Down Expand Up @@ -126,64 +128,67 @@ public void close() {
}

private class ColumnBatchLoader {
private final int numRowsToRead;
// the rowId mapping to skip deleted rows for all column vectors inside a batch, it is null when
// there is no deletes
private int[] rowIdMapping;
// the array to indicate if a row is deleted or not, it is null when there is no "_deleted"
// metadata column
private boolean[] isDeleted;
private final int batchSize;

ColumnBatchLoader(int numRowsToRead) {
Preconditions.checkArgument(
numRowsToRead > 0, "Invalid number of rows to read: %s", numRowsToRead);
this.numRowsToRead = numRowsToRead;
if (hasIsDeletedColumn) {
isDeleted = new boolean[numRowsToRead];
}
this.batchSize = numRowsToRead;
}

ColumnarBatch loadDataToColumnBatch() {
ColumnVector[] arrowColumnVectors = readDataToColumnVectors();
ColumnarBatch columnarBatch = new ColumnarBatch(arrowColumnVectors);
ColumnarBatchUtil.applyDeletesToColumnarBatch(
columnarBatch, deletes, isDeleted, numRowsToRead, rowStartPosInBatch, hasIsDeletedColumn);

int numLiveRows = batchSize;
if (hasIsDeletedColumn) {
// reset the row id mapping array, so that it doesn't filter out the deleted rows
ColumnarBatchUtil.resetRowIdMapping(columnarBatch, numRowsToRead);
boolean[] isDeleted =
ColumnarBatchUtil.buildIsDeleted(
arrowColumnVectors, deletes, rowStartPosInBatch, batchSize);
readDeletedColumn(arrowColumnVectors, isDeleted);
} else {
Pair<int[], Integer> pair =
ColumnarBatchUtil.buildRowIdMapping(
arrowColumnVectors, deletes, rowStartPosInBatch, batchSize);
if (pair != null) {
int[] rowIdMapping = pair.first();
numLiveRows = pair.second();
for (int i = 0; i < arrowColumnVectors.length; i++) {
((CometVector) arrowColumnVectors[i]).setRowIdMapping(rowIdMapping);
}
}
}

if (hasIsDeletedColumn) {
readDeletedColumnIfNecessary(arrowColumnVectors);
if (deletes != null && deletes.hasEqDeletes()) {
arrowColumnVectors = ColumnarBatchUtil.removeExtraColumns(deletes, arrowColumnVectors);
}

return columnarBatch;
ColumnarBatch newColumnarBatch = new ColumnarBatch(arrowColumnVectors);
newColumnarBatch.setNumRows(numLiveRows);
return newColumnarBatch;
}

void readDeletedColumnIfNecessary(ColumnVector[] columnVectors) {
for (int i = 0; i < readers.length; i++) {
if (readers[i] instanceof CometDeleteColumnReader) {
CometDeleteColumnReader deleteColumnReader = new CometDeleteColumnReader<>(isDeleted);
deleteColumnReader.setBatchSize(numRowsToRead);
deleteColumnReader.read(deleteColumnReader.getVector(), numRowsToRead);
columnVectors[i] = deleteColumnReader.getVector();
}
}
}

protected ColumnVector[] readDataToColumnVectors() {
ColumnVector[] columnVectors = new ColumnVector[readers.length];
ColumnVector[] readDataToColumnVectors() {
CometVector[] columnVectors = new CometVector[readers.length];
// Fetch rows for all readers in the delegate
delegate.nextBatch(numRowsToRead);
delegate.nextBatch(batchSize);
for (int i = 0; i < readers.length; i++) {
CometVector bv = readers[i].getVector();
columnVectors[i] = readers[i].getVector();
columnVectors[i].resetRowIdMapping();
org.apache.comet.vector.CometVector vector = readers[i].getDelegate().currentBatch();
bv.setDelegate(vector);
columnVectors[i] = bv;
columnVectors[i].setDelegate(vector);
}

return columnVectors;
}

void readDeletedColumn(ColumnVector[] columnVectors, boolean[] isDeleted) {
for (int i = 0; i < readers.length; i++) {
if (readers[i] instanceof CometDeleteColumnReader) {
CometDeleteColumnReader deleteColumnReader = new CometDeleteColumnReader<>(isDeleted);
deleteColumnReader.setBatchSize(batchSize);
deleteColumnReader.read(deleteColumnReader.getVector(), batchSize);
columnVectors[i] = deleteColumnReader.getVector();
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,10 @@
import org.apache.iceberg.types.Types;

class CometConstantColumnReader<T> extends CometColumnReader {
private final T value;

CometConstantColumnReader(T value, Types.NestedField field) {
super(field);
this.value = value;
// use delegate to set constant value on the native side to be consumed by native execution.
delegate = new ConstantColumnReader(getSparkType(), getDescriptor(), value, false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.comet.parquet.MetadataColumnReader;
import org.apache.comet.parquet.Native;
import org.apache.comet.parquet.TypeUtil;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
Expand All @@ -34,10 +35,7 @@ class CometDeleteColumnReader<T> extends CometColumnReader {
}

CometDeleteColumnReader(boolean[] isDeleted) {
super(
DataTypes.BooleanType,
TypeUtil.convertToParquet(
new StructField("deleted", DataTypes.BooleanType, false, Metadata.empty())));
super(MetadataColumns.IS_DELETED);
delegate = new DeleteColumnReader(isDeleted);
}

Expand All @@ -55,14 +53,15 @@ private static class DeleteColumnReader extends MetadataColumnReader {
super(
DataTypes.BooleanType,
TypeUtil.convertToParquet(
new StructField("deleted", DataTypes.BooleanType, false, Metadata.empty())),
new StructField("_deleted", DataTypes.BooleanType, false, Metadata.empty())),
false);
this.isDeleted = isDeleted;
}

@Override
public void readBatch(int total) {
Native.resetBatch(nativeHandle);
// set isDeleted on the native side to be consumed by native execution
Native.setIsDeleted(nativeHandle, isDeleted);

super.readBatch(total);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,13 @@ private static class PositionColumnReader extends MetadataColumnReader {
private long position;

PositionColumnReader(ColumnDescriptor descriptor) {
this(descriptor, 0L);
}

PositionColumnReader(ColumnDescriptor descriptor, long position) {
super(DataTypes.LongType, descriptor, false);
this.position = position;
}

@Override
public void readBatch(int total) {
Native.resetBatch(nativeHandle);
// set position on the native side to be consumed by native execution
Native.setPosition(nativeHandle, position, total);
position += total;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,6 @@
@SuppressWarnings("checkstyle:VisibilityModifier")
class CometVector extends CometDelegateVector {

// the rowId mapping to skip deleted rows for all column vectors inside a batch
// Here is an example:
// [0,1,2,3,4,5,6,7] -- Original status of the row id mapping array
// Position delete 2, 6
// [0,1,3,4,5,7,-,-] -- After applying position deletes [Set Num records to 6]
// Equality delete 1 <= x <= 3
// [0,4,5,7,-,-,-,-] -- After applying equality deletes [Set Num records to 4]
protected int[] rowIdMapping;

CometVector(DataType type, boolean useDecimal128) {
Expand All @@ -43,6 +36,10 @@ public void setRowIdMapping(int[] rowIdMapping) {
this.rowIdMapping = rowIdMapping;
}

public void resetRowIdMapping() {
this.rowIdMapping = null;
}

@Override
public boolean isNullAt(int rowId) {
return super.isNullAt(mapRowId(rowId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,16 @@ public VectorizedReader<?> message(
reorderedFields.add(deleteReader);
} else if (reader != null) {
reorderedFields.add(reader);
} else {
} else if (field.initialDefault() != null) {
CometColumnReader constantReader =
new CometConstantColumnReader<>(field.initialDefault(), field);
reorderedFields.add(constantReader);
} else if (field.isOptional()) {
CometColumnReader constantReader = new CometConstantColumnReader<>(null, field);
reorderedFields.add(constantReader);
} else {
throw new IllegalArgumentException(
String.format("Missing required field: %s", field.name()));
}
}
return vectorizedReader(reorderedFields);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,23 @@ public static ColumnarBatchReader buildReader(
deleteFilter));
}

public static CometColumnarBatchReader buildCometReader(
Schema expectedSchema,
MessageType fileSchema,
Map<Integer, ?> idToConstant,
DeleteFilter<InternalRow> deleteFilter) {
return (CometColumnarBatchReader)
TypeWithSchemaVisitor.visit(
expectedSchema.asStruct(),
fileSchema,
new CometVectorizedReaderBuilder(
expectedSchema,
fileSchema,
idToConstant,
readers -> new CometColumnarBatchReader(readers, expectedSchema),
deleteFilter));
}

// enables unsafe memory access to avoid costly checks to see if index is within bounds
// as long as it is not configured explicitly (see BoundsChecking in Arrow)
private static void enableUnsafeMemoryAccess() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.iceberg.spark.source;

import static org.apache.iceberg.Files.localOutput;
import static org.apache.iceberg.spark.SparkSQLProperties.PARQUET_READER_TYPE;
import static org.assertj.core.api.Assumptions.assumeThat;

import java.io.File;
Expand All @@ -35,14 +36,25 @@
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.spark.ParquetReaderType;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.BeforeAll;

public class TestParquetScan extends ScanTestBase {
protected boolean vectorized() {
return false;
}

@BeforeAll
public static void startSpark() {
ScanTestBase.spark = SparkSession.builder().master("local[2]").getOrCreate();
ScanTestBase.spark.conf().set(PARQUET_READER_TYPE, ParquetReaderType.ICEBERG.toString());
ScanTestBase.sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
}

@Override
protected void configureTable(Table table) {
table
Expand Down

0 comments on commit f69013f

Please sign in to comment.