From 70959144b48a14f91c7a5809276aa757bc264962 Mon Sep 17 00:00:00 2001 From: Sourav Maji Date: Tue, 30 Jan 2024 10:27:51 -0800 Subject: [PATCH] updated test --- .../helix/HelixReadWriteSchemaRepository.java | 4 ++++ .../venice/system/store/MetaStoreWriter.java | 4 +--- .../UnusedValueSchemaCleanupService.java | 15 ++++++++++----- .../venice/controller/VeniceHelixAdmin.java | 10 +++++----- .../TestUnusedValueSchemaCleanupService.java | 7 +++++++ 5 files changed, 27 insertions(+), 13 deletions(-) diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/helix/HelixReadWriteSchemaRepository.java b/internal/venice-common/src/main/java/com/linkedin/venice/helix/HelixReadWriteSchemaRepository.java index 162895fb905..a2e5c60755d 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/helix/HelixReadWriteSchemaRepository.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/helix/HelixReadWriteSchemaRepository.java @@ -509,6 +509,10 @@ private Map> getDerivedSchemaMap(String storeN @Override public void removeValueSchema(String storeName, int valueSchemaId) { preCheckStoreCondition(storeName); + if (getSupersetOrLatestValueSchema(storeName).getId() == valueSchemaId) { + logger.error("Should not remove latest schema id {} for store {}", valueSchemaId, storeName); + return; + } accessor.removeValueSchema(storeName, valueSchemaId); } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/system/store/MetaStoreWriter.java b/internal/venice-common/src/main/java/com/linkedin/venice/system/store/MetaStoreWriter.java index 6237cc27e72..79a97207ae2 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/system/store/MetaStoreWriter.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/system/store/MetaStoreWriter.java @@ -33,7 +33,6 @@ import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.EnumMap; @@ -216,8 +215,7 @@ public void writeInUseValueSchema(String storeName, int versionNumber, int value // Construct an update UpdateBuilder updateBuilder = new UpdateBuilderImpl(this.derivedComputeSchema); updateBuilder.setNewFieldValue("timestamp", System.currentTimeMillis()); - updateBuilder - .setElementsToAddToListField("storeValueSchemaIdsWrittenPerStoreVersion", Arrays.asList(valueSchemaId)); + updateBuilder.setElementsToAddToListField("storeValueSchemaIdsWrittenPerStoreVersion", List.of(valueSchemaId)); return updateBuilder.build(); }); } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/UnusedValueSchemaCleanupService.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/UnusedValueSchemaCleanupService.java index 1e8c75b2ac8..b0770f0c138 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/UnusedValueSchemaCleanupService.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/UnusedValueSchemaCleanupService.java @@ -72,22 +72,27 @@ private Runnable getRunnableForSchemaCleanup() { // assumes `getValueSchemas` returns ascending schema ids so that the older schemas are deleted first for (SchemaEntry schemaEntry: allSchemas) { - if (schemaEntry.getId() == store.getLatestSuperSetValueSchemaId()) { + int schemaId = schemaEntry.getId(); + // skip latest value schema or super-set schema id + if (schemaId == store.getLatestSuperSetValueSchemaId() || admin.getHelixVeniceClusterResources(clusterName) + .getSchemaRepository() + .getSupersetOrLatestValueSchema(storeName) + .getId() == schemaId) { continue; } // delete only if its not used and less than minimum of used schema id - if (!usedSchemaSet.contains(schemaEntry.getId()) && schemaEntry.getId() < minSchemaIdInUse) { - schemasToDelete.add(schemaEntry.getId()); + if (!usedSchemaSet.contains(schemaId) && schemaId < minSchemaIdInUse) { + schemasToDelete.add(schemaId); // maintain minimum of SCHEMA_COUNT_THRESHOLD schemas in repo if (schemasToDelete.size() > allSchemas.size() - minSchemaCountToKeep) { break; } } } - // delete from parent - admin.deleteValueSchemas(clusterName, store.getName(), schemasToDelete); // delete from child colos + admin.deleteValueSchemas(clusterName, store.getName(), schemasToDelete); + // delete from parent veniceParentHelixAdmin.deleteValueSchemas(clusterName, store.getName(), schemasToDelete); } } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java index 71fe61c076b..ce4c9edb342 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java @@ -328,7 +328,7 @@ public class VeniceHelixAdmin implements Admin, StoreCleaner { private static final long HELIX_RESOURCE_ASSIGNMENT_RETRY_INTERVAL_MS = 500; private static final long HELIX_RESOURCE_ASSIGNMENT_LOG_INTERVAL_MS = TimeUnit.MINUTES.toMillis(1); - private static final long USUSED_SCHEMA_DELETION_TIME_GAP = TimeUnit.DAYS.toMillis(15); + private static final long UNUSED_SCHEMA_DELETION_TIME_GAP = TimeUnit.DAYS.toMillis(15); private static final int INTERNAL_STORE_GET_RRT_TOPIC_ATTEMPTS = 3; private static final long INTERNAL_STORE_RTT_RETRY_BACKOFF_MS = TimeUnit.SECONDS.toMillis(5); private static final int PARTICIPANT_MESSAGE_STORE_SCHEMA_ID = 1; @@ -3960,7 +3960,7 @@ public Set getInUseValueSchemaIds(String clusterName, String storeName) Store store = getStore(clusterName, storeName); Set schemaIds = new HashSet<>(); - // Fetch value schema id used by all existing store verion + // Fetch value schema id used by all existing store version for (Version version: store.getVersions()) { Map map = new HashMap<>(2); map.put(MetaStoreWriter.KEY_STRING_VERSION_NUMBER, Integer.toString(version.getNumber())); @@ -3968,11 +3968,11 @@ public Set getInUseValueSchemaIds(String clusterName, String storeName) StoreMetaValue metaValue = getMetaStoreValue(key, storeName); if (metaValue == null) { - return Collections.emptySet(); + continue; } // Skip if its recorded recently - if (System.currentTimeMillis() < metaValue.timestamp + USUSED_SCHEMA_DELETION_TIME_GAP) { - return Collections.emptySet(); + if (System.currentTimeMillis() < metaValue.timestamp + UNUSED_SCHEMA_DELETION_TIME_GAP) { + continue; } schemaIds.addAll(metaValue.storeValueSchemaIdsWrittenPerStoreVersion); } diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestUnusedValueSchemaCleanupService.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestUnusedValueSchemaCleanupService.java index 185515d1d92..e1220545c9f 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestUnusedValueSchemaCleanupService.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestUnusedValueSchemaCleanupService.java @@ -3,6 +3,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; +import com.linkedin.venice.meta.ReadWriteSchemaRepository; import com.linkedin.venice.meta.Store; import com.linkedin.venice.meta.Version; import com.linkedin.venice.schema.SchemaEntry; @@ -50,11 +51,17 @@ public void testCleanupBackupVersionSleepValidation() throws Exception { String clusterName = "test_cluster"; stores.add(store.getName()); Set clusters = new HashSet<>(); + HelixVeniceClusterResources helixVeniceClusterResources = mock(HelixVeniceClusterResources.class); + ReadWriteSchemaRepository schemaRepository = mock(ReadWriteSchemaRepository.class); clusters.add(clusterName); doReturn(clusters).when(config).getClusters(); List storeList = new ArrayList<>(); storeList.add(store); doReturn(storeList).when(admin).getAllStores(any()); + doReturn(helixVeniceClusterResources).when(admin).getHelixVeniceClusterResources(anyString()); + doReturn(schemaRepository).when(helixVeniceClusterResources).getSchemaRepository(); + SchemaEntry schemaEntry = new SchemaEntry(4, SCHEMA); + doReturn(schemaEntry).when(schemaRepository).getSupersetOrLatestValueSchema(anyString()); Collection schemaEntries = new ArrayList<>(); schemaEntries.add(new SchemaEntry(1, SCHEMA)); schemaEntries.add(new SchemaEntry(2, SCHEMA));