Skip to content

Commit

Permalink
Spark: Exclude reading _pos column if it's not in the scan list (#11390)
Browse files Browse the repository at this point in the history
  • Loading branch information
huaxingao authored Nov 8, 2024
1 parent 82a2362 commit 1c576c5
Show file tree
Hide file tree
Showing 19 changed files with 94 additions and 28 deletions.
20 changes: 16 additions & 4 deletions data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ protected DeleteFilter(
List<DeleteFile> deletes,
Schema tableSchema,
Schema requestedSchema,
DeleteCounter counter) {
DeleteCounter counter,
boolean needRowPosCol) {
this.filePath = filePath;
this.counter = counter;

Expand All @@ -93,13 +94,23 @@ protected DeleteFilter(

this.posDeletes = posDeleteBuilder.build();
this.eqDeletes = eqDeleteBuilder.build();
this.requiredSchema = fileProjection(tableSchema, requestedSchema, posDeletes, eqDeletes);
this.requiredSchema =
fileProjection(tableSchema, requestedSchema, posDeletes, eqDeletes, needRowPosCol);
this.posAccessor = requiredSchema.accessorForField(MetadataColumns.ROW_POSITION.fieldId());
this.hasIsDeletedColumn =
requiredSchema.findField(MetadataColumns.IS_DELETED.fieldId()) != null;
this.isDeletedColumnPosition = requiredSchema.columns().indexOf(MetadataColumns.IS_DELETED);
}

protected DeleteFilter(
String filePath,
List<DeleteFile> deletes,
Schema tableSchema,
Schema requestedSchema,
DeleteCounter counter) {
this(filePath, deletes, tableSchema, requestedSchema, counter, true);
}

protected DeleteFilter(
String filePath, List<DeleteFile> deletes, Schema tableSchema, Schema requestedSchema) {
this(filePath, deletes, tableSchema, requestedSchema, new DeleteCounter());
Expand Down Expand Up @@ -251,13 +262,14 @@ private static Schema fileProjection(
Schema tableSchema,
Schema requestedSchema,
List<DeleteFile> posDeletes,
List<DeleteFile> eqDeletes) {
List<DeleteFile> eqDeletes,
boolean needRowPosCol) {
if (posDeletes.isEmpty() && eqDeletes.isEmpty()) {
return requestedSchema;
}

Set<Integer> requiredIds = Sets.newLinkedHashSet();
if (!posDeletes.isEmpty()) {
if (needRowPosCol && !posDeletes.isEmpty()) {
requiredIds.add(MetadataColumns.ROW_POSITION.fieldId());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iceberg.spark.source;

import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.iceberg.FileFormat;
Expand All @@ -31,10 +32,12 @@
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders;
import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.vectorized.ColumnarBatch;

abstract class BaseBatchReader<T extends ScanTask> extends BaseReader<ColumnarBatch, T> {
Expand Down Expand Up @@ -81,9 +84,21 @@ private CloseableIterable<ColumnarBatch> newParquetIterable(
SparkDeleteFilter deleteFilter) {
// get required schema if there are deletes
Schema requiredSchema = deleteFilter != null ? deleteFilter.requiredSchema() : expectedSchema();
boolean hasPositionDelete = deleteFilter != null ? deleteFilter.hasPosDeletes() : false;
Schema projectedSchema = requiredSchema;
if (hasPositionDelete) {
// We need to add MetadataColumns.ROW_POSITION in the schema for
// ReadConf.generateOffsetToStartPos(Schema schema). This is not needed any
// more after #10107 is merged.
List<Types.NestedField> columns = Lists.newArrayList(requiredSchema.columns());
if (!columns.contains(MetadataColumns.ROW_POSITION)) {
columns.add(MetadataColumns.ROW_POSITION);
projectedSchema = new Schema(columns);
}
}

return Parquet.read(inputFile)
.project(requiredSchema)
.project(projectedSchema)
.split(start, length)
.createBatchedReaderFunc(
fileSchema ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,8 +257,9 @@ protected static Object convertConstant(Type type, Object value) {
protected class SparkDeleteFilter extends DeleteFilter<InternalRow> {
private final InternalRowWrapper asStructLike;

SparkDeleteFilter(String filePath, List<DeleteFile> deletes, DeleteCounter counter) {
super(filePath, deletes, tableSchema, expectedSchema, counter);
SparkDeleteFilter(
String filePath, List<DeleteFile> deletes, DeleteCounter counter, boolean needRowPosCol) {
super(filePath, deletes, tableSchema, expectedSchema, counter, needRowPosCol);
this.asStructLike =
new InternalRowWrapper(
SparkSchemaUtil.convert(requiredSchema()), requiredSchema().asStruct());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ protected CloseableIterator<ColumnarBatch> open(FileScanTask task) {
SparkDeleteFilter deleteFilter =
task.deletes().isEmpty()
? null
: new SparkDeleteFilter(filePath, task.deletes(), counter());
: new SparkDeleteFilter(filePath, task.deletes(), counter(), false);

return newBatchIterable(
inputFile,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,14 @@ private CloseableIterable<InternalRow> openChangelogScanTask(ChangelogScanTask t

CloseableIterable<InternalRow> openAddedRowsScanTask(AddedRowsScanTask task) {
String filePath = task.file().path().toString();
SparkDeleteFilter deletes = new SparkDeleteFilter(filePath, task.deletes(), counter());
SparkDeleteFilter deletes = new SparkDeleteFilter(filePath, task.deletes(), counter(), true);
return deletes.filter(rows(task, deletes.requiredSchema()));
}

private CloseableIterable<InternalRow> openDeletedDataFileScanTask(DeletedDataFileScanTask task) {
String filePath = task.file().path().toString();
SparkDeleteFilter deletes = new SparkDeleteFilter(filePath, task.existingDeletes(), counter());
SparkDeleteFilter deletes =
new SparkDeleteFilter(filePath, task.existingDeletes(), counter(), true);
return deletes.filter(rows(task, deletes.requiredSchema()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public EqualityDeleteRowReader(
@Override
protected CloseableIterator<InternalRow> open(FileScanTask task) {
SparkDeleteFilter matches =
new SparkDeleteFilter(task.file().path().toString(), task.deletes(), counter());
new SparkDeleteFilter(task.file().path().toString(), task.deletes(), counter(), true);

// schema or rows returned by readers
Schema requiredSchema = matches.requiredSchema();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ protected Stream<ContentFile<?>> referencedFiles(FileScanTask task) {
protected CloseableIterator<InternalRow> open(FileScanTask task) {
String filePath = task.file().path().toString();
LOG.debug("Opening data file {}", filePath);
SparkDeleteFilter deleteFilter = new SparkDeleteFilter(filePath, task.deletes(), counter());
SparkDeleteFilter deleteFilter =
new SparkDeleteFilter(filePath, task.deletes(), counter(), true);

// schema or rows returned by readers
Schema requiredSchema = deleteFilter.requiredSchema();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iceberg.spark.source;

import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.iceberg.FileFormat;
Expand All @@ -31,10 +32,12 @@
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders;
import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.vectorized.ColumnarBatch;

abstract class BaseBatchReader<T extends ScanTask> extends BaseReader<ColumnarBatch, T> {
Expand Down Expand Up @@ -81,9 +84,21 @@ private CloseableIterable<ColumnarBatch> newParquetIterable(
SparkDeleteFilter deleteFilter) {
// get required schema if there are deletes
Schema requiredSchema = deleteFilter != null ? deleteFilter.requiredSchema() : expectedSchema();
boolean hasPositionDelete = deleteFilter != null ? deleteFilter.hasPosDeletes() : false;
Schema projectedSchema = requiredSchema;
if (hasPositionDelete) {
// We need to add MetadataColumns.ROW_POSITION in the schema for
// ReadConf.generateOffsetToStartPos(Schema schema). This is not needed any
// more after #10107 is merged.
List<Types.NestedField> columns = Lists.newArrayList(requiredSchema.columns());
if (!columns.contains(MetadataColumns.ROW_POSITION)) {
columns.add(MetadataColumns.ROW_POSITION);
projectedSchema = new Schema(columns);
}
}

return Parquet.read(inputFile)
.project(requiredSchema)
.project(projectedSchema)
.split(start, length)
.createBatchedReaderFunc(
fileSchema ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,9 @@ protected static Object convertConstant(Type type, Object value) {
protected class SparkDeleteFilter extends DeleteFilter<InternalRow> {
private final InternalRowWrapper asStructLike;

SparkDeleteFilter(String filePath, List<DeleteFile> deletes, DeleteCounter counter) {
super(filePath, deletes, tableSchema, expectedSchema, counter);
SparkDeleteFilter(
String filePath, List<DeleteFile> deletes, DeleteCounter counter, boolean needRowPosCol) {
super(filePath, deletes, tableSchema, expectedSchema, counter, needRowPosCol);
this.asStructLike =
new InternalRowWrapper(
SparkSchemaUtil.convert(requiredSchema()), requiredSchema().asStruct());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ protected CloseableIterator<ColumnarBatch> open(FileScanTask task) {
SparkDeleteFilter deleteFilter =
task.deletes().isEmpty()
? null
: new SparkDeleteFilter(filePath, task.deletes(), counter());
: new SparkDeleteFilter(filePath, task.deletes(), counter(), false);

return newBatchIterable(
inputFile,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,14 @@ private CloseableIterable<InternalRow> openChangelogScanTask(ChangelogScanTask t

CloseableIterable<InternalRow> openAddedRowsScanTask(AddedRowsScanTask task) {
String filePath = task.file().path().toString();
SparkDeleteFilter deletes = new SparkDeleteFilter(filePath, task.deletes(), counter());
SparkDeleteFilter deletes = new SparkDeleteFilter(filePath, task.deletes(), counter(), true);
return deletes.filter(rows(task, deletes.requiredSchema()));
}

private CloseableIterable<InternalRow> openDeletedDataFileScanTask(DeletedDataFileScanTask task) {
String filePath = task.file().path().toString();
SparkDeleteFilter deletes = new SparkDeleteFilter(filePath, task.existingDeletes(), counter());
SparkDeleteFilter deletes =
new SparkDeleteFilter(filePath, task.existingDeletes(), counter(), true);
return deletes.filter(rows(task, deletes.requiredSchema()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public EqualityDeleteRowReader(
@Override
protected CloseableIterator<InternalRow> open(FileScanTask task) {
SparkDeleteFilter matches =
new SparkDeleteFilter(task.file().path().toString(), task.deletes(), counter());
new SparkDeleteFilter(task.file().path().toString(), task.deletes(), counter(), true);

// schema or rows returned by readers
Schema requiredSchema = matches.requiredSchema();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ protected Stream<ContentFile<?>> referencedFiles(FileScanTask task) {
protected CloseableIterator<InternalRow> open(FileScanTask task) {
String filePath = task.file().path().toString();
LOG.debug("Opening data file {}", filePath);
SparkDeleteFilter deleteFilter = new SparkDeleteFilter(filePath, task.deletes(), counter());
SparkDeleteFilter deleteFilter =
new SparkDeleteFilter(filePath, task.deletes(), counter(), true);

// schema or rows returned by readers
Schema requiredSchema = deleteFilter.requiredSchema();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iceberg.spark.source;

import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.iceberg.FileFormat;
Expand All @@ -31,10 +32,12 @@
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders;
import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.vectorized.ColumnarBatch;

abstract class BaseBatchReader<T extends ScanTask> extends BaseReader<ColumnarBatch, T> {
Expand Down Expand Up @@ -81,9 +84,21 @@ private CloseableIterable<ColumnarBatch> newParquetIterable(
SparkDeleteFilter deleteFilter) {
// get required schema if there are deletes
Schema requiredSchema = deleteFilter != null ? deleteFilter.requiredSchema() : expectedSchema();
boolean hasPositionDelete = deleteFilter != null ? deleteFilter.hasPosDeletes() : false;
Schema projectedSchema = requiredSchema;
if (hasPositionDelete) {
// We need to add MetadataColumns.ROW_POSITION in the schema for
// ReadConf.generateOffsetToStartPos(Schema schema). This is not needed any
// more after #10107 is merged.
List<Types.NestedField> columns = Lists.newArrayList(requiredSchema.columns());
if (!columns.contains(MetadataColumns.ROW_POSITION)) {
columns.add(MetadataColumns.ROW_POSITION);
projectedSchema = new Schema(columns);
}
}

return Parquet.read(inputFile)
.project(requiredSchema)
.project(projectedSchema)
.split(start, length)
.createBatchedReaderFunc(
fileSchema ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,9 @@ protected static Object convertConstant(Type type, Object value) {
protected class SparkDeleteFilter extends DeleteFilter<InternalRow> {
private final InternalRowWrapper asStructLike;

SparkDeleteFilter(String filePath, List<DeleteFile> deletes, DeleteCounter counter) {
super(filePath, deletes, tableSchema, expectedSchema, counter);
SparkDeleteFilter(
String filePath, List<DeleteFile> deletes, DeleteCounter counter, boolean needRowPosCol) {
super(filePath, deletes, tableSchema, expectedSchema, counter, needRowPosCol);
this.asStructLike =
new InternalRowWrapper(
SparkSchemaUtil.convert(requiredSchema()), requiredSchema().asStruct());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ protected CloseableIterator<ColumnarBatch> open(FileScanTask task) {
SparkDeleteFilter deleteFilter =
task.deletes().isEmpty()
? null
: new SparkDeleteFilter(filePath, task.deletes(), counter());
: new SparkDeleteFilter(filePath, task.deletes(), counter(), false);

return newBatchIterable(
inputFile,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,14 @@ private CloseableIterable<InternalRow> openChangelogScanTask(ChangelogScanTask t

CloseableIterable<InternalRow> openAddedRowsScanTask(AddedRowsScanTask task) {
String filePath = task.file().path().toString();
SparkDeleteFilter deletes = new SparkDeleteFilter(filePath, task.deletes(), counter());
SparkDeleteFilter deletes = new SparkDeleteFilter(filePath, task.deletes(), counter(), true);
return deletes.filter(rows(task, deletes.requiredSchema()));
}

private CloseableIterable<InternalRow> openDeletedDataFileScanTask(DeletedDataFileScanTask task) {
String filePath = task.file().path().toString();
SparkDeleteFilter deletes = new SparkDeleteFilter(filePath, task.existingDeletes(), counter());
SparkDeleteFilter deletes =
new SparkDeleteFilter(filePath, task.existingDeletes(), counter(), true);
return deletes.filter(rows(task, deletes.requiredSchema()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public EqualityDeleteRowReader(
@Override
protected CloseableIterator<InternalRow> open(FileScanTask task) {
SparkDeleteFilter matches =
new SparkDeleteFilter(task.file().path().toString(), task.deletes(), counter());
new SparkDeleteFilter(task.file().path().toString(), task.deletes(), counter(), true);

// schema or rows returned by readers
Schema requiredSchema = matches.requiredSchema();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ protected Stream<ContentFile<?>> referencedFiles(FileScanTask task) {
protected CloseableIterator<InternalRow> open(FileScanTask task) {
String filePath = task.file().path().toString();
LOG.debug("Opening data file {}", filePath);
SparkDeleteFilter deleteFilter = new SparkDeleteFilter(filePath, task.deletes(), counter());
SparkDeleteFilter deleteFilter =
new SparkDeleteFilter(filePath, task.deletes(), counter(), true);

// schema or rows returned by readers
Schema requiredSchema = deleteFilter.requiredSchema();
Expand Down

0 comments on commit 1c576c5

Please sign in to comment.