Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

API, CORE: Adds Row Lineage Fields #11930

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions api/src/main/java/org/apache/iceberg/ContentFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,11 @@ default Integer sortOrderId() {
return null;
}

/** Returns `_row_id` for the first row in the data file */
default Long firstRowId() {
return null;
}

/**
* Returns the data sequence number of the file.
*
Expand Down
11 changes: 9 additions & 2 deletions api/src/main/java/org/apache/iceberg/DataFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,18 @@ public interface DataFile extends ContentFile<DataFile> {
"content_size_in_bytes",
LongType.get(),
"The length of referenced content stored in the file");
Types.NestedField FIRST_ROW_ID =
optional(
146,
"first_row_id",
IntegerType.get(),
"The `_row_id` for the first row in the data file");

int PARTITION_ID = 102;
String PARTITION_NAME = "partition";
String PARTITION_DOC = "Partition data tuple, schema based on the partition spec";

// NEXT ID TO ASSIGN: 146
// NEXT ID TO ASSIGN: 147

static StructType getType(StructType partitionType) {
// IDs start at 100 to leave room for changes to ManifestEntry
Expand All @@ -142,7 +148,8 @@ static StructType getType(StructType partitionType) {
SORT_ORDER_ID,
REFERENCED_DATA_FILE,
CONTENT_OFFSET,
CONTENT_SIZE);
CONTENT_SIZE,
FIRST_ROW_ID);
}

/**
Expand Down
16 changes: 14 additions & 2 deletions api/src/main/java/org/apache/iceberg/ManifestFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,13 @@ public interface ManifestFile {
"Summary for each partition");
Types.NestedField KEY_METADATA =
optional(519, "key_metadata", Types.BinaryType.get(), "Encryption key metadata blob");
// next ID to assign: 520
Types.NestedField FIRST_ROW_ID =
optional(
520,
"first_row_id",
Types.LongType.get(),
"The starting `_row_id` to assign to rows added by `ADDED` data files");
// next ID to assign: 521

Schema SCHEMA =
new Schema(
Expand All @@ -105,7 +111,8 @@ public interface ManifestFile {
EXISTING_ROWS_COUNT,
DELETED_ROWS_COUNT,
PARTITION_SUMMARIES,
KEY_METADATA);
KEY_METADATA,
FIRST_ROW_ID);

static Schema schema() {
return SCHEMA;
Expand Down Expand Up @@ -196,6 +203,11 @@ default ByteBuffer keyMetadata() {
return null;
}

/** Returns the starting `_row_id` to assign to rows added by `ADDED` data files */
default Long firstRowId() {
return null;
}

/**
* Copies this {@link ManifestFile manifest file}. Readers can reuse manifest file instances; use
* this method to make defensive copies.
Expand Down
14 changes: 14 additions & 0 deletions api/src/main/java/org/apache/iceberg/Snapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -171,4 +171,18 @@ default Iterable<DeleteFile> removedDeleteFiles(FileIO io) {
default Integer schemaId() {
return null;
}

/**
* For row lineage. The first row identifier for the first newly added row within this snapshot.
* All rows created during this snapshot will be assigned an id larger than this value. This
* snapshot can contain rows added in previous snapshots with an id less than this value.
*
* <p>A null value indicates that row-lineage was not enabled when this snapshot was created.
*
* @return the first row-lineage row-id to be used in this snapshot or null if row-lineage was not
* enabled
*/
default Long firstRowId() {
return null;
}
}
20 changes: 18 additions & 2 deletions core/src/main/java/org/apache/iceberg/BaseFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public PartitionData copy() {
private String referencedDataFile = null;
private Long contentOffset = null;
private Long contentSizeInBytes = null;
private Long firstRowId = null;

// cached schema
private transient Schema avroSchema = null;
Expand Down Expand Up @@ -114,7 +115,8 @@ public PartitionData copy() {
DataFile.REFERENCED_DATA_FILE,
DataFile.CONTENT_OFFSET,
DataFile.CONTENT_SIZE,
MetadataColumns.ROW_POSITION);
MetadataColumns.ROW_POSITION,
DataFile.FIRST_ROW_ID);

/** Used by Avro reflection to instantiate this class when reading manifest files. */
BaseFile(Schema avroSchema) {
Expand Down Expand Up @@ -158,7 +160,8 @@ public PartitionData copy() {
ByteBuffer keyMetadata,
String referencedDataFile,
Long contentOffset,
Long contentSizeInBytes) {
Long contentSizeInBytes,
Long firstRowId) {
super(BASE_TYPE.fields().size());
this.partitionSpecId = specId;
this.content = content;
Expand Down Expand Up @@ -190,6 +193,7 @@ public PartitionData copy() {
this.referencedDataFile = referencedDataFile;
this.contentOffset = contentOffset;
this.contentSizeInBytes = contentSizeInBytes;
this.firstRowId = firstRowId;
}

/**
Expand Down Expand Up @@ -245,6 +249,7 @@ public PartitionData copy() {
this.referencedDataFile = toCopy.referencedDataFile;
this.contentOffset = toCopy.contentOffset;
this.contentSizeInBytes = toCopy.contentSizeInBytes;
this.firstRowId = toCopy.firstRowId;
}

/** Constructor for Java serialization. */
Expand Down Expand Up @@ -365,6 +370,9 @@ protected <T> void internalSet(int pos, T value) {
case 20:
this.fileOrdinal = (long) value;
return;
case 21:
this.firstRowId = (Long) value;
return;
default:
// ignore the object, it must be from a newer version of the format
}
Expand Down Expand Up @@ -419,6 +427,8 @@ private Object getByPos(int basePos) {
return contentSizeInBytes;
case 20:
return fileOrdinal;
case 21:
return firstRowId;
default:
throw new UnsupportedOperationException("Unknown field ordinal: " + basePos);
}
Expand Down Expand Up @@ -556,6 +566,11 @@ public Long contentSizeInBytes() {
return contentSizeInBytes;
}

@Override
public Long firstRowId() {
return firstRowId;
}

private static <K, V> Map<K, V> copyMap(Map<K, V> map, Set<K> keys) {
return keys == null ? SerializableMap.copyOf(map) : SerializableMap.filteredCopyOf(map, keys);
}
Expand Down Expand Up @@ -610,6 +625,7 @@ public String toString() {
.add("referenced_data_file", referencedDataFile == null ? "null" : referencedDataFile)
.add("content_offset", contentOffset == null ? "null" : contentOffset)
.add("content_size_in_bytes", contentSizeInBytes == null ? "null" : contentSizeInBytes)
.add("first_row_id", firstRowId == null ? "null" : firstRowId)
.toString();
}
}
14 changes: 12 additions & 2 deletions core/src/main/java/org/apache/iceberg/BaseSnapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class BaseSnapshot implements Snapshot {
private final Map<String, String> summary;
private final Integer schemaId;
private final String[] v1ManifestLocations;
private final Long firstRowId;

// lazily initialized
private transient List<ManifestFile> allManifests = null;
Expand All @@ -61,7 +62,8 @@ class BaseSnapshot implements Snapshot {
String operation,
Map<String, String> summary,
Integer schemaId,
String manifestList) {
String manifestList,
Long firstRowId) {
this.sequenceNumber = sequenceNumber;
this.snapshotId = snapshotId;
this.parentId = parentId;
Expand All @@ -71,6 +73,7 @@ class BaseSnapshot implements Snapshot {
this.schemaId = schemaId;
this.manifestListLocation = manifestList;
this.v1ManifestLocations = null;
this.firstRowId = firstRowId;
}

BaseSnapshot(
Expand All @@ -81,7 +84,8 @@ class BaseSnapshot implements Snapshot {
String operation,
Map<String, String> summary,
Integer schemaId,
String[] v1ManifestLocations) {
String[] v1ManifestLocations,
Long firstRowId) {
this.sequenceNumber = sequenceNumber;
this.snapshotId = snapshotId;
this.parentId = parentId;
Expand All @@ -91,6 +95,7 @@ class BaseSnapshot implements Snapshot {
this.schemaId = schemaId;
this.manifestListLocation = null;
this.v1ManifestLocations = v1ManifestLocations;
this.firstRowId = firstRowId;
}

@Override
Expand Down Expand Up @@ -220,6 +225,11 @@ public String manifestListLocation() {
return manifestListLocation;
}

@Override
public Long firstRowId() {
return firstRowId;
}

private void cacheDeleteFileChanges(FileIO fileIO) {
Preconditions.checkArgument(fileIO != null, "Cannot cache delete file changes: FileIO is null");

Expand Down
5 changes: 4 additions & 1 deletion core/src/main/java/org/apache/iceberg/ContentFileParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class ContentFileParser {
private static final String REFERENCED_DATA_FILE = "referenced-data-file";
private static final String CONTENT_OFFSET = "content-offset";
private static final String CONTENT_SIZE = "content-size-in-bytes";
private static final String FIRST_ROW_ID = "first-row-id";

private ContentFileParser() {}

Expand Down Expand Up @@ -167,6 +168,7 @@ static ContentFile<?> fromJson(JsonNode jsonNode, PartitionSpec spec) {
String referencedDataFile = JsonUtil.getStringOrNull(REFERENCED_DATA_FILE, jsonNode);
Long contentOffset = JsonUtil.getLongOrNull(CONTENT_OFFSET, jsonNode);
Long contentSizeInBytes = JsonUtil.getLongOrNull(CONTENT_SIZE, jsonNode);
Long firstRowId = JsonUtil.getLongOrNull(FIRST_ROW_ID, jsonNode);

if (fileContent == FileContent.DATA) {
return new GenericDataFile(
Expand All @@ -178,7 +180,8 @@ static ContentFile<?> fromJson(JsonNode jsonNode, PartitionSpec spec) {
metrics,
keyMetadata,
splitOffsets,
sortOrderId);
sortOrderId,
firstRowId);
} else {
return new GenericDeleteFile(
specId,
Expand Down
10 changes: 9 additions & 1 deletion core/src/main/java/org/apache/iceberg/DataFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ public static class Builder {
private ByteBuffer keyMetadata = null;
private List<Long> splitOffsets = null;
private Integer sortOrderId = SortOrder.unsorted().orderId();
private Long firstRowId = null;

public Builder(PartitionSpec spec) {
this.spec = spec;
Expand Down Expand Up @@ -201,6 +202,7 @@ public Builder copy(DataFile toCopy) {
this.splitOffsets =
toCopy.splitOffsets() == null ? null : ImmutableList.copyOf(toCopy.splitOffsets());
this.sortOrderId = toCopy.sortOrderId();
this.firstRowId = toCopy.firstRowId();
return this;
}

Expand Down Expand Up @@ -315,6 +317,11 @@ public Builder withSortOrder(SortOrder newSortOrder) {
return this;
}

public Builder withFirstRowId(long newFirstRowId) {
this.firstRowId = newFirstRowId;
return this;
}

public DataFile build() {
Preconditions.checkArgument(filePath != null, "File path is required");
if (format == null) {
Expand All @@ -340,7 +347,8 @@ public DataFile build() {
upperBounds),
keyMetadata,
splitOffsets,
sortOrderId);
sortOrderId,
firstRowId);
}
}
}
6 changes: 4 additions & 2 deletions core/src/main/java/org/apache/iceberg/GenericDataFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ class GenericDataFile extends BaseFile<DataFile> implements DataFile {
Metrics metrics,
ByteBuffer keyMetadata,
List<Long> splitOffsets,
Integer sortOrderId) {
Integer sortOrderId,
Long firstRowId) {
super(
specId,
FileContent.DATA,
Expand All @@ -67,7 +68,8 @@ class GenericDataFile extends BaseFile<DataFile> implements DataFile {
keyMetadata,
null /* no referenced data file */,
null /* no content offset */,
null /* no content size */);
null /* no content size */,
firstRowId);
}

/**
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/java/org/apache/iceberg/GenericDeleteFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ class GenericDeleteFile extends BaseFile<DeleteFile> implements DeleteFile {
keyMetadata,
referencedDataFile,
contentOffset,
contentSizeInBytes);
contentSizeInBytes,
null);
}

/**
Expand Down
Loading
Loading