Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

API, Core: Support keeping at most N snapshots in ExpireSnapshots #11879

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .palantir/revapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1146,6 +1146,10 @@ acceptedBreaks:
\ org.apache.iceberg.TableMetadata)"
justification: "Removing deprecated code"
"1.7.0":
org.apache.iceberg:iceberg-api:
- code: "java.method.addedToInterface"
new: "method org.apache.iceberg.ExpireSnapshots org.apache.iceberg.ExpireSnapshots::keepAtMost(int)"
justification: "Add new method"
org.apache.iceberg:iceberg-core:
- code: "java.method.removed"
old: "method <T extends org.apache.iceberg.StructLike> org.apache.iceberg.deletes.PositionDeleteIndex\
Expand Down
13 changes: 13 additions & 0 deletions api/src/main/java/org/apache/iceberg/ExpireSnapshots.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,24 @@ public interface ExpireSnapshots extends PendingUpdate<List<Snapshot>> {
/**
* Expires all snapshots older than the given timestamp.
*
* <p>A snapshot kept by this method can still be expired by {@link #keepAtMost(int)}.
*
* @param timestampMillis a long timestamp, as returned by {@link System#currentTimeMillis()}
* @return this for method chaining
*/
ExpireSnapshots expireOlderThan(long timestampMillis);

/**
* Keeps at most {@code numSnapshots} ancestors of the current snapshot and expires older
* ancestors.
*
* <p>A snapshot kept by this method can still be expired by {@link #expireOlderThan(long)}.
*
* @param numSnapshots the number of snapshots to keep at most
* @return this for method chaining
*/
ExpireSnapshots keepAtMost(int numSnapshots);

/**
* Retains the most recent ancestors of the current snapshot.
*
Expand Down
33 changes: 29 additions & 4 deletions core/src/main/java/org/apache/iceberg/RemoveSnapshots.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import static org.apache.iceberg.TableProperties.GC_ENABLED_DEFAULT;
import static org.apache.iceberg.TableProperties.MAX_REF_AGE_MS;
import static org.apache.iceberg.TableProperties.MAX_REF_AGE_MS_DEFAULT;
import static org.apache.iceberg.TableProperties.MAX_SNAPSHOTS_TO_KEEP;
import static org.apache.iceberg.TableProperties.MAX_SNAPSHOTS_TO_KEEP_DEFAULT;
import static org.apache.iceberg.TableProperties.MAX_SNAPSHOT_AGE_MS;
import static org.apache.iceberg.TableProperties.MAX_SNAPSHOT_AGE_MS_DEFAULT;
import static org.apache.iceberg.TableProperties.MIN_SNAPSHOTS_TO_KEEP;
Expand Down Expand Up @@ -80,6 +82,7 @@ public void accept(String file) {
private TableMetadata base;
private long defaultExpireOlderThan;
private int defaultMinNumSnapshots;
private int defaultMaxNumSnapshots;
private Consumer<String> deleteFunc = defaultDelete;
private ExecutorService deleteExecutorService = DEFAULT_DELETE_EXECUTOR_SERVICE;
private ExecutorService planExecutorService = ThreadPools.getWorkerPool();
Expand All @@ -102,6 +105,9 @@ public void accept(String file) {
this.defaultMinNumSnapshots =
PropertyUtil.propertyAsInt(
base.properties(), MIN_SNAPSHOTS_TO_KEEP, MIN_SNAPSHOTS_TO_KEEP_DEFAULT);
this.defaultMaxNumSnapshots =
PropertyUtil.propertyAsInt(
base.properties(), MAX_SNAPSHOTS_TO_KEEP, MAX_SNAPSHOTS_TO_KEEP_DEFAULT);

this.defaultMaxRefAgeMs =
PropertyUtil.propertyAsLong(base.properties(), MAX_REF_AGE_MS, MAX_REF_AGE_MS_DEFAULT);
Expand Down Expand Up @@ -131,6 +137,16 @@ public ExpireSnapshots expireOlderThan(long timestampMillis) {
return this;
}

@Override
public ExpireSnapshots keepAtMost(int numSnapshots) {
Preconditions.checkArgument(
1 <= numSnapshots,
"Number of snapshots to keep at most must be at least 1, cannot be: %s",
numSnapshots);
this.defaultMaxNumSnapshots = numSnapshots;
return this;
}

@Override
public ExpireSnapshots retainLast(int numSnapshots) {
Preconditions.checkArgument(
Expand Down Expand Up @@ -245,21 +261,30 @@ private Set<Long> computeAllBranchSnapshotsToRetain(Collection<SnapshotRef> refs
ref.maxSnapshotAgeMs() != null ? now - ref.maxSnapshotAgeMs() : defaultExpireOlderThan;
int minSnapshotsToKeep =
ref.minSnapshotsToKeep() != null ? ref.minSnapshotsToKeep() : defaultMinNumSnapshots;
int maxSnapshotsToKeep = Math.max(defaultMaxNumSnapshots, minSnapshotsToKeep);
branchSnapshotsToRetain.addAll(
computeBranchSnapshotsToRetain(
ref.snapshotId(), expireSnapshotsOlderThan, minSnapshotsToKeep));
ref.snapshotId(),
expireSnapshotsOlderThan,
minSnapshotsToKeep,
maxSnapshotsToKeep));
}
}

return branchSnapshotsToRetain;
}

private Set<Long> computeBranchSnapshotsToRetain(
long snapshot, long expireSnapshotsOlderThan, int minSnapshotsToKeep) {
long snapshot,
long expireSnapshotsOlderThan,
int minSnapshotsToKeep,
int maxSnapshotsToKeep) {
Set<Long> idsToRetain = Sets.newHashSet();
for (Snapshot ancestor : SnapshotUtil.ancestorsOf(snapshot, base::snapshot)) {
if (idsToRetain.size() < minSnapshotsToKeep
|| ancestor.timestampMillis() >= expireSnapshotsOlderThan) {
int retainSize = idsToRetain.size();
if (retainSize < minSnapshotsToKeep
|| (retainSize < maxSnapshotsToKeep
&& ancestor.timestampMillis() >= expireSnapshotsOlderThan)) {
idsToRetain.add(ancestor.snapshotId());
} else {
return idsToRetain;
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,9 @@ private TableProperties() {}
public static final String MIN_SNAPSHOTS_TO_KEEP = "history.expire.min-snapshots-to-keep";
public static final int MIN_SNAPSHOTS_TO_KEEP_DEFAULT = 1;

public static final String MAX_SNAPSHOTS_TO_KEEP = "history.expire.max-snapshots-to-keep";
public static final int MAX_SNAPSHOTS_TO_KEEP_DEFAULT = Integer.MAX_VALUE;

public static final String MAX_REF_AGE_MS = "history.expire.max-ref-age-ms";
public static final long MAX_REF_AGE_MS_DEFAULT = Long.MAX_VALUE;

Expand Down
Loading