Skip to content

Commit

Permalink
updated test
Browse files Browse the repository at this point in the history
  • Loading branch information
Sourav Maji committed Jan 30, 2024
1 parent 38f9a14 commit 7095914
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,10 @@ private Map<Integer, List<DerivedSchemaEntry>> getDerivedSchemaMap(String storeN
@Override
public void removeValueSchema(String storeName, int valueSchemaId) {
preCheckStoreCondition(storeName);
if (getSupersetOrLatestValueSchema(storeName).getId() == valueSchemaId) {
logger.error("Should not remove latest schema id {} for store {}", valueSchemaId, storeName);
return;
}
accessor.removeValueSchema(storeName, valueSchemaId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumMap;
Expand Down Expand Up @@ -216,8 +215,7 @@ public void writeInUseValueSchema(String storeName, int versionNumber, int value
// Construct an update
UpdateBuilder updateBuilder = new UpdateBuilderImpl(this.derivedComputeSchema);
updateBuilder.setNewFieldValue("timestamp", System.currentTimeMillis());
updateBuilder
.setElementsToAddToListField("storeValueSchemaIdsWrittenPerStoreVersion", Arrays.asList(valueSchemaId));
updateBuilder.setElementsToAddToListField("storeValueSchemaIdsWrittenPerStoreVersion", List.of(valueSchemaId));
return updateBuilder.build();
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,22 +72,27 @@ private Runnable getRunnableForSchemaCleanup() {

// assumes `getValueSchemas` returns ascending schema ids so that the older schemas are deleted first
for (SchemaEntry schemaEntry: allSchemas) {
if (schemaEntry.getId() == store.getLatestSuperSetValueSchemaId()) {
int schemaId = schemaEntry.getId();
// skip latest value schema or super-set schema id
if (schemaId == store.getLatestSuperSetValueSchemaId() || admin.getHelixVeniceClusterResources(clusterName)
.getSchemaRepository()
.getSupersetOrLatestValueSchema(storeName)
.getId() == schemaId) {
continue;
}

// delete only if its not used and less than minimum of used schema id
if (!usedSchemaSet.contains(schemaEntry.getId()) && schemaEntry.getId() < minSchemaIdInUse) {
schemasToDelete.add(schemaEntry.getId());
if (!usedSchemaSet.contains(schemaId) && schemaId < minSchemaIdInUse) {
schemasToDelete.add(schemaId);
// maintain minimum of SCHEMA_COUNT_THRESHOLD schemas in repo
if (schemasToDelete.size() > allSchemas.size() - minSchemaCountToKeep) {
break;
}
}
}
// delete from parent
admin.deleteValueSchemas(clusterName, store.getName(), schemasToDelete);
// delete from child colos
admin.deleteValueSchemas(clusterName, store.getName(), schemasToDelete);
// delete from parent
veniceParentHelixAdmin.deleteValueSchemas(clusterName, store.getName(), schemasToDelete);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ public class VeniceHelixAdmin implements Admin, StoreCleaner {
private static final long HELIX_RESOURCE_ASSIGNMENT_RETRY_INTERVAL_MS = 500;
private static final long HELIX_RESOURCE_ASSIGNMENT_LOG_INTERVAL_MS = TimeUnit.MINUTES.toMillis(1);

private static final long USUSED_SCHEMA_DELETION_TIME_GAP = TimeUnit.DAYS.toMillis(15);
private static final long UNUSED_SCHEMA_DELETION_TIME_GAP = TimeUnit.DAYS.toMillis(15);
private static final int INTERNAL_STORE_GET_RRT_TOPIC_ATTEMPTS = 3;
private static final long INTERNAL_STORE_RTT_RETRY_BACKOFF_MS = TimeUnit.SECONDS.toMillis(5);
private static final int PARTICIPANT_MESSAGE_STORE_SCHEMA_ID = 1;
Expand Down Expand Up @@ -3960,19 +3960,19 @@ public Set<Integer> getInUseValueSchemaIds(String clusterName, String storeName)
Store store = getStore(clusterName, storeName);
Set<Integer> schemaIds = new HashSet<>();

// Fetch value schema id used by all existing store verion
// Fetch value schema id used by all existing store version
for (Version version: store.getVersions()) {
Map<String, String> map = new HashMap<>(2);
map.put(MetaStoreWriter.KEY_STRING_VERSION_NUMBER, Integer.toString(version.getNumber()));
StoreMetaKey key = MetaStoreDataType.VALUE_SCHEMAS_WRITTEN_PER_STORE_VERSION.getStoreMetaKey(map);
StoreMetaValue metaValue = getMetaStoreValue(key, storeName);

if (metaValue == null) {
return Collections.emptySet();
continue;
}
// Skip if its recorded recently
if (System.currentTimeMillis() < metaValue.timestamp + USUSED_SCHEMA_DELETION_TIME_GAP) {
return Collections.emptySet();
if (System.currentTimeMillis() < metaValue.timestamp + UNUSED_SCHEMA_DELETION_TIME_GAP) {
continue;
}
schemaIds.addAll(metaValue.storeValueSchemaIdsWrittenPerStoreVersion);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;

import com.linkedin.venice.meta.ReadWriteSchemaRepository;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.schema.SchemaEntry;
Expand Down Expand Up @@ -50,11 +51,17 @@ public void testCleanupBackupVersionSleepValidation() throws Exception {
String clusterName = "test_cluster";
stores.add(store.getName());
Set<String> clusters = new HashSet<>();
HelixVeniceClusterResources helixVeniceClusterResources = mock(HelixVeniceClusterResources.class);
ReadWriteSchemaRepository schemaRepository = mock(ReadWriteSchemaRepository.class);
clusters.add(clusterName);
doReturn(clusters).when(config).getClusters();
List<Store> storeList = new ArrayList<>();
storeList.add(store);
doReturn(storeList).when(admin).getAllStores(any());
doReturn(helixVeniceClusterResources).when(admin).getHelixVeniceClusterResources(anyString());
doReturn(schemaRepository).when(helixVeniceClusterResources).getSchemaRepository();
SchemaEntry schemaEntry = new SchemaEntry(4, SCHEMA);
doReturn(schemaEntry).when(schemaRepository).getSupersetOrLatestValueSchema(anyString());
Collection<SchemaEntry> schemaEntries = new ArrayList<>();
schemaEntries.add(new SchemaEntry(1, SCHEMA));
schemaEntries.add(new SchemaEntry(2, SCHEMA));
Expand Down

0 comments on commit 7095914

Please sign in to comment.