diff --git a/bom/build.gradle.kts b/bom/build.gradle.kts index b068c557f40..d7cb70f8dde 100644 --- a/bom/build.gradle.kts +++ b/bom/build.gradle.kts @@ -88,6 +88,7 @@ dependencies { api(project(":nessie-versioned-storage-cassandra-tests")) api(project(":nessie-versioned-storage-cassandra2")) api(project(":nessie-versioned-storage-cassandra2-tests")) + api(project(":nessie-versioned-storage-cleanup")) api(project(":nessie-versioned-storage-common")) api(project(":nessie-versioned-storage-common-proto")) api(project(":nessie-versioned-storage-common-serialize")) diff --git a/gradle/projects.main.properties b/gradle/projects.main.properties index 6abd8850df9..d9051f58800 100644 --- a/gradle/projects.main.properties +++ b/gradle/projects.main.properties @@ -71,6 +71,7 @@ nessie-versioned-storage-cassandra=versioned/storage/cassandra nessie-versioned-storage-cassandra-tests=versioned/storage/cassandra-tests nessie-versioned-storage-cassandra2=versioned/storage/cassandra2 nessie-versioned-storage-cassandra2-tests=versioned/storage/cassandra2-tests +nessie-versioned-storage-cleanup=versioned/storage/cleanup nessie-versioned-storage-common=versioned/storage/common nessie-versioned-storage-common-proto=versioned/storage/common-proto nessie-versioned-storage-common-serialize=versioned/storage/common-serialize diff --git a/versioned/storage/cleanup/build.gradle.kts b/versioned/storage/cleanup/build.gradle.kts new file mode 100644 index 00000000000..fa457c18720 --- /dev/null +++ b/versioned/storage/cleanup/build.gradle.kts @@ -0,0 +1,61 @@ +/* + * Copyright (C) 2022 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +plugins { id("nessie-conventions-server") } + +publishingHelper { mavenName = "Nessie - Storage - Cleanup unreferenced objects" } + +description = "Identify and purge unreferenced objects in the Nessie repository." + +dependencies { + implementation(project(":nessie-model")) + implementation(project(":nessie-versioned-storage-common")) + implementation(project(":nessie-versioned-spi")) + implementation(project(":nessie-versioned-transfer-related")) + + compileOnly(libs.jakarta.validation.api) + compileOnly(libs.jakarta.annotation.api) + compileOnly(libs.microprofile.openapi) + + compileOnly(platform(libs.jackson.bom)) + compileOnly("com.fasterxml.jackson.core:jackson-annotations") + + compileOnly(libs.errorprone.annotations) + implementation(libs.guava) + implementation(libs.agrona) + implementation(libs.slf4j.api) + + compileOnly(project(":nessie-versioned-storage-testextension")) + + compileOnly(project(":nessie-immutables")) + annotationProcessor(project(":nessie-immutables", configuration = "processor")) + + testImplementation(project(":nessie-versioned-storage-testextension")) + testImplementation(project(":nessie-versioned-storage-inmemory")) + testImplementation(project(":nessie-versioned-tests")) + testImplementation(project(path = ":nessie-protobuf-relocated", configuration = "shadow")) + testImplementation(platform(libs.junit.bom)) + testImplementation(libs.bundles.junit.testing) + testRuntimeOnly(libs.logback.classic) + + testCompileOnly(project(":nessie-immutables")) + testAnnotationProcessor(project(":nessie-immutables", configuration = "processor")) + + testCompileOnly(libs.microprofile.openapi) + + testCompileOnly(platform(libs.jackson.bom)) + testCompileOnly("com.fasterxml.jackson.core:jackson-annotations") +} diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/Cleanup.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/Cleanup.java new file mode 100644 index 00000000000..57f1bf1cfc4 --- /dev/null +++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/Cleanup.java @@ -0,0 +1,111 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +import static org.projectnessie.versioned.storage.cleanup.PurgeFilter.ReferencedObjectsPurgeFilter.referencedObjectsPurgeFilter; +import static org.projectnessie.versioned.storage.cleanup.ReferencedObjectsContext.objectsResolverContext; + +import org.projectnessie.versioned.storage.common.persist.Obj; +import org.projectnessie.versioned.storage.common.persist.Persist; + +/** + * Primary point of entry to remove unreferenced objects from Nessie's backend database. + * + *

Simplified example code flow:

+ * var params =
+ *   CleanupParams.builder().build();
+ * var cleanup =
+ *   createCleanup(params);
+ *
+ * var referencedObjectsContext =
+ *   cleanup.buildReferencedObjectsContext(persist,
+ *   TimeUnit.MILLISECONDS.toMicros(
+ *     Instant.now().minus(3, ChronoUnit.DAYS)
+ *       .toEpochMilli()));
+ * var referencedObjectsResolver =
+ *   cleanup.createReferencedObjectsResolver(referencedObjectsContext);
+ *
+ * // Must handle MustRestartWithBiggerFilterException
+ * var resolveResult =
+ *   referencedObjectsResolver.resolve();
+ *
+ * var purgeObjects =
+ *   cleanup.createPurgeObjects(resolveResult.purgeObjectsContext());
+ * var purgeResult =
+ *   purgeObjects.purge();
+ * 
+ */ +public class Cleanup { + private final CleanupParams cleanupParams; + + private Cleanup(CleanupParams cleanupParams) { + this.cleanupParams = cleanupParams; + } + + public static Cleanup createCleanup(CleanupParams params) { + return new Cleanup(params); + } + + /** + * Create the context holder used when identifying referenced objects and purging unreferenced + * objects. + * + *

Choosing an appropriate value for {@code maxObjReferenced} is crucial. Technically, this + * value must be at max the current timestamp - but logically {@code maxObjReferenced} should be + * the timestamp of a few days ago to not delete unreferenced objects too early and give users a + * chance to reset branches to another commit ID in case some table/view metadata is broken. + * + *

Uses an instance of {@link + * org.projectnessie.versioned.storage.cleanup.PurgeFilter.ReferencedObjectsPurgeFilter} using a + * bloom filter based {@link ReferencedObjectsFilter}, both configured using {@link + * CleanupParams}'s attributes. + * + * @param persist the persistence/repository to run against + * @param maxObjReferenced only {@link Obj}s with a {@link Obj#referenced()} older than {@code + * maxObjReferenced} will be deleted. Production workloads should set this to something like + * "now minus 7 days" to have the chance to reset branches, just in case. Technically, this + * value must not be greater than "now". "Now" should be inquired using {@code + * Persist.config().clock().instant()}. + */ + public ReferencedObjectsContext buildReferencedObjectsContext( + Persist persist, long maxObjReferenced) { + var referencedObjects = new ReferencedObjectsFilterImpl(cleanupParams); + var purgeFilter = referencedObjectsPurgeFilter(referencedObjects, maxObjReferenced); + return objectsResolverContext(persist, cleanupParams, referencedObjects, purgeFilter); + } + + /** + * Creates a new objects-resolver instance to identify referenced objects, which must be + * retained. + * + * @param objectsResolverContext context, preferably created using {@link + * #buildReferencedObjectsContext(Persist, long)} + */ + public ReferencedObjectsResolver createReferencedObjectsResolver( + ReferencedObjectsContext objectsResolverContext) { + return new ReferencedObjectsResolverImpl( + objectsResolverContext, cleanupParams.rateLimitFactory()); + } + + /** + * Creates a new objects-purger instance to delete unreferenced objects. + * + * @param purgeObjectsContext return value of {@link ReferencedObjectsResolver#resolve()}. + */ + public PurgeObjects createPurgeObjects(PurgeObjectsContext purgeObjectsContext) { + return new PurgeObjectsImpl(purgeObjectsContext, cleanupParams.rateLimitFactory()); + } +} diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/CleanupParams.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/CleanupParams.java new file mode 100644 index 00000000000..1814259ef69 --- /dev/null +++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/CleanupParams.java @@ -0,0 +1,195 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +import static org.projectnessie.versioned.storage.common.logic.InternalRef.REF_REFS; +import static org.projectnessie.versioned.storage.common.logic.InternalRef.REF_REPO; +import static org.projectnessie.versioned.transfer.related.CompositeTransferRelatedObjects.createCompositeTransferRelatedObjects; + +import java.util.List; +import java.util.function.IntFunction; +import org.immutables.value.Value; +import org.projectnessie.nessie.immutables.NessieImmutable; +import org.projectnessie.versioned.storage.common.objtypes.CommitObj; +import org.projectnessie.versioned.storage.common.persist.Obj; +import org.projectnessie.versioned.storage.common.persist.ObjId; +import org.projectnessie.versioned.transfer.related.TransferRelatedObjects; + +/** + * Technically and implementation oriented parameters for Nessie's backend database cleanup, + * considered for internal use only. + * + *

Any API or functionality that exposes Nessie's backend database cleanup must provide a + * functionally oriented way for configuration and generate a {@link CleanupParams} from it. + */ +@NessieImmutable +public interface CleanupParams { + // Following defaults result in a serialized bloom filter size of about 3000000 bytes. + long DEFAULT_EXPECTED_OBJ_COUNT = 1_000_000L; + double DEFAULT_FALSE_POSITIVE_PROBABILITY = 0.00001d; + double DEFAULT_ALLOWED_FALSE_POSITIVE_PROBABILITY = 0.0001d; + boolean DEFAULT_ALLOW_DUPLICATE_COMMIT_TRAVERSALS = false; + int DEFAULT_PENDING_OBJS_BATCH_SIZE = 20; + int DEFAULT_RECENT_OBJ_IDS_FILTER_SIZE = 100_000; + + static ImmutableCleanupParams.Builder builder() { + return ImmutableCleanupParams.builder(); + } + + /** + * Number of expected {@link Obj}s, defaults to {@value #DEFAULT_EXPECTED_OBJ_COUNT}, used to size + * the bloom filter identifying the referenced {@link Obj}s. If {@link + * ReferencedObjectsResolver#resolve()} throws {@link MustRestartWithBiggerFilterException}, it is + * recommended to increase this value. + */ + @Value.Default + default long expectedObjCount() { + return DEFAULT_EXPECTED_OBJ_COUNT; + } + + /** + * Returns an updated instance of {@code this} value with {@link #expectedObjCount()} increased by + * {@value #DEFAULT_EXPECTED_OBJ_COUNT} as a convenience function to handle {@link + * MustRestartWithBiggerFilterException} thrown by {@link ReferencedObjectsResolver#resolve()} . + */ + default CleanupParams withIncreasedExpectedObjCount() { + return builder() + .from(this) + .expectedObjCount(expectedObjCount() + DEFAULT_EXPECTED_OBJ_COUNT) + .build(); + } + + /** + * Related to {@link #expectedObjCount()}, used to size the bloom filter identifying the + * referenced {@link Obj}s, defaults to {@value #DEFAULT_FALSE_POSITIVE_PROBABILITY}. + */ + @Value.Default + default double falsePositiveProbability() { + return DEFAULT_FALSE_POSITIVE_PROBABILITY; + } + + /** + * Maximum allowed FPP, checked when adding to the bloom filter identifying the referenced {@link + * Obj}s, defaults to {@value #DEFAULT_ALLOWED_FALSE_POSITIVE_PROBABILITY}. If this value is + * exceeded, a {@link MustRestartWithBiggerFilterException} will be thrown from {@link + * ReferencedObjectsResolver#resolve()}. + */ + @Value.Default + default double allowedFalsePositiveProbability() { + return DEFAULT_ALLOWED_FALSE_POSITIVE_PROBABILITY; + } + + /** Helper functionality to identify related {@link Obj}s, see {@link TransferRelatedObjects}. */ + @Value.Default + default TransferRelatedObjects relatedObjects() { + return createCompositeTransferRelatedObjects(); + } + + /** + * {@link ReferencedObjectsResolver} tries to not walk a commit more than once by memoizing the + * visited {@link CommitObj#id() commit IDs}, default is {@link + * #DEFAULT_ALLOW_DUPLICATE_COMMIT_TRAVERSALS}. Setting this to {@code true} disables this + * optimization. + */ + @Value.Default + default boolean allowDuplicateCommitTraversals() { + return DEFAULT_ALLOW_DUPLICATE_COMMIT_TRAVERSALS; + } + + /** + * Rate limit for commit objects per second during {@link ReferencedObjectsResolver#resolve()}, + * default is unlimited. Any positive value enables rate limiting, any value {@code <=0} disables + * rate limiting. + */ + @Value.Default + default int resolveCommitRatePerSecond() { + return 0; + } + + /** + * Rate limit for (non commit) objects per second during {@link + * ReferencedObjectsResolver#resolve()}, default is unlimited. Any positive value enables rate + * limiting, any value {@code <=0} disables rate limiting. + */ + @Value.Default + default int resolveObjRatePerSecond() { + return 0; + } + + /** + * Rate limit for scanning objects per second during {@link PurgeObjects#purge()}, default is + * unlimited. Any positive value enables rate limiting, any value {@code <=0} disables rate + * limiting. + */ + @Value.Default + default int purgeScanObjRatePerSecond() { + return 0; + } + + /** + * Rate limit for purging objects per second during {@link PurgeObjects#purge()}, default is + * unlimited. Any positive value enables rate limiting, any value {@code <=0} disables rate + * limiting. + */ + @Value.Default + default int purgeDeleteObjRatePerSecond() { + return 0; + } + + /** + * {@link ReferencedObjectsResolver} attempts to fetch objects from the backend database in + * batches, this parameter defines the batch size, defaults to {@link + * #DEFAULT_PENDING_OBJS_BATCH_SIZE}. + */ + @Value.Default + default int pendingObjsBatchSize() { + return DEFAULT_PENDING_OBJS_BATCH_SIZE; + } + + /** + * Size of the "recent object IDs" filter to prevent processing the same {@link ObjId}s. This * + * happens, when the values referenced from the commit index are iterated, because it iterates * + * over all keys, not only the keys added by a particular commit. + * + *

The value defaults to {@value #DEFAULT_RECENT_OBJ_IDS_FILTER_SIZE}. It should be higher than + * the maximum number of keys in a commit. + */ + @Value.Default + default int recentObjIdsFilterSize() { + return DEFAULT_RECENT_OBJ_IDS_FILTER_SIZE; + } + + /** Rate limiter factory for the rate limits defined above, useful for testing purposes. */ + @Value.Default + default IntFunction rateLimitFactory() { + return RateLimit::create; + } + + /** Defines the names of the Nessie internal references, do not change. */ + @Value.Default + default List internalReferenceNames() { + return List.of(REF_REFS.name(), REF_REPO.name()); + } + + /** + * Optionally enable a dry-run mode, which does not delete any objects from the backend database, + * defaults to {@code false}. + */ + @Value.Default + default boolean dryRun() { + return false; + } +} diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/HeapSizes.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/HeapSizes.java new file mode 100644 index 00000000000..70b2c7c0976 --- /dev/null +++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/HeapSizes.java @@ -0,0 +1,162 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +import static java.lang.String.format; + +final class HeapSizes { + private HeapSizes() {} + + /* + org.agrona.collections.ObjectHashSet object internals: + OFF SZ TYPE DESCRIPTION VALUE + 0 8 (object header: mark) N/A + 8 4 (object header: class) N/A + 12 4 float ObjectHashSet.loadFactor N/A + 16 4 int ObjectHashSet.resizeThreshold N/A + 20 4 int ObjectHashSet.size N/A + 24 1 boolean ObjectHashSet.shouldAvoidAllocation N/A + 25 7 (alignment/padding gap) + 32 8 java.lang.Object[] ObjectHashSet.values N/A + 40 8 org.agrona.collections.ObjectHashSet.ObjectIterator ObjectHashSet.iterator N/A + 48 8 java.util.function.IntConsumer ObjectHashSet.resizeNotifier N/A + 56 8 (object alignment gap) + Instance size: 64 bytes + Space losses: 7 bytes internal + 8 bytes external = 15 bytes total + */ + static final long HEAP_SIZE_OBJECT_HASH_SET = 64L; + /* + org.projectnessie.versioned.storage.common.persist.ObjId$ObjId256 object internals: + OFF SZ TYPE DESCRIPTION VALUE + 0 8 (object header: mark) N/A + 8 4 (object header: class) N/A + 12 4 (alignment/padding gap) + 16 8 long ObjId256.l0 N/A + 24 8 long ObjId256.l1 N/A + 32 8 long ObjId256.l2 N/A + 40 8 long ObjId256.l3 N/A + Instance size: 48 bytes + Space losses: 4 bytes internal + 0 bytes external = 4 bytes total + */ + static final long HEAP_SIZE_OBJ_ID = 48L; + /* + long[] : 16 + 8*length + */ + static final long HEAP_SIZE_PRIMITIVE_OBJ_ARRAY = 16L; + + /* + com.google.common.hash.BloomFilter object internals: + OFF SZ TYPE DESCRIPTION VALUE + 0 8 (object header: mark) N/A + 8 8 (object header: class) N/A + 16 4 int BloomFilter.numHashFunctions N/A + 20 4 (alignment/padding gap) + 24 8 com.google.common.hash.BloomFilterStrategies.LockFreeBitArray BloomFilter.bits N/A + 32 8 com.google.common.hash.Funnel BloomFilter.funnel N/A + 40 8 com.google.common.hash.BloomFilter.Strategy BloomFilter.strategy N/A + Instance size: 48 bytes + Space losses: 4 bytes internal + 0 bytes external = 4 bytes total + */ + static final long HEAP_SIZE_BLOOM_FILTER = 48L; + /* + com.google.common.hash.BloomFilterStrategies$LockFreeBitArray object internals: + OFF SZ TYPE DESCRIPTION VALUE + 0 8 (object header: mark) N/A + 8 8 (object header: class) N/A + 16 8 java.util.concurrent.atomic.AtomicLongArray LockFreeBitArray.data N/A + 24 8 com.google.common.hash.LongAddable LockFreeBitArray.bitCount N/A + Instance size: 32 bytes + Space losses: 0 bytes internal + 0 bytes external = 0 bytes total + */ + static final long HEAP_SIZE_BIT_ARRAY = 32L; + /* + We assume that com.google.common.hash.LongAddables uses the pure-Java implementation, not Guava's + heap-expensive LongAdder implementation based on its Striped64 with 144 bytes per cell. + + java.util.concurrent.atomic.AtomicLong object internals (com.google.common.hash.LongAddables.PureJavaLongAddable): + OFF SZ TYPE DESCRIPTION VALUE + 0 8 (object header: mark) N/A + 8 4 (object header: class) N/A + 12 4 (alignment/padding gap) + 16 8 long AtomicLong.value N/A + 24 8 (object alignment gap) + Instance size: 32 bytes + Space losses: 4 bytes internal + 8 bytes external = 12 bytes total + */ + static final long HEAP_SIZE_LONG_ADDER = 40L; + /* + java.util.concurrent.atomic.AtomicLongArray object internals: + OFF SZ TYPE DESCRIPTION VALUE + 0 8 (object header: mark) N/A + 8 4 (object header: class) N/A + 12 4 (alignment/padding gap) + 16 8 long[] AtomicLongArray.array N/A + 24 8 (object alignment gap) + Instance size: 32 bytes + Space losses: 4 bytes internal + 8 bytes external = 12 bytes total + */ + static final long HEAP_SIZE_ATOMIC_LONG_ARRAY = 32L; + /* + long[] : 16 + 8*length + */ + static final long HEAP_SIZE_PRIMITIVE_LONG_ARRAY = 16L; + + /* + java.util.LinkedHashMap object internals: + OFF SZ TYPE DESCRIPTION VALUE + 0 8 (object header: mark) N/A + 8 4 (object header: class) N/A + 12 4 int HashMap.size N/A + 16 8 java.util.Set AbstractMap.keySet N/A + 24 8 java.util.Collection AbstractMap.values N/A + 32 4 int HashMap.modCount N/A + 36 4 int HashMap.threshold N/A + 40 4 float HashMap.loadFactor N/A + 44 4 int LinkedHashMap.putMode N/A + 48 8 java.util.HashMap.Node[] HashMap.table N/A + 56 8 java.util.Set HashMap.entrySet N/A + 64 1 boolean LinkedHashMap.accessOrder N/A + 65 7 (alignment/padding gap) + 72 8 java.util.LinkedHashMap.Entry LinkedHashMap.head N/A + 80 8 java.util.LinkedHashMap.Entry LinkedHashMap.tail N/A + 88 8 (object alignment gap) + Instance size: 96 bytes + Space losses: 7 bytes internal + 8 bytes external = 15 bytes total + */ + static final long HEAP_SIZE_LINKED_HASH_MAP = 96L; + /* + java.util.LinkedHashMap$Entry object internals: + OFF SZ TYPE DESCRIPTION VALUE + 0 8 (object header: mark) N/A + 8 8 (object header: class) N/A + 16 4 int Node.hash N/A + 20 4 (alignment/padding gap) + 24 8 java.lang.Object Node.key N/A + 32 8 java.lang.Object Node.value N/A + 40 8 java.util.HashMap.Node Node.next N/A + 48 8 java.util.LinkedHashMap.Entry Entry.before N/A + 56 8 java.util.LinkedHashMap.Entry Entry.after N/A + Instance size: 64 bytes + Space losses: 4 bytes internal + 0 bytes external = 4 bytes total + */ + static final long HEAP_SIZE_LINKED_HASH_MAP_ENTRY = 64L; + + static final long HEAP_SIZE_POINTER = 8L; + + static String memSizeToStringMB(long bytes) { + return format("%.1f M", ((double) bytes) / 1024L / 1024L); + } +} diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/MustRestartWithBiggerFilterException.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/MustRestartWithBiggerFilterException.java new file mode 100644 index 00000000000..7349b9983c4 --- /dev/null +++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/MustRestartWithBiggerFilterException.java @@ -0,0 +1,27 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +/** + * Thrown when the bloom filter's FPP is above the configured threshold when adding IDs. If this + * exception is encountered, the current garbage-collection run must be aborted and + * restarted with a bigger {@link CleanupParams#expectedObjCount()} value. + */ +public class MustRestartWithBiggerFilterException extends Exception { + public MustRestartWithBiggerFilterException(String msg, Throwable cause) { + super(msg, cause); + } +} diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/MustRestartWithBiggerFilterRuntimeException.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/MustRestartWithBiggerFilterRuntimeException.java new file mode 100644 index 00000000000..a981b366b09 --- /dev/null +++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/MustRestartWithBiggerFilterRuntimeException.java @@ -0,0 +1,26 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +/** + * Internally used unchecked exception to eventually be "wrapped" by a checked {@link + * MustRestartWithBiggerFilterException}. + */ +class MustRestartWithBiggerFilterRuntimeException extends RuntimeException { + public MustRestartWithBiggerFilterRuntimeException(String msg) { + super(msg); + } +} diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeFilter.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeFilter.java new file mode 100644 index 00000000000..ad5cb5409da --- /dev/null +++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeFilter.java @@ -0,0 +1,71 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +import jakarta.validation.constraints.NotNull; +import java.util.List; +import org.projectnessie.nessie.immutables.NessieImmutable; +import org.projectnessie.versioned.storage.common.persist.Obj; + +/** Filter to decide whether an {@link Obj} must be kept or whether it can be deleted. */ +public interface PurgeFilter { + boolean mustKeep(@NotNull Obj obj); + + @NessieImmutable + interface CompositePurgeFilter extends PurgeFilter { + List filters(); + + static CompositePurgeFilter compositePurgeFilter(PurgeFilter... filters) { + return ImmutableCompositePurgeFilter.of(List.of(filters)); + } + + static CompositePurgeFilter compositePurgeFilter(List filters) { + return ImmutableCompositePurgeFilter.of(filters); + } + + @Override + default boolean mustKeep(Obj obj) { + for (PurgeFilter filter : filters()) { + if (filter.mustKeep(obj)) { + return true; + } + } + return false; + } + } + + /** + * Recommended default purge filter, which considers a {@link ReferencedObjectsFilter} and a + * maximum value of {@link Obj#referenced()}. + */ + @NessieImmutable + interface ReferencedObjectsPurgeFilter extends PurgeFilter { + ReferencedObjectsFilter referencedObjects(); + + long maxObjReferenced(); + + static ReferencedObjectsPurgeFilter referencedObjectsPurgeFilter( + ReferencedObjectsFilter referencedObjects, long maxObjReferenced) { + return ImmutableReferencedObjectsPurgeFilter.of(referencedObjects, maxObjReferenced); + } + + @Override + default boolean mustKeep(Obj obj) { + return obj.referenced() > maxObjReferenced() + || referencedObjects().isProbablyReferenced(obj.id()); + } + } +} diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeObjects.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeObjects.java new file mode 100644 index 00000000000..57f230258c1 --- /dev/null +++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeObjects.java @@ -0,0 +1,31 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +public interface PurgeObjects { + PurgeResult purge(); + + /** Return the current statistics, returns a result after {@link #purge()} threw an exception. */ + PurgeStats getStats(); + + /** + * Returns the estimated maximum heap pressure of this object tree. Considers the data + * structured that are required for the purge operation to work, a subset of the structures + * required for {@link ReferencedObjectsResolver#resolve()}. It is wrong to use the sum of {@link + * ReferencedObjectsResolver#estimatedHeapPressure()} and this value. + */ + long estimatedHeapPressure(); +} diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeObjectsContext.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeObjectsContext.java new file mode 100644 index 00000000000..1d986a69573 --- /dev/null +++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeObjectsContext.java @@ -0,0 +1,58 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +import jakarta.validation.constraints.NotNull; +import org.projectnessie.nessie.immutables.NessieImmutable; +import org.projectnessie.versioned.storage.common.persist.Persist; + +/** + * Holds the data structures and parameters that are needed for the {@linkplain PurgeObjects purge + * operation}. + * + *

Once the {@linkplain ReferencedObjectsResolver referenced objects have been resolved}, the + * data structures that are not needed for the purge operation should become eligible for Java GC, + * which is why this context object exists and holds less information than {@link + * ReferencedObjectsContext}. + */ +@NessieImmutable +public interface PurgeObjectsContext { + @NotNull + Persist persist(); + + @NotNull + ReferencedObjectsFilter referencedObjects(); + + @NotNull + PurgeFilter purgeFilter(); + + int scanObjRatePerSecond(); + + int deleteObjRatePerSecond(); + + static PurgeObjectsContext purgeObjectsContext( + ReferencedObjectsContext referencedObjectsContext) { + return ImmutablePurgeObjectsContext.of( + referencedObjectsContext.persist(), + referencedObjectsContext.referencedObjects(), + referencedObjectsContext.purgeFilter(), + referencedObjectsContext.params().purgeScanObjRatePerSecond(), + referencedObjectsContext.params().purgeDeleteObjRatePerSecond(), + referencedObjectsContext.params().dryRun()); + } + + boolean dryRun(); +} diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeObjectsImpl.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeObjectsImpl.java new file mode 100644 index 00000000000..f443da1c164 --- /dev/null +++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeObjectsImpl.java @@ -0,0 +1,139 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +import static com.google.common.base.Preconditions.checkState; +import static org.projectnessie.versioned.storage.cleanup.HeapSizes.memSizeToStringMB; + +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.IntFunction; +import org.projectnessie.versioned.storage.common.persist.CloseableIterator; +import org.projectnessie.versioned.storage.common.persist.Obj; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +final class PurgeObjectsImpl implements PurgeObjects { + private static final Logger LOGGER = LoggerFactory.getLogger(PurgeObjectsImpl.class); + + private final PurgeObjectsContext purgeObjectsContext; + private final PurgeStatsBuilder stats; + private final AtomicBoolean used = new AtomicBoolean(); + private final RateLimit scanRateLimiter; + private final RateLimit purgeRateLimiter; + + public PurgeObjectsImpl( + PurgeObjectsContext purgeObjectsContext, IntFunction rateLimitIntFunction) { + this.purgeObjectsContext = purgeObjectsContext; + this.stats = new PurgeStatsBuilder(); + this.scanRateLimiter = rateLimitIntFunction.apply(purgeObjectsContext.scanObjRatePerSecond()); + this.purgeRateLimiter = + rateLimitIntFunction.apply(purgeObjectsContext.deleteObjRatePerSecond()); + } + + @Override + public PurgeResult purge() { + checkState(used.compareAndSet(false, true), "resolve() has already been called."); + + var purgeFilter = purgeObjectsContext.purgeFilter(); + var persist = purgeObjectsContext.persist(); + var clock = persist.config().clock(); + + LOGGER.info( + "Purging unreferenced objects in repository '{}', scanning {} objects per second, deleting {} objects per second, estimated context heap pressure: {}", + persist.config().repositoryId(), + scanRateLimiter, + purgeRateLimiter, + memSizeToStringMB(estimatedHeapPressure())); + + PurgeStats finalStats = null; + try { + stats.started = clock.instant(); + try (CloseableIterator iter = persist.scanAllObjects(Set.of())) { + while (iter.hasNext()) { + scanRateLimiter.acquire(); + stats.numScannedObjs++; + var obj = iter.next(); + if (purgeFilter.mustKeep(obj)) { + continue; + } + + purgeRateLimiter.acquire(); + purgeObj(obj); + } + } catch (RuntimeException e) { + stats.failure = e; + } finally { + stats.ended = clock.instant(); + finalStats = stats.build(); + } + + LOGGER.info( + "Successfully finished purging unreferenced objects after {} in repository '{}', purge stats: {}, estimated context heap pressure: {}", + finalStats.duration(), + persist.config().repositoryId(), + finalStats, + memSizeToStringMB(estimatedHeapPressure())); + } catch (RuntimeException e) { + if (finalStats != null) { + LOGGER.warn( + "Error while purging unreferenced objects after {} in repository '{}', purge stats: {}, estimated context heap pressure: {}", + finalStats.duration(), + persist.config().repositoryId(), + finalStats, + memSizeToStringMB(estimatedHeapPressure()), + e); + } else { + LOGGER.warn( + "Error while purging unreferenced objects in repository '{}'", + persist.config().repositoryId(), + stats.failure); + } + throw e; + } + + return ImmutablePurgeResult.of(stats.build()); + } + + @Override + public PurgeStats getStats() { + return stats.build(); + } + + @Override + public long estimatedHeapPressure() { + return purgeObjectsContext.referencedObjects().estimatedHeapPressure(); + } + + private void purgeObj(Obj obj) { + // TODO delete in parallel (multiple threads) + stats.numPurgedObjs++; + + var persist = purgeObjectsContext.persist(); + + var objType = obj.type(); + LOGGER.trace( + "Deleting obj {} of type {}/{} in repository '{}'", + obj.id(), + objType.name(), + objType.shortName(), + persist.config().repositoryId()); + + if (!purgeObjectsContext.dryRun()) { + persist.deleteWithReferenced(obj); + } + } +} diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeResult.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeResult.java new file mode 100644 index 00000000000..2e94a6e237b --- /dev/null +++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeResult.java @@ -0,0 +1,23 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +import org.projectnessie.nessie.immutables.NessieImmutable; + +@NessieImmutable +public interface PurgeResult { + PurgeStats stats(); +} diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeStats.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeStats.java new file mode 100644 index 00000000000..f597503e25d --- /dev/null +++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeStats.java @@ -0,0 +1,43 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +import java.time.Duration; +import java.time.Instant; +import java.util.Optional; +import org.projectnessie.nessie.immutables.NessieImmutable; + +@NessieImmutable +public interface PurgeStats { + Instant started(); + + Instant ended(); + + default Duration duration() { + return Duration.between(started(), ended()); + } + + /** Number of objects handled while scanning the Nessie repository. */ + long numScannedObjs(); + + /** + * Number of purged (deleted) objects. For a {@linkplain CleanupParams#dryRun() dry-run}, this + * value indicates the number of objects that would have been deleted. + */ + long numPurgedObjs(); + + Optional failure(); +} diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeStatsBuilder.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeStatsBuilder.java new file mode 100644 index 00000000000..b318b257f7f --- /dev/null +++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeStatsBuilder.java @@ -0,0 +1,34 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +import java.time.Instant; +import java.util.Optional; + +final class PurgeStatsBuilder { + Instant started; + Instant ended; + + Exception failure; + + long numScannedObjs; + long numPurgedObjs; + + PurgeStats build() { + return ImmutablePurgeStats.of( + started, ended, numScannedObjs, numPurgedObjs, Optional.ofNullable(failure)); + } +} diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/RateLimit.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/RateLimit.java new file mode 100644 index 00000000000..8df6112a16c --- /dev/null +++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/RateLimit.java @@ -0,0 +1,50 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +import com.google.common.util.concurrent.RateLimiter; + +public interface RateLimit { + void acquire(); + + @SuppressWarnings("UnstableApiUsage") + static RateLimit create(int ratePerSecond) { + if (ratePerSecond <= 0) { + return new RateLimit() { + @Override + public void acquire() {} + + @Override + public String toString() { + return "unlimited"; + } + }; + } + return new RateLimit() { + final RateLimiter limiter = RateLimiter.create(ratePerSecond); + + @Override + public void acquire() { + limiter.acquire(); + } + + @Override + public String toString() { + return "up to " + ratePerSecond; + } + }; + } +} diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/RecentObjIdFilter.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/RecentObjIdFilter.java new file mode 100644 index 00000000000..171a9ac74da --- /dev/null +++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/RecentObjIdFilter.java @@ -0,0 +1,27 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +import org.projectnessie.versioned.storage.common.persist.ObjId; + +public interface RecentObjIdFilter { + boolean add(ObjId id); + + boolean contains(ObjId id); + + /** Returns the estimated maximum heap pressure of this object tree. */ + long estimatedHeapPressure(); +} diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/RecentObjIdFilterImpl.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/RecentObjIdFilterImpl.java new file mode 100644 index 00000000000..bac4ff59fea --- /dev/null +++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/RecentObjIdFilterImpl.java @@ -0,0 +1,73 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +import static org.projectnessie.versioned.storage.cleanup.HeapSizes.HEAP_SIZE_LINKED_HASH_MAP; +import static org.projectnessie.versioned.storage.cleanup.HeapSizes.HEAP_SIZE_LINKED_HASH_MAP_ENTRY; +import static org.projectnessie.versioned.storage.cleanup.HeapSizes.HEAP_SIZE_OBJ_ID; +import static org.projectnessie.versioned.storage.cleanup.HeapSizes.HEAP_SIZE_POINTER; +import static org.projectnessie.versioned.storage.cleanup.HeapSizes.HEAP_SIZE_PRIMITIVE_LONG_ARRAY; + +import java.util.LinkedHashMap; +import java.util.Map; +import org.projectnessie.versioned.storage.common.persist.ObjId; + +final class RecentObjIdFilterImpl implements RecentObjIdFilter { + private static final Object PRESENT = new Object(); + + private final LinkedHashMap recentObjIds; + private final long estimatedHeapPressure; + + public RecentObjIdFilterImpl(int recentObjIdsFilterSize) { + int capacity = (int) Math.ceil(recentObjIdsFilterSize / 0.75d); + this.estimatedHeapPressure = calculateEstimatedHeapPressure(recentObjIdsFilterSize, capacity); + + this.recentObjIds = + new LinkedHashMap<>(capacity) { + @Override + protected boolean removeEldestEntry(Map.Entry eldest) { + return size() >= recentObjIdsFilterSize; + } + }; + } + + @Override + public boolean contains(ObjId id) { + return recentObjIds.containsKey(id); + } + + @Override + public boolean add(ObjId id) { + return recentObjIds.put(id, PRESENT) == null; + } + + @Override + public long estimatedHeapPressure() { + return estimatedHeapPressure; + } + + private long calculateEstimatedHeapPressure(int size, int capacity) { + int tableSize = -1 >>> Integer.numberOfLeadingZeros(capacity - 1); + + return HEAP_SIZE_LINKED_HASH_MAP + + + // LHM entries + (HEAP_SIZE_LINKED_HASH_MAP_ENTRY + HEAP_SIZE_OBJ_ID) * size + // LHM table/node-array + + HEAP_SIZE_PRIMITIVE_LONG_ARRAY + + HEAP_SIZE_POINTER * tableSize; + } +} diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ReferencedObjectsContext.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ReferencedObjectsContext.java new file mode 100644 index 00000000000..6c7dcb9cece --- /dev/null +++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ReferencedObjectsContext.java @@ -0,0 +1,59 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +import static org.projectnessie.versioned.storage.cleanup.VisitedCommitFilter.ALLOW_DUPLICATE_TRAVERSALS; + +import jakarta.validation.constraints.NotNull; +import org.projectnessie.nessie.immutables.NessieImmutable; +import org.projectnessie.versioned.storage.common.persist.Persist; + +/** + * Holds the data structures and parameters that are needed to {@linkplain ReferencedObjectsResolver + * resolving referenced objects}. + */ +@NessieImmutable +public interface ReferencedObjectsContext { + @NotNull + Persist persist(); + + @NotNull + ReferencedObjectsFilter referencedObjects(); + + @NotNull + CleanupParams params(); + + @NotNull + PurgeFilter purgeFilter(); + + @NotNull + VisitedCommitFilter visitedCommitFilter(); + + static ReferencedObjectsContext objectsResolverContext( + Persist persist, + CleanupParams params, + ReferencedObjectsFilter referencedObjects, + PurgeFilter purgeFilter) { + return ImmutableReferencedObjectsContext.of( + persist, + referencedObjects, + params, + purgeFilter, + params.allowDuplicateCommitTraversals() + ? ALLOW_DUPLICATE_TRAVERSALS + : new VisitedCommitFilterImpl()); + } +} diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ReferencedObjectsFilter.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ReferencedObjectsFilter.java new file mode 100644 index 00000000000..07e359c96ab --- /dev/null +++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ReferencedObjectsFilter.java @@ -0,0 +1,41 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +import jakarta.validation.constraints.NotNull; +import org.projectnessie.versioned.storage.common.persist.ObjId; + +/** + * Mark {@linkplain ObjId object IDs} as referenced and allow checking whether object IDs are marked + * as referenced. + * + *

The implementation is usually backed by a probabilistic data structure (bloom filter), which + * means that there is a {@linkplain #expectedFpp() chance} that an unreferenced object is not + * collected, but all referenced objects are guaranteed to remain. + */ +public interface ReferencedObjectsFilter { + boolean markReferenced(@NotNull ObjId objId); + + boolean isProbablyReferenced(@NotNull ObjId objId); + + boolean withinExpectedFpp(); + + long approximateElementCount(); + + double expectedFpp(); + + long estimatedHeapPressure(); +} diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ReferencedObjectsFilterImpl.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ReferencedObjectsFilterImpl.java new file mode 100644 index 00000000000..506c78574b6 --- /dev/null +++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ReferencedObjectsFilterImpl.java @@ -0,0 +1,117 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +import static org.projectnessie.versioned.storage.cleanup.HeapSizes.HEAP_SIZE_ATOMIC_LONG_ARRAY; +import static org.projectnessie.versioned.storage.cleanup.HeapSizes.HEAP_SIZE_BIT_ARRAY; +import static org.projectnessie.versioned.storage.cleanup.HeapSizes.HEAP_SIZE_BLOOM_FILTER; +import static org.projectnessie.versioned.storage.cleanup.HeapSizes.HEAP_SIZE_LONG_ADDER; +import static org.projectnessie.versioned.storage.cleanup.HeapSizes.HEAP_SIZE_PRIMITIVE_LONG_ARRAY; + +import com.google.common.hash.BloomFilter; +import com.google.common.hash.PrimitiveSink; +import java.util.concurrent.atomic.AtomicLong; +import org.projectnessie.versioned.storage.common.persist.ObjId; + +@SuppressWarnings("UnstableApiUsage") +final class ReferencedObjectsFilterImpl implements ReferencedObjectsFilter { + + private final BloomFilter filter; + private final double allowedFalsePositiveProbability; + private final AtomicLong remainingElements; + private final long estimatedHeapPressure; + + ReferencedObjectsFilterImpl(CleanupParams params) { + this.filter = createBloomFilter(params); + this.remainingElements = new AtomicLong(params.expectedObjCount()); + this.allowedFalsePositiveProbability = params.allowedFalsePositiveProbability(); + this.estimatedHeapPressure = calculateEstimatedHeapPressure(params); + } + + static BloomFilter createBloomFilter(CleanupParams params) { + return BloomFilter.create( + ReferencedObjectsFilterImpl::funnel, + params.expectedObjCount(), + params.falsePositiveProbability()); + } + + private static void funnel(ObjId id, PrimitiveSink primitiveSink) { + var idSize = id.size(); + var i = 0; + for (; idSize >= 8; idSize -= 8) { + primitiveSink.putLong(id.longAt(i++)); + } + i <<= 3; + for (; idSize > 0; idSize--) { + primitiveSink.putByte(id.byteAt(i++)); + } + } + + @Override + public boolean markReferenced(ObjId objId) { + if (filter.put(objId)) { + if (remainingElements.decrementAndGet() >= 0L || withinExpectedFpp()) { + return true; + } + throw new MustRestartWithBiggerFilterRuntimeException( + "Bloom filter exceeded the configured expected FPP"); + } + return false; + } + + @Override + public boolean isProbablyReferenced(ObjId objId) { + return filter.mightContain(objId); + } + + @Override + public boolean withinExpectedFpp() { + return expectedFpp() <= allowedFalsePositiveProbability; + } + + @Override + public long approximateElementCount() { + return filter.approximateElementCount(); + } + + @Override + public double expectedFpp() { + return filter.expectedFpp(); + } + + @Override + public long estimatedHeapPressure() { + return estimatedHeapPressure; + } + + private static long calculateEstimatedHeapPressure(CleanupParams params) { + var bits = optimalNumOfBits(params.expectedObjCount(), params.falsePositiveProbability()); + var arrayLen = bits / 64 + 1; + return HEAP_SIZE_BLOOM_FILTER + + HEAP_SIZE_BIT_ARRAY + + HEAP_SIZE_LONG_ADDER + + HEAP_SIZE_ATOMIC_LONG_ARRAY + + HEAP_SIZE_PRIMITIVE_LONG_ARRAY * arrayLen; + } + + // See com.google.common.hash.BloomFilter.optimalNumOfBits + private static long optimalNumOfBits(long expectedInsertions, double fpp) { + if (fpp == 0) { + fpp = Double.MIN_VALUE; + } + return (long) (-expectedInsertions * Math.log(fpp) / (Math.log(2) * Math.log(2))); + } +} diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ReferencedObjectsResolver.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ReferencedObjectsResolver.java new file mode 100644 index 00000000000..ce3906933b8 --- /dev/null +++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ReferencedObjectsResolver.java @@ -0,0 +1,45 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +import org.projectnessie.versioned.storage.common.persist.Persist; + +public interface ReferencedObjectsResolver { + /** + * Identifies all referenced objects in the {@linkplain Persist Nessie repository}. + * + * @return result containing the information for the follow-up {@linkplain PurgeObjects#purge() + * purge operation} and stats. + * @throws MustRestartWithBiggerFilterException thrown if this operation identifies more than the + * configured {@linkplain CleanupParams#expectedObjCount() expected object count}. This + * exception must be handled by calling code + */ + ResolveResult resolve() throws MustRestartWithBiggerFilterException; + + /** + * Return the current statistics, returns a valid result, even if {@link #resolve()} threw an + * exception. + */ + ResolveStats getStats(); + + /** + * Returns the estimated maximum heap pressure of this object tree. Considers the data + * structured that are required for the resolve operation to work, a superset of the structures + * required for {@link PurgeObjects#purge()}. It is wrong to use the sum of {@link + * PurgeObjects#estimatedHeapPressure()} and this value. + */ + long estimatedHeapPressure(); +} diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ReferencedObjectsResolverImpl.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ReferencedObjectsResolverImpl.java new file mode 100644 index 00000000000..e0f3f833d41 --- /dev/null +++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ReferencedObjectsResolverImpl.java @@ -0,0 +1,369 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +import static com.google.common.base.Preconditions.checkState; +import static org.projectnessie.versioned.storage.cleanup.HeapSizes.memSizeToStringMB; +import static org.projectnessie.versioned.storage.cleanup.PurgeObjectsContext.purgeObjectsContext; +import static org.projectnessie.versioned.storage.common.logic.CommitLogQuery.commitLogQuery; +import static org.projectnessie.versioned.storage.common.logic.Logics.commitLogic; +import static org.projectnessie.versioned.storage.common.logic.Logics.indexesLogic; +import static org.projectnessie.versioned.storage.common.logic.Logics.referenceLogic; +import static org.projectnessie.versioned.storage.common.logic.Logics.repositoryLogic; +import static org.projectnessie.versioned.storage.common.logic.ReferencesQuery.referencesQuery; +import static org.projectnessie.versioned.storage.common.objtypes.StandardObjType.VALUE; +import static org.projectnessie.versioned.storage.common.persist.ObjId.EMPTY_OBJ_ID; + +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.IntFunction; +import org.agrona.collections.ObjectHashSet; +import org.projectnessie.model.Content; +import org.projectnessie.versioned.storage.common.indexes.StoreIndexElement; +import org.projectnessie.versioned.storage.common.logic.CommitLogic; +import org.projectnessie.versioned.storage.common.objtypes.CommitObj; +import org.projectnessie.versioned.storage.common.objtypes.CommitOp; +import org.projectnessie.versioned.storage.common.objtypes.ContentValueObj; +import org.projectnessie.versioned.storage.common.persist.Obj; +import org.projectnessie.versioned.storage.common.persist.ObjId; +import org.projectnessie.versioned.storage.common.persist.Persist; +import org.projectnessie.versioned.storage.common.persist.Reference; +import org.projectnessie.versioned.store.DefaultStoreWorker; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +final class ReferencedObjectsResolverImpl implements ReferencedObjectsResolver { + private static final Logger LOGGER = LoggerFactory.getLogger(ReferencedObjectsResolverImpl.class); + + private final ObjectHashSet pendingObjs = new ObjectHashSet<>(); + private final Deque pendingHeads = new ArrayDeque<>(); + + /** + * Set of recently handled 'ObjId's to prevent re-processing the same objects multiple times. This + * happens, when the values referenced from the commit index are iterated, because it iterates + * over all keys, not only the keys added by a particular commit. + */ + private final RecentObjIdFilter recentObjIds; + + private final ReferencedObjectsContext referencedObjectsContext; + + private final ResolveStatsBuilder stats; + private final RateLimit commitRateLimiter; + private final RateLimit objRateLimiter; + + private final AtomicBoolean used = new AtomicBoolean(); + + ReferencedObjectsResolverImpl( + ReferencedObjectsContext referencedObjectsContext, + IntFunction rateLimitIntFunction) { + this.referencedObjectsContext = referencedObjectsContext; + this.stats = new ResolveStatsBuilder(); + this.commitRateLimiter = + rateLimitIntFunction.apply(referencedObjectsContext.params().resolveCommitRatePerSecond()); + this.objRateLimiter = + rateLimitIntFunction.apply(referencedObjectsContext.params().resolveObjRatePerSecond()); + this.recentObjIds = + new RecentObjIdFilterImpl(referencedObjectsContext.params().recentObjIdsFilterSize()); + } + + @Override + public long estimatedHeapPressure() { + return referencedObjectsContext.referencedObjects().estimatedHeapPressure() + + referencedObjectsContext.visitedCommitFilter().estimatedHeapPressure() + + recentObjIds.estimatedHeapPressure(); + } + + @Override + public ResolveResult resolve() throws MustRestartWithBiggerFilterException { + checkState(used.compareAndSet(false, true), "resolve() has already been called."); + + LOGGER.info( + "Identifying referenced objects in repository '{}', processing {} commits per second, processing {} objects per second, estimated context heap pressure: {}", + referencedObjectsContext.persist().config().repositoryId(), + commitRateLimiter, + objRateLimiter, + memSizeToStringMB(estimatedHeapPressure())); + + var persist = referencedObjectsContext.persist(); + var params = referencedObjectsContext.params(); + + ResolveStats finalStats = null; + try { + finalStats = doResolve(persist, params); + + LOGGER.info( + "Successfully finished identifying referenced objects after {} in repository '{}', resolve stats: {}, estimated context heap pressure: {}", + finalStats.duration(), + persist.config().repositoryId(), + finalStats, + memSizeToStringMB(estimatedHeapPressure())); + } catch (MustRestartWithBiggerFilterRuntimeException mustRestart) { + LOGGER.warn( + "Must restart identifying referenced objects for repository '{}', current parameters: expected object count: {}, FPP: {}, allowed FPP: {}, resolve stats: {}, estimated context heap pressure: {}", + persist.config().repositoryId(), + params.expectedObjCount(), + params.falsePositiveProbability(), + params.allowedFalsePositiveProbability(), + finalStats, + memSizeToStringMB(estimatedHeapPressure())); + throw new MustRestartWithBiggerFilterException(mustRestart.getMessage(), mustRestart); + } catch (RuntimeException e) { + if (finalStats != null) { + LOGGER.warn( + "Error while identifying referenced objects after {} in repository '{}', stats: {}, estimated context heap pressure: {}", + finalStats.duration(), + persist.config().repositoryId(), + finalStats, + memSizeToStringMB(estimatedHeapPressure()), + e); + } else { + LOGGER.warn( + "Error while identifying referenced objects after {} in repository '{}'", + persist.config().repositoryId(), + e); + } + throw e; + } + + return ImmutableResolveResult.of(stats.build(), purgeObjectsContext(referencedObjectsContext)); + } + + private ResolveStats doResolve(Persist persist, CleanupParams params) { + var clock = persist.config().clock(); + + ResolveStats finalStats; + try { + stats.started = clock.instant(); + + checkState( + repositoryLogic(persist).repositoryExists(), + "The provided repository has not been initialized."); + + params.relatedObjects().repositoryRelatedObjects().forEach(this::pendingObj); + + var referenceLogic = referenceLogic(persist); + var commitLogic = commitLogic(persist); + + for (String internalReferenceName : params.internalReferenceNames()) { + var intRef = persist.fetchReference(internalReferenceName); + checkState(intRef != null, "Internal reference %s not found!", internalReferenceName); + handleReference(intRef); + processPendingHeads(commitLogic); + } + + for (var referencesIter = referenceLogic.queryReferences(referencesQuery()); + referencesIter.hasNext(); ) { + var reference = referencesIter.next(); + handleReference(reference); + processPendingHeads(commitLogic); + } + + processPendingHeads(commitLogic); + + while (!pendingObjs.isEmpty()) { + processPendingObjs(); + } + } catch (RuntimeException e) { + stats.mustRestart = e instanceof MustRestartWithBiggerFilterRuntimeException; + stats.failure = e; + throw e; + } finally { + stats.ended = clock.instant(); + finalStats = stats.build(); + } + return finalStats; + } + + private void processPendingHeads(CommitLogic commitLogic) { + while (!pendingHeads.isEmpty()) { + var head = pendingHeads.removeFirst(); + commitLogic.commitLog(commitLogQuery(head)).forEachRemaining(this::handleCommit); + } + } + + @Override + public ResolveStats getStats() { + return stats.build(); + } + + private void handleReference(Reference reference) { + stats.numReferences++; + + var persist = referencedObjectsContext.persist(); + + if (reference.deleted()) { + LOGGER.trace( + "Skipping deleted reference {} in repository '{}'", + reference.name(), + persist.config().repositoryId()); + return; + } + + LOGGER.debug( + "Walking reference {} in repository '{}' starting at commit {}", + reference.name(), + persist.config().repositoryId(), + reference.pointer()); + + referencedObjectsContext + .params() + .relatedObjects() + .referenceRelatedObjects(reference) + .forEach(this::pendingObj); + + commitChain(reference.pointer()); + + var extendedInfo = reference.extendedInfoObj(); + if (extendedInfo != null) { + referencedObjectsContext.referencedObjects().markReferenced(extendedInfo); + } + } + + private void commitChain(ObjId head) { + if (EMPTY_OBJ_ID.equals(head)) { + // Prevent visiting the same commit more often than once + return; + } + + stats.numCommitChainHeads++; + + if (referencedObjectsContext.visitedCommitFilter().alreadyVisited(head)) { + // Prevent visiting the same commit more often than once + return; + } + + pendingHeads.addLast(head); + } + + private void handleCommit(CommitObj commit) { + stats.numCommits++; + + if (!referencedObjectsContext.visitedCommitFilter().mustVisit(commit.id())) { + // Prevent visiting the same commit more often than once + return; + } + + commitRateLimiter.acquire(); + + var persist = referencedObjectsContext.persist(); + + LOGGER.debug( + "Handling commit {} in repository '{}'", commit.id(), persist.config().repositoryId()); + + stats.numUniqueCommits++; + + referencedObjectsContext.referencedObjects().markReferenced(commit.id()); + + referencedObjectsContext + .params() + .relatedObjects() + .commitRelatedObjects(commit) + .forEach(this::pendingObj); + + var indexesLogic = indexesLogic(referencedObjectsContext.persist()); + var index = indexesLogic.buildCompleteIndexOrEmpty(commit); + for (StoreIndexElement indexElement : index) { + var content = indexElement.content(); + if (content.action().exists()) { + var value = content.value(); + pendingObj(value); + } + } + + commit.secondaryParents().forEach(this::commitChain); + } + + private void pendingObj(ObjId objId) { + if (recentObjIds.contains(objId)) { + return; + } + + if (!pendingObjs.add(objId)) { + return; + } + + stats.numQueuedObjs++; + + if (pendingObjs.size() >= referencedObjectsContext.params().pendingObjsBatchSize()) { + processPendingObjs(); + } + } + + private void processPendingObjs() { + stats.numQueuedObjsBulkFetches++; + + var persist = referencedObjectsContext.persist(); + + LOGGER.debug( + "Fetching {} pending objects in repository '{}'", + pendingObjs.size(), + persist.config().repositoryId()); + + var objs = persist.fetchObjsIfExist(pendingObjs.toArray(ObjId[]::new)); + // Must clear 'pendingObjs' here, because handleObj can add more objects to it + pendingObjs.clear(); + + for (Obj obj : objs) { + if (obj != null) { + handleObj(obj); + } + } + } + + private void handleObj(Obj obj) { + objRateLimiter.acquire(); + + if (!recentObjIds.add(obj.id())) { + // already handled + return; + } + + stats.numObjs++; + + var persist = referencedObjectsContext.persist(); + + var objType = obj.type(); + + LOGGER.debug( + "Handling obj {} of type {}/{} in repository '{}'", + obj.id(), + objType.name(), + objType.shortName(), + persist.config().repositoryId()); + + referencedObjectsContext.referencedObjects().markReferenced(obj.id()); + + if (VALUE.equals(objType)) { + var contentValueObj = (ContentValueObj) obj; + var content = + DefaultStoreWorker.instance() + .valueFromStore(contentValueObj.payload(), contentValueObj.data()); + + handleContent(content); + } + } + + private void handleContent(Content content) { + stats.numContents++; + + referencedObjectsContext + .params() + .relatedObjects() + .contentRelatedObjects(content) + .forEach(this::pendingObj); + } +} diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ResolveResult.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ResolveResult.java new file mode 100644 index 00000000000..120c791c761 --- /dev/null +++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ResolveResult.java @@ -0,0 +1,26 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +import org.projectnessie.nessie.immutables.NessieImmutable; + +@NessieImmutable +public interface ResolveResult { + ResolveStats stats(); + + /** Context required for a purge. */ + PurgeObjectsContext purgeObjectsContext(); +} diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ResolveStats.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ResolveStats.java new file mode 100644 index 00000000000..dd612168495 --- /dev/null +++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ResolveStats.java @@ -0,0 +1,60 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +import java.time.Duration; +import java.time.Instant; +import java.util.Optional; +import org.projectnessie.nessie.immutables.NessieImmutable; + +@NessieImmutable +public interface ResolveStats { + Instant started(); + + Instant ended(); + + default Duration duration() { + return Duration.between(started(), ended()); + } + + boolean mustRestart(); + + /** Number of processed references, including Nessie internal references. */ + long numReferences(); + + /** Number of commit chain "heads". */ + long numCommitChainHeads(); + + /** Number of processed commit objects, including Nessie internal commits. */ + long numCommits(); + + /** Number of processed unique commit objects, including Nessie internal commits. */ + long numUniqueCommits(); + + /** Number of non-commit objects. */ + long numObjs(); + + /** Number of {@link org.projectnessie.model.Content} objects. */ + long numContents(); + + /** Number of non-commit objects that had been queued for batched commit object handling. */ + long numQueuedObjs(); + + /** Number of bulk non-commit object fetches. */ + long numQueuedObjsBulkFetches(); + + Optional failure(); +} diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ResolveStatsBuilder.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ResolveStatsBuilder.java new file mode 100644 index 00000000000..bc2fef3ada4 --- /dev/null +++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ResolveStatsBuilder.java @@ -0,0 +1,52 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +import java.time.Instant; +import java.util.Optional; + +final class ResolveStatsBuilder { + Instant started; + Instant ended; + + boolean mustRestart; + Exception failure; + + long numReferences; + long numCommitChainHeads; + long numCommits; + long numUniqueCommits; + long numObjs; + long numContents; + long numQueuedObjs; + long numQueuedObjsBulkFetches; + + ResolveStats build() { + return ImmutableResolveStats.of( + started, + ended, + mustRestart, + numReferences, + numCommitChainHeads, + numCommits, + numUniqueCommits, + numObjs, + numContents, + numQueuedObjs, + numQueuedObjsBulkFetches, + Optional.ofNullable(failure)); + } +} diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/VisitedCommitFilter.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/VisitedCommitFilter.java new file mode 100644 index 00000000000..5570d8fc1bc --- /dev/null +++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/VisitedCommitFilter.java @@ -0,0 +1,53 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +import org.projectnessie.versioned.storage.common.objtypes.CommitObj; +import org.projectnessie.versioned.storage.common.persist.ObjId; + +/** + * Filter to prevent processing the same {@linkplain CommitObj Nessie commit} more than once. + * + *

There are two implementations of this interface: {@linkplain #ALLOW_DUPLICATE_TRAVERSALS one} + * that does not prevent duplicate processing, and {@linkplain VisitedCommitFilterImpl the + * default one} that does. The parameter {@link CleanupParams#allowDuplicateCommitTraversals()} is + * used to decide which implementation is being used. + */ +public interface VisitedCommitFilter { + boolean mustVisit(ObjId commitObjId); + + boolean alreadyVisited(ObjId commitObjId); + + long estimatedHeapPressure(); + + VisitedCommitFilter ALLOW_DUPLICATE_TRAVERSALS = + new VisitedCommitFilter() { + @Override + public boolean mustVisit(ObjId commitObjId) { + return true; + } + + @Override + public boolean alreadyVisited(ObjId commitObjId) { + return false; + } + + @Override + public long estimatedHeapPressure() { + return 0; + } + }; +} diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/VisitedCommitFilterImpl.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/VisitedCommitFilterImpl.java new file mode 100644 index 00000000000..c5efb936aa4 --- /dev/null +++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/VisitedCommitFilterImpl.java @@ -0,0 +1,45 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +import static org.agrona.collections.Hashing.DEFAULT_LOAD_FACTOR; +import static org.projectnessie.versioned.storage.cleanup.HeapSizes.HEAP_SIZE_OBJECT_HASH_SET; +import static org.projectnessie.versioned.storage.cleanup.HeapSizes.HEAP_SIZE_OBJ_ID; +import static org.projectnessie.versioned.storage.cleanup.HeapSizes.HEAP_SIZE_PRIMITIVE_OBJ_ARRAY; + +import org.agrona.collections.ObjectHashSet; +import org.projectnessie.versioned.storage.common.persist.ObjId; + +final class VisitedCommitFilterImpl implements VisitedCommitFilter { + private final ObjectHashSet visited = new ObjectHashSet<>(64, DEFAULT_LOAD_FACTOR); + + @Override + public boolean mustVisit(ObjId commitObjId) { + return visited.add(commitObjId); + } + + @Override + public boolean alreadyVisited(ObjId commitObjId) { + return visited.contains(commitObjId); + } + + @Override + public long estimatedHeapPressure() { + var sz = visited.size(); + var cap = visited.capacity(); + return HEAP_SIZE_OBJECT_HASH_SET + HEAP_SIZE_PRIMITIVE_OBJ_ARRAY * cap + HEAP_SIZE_OBJ_ID * sz; + } +} diff --git a/versioned/storage/cleanup/src/test/java/org/projectnessie/versioned/storage/cleanup/TestCleanup.java b/versioned/storage/cleanup/src/test/java/org/projectnessie/versioned/storage/cleanup/TestCleanup.java new file mode 100644 index 00000000000..3821674cf72 --- /dev/null +++ b/versioned/storage/cleanup/src/test/java/org/projectnessie/versioned/storage/cleanup/TestCleanup.java @@ -0,0 +1,516 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +import static java.util.Objects.requireNonNull; +import static java.util.UUID.randomUUID; +import static org.projectnessie.nessie.relocated.protobuf.ByteString.copyFromUtf8; +import static org.projectnessie.versioned.storage.cleanup.Cleanup.createCleanup; +import static org.projectnessie.versioned.storage.common.indexes.StoreKey.key; +import static org.projectnessie.versioned.storage.common.logic.CreateCommit.Add.commitAdd; +import static org.projectnessie.versioned.storage.common.logic.CreateCommit.newCommitBuilder; +import static org.projectnessie.versioned.storage.common.logic.Logics.commitLogic; +import static org.projectnessie.versioned.storage.common.logic.Logics.referenceLogic; +import static org.projectnessie.versioned.storage.common.logic.Logics.repositoryLogic; +import static org.projectnessie.versioned.storage.common.objtypes.CommitHeaders.newCommitHeaders; +import static org.projectnessie.versioned.storage.common.objtypes.CommitType.NORMAL; +import static org.projectnessie.versioned.storage.common.objtypes.ContentValueObj.contentValue; +import static org.projectnessie.versioned.storage.common.objtypes.StringObj.stringData; +import static org.projectnessie.versioned.storage.common.persist.ObjId.EMPTY_OBJ_ID; +import static org.projectnessie.versioned.testworker.OnRefOnly.onRef; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import org.assertj.core.api.SoftAssertions; +import org.assertj.core.api.junit.jupiter.InjectSoftAssertions; +import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.projectnessie.nessie.immutables.NessieImmutable; +import org.projectnessie.versioned.storage.common.logic.CommitLogic; +import org.projectnessie.versioned.storage.common.logic.InternalRef; +import org.projectnessie.versioned.storage.common.objtypes.Compression; +import org.projectnessie.versioned.storage.common.persist.ObjId; +import org.projectnessie.versioned.storage.common.persist.Persist; +import org.projectnessie.versioned.storage.testextension.NessiePersist; +import org.projectnessie.versioned.storage.testextension.PersistExtension; +import org.projectnessie.versioned.store.DefaultStoreWorker; + +@ExtendWith({PersistExtension.class, SoftAssertionsExtension.class}) +public class TestCleanup { + @InjectSoftAssertions protected SoftAssertions soft; + + @NessiePersist protected Persist persist; + + @Test + void mustRestartWithBiggerFilterThrown() { + soft.assertThat(repositoryLogic(persist).repositoryExists()).isTrue(); + + var maxObjReferenced = persist.config().currentTimeMicros(); + + var cleanupParams = CleanupParams.builder().expectedObjCount(1).build(); + var cleanup = createCleanup(cleanupParams); + var referencedObjectsContext = cleanup.buildReferencedObjectsContext(persist, maxObjReferenced); + var referencedObjectsResolver = + cleanup.createReferencedObjectsResolver(referencedObjectsContext); + + soft.assertThatThrownBy(referencedObjectsResolver::resolve) + .isInstanceOf(MustRestartWithBiggerFilterException.class); + + var newCleanupParams = cleanupParams.withIncreasedExpectedObjCount(); + + soft.assertThat(cleanupParams.expectedObjCount()) + .isLessThan(newCleanupParams.expectedObjCount()); + soft.assertThat( + CleanupParams.builder() + .from(cleanupParams) + .expectedObjCount(newCleanupParams.expectedObjCount()) + .build()) + .isEqualTo(newCleanupParams); + + cleanup = createCleanup(newCleanupParams); + referencedObjectsContext = cleanup.buildReferencedObjectsContext(persist, maxObjReferenced); + referencedObjectsResolver = cleanup.createReferencedObjectsResolver(referencedObjectsContext); + + soft.assertThatCode(referencedObjectsResolver::resolve).doesNotThrowAnyException(); + } + + @Test + void estimatedHeapPressure() throws Exception { + soft.assertThat(repositoryLogic(persist).repositoryExists()).isTrue(); + + var maxObjReferenced = persist.config().currentTimeMicros(); + + var cleanup = createCleanup(CleanupParams.builder().build()); + var referencedObjectsContext = cleanup.buildReferencedObjectsContext(persist, maxObjReferenced); + var referencedObjectsResolver = + cleanup.createReferencedObjectsResolver(referencedObjectsContext); + + soft.assertThat(referencedObjectsResolver.estimatedHeapPressure()).isGreaterThan(1L); + + var resolveResult = referencedObjectsResolver.resolve(); + var purge = cleanup.createPurgeObjects(resolveResult.purgeObjectsContext()); + + soft.assertThat(purge.estimatedHeapPressure()) + .isGreaterThan(1L) + .isLessThan(referencedObjectsResolver.estimatedHeapPressure()); + } + + @Test + void againstEmptyRepository() throws Exception { + soft.assertThat(repositoryLogic(persist).repositoryExists()).isTrue(); + + var resolveAndPurge = resolveAndPurge(persist.config().currentTimeMicros()); + soft.assertThat(resolveAndPurge.resolveResult().stats()) + .extracting( + ResolveStats::failure, + ResolveStats::numReferences, + ResolveStats::numCommitChainHeads, + ResolveStats::numCommits, + ResolveStats::numUniqueCommits, + ResolveStats::numQueuedObjs, + ResolveStats::numObjs) + .containsExactly( + Optional.empty(), + // refs + 3L, + // HEADs ("main" has EMPTY_OBJ_ID) + 2L, + // commits + 3L, + // unique commits + 3L, + // queued objs + 2L, + // objs + 2L); + soft.assertThat(resolveAndPurge.purgeResult().stats()) + .extracting(PurgeStats::failure, PurgeStats::numScannedObjs, PurgeStats::numPurgedObjs) + .containsExactly(Optional.empty(), 5L, 0L); + } + + @Test + void purgeDeleteRefObjs() throws Exception { + soft.assertThat(repositoryLogic(persist).repositoryExists()).isTrue(); + + var referenceLogic = referenceLogic(persist); + var commitLogic = commitLogic(persist); + + for (int i = 0; i < 10; i++) { + referenceLogic.createReference("kept-" + i, EMPTY_OBJ_ID, null); + } + for (int i = 0; i < 10; i++) { + referenceLogic.createReference("deleted-" + i, EMPTY_OBJ_ID, null); + } + + var resolveAndPurge = resolveAndPurge(persist.config().currentTimeMicros()); + soft.assertThat(resolveAndPurge.resolveResult().stats()) + .extracting( + ResolveStats::failure, + ResolveStats::numReferences, + ResolveStats::numCommitChainHeads, + ResolveStats::numCommits, + ResolveStats::numUniqueCommits, + ResolveStats::numQueuedObjs, + ResolveStats::numObjs) + .containsExactly( + Optional.empty(), + // 3 references (empty repo) + 20 created references + 3L + 20L, + // 2 queued commits (2 internal refs, "main" + all created refs hava EMPTY_OBJ_ID + 2L, + // 3 commits (empty repo) + 20 created references CommitObjs + 3L + 20L, + 3L + 20L, + // 2 objs (empty repo) + 20 created RefObj's + 2L + 20L, + 2L + 20L); + soft.assertThat(resolveAndPurge.purgeResult().stats()) + .extracting(PurgeStats::failure, PurgeStats::numScannedObjs, PurgeStats::numPurgedObjs) + .containsExactly( + Optional.empty(), + // 5 (empty repo) + 20 CommitObj + 20 RefObj + 10 CommitObj + 5L + 20L + 20L, + // Nothing to delete + 0L); + + for (int i = 0; i < 10; i++) { + referenceLogic.deleteReference("deleted-" + i, EMPTY_OBJ_ID); + } + + resolveAndPurge = resolveAndPurge(persist.config().currentTimeMicros()); + soft.assertThat(resolveAndPurge.resolveResult().stats()) + .extracting( + ResolveStats::failure, + ResolveStats::numReferences, + ResolveStats::numCommitChainHeads, + ResolveStats::numCommits, + ResolveStats::numUniqueCommits, + ResolveStats::numQueuedObjs, + ResolveStats::numObjs) + .containsExactly( + Optional.empty(), + // 3 references (empty repo) + 20 created references + 3L + 10L, + // 2 queued commits (2 internal refs, "main" + all created refs hava EMPTY_OBJ_ID + 2L, + // 3 commits (empty repo) + 20 created references CommitObjs + 10 deleted references + // CommitObjs + 3L + 20L + 10L, + 3L + 20L + 10L, + // 2 objs (empty repo) + 20 created RefObj's + 2L + 20L, + 2L + 20L); + soft.assertThat(resolveAndPurge.purgeResult().stats()) + .extracting(PurgeStats::failure, PurgeStats::numScannedObjs, PurgeStats::numPurgedObjs) + .containsExactly( + Optional.empty(), + // 5 (empty repo) + 20 CommitObj + 20 RefObj + 10 CommitObj + 5L + 20L + 20L + 10L, + // RefObj's are NOT deleted, because those are referenced via the `int/refs` commit log + // chain + 0L); + + // Shorten the "int/refs" history / make RefObj's eligible for cleanup + + var refRefs = requireNonNull(persist.fetchReference(InternalRef.REF_REFS.name())); + var newRefRefs = referenceLogic.rewriteCommitLog(refRefs, (num, commit) -> true); + soft.assertThat(newRefRefs.pointer()).isNotEqualTo(refRefs.pointer()); + var refRefsHead = requireNonNull(commitLogic.fetchCommit(newRefRefs.pointer())); + soft.assertThat(refRefsHead.directParent()).isEqualTo(EMPTY_OBJ_ID); + + resolveAndPurge = resolveAndPurge(persist.config().currentTimeMicros()); + soft.assertThat(resolveAndPurge.resolveResult().stats()) + .extracting( + ResolveStats::failure, + ResolveStats::numReferences, + ResolveStats::numCommitChainHeads, + ResolveStats::numCommits, + ResolveStats::numUniqueCommits, + ResolveStats::numQueuedObjs, + ResolveStats::numObjs) + .containsExactly( + Optional.empty(), + // 3 references (empty repo) + 20 created references + 3L + 10L, + // 2 queued commits (2 internal refs, "main" + all created refs hava EMPTY_OBJ_ID + 2L, + // 2 CommitObjs (one less than "empty repo": the commit to create the "main" reference + // has been "squashed") + 2L, + 2L, + // 2 objs (empty repo) + 10 "existing" RefObj's + 2L + 10L, + 2L + 10L); + soft.assertThat(resolveAndPurge.purgeResult().stats()) + .extracting(PurgeStats::failure, PurgeStats::numScannedObjs, PurgeStats::numPurgedObjs) + .containsExactly( + Optional.empty(), + // 5 (empty repo) + 20 CommitObj + 20 RefObj + 10 CommitObj + 1 re-written CommitObj + 5L + 20L + 20L + 10L + 1L, + // RefObj's are deleted, because those are referenced via the `int/refs` commit log + // chain, CommitObj's from the create/delete reference operations: + // 10 RefObj's + 30 CommitObj + 2 CommitObj + 10L + 30L + 2L); + } + + @Test + void againstEmptyRepositoryWithGarbage() throws Exception { + soft.assertThat(repositoryLogic(persist).repositoryExists()).isTrue(); + + var referenceLogic = referenceLogic(persist); + var commitLogic = commitLogic(persist); + + var unreferenced = new ArrayList(); + var keptUnreferenced = new ArrayList(); + var referencedCommits = new ArrayList(); + var referenced = new ArrayList(); + var contents = 0; + + for (int i = 0; i < 25; i++) { + var obj = + stringData("foo/bar", Compression.NONE, null, List.of(), copyFromUtf8("string " + i)); + soft.assertThat(persist.storeObj(obj)).isTrue(); + unreferenced.add(obj.id()); + } + for (int i = 0; i < 25; i++) { + var cid = randomUUID(); + var obj = + contentValue( + cid.toString(), + 127, + DefaultStoreWorker.instance() + .toStoreOnReferenceState(onRef("dummy " + i, cid.toString()))); + soft.assertThat(persist.storeObj(obj)).isTrue(); + unreferenced.add(obj.id()); + } + + // 10 new references + // 10 new RefObj + for (int i = 0; i < 10; i++) { + var head = EMPTY_OBJ_ID; + for (int i1 = 0; i1 < 20; i1++) { + var cid1 = randomUUID(); + var cid2 = randomUUID(); + var obj1 = + contentValue( + cid1.toString(), + 127, + DefaultStoreWorker.instance() + .toStoreOnReferenceState(onRef("obj " + i + " " + i1 + " 1", cid1.toString()))); + var obj2 = + contentValue( + cid2.toString(), + 127, + DefaultStoreWorker.instance() + .toStoreOnReferenceState(onRef("obj " + i + " " + i1 + " 2", cid2.toString()))); + var commit = + commitLogic.doCommit( + newCommitBuilder() + .commitType(NORMAL) + .parentCommitId(head) + .addAdds( + commitAdd( + key("store", "key", Integer.toString(i), Integer.toString(i1), "1"), + 42, + obj1.id(), + null, + cid1)) + .addAdds( + commitAdd( + key("store", "key", Integer.toString(i), Integer.toString(i1), "2"), + 42, + obj2.id(), + null, + cid2)) + .headers(newCommitHeaders().add("created", "foo-" + i + "-" + i1).build()) + .message("commit " + i1 + " on " + i) + .build(), + List.of(obj1, obj2)); + head = requireNonNull(commit).id(); + + referencedCommits.add(head); + referenced.add(obj1.id()); + referenced.add(obj2.id()); + contents += 2; + } + + var extendedInfo = + stringData("ref/foo", Compression.NONE, null, List.of(), copyFromUtf8("ext-info " + i)); + soft.assertThat(persist.storeObj(extendedInfo)).isTrue(); + referenced.add(extendedInfo.id()); + + referenceLogic.createReference("refs/heads/myref-" + i, head, extendedInfo.id()); + } + + var maxObjReferenced = persist.config().currentTimeMicros(); + + // Unreferenced, but newer than 'maxObjReferenced' + for (int i = 100; i < 125; i++) { + var obj = + stringData("foo/bar", Compression.NONE, null, List.of(), copyFromUtf8("string " + i)); + soft.assertThat(persist.storeObj(obj)).isTrue(); + keptUnreferenced.add(obj.id()); + } + for (int i = 100; i < 125; i++) { + var obj = contentValue("cid-" + i, 42, copyFromUtf8("string " + i)); + soft.assertThat(persist.storeObj(obj)).isTrue(); + keptUnreferenced.add(obj.id()); + } + + var resolveAndPurge = resolveAndPurge(maxObjReferenced); + + soft.assertThat(resolveAndPurge.resolveResult().stats()) + .extracting( + ResolveStats::failure, + ResolveStats::numReferences, + ResolveStats::numCommitChainHeads, + ResolveStats::numCommits, + ResolveStats::numUniqueCommits, + ResolveStats::numQueuedObjs, + ResolveStats::numObjs) + .containsExactly( + Optional.empty(), + // refs + 3L + 10L, + // heads ("main" has EMPTY_OBJ_ID) + 2L + 10L, + // commits + 3L + 10L + referencedCommits.size(), + // unique commits + 3L + 10L + referencedCommits.size(), + // objects + non-existing UniqueObj + 2L + referenced.size() + contents, + 2L + referenced.size()); + + soft.assertThat(resolveAndPurge.purgeResult().stats()) + .extracting(PurgeStats::failure, PurgeStats::numScannedObjs, PurgeStats::numPurgedObjs) + .containsExactly( + Optional.empty(), 5L + 100L + 20L + referencedCommits.size() + referenced.size(), 50L); + + soft.assertThat(persist.fetchObjsIfExist(unreferenced.toArray(new ObjId[0]))) + .containsOnlyNulls(); + soft.assertThat(persist.fetchObjsIfExist(keptUnreferenced.toArray(new ObjId[0]))) + .doesNotContainNull(); + soft.assertThat(persist.fetchObjsIfExist(referenced.toArray(new ObjId[0]))) + .doesNotContainNull(); + } + + @Test + void withSecondaryParents() throws Exception { + soft.assertThat(repositoryLogic(persist).repositoryExists()).isTrue(); + + var referenceLogic = referenceLogic(persist); + var commitLogic = commitLogic(persist); + + var secondaryHead = buildNewCommitChain(commitLogic, "secondary"); + var referenceHead = buildNewCommitChain(commitLogic, "main"); + + var mergeCommit = + commitLogic.doCommit( + newCommitBuilder() + .commitType(NORMAL) + .parentCommitId(referenceHead) + .addSecondaryParents(secondaryHead) + .message("merge commit") + .headers(newCommitHeaders().add("created", "foo merge").build()) + .build(), + List.of()); + + referenceLogic.createReference("refs/heads/my-merge-1", requireNonNull(mergeCommit).id(), null); + referenceLogic.createReference("refs/heads/my-merge-2", requireNonNull(mergeCommit).id(), null); + + var maxObjReferenced = persist.config().currentTimeMicros(); + var resolveAndPurge = resolveAndPurge(maxObjReferenced); + + soft.assertThat(resolveAndPurge.resolveResult().stats()) + .extracting( + ResolveStats::failure, + ResolveStats::numReferences, + ResolveStats::numCommitChainHeads, + ResolveStats::numCommits, + ResolveStats::numUniqueCommits, + ResolveStats::numQueuedObjs, + ResolveStats::numObjs) + .containsExactly( + Optional.empty(), + // references + 3L + 1L + 1L, + // commit heads (all refs HEADs + secondary parent + incl duplicates & EMPTY_OBJ_ID) + 3L + 2L, + // commits (internals + 2x create-ref + 5+5 + 1) + 3L + 2L + 5L + 5L + 1L, + 3L + 2L + 5L + 5L + 1L, + // objects (internals, 2x RefObj + 5+5 contents + 10 non-existing UniqueObj + 2L + 2L + 5L + 5L + 10L, + 2L + 2L + 5L + 5L); + + soft.assertThat(resolveAndPurge.purgeResult().stats()) + .extracting(PurgeStats::failure, PurgeStats::numScannedObjs, PurgeStats::numPurgedObjs) + .containsExactly(Optional.empty(), 5L + 13L + 12L, 0L); + } + + private ObjId buildNewCommitChain(CommitLogic commitLogic, String discrim) throws Exception { + var head = EMPTY_OBJ_ID; + for (int i = 0; i < 5; i++) { + var cid1 = randomUUID(); + var obj1 = + contentValue( + cid1.toString(), + 127, + DefaultStoreWorker.instance() + .toStoreOnReferenceState(onRef("obj " + i + " " + discrim, cid1.toString()))); + var commit = + commitLogic.doCommit( + newCommitBuilder() + .commitType(NORMAL) + .parentCommitId(head) + .addAdds( + commitAdd( + key("store", "key", Integer.toString(i), discrim), + 42, + obj1.id(), + null, + cid1)) + .headers(newCommitHeaders().add("created", "foo-" + i + "-" + discrim).build()) + .message("commit " + i + " " + discrim) + .build(), + List.of(obj1)); + head = requireNonNull(commit).id(); + } + return head; + } + + ResolvePurgeResult resolveAndPurge(long maxObjReferenced) throws Exception { + var cleanup = createCleanup(CleanupParams.builder().build()); + var referencedObjectsContext = cleanup.buildReferencedObjectsContext(persist, maxObjReferenced); + var referencedObjectsResolver = + cleanup.createReferencedObjectsResolver(referencedObjectsContext); + var resolveResult = referencedObjectsResolver.resolve(); + var purgeObjects = cleanup.createPurgeObjects(resolveResult.purgeObjectsContext()); + var purgeResult = purgeObjects.purge(); + + return ImmutableResolvePurgeResult.of(resolveResult, purgeResult); + } + + @NessieImmutable + interface ResolvePurgeResult { + ResolveResult resolveResult(); + + PurgeResult purgeResult(); + } +} diff --git a/versioned/storage/cleanup/src/test/java/org/projectnessie/versioned/storage/cleanup/TestPurgeStatsBuilder.java b/versioned/storage/cleanup/src/test/java/org/projectnessie/versioned/storage/cleanup/TestPurgeStatsBuilder.java new file mode 100644 index 00000000000..179cd070527 --- /dev/null +++ b/versioned/storage/cleanup/src/test/java/org/projectnessie/versioned/storage/cleanup/TestPurgeStatsBuilder.java @@ -0,0 +1,49 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import org.assertj.core.api.SoftAssertions; +import org.assertj.core.api.junit.jupiter.InjectSoftAssertions; +import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(SoftAssertionsExtension.class) +public class TestPurgeStatsBuilder { + @InjectSoftAssertions SoftAssertions soft; + + @Test + void purgeStatsBuilder() { + var builder = new PurgeStatsBuilder(); + + var expected = ImmutablePurgeStats.builder(); + + builder.started = Instant.EPOCH; + expected.started(Instant.EPOCH); + builder.ended = Instant.EPOCH.plus(42, ChronoUnit.DAYS); + expected.ended(Instant.EPOCH.plus(42, ChronoUnit.DAYS)); + builder.numPurgedObjs = 1; + expected.numPurgedObjs(1); + builder.numScannedObjs = 2; + expected.numScannedObjs(2); + builder.failure = new Exception("hello"); + expected.failure(builder.failure); + + soft.assertThat(builder.build()).isEqualTo(expected.build()); + } +} diff --git a/versioned/storage/cleanup/src/test/java/org/projectnessie/versioned/storage/cleanup/TestRateLimiting.java b/versioned/storage/cleanup/src/test/java/org/projectnessie/versioned/storage/cleanup/TestRateLimiting.java new file mode 100644 index 00000000000..1d31a14426e --- /dev/null +++ b/versioned/storage/cleanup/src/test/java/org/projectnessie/versioned/storage/cleanup/TestRateLimiting.java @@ -0,0 +1,240 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +import static java.util.Objects.requireNonNull; +import static java.util.UUID.randomUUID; +import static org.projectnessie.nessie.relocated.protobuf.ByteString.copyFromUtf8; +import static org.projectnessie.versioned.storage.cleanup.Cleanup.createCleanup; +import static org.projectnessie.versioned.storage.common.indexes.StoreKey.key; +import static org.projectnessie.versioned.storage.common.logic.CreateCommit.Add.commitAdd; +import static org.projectnessie.versioned.storage.common.logic.CreateCommit.newCommitBuilder; +import static org.projectnessie.versioned.storage.common.logic.Logics.commitLogic; +import static org.projectnessie.versioned.storage.common.logic.Logics.referenceLogic; +import static org.projectnessie.versioned.storage.common.logic.Logics.repositoryLogic; +import static org.projectnessie.versioned.storage.common.objtypes.CommitHeaders.newCommitHeaders; +import static org.projectnessie.versioned.storage.common.objtypes.CommitType.NORMAL; +import static org.projectnessie.versioned.storage.common.objtypes.ContentValueObj.contentValue; +import static org.projectnessie.versioned.storage.common.objtypes.StringObj.stringData; +import static org.projectnessie.versioned.storage.common.persist.ObjId.EMPTY_OBJ_ID; +import static org.projectnessie.versioned.testworker.OnRefOnly.onRef; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicLong; +import org.assertj.core.api.SoftAssertions; +import org.assertj.core.api.junit.jupiter.InjectSoftAssertions; +import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.projectnessie.versioned.storage.common.objtypes.Compression; +import org.projectnessie.versioned.storage.common.persist.ObjId; +import org.projectnessie.versioned.storage.common.persist.Persist; +import org.projectnessie.versioned.storage.testextension.NessiePersist; +import org.projectnessie.versioned.storage.testextension.PersistExtension; +import org.projectnessie.versioned.store.DefaultStoreWorker; + +@ExtendWith({PersistExtension.class, SoftAssertionsExtension.class}) +public class TestRateLimiting { + @InjectSoftAssertions protected SoftAssertions soft; + + @NessiePersist protected Persist persist; + + @Test + void productionImplementations() { + soft.assertThat(RateLimit.create(0)).extracting(RateLimit::toString).isEqualTo("unlimited"); + soft.assertThat(RateLimit.create(-42)).extracting(RateLimit::toString).isEqualTo("unlimited"); + soft.assertThat(RateLimit.create(42)).extracting(RateLimit::toString).isEqualTo("up to 42"); + + soft.assertThatCode(() -> RateLimit.create(0).acquire()).doesNotThrowAnyException(); + soft.assertThatCode(() -> RateLimit.create(42).acquire()).doesNotThrowAnyException(); + } + + @Test + void againstEmptyRepositoryWithGarbage() throws Exception { + soft.assertThat(repositoryLogic(persist).repositoryExists()).isTrue(); + + var referenceLogic = referenceLogic(persist); + var commitLogic = commitLogic(persist); + + var unreferenced = new ArrayList(); + var keptUnreferenced = new ArrayList(); + var referencedCommits = new ArrayList(); + var referenced = new ArrayList(); + var contents = 0; + + for (int i = 0; i < 25; i++) { + var obj = + stringData("foo/bar", Compression.NONE, null, List.of(), copyFromUtf8("string " + i)); + soft.assertThat(persist.storeObj(obj)).isTrue(); + unreferenced.add(obj.id()); + } + for (int i = 0; i < 25; i++) { + var cid = randomUUID(); + var obj = + contentValue( + cid.toString(), + 127, + DefaultStoreWorker.instance() + .toStoreOnReferenceState(onRef("dummy " + i, cid.toString()))); + soft.assertThat(persist.storeObj(obj)).isTrue(); + unreferenced.add(obj.id()); + } + + // 10 new references + // 10 new RefObj + for (int i = 0; i < 10; i++) { + var head = EMPTY_OBJ_ID; + for (int i1 = 0; i1 < 20; i1++) { + var cid1 = randomUUID(); + var cid2 = randomUUID(); + var obj1 = + contentValue( + cid1.toString(), + 127, + DefaultStoreWorker.instance() + .toStoreOnReferenceState(onRef("obj " + i + " " + i1 + " 1", cid1.toString()))); + var obj2 = + contentValue( + cid2.toString(), + 127, + DefaultStoreWorker.instance() + .toStoreOnReferenceState(onRef("obj " + i + " " + i1 + " 2", cid2.toString()))); + var commit = + commitLogic.doCommit( + newCommitBuilder() + .commitType(NORMAL) + .parentCommitId(head) + .addAdds( + commitAdd( + key("store", "key", Integer.toString(i), Integer.toString(i1), "1"), + 42, + obj1.id(), + null, + cid1)) + .addAdds( + commitAdd( + key("store", "key", Integer.toString(i), Integer.toString(i1), "2"), + 42, + obj2.id(), + null, + cid2)) + .headers(newCommitHeaders().add("created", "foo-" + i + "-" + i1).build()) + .message("commit " + i1 + " on " + i) + .build(), + List.of(obj1, obj2)); + head = requireNonNull(commit).id(); + + referencedCommits.add(head); + referenced.add(obj1.id()); + referenced.add(obj2.id()); + contents += 2; + } + + var extendedInfo = + stringData("ref/foo", Compression.NONE, null, List.of(), copyFromUtf8("ext-info " + i)); + soft.assertThat(persist.storeObj(extendedInfo)).isTrue(); + referenced.add(extendedInfo.id()); + + referenceLogic.createReference("refs/heads/myref-" + i, head, extendedInfo.id()); + } + + var maxObjReferenced = persist.config().currentTimeMicros(); + + // Unreferenced, but newer than 'maxObjReferenced' + for (int i = 100; i < 125; i++) { + var obj = + stringData("foo/bar", Compression.NONE, null, List.of(), copyFromUtf8("string " + i)); + soft.assertThat(persist.storeObj(obj)).isTrue(); + keptUnreferenced.add(obj.id()); + } + for (int i = 100; i < 125; i++) { + var obj = contentValue("cid-" + i, 42, copyFromUtf8("string " + i)); + soft.assertThat(persist.storeObj(obj)).isTrue(); + keptUnreferenced.add(obj.id()); + } + + var acquires = new AtomicLong[4]; + + var cleanup = + createCleanup( + CleanupParams.builder() + .resolveCommitRatePerSecond(1) + .resolveObjRatePerSecond(2) + .purgeScanObjRatePerSecond(3) + .purgeDeleteObjRatePerSecond(4) + .rateLimitFactory( + i -> { + var a = acquires[i - 1] = new AtomicLong(); + return a::incrementAndGet; + }) + .build()); + var referencedObjectsContext = cleanup.buildReferencedObjectsContext(persist, maxObjReferenced); + var referencedObjectsResolver = + cleanup.createReferencedObjectsResolver(referencedObjectsContext); + var resolveResult = referencedObjectsResolver.resolve(); + + soft.assertThat(acquires) + .extracting(l -> l != null ? l.get() : -1L) + .containsExactlyInAnyOrder( + 3L + 10L + referencedCommits.size(), 2L + referenced.size(), -1L, -1L); + soft.assertThat(resolveResult.stats()) + .extracting( + ResolveStats::failure, + ResolveStats::numReferences, + ResolveStats::numCommitChainHeads, + ResolveStats::numCommits, + ResolveStats::numUniqueCommits, + ResolveStats::numQueuedObjs, + ResolveStats::numObjs) + .containsExactly( + Optional.empty(), + // refs + 3L + 10L, + // heads ("main" has EMPTY_OBJ_ID) + 2L + 10L, + // commits + 3L + 10L + referencedCommits.size(), + // unique commits + 3L + 10L + referencedCommits.size(), + // objects + 2L + referenced.size() + contents, + 2L + referenced.size()); + + var purgeObjects = cleanup.createPurgeObjects(resolveResult.purgeObjectsContext()); + var purgeResult = purgeObjects.purge(); + + soft.assertThat(acquires) + .extracting(AtomicLong::get) + .containsExactlyInAnyOrder( + 3L + 10L + referencedCommits.size(), + 2L + referenced.size(), + 5L + 100L + 20L + referencedCommits.size() + referenced.size(), + 50L); + soft.assertThat(purgeResult.stats()) + .extracting(PurgeStats::failure, PurgeStats::numScannedObjs, PurgeStats::numPurgedObjs) + .containsExactly( + Optional.empty(), 5L + 100L + 20L + referencedCommits.size() + referenced.size(), 50L); + + soft.assertThat(persist.fetchObjsIfExist(unreferenced.toArray(new ObjId[0]))) + .containsOnlyNulls(); + soft.assertThat(persist.fetchObjsIfExist(keptUnreferenced.toArray(new ObjId[0]))) + .doesNotContainNull(); + soft.assertThat(persist.fetchObjsIfExist(referenced.toArray(new ObjId[0]))) + .doesNotContainNull(); + } +} diff --git a/versioned/storage/cleanup/src/test/java/org/projectnessie/versioned/storage/cleanup/TestReferencedObjectsFilterImpl.java b/versioned/storage/cleanup/src/test/java/org/projectnessie/versioned/storage/cleanup/TestReferencedObjectsFilterImpl.java new file mode 100644 index 00000000000..1a8388ab828 --- /dev/null +++ b/versioned/storage/cleanup/src/test/java/org/projectnessie/versioned/storage/cleanup/TestReferencedObjectsFilterImpl.java @@ -0,0 +1,110 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +import static org.projectnessie.versioned.storage.common.persist.ObjId.objIdFromByteArray; +import static org.projectnessie.versioned.storage.common.persist.ObjId.randomObjId; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; +import org.assertj.core.api.SoftAssertions; +import org.assertj.core.api.junit.jupiter.InjectSoftAssertions; +import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.projectnessie.versioned.storage.common.persist.ObjId; + +@ExtendWith(SoftAssertionsExtension.class) +public class TestReferencedObjectsFilterImpl { + @InjectSoftAssertions SoftAssertions soft; + + @Test + public void emptyFilterContainsNothing() { + ReferencedObjectsFilterImpl filter = + new ReferencedObjectsFilterImpl(CleanupParams.builder().build()); + soft.assertThat(filter.isProbablyReferenced(ObjId.EMPTY_OBJ_ID)).isFalse(); + for (int i = 0; i < 100; i++) { + ObjId id = randomObjId(); + soft.assertThat(filter.isProbablyReferenced(id)).describedAs("id = %s", id).isFalse(); + } + } + + @Test + public void filterContainsAdded() { + ReferencedObjectsFilterImpl filter = + new ReferencedObjectsFilterImpl(CleanupParams.builder().build()); + + soft.assertThat(filter.estimatedHeapPressure()).isGreaterThan(1L); + + soft.assertThat(filter.markReferenced(ObjId.EMPTY_OBJ_ID)).isTrue(); + soft.assertThat(filter.markReferenced(ObjId.EMPTY_OBJ_ID)).isFalse(); + + Set ids = new HashSet<>(3000); + for (int i = 0; i < 1000; i++) { + ids.add(randomObjId()); + } + + for (int i = 0; i < 1000; i++) { + byte[] bytes = new byte[4 + ThreadLocalRandom.current().nextInt(33)]; + ThreadLocalRandom.current().nextBytes(bytes); + ids.add(objIdFromByteArray(bytes)); + } + + for (ObjId id : ids) { + // There is a theoretical chance that this assertion fails, but that change is extremely low. + // (We're adding 2000 object IDs to a bloom filter with an expected object count of 1M and a + // low FPP.) + soft.assertThat(filter.markReferenced(id)).isTrue(); + soft.assertThat(filter.markReferenced(id)).isFalse(); + } + + soft.assertThat(filter.isProbablyReferenced(ObjId.EMPTY_OBJ_ID)).isTrue(); + for (ObjId id : ids) { + soft.assertThat(filter.isProbablyReferenced(id)).describedAs("id = %s", id).isTrue(); + } + } + + @ParameterizedTest + @ValueSource(ints = {100, 1_000, 10_000}) + public void withinExpectedFpp(int expected) { + ReferencedObjectsFilterImpl filter = + new ReferencedObjectsFilterImpl(CleanupParams.builder().expectedObjCount(expected).build()); + + for (int i = 0; i < expected; i++) { + ObjId id = randomObjId(); + soft.assertThatCode(() -> filter.markReferenced(id)).doesNotThrowAnyException(); + soft.assertThat(filter.withinExpectedFpp()).isTrue(); + } + + // "withinExpectedFpp" should trigger at some point + boolean thrown = false; + for (int i = 0; i < expected / 2; i++) { + ObjId id = randomObjId(); + try { + filter.markReferenced(id); + soft.assertThat(filter.withinExpectedFpp()).isTrue(); + } catch (MustRestartWithBiggerFilterRuntimeException e) { + soft.assertThat(filter.withinExpectedFpp()).isFalse(); + thrown = true; + break; + } + } + soft.assertThat(thrown).isTrue(); + } +} diff --git a/versioned/storage/cleanup/src/test/java/org/projectnessie/versioned/storage/cleanup/TestResolveStatsBuilder.java b/versioned/storage/cleanup/src/test/java/org/projectnessie/versioned/storage/cleanup/TestResolveStatsBuilder.java new file mode 100644 index 00000000000..3e50f0af62f --- /dev/null +++ b/versioned/storage/cleanup/src/test/java/org/projectnessie/versioned/storage/cleanup/TestResolveStatsBuilder.java @@ -0,0 +1,63 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import org.assertj.core.api.SoftAssertions; +import org.assertj.core.api.junit.jupiter.InjectSoftAssertions; +import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(SoftAssertionsExtension.class) +public class TestResolveStatsBuilder { + @InjectSoftAssertions SoftAssertions soft; + + @Test + void resolveStatsBuilder() { + var builder = new ResolveStatsBuilder(); + + var expected = ImmutableResolveStats.builder(); + + builder.mustRestart = true; + expected.mustRestart(true); + builder.started = Instant.EPOCH; + expected.started(Instant.EPOCH); + builder.ended = Instant.EPOCH.plus(42, ChronoUnit.DAYS); + expected.ended(Instant.EPOCH.plus(42, ChronoUnit.DAYS)); + builder.numCommits = 1; + expected.numCommits(1); + builder.numContents = 2; + expected.numContents(2); + builder.numObjs = 3; + expected.numObjs(3); + builder.numReferences = 4; + expected.numReferences(4); + builder.numUniqueCommits = 5; + expected.numUniqueCommits(5); + builder.numCommitChainHeads = 7; + expected.numCommitChainHeads(7); + builder.numQueuedObjs = 8; + expected.numQueuedObjs(8); + builder.numQueuedObjsBulkFetches = 9; + expected.numQueuedObjsBulkFetches(9); + builder.failure = new Exception("hello"); + expected.failure(builder.failure); + + soft.assertThat(builder.build()).isEqualTo(expected.build()); + } +}