From 83a3136eee1995aa1cb992a0480c664f90fcab99 Mon Sep 17 00:00:00 2001 From: Sourav Maji Date: Wed, 31 Jan 2024 12:55:24 -0800 Subject: [PATCH] updated test --- .../helix/HelixReadWriteSchemaRepository.java | 2 +- ...HelixReadWriteSchemaRepositoryAdapter.java | 7 ++- .../venice/system/store/MetaStoreWriter.java | 13 +++++- ...lixReadOnlyStoreRepositoryAdapterTest.java | 44 ++++++++++++++++++- .../venice/helix/HelixSchemaAccessorTest.java | 18 ++++++++ .../system/store/MetaStoreWriterTest.java | 42 +++++++++++++++++- 6 files changed, 119 insertions(+), 7 deletions(-) create mode 100644 internal/venice-common/src/test/java/com/linkedin/venice/helix/HelixSchemaAccessorTest.java diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/helix/HelixReadWriteSchemaRepository.java b/internal/venice-common/src/main/java/com/linkedin/venice/helix/HelixReadWriteSchemaRepository.java index a2e5c60755d..780a8974bf9 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/helix/HelixReadWriteSchemaRepository.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/helix/HelixReadWriteSchemaRepository.java @@ -563,7 +563,7 @@ public void refresh() { public void clear() { } - private void preCheckStoreCondition(String storeName) { + void preCheckStoreCondition(String storeName) { if (!storeRepository.hasStore(storeName)) { throw new VeniceNoStoreException(storeName); } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/helix/HelixReadWriteSchemaRepositoryAdapter.java b/internal/venice-common/src/main/java/com/linkedin/venice/helix/HelixReadWriteSchemaRepositoryAdapter.java index d6ffa0a8232..6d0caafa08d 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/helix/HelixReadWriteSchemaRepositoryAdapter.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/helix/HelixReadWriteSchemaRepositoryAdapter.java @@ -106,12 +106,16 @@ public DerivedSchemaEntry addDerivedSchema( public DerivedSchemaEntry removeDerivedSchema(String storeName, int valueSchemaId, int derivedSchemaId) { VeniceSystemStoreType systemStoreType = VeniceSystemStoreType.getSystemStoreType(storeName); if (HelixReadOnlyStoreRepositoryAdapter.forwardToRegularRepository(systemStoreType)) { - return readWriteRegularStoreSchemaRepository.removeDerivedSchema(storeName, valueSchemaId, derivedSchemaId); + return getReadWriteSchemaRepository().removeDerivedSchema(storeName, valueSchemaId, derivedSchemaId); } throw new VeniceException( errorMsgForUnsupportedOperationsAgainstSystemStore(storeName, systemStoreType, "removeDerivedSchema")); } + ReadWriteSchemaRepository getReadWriteSchemaRepository() { + return readWriteRegularStoreSchemaRepository; + } + @Override public int preCheckValueSchemaAndGetNextAvailableId( String storeName, @@ -193,6 +197,7 @@ public void removeValueSchema(String storeName, int schemaID) { VeniceSystemStoreType systemStoreType = VeniceSystemStoreType.getSystemStoreType(storeName); if (HelixReadOnlyStoreRepositoryAdapter.forwardToRegularRepository(systemStoreType)) { readWriteRegularStoreSchemaRepository.removeValueSchema(storeName, schemaID); + return; } throw new VeniceException( errorMsgForUnsupportedOperationsAgainstSystemStore(storeName, systemStoreType, "removeValueSchema")); diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/system/store/MetaStoreWriter.java b/internal/venice-common/src/main/java/com/linkedin/venice/system/store/MetaStoreWriter.java index 6237cc27e72..8b916fa84ed 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/system/store/MetaStoreWriter.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/system/store/MetaStoreWriter.java @@ -2,6 +2,7 @@ import com.linkedin.venice.common.VeniceSystemStoreType; import com.linkedin.venice.exceptions.VeniceException; +import com.linkedin.venice.helix.HelixReadOnlySchemaRepository; import com.linkedin.venice.helix.HelixReadOnlyZKSharedSchemaRepository; import com.linkedin.venice.kafka.TopicManager; import com.linkedin.venice.meta.Instance; @@ -270,7 +271,7 @@ public void writeStoreReplicaStatus( } }, () -> { // Construct an update - UpdateBuilder updateBuilder = new UpdateBuilderImpl(this.derivedComputeSchema); + UpdateBuilder updateBuilder = new UpdateBuilderImpl(getDerivedComputeSchema()); updateBuilder.setNewFieldValue("timestamp", System.currentTimeMillis()); Map instanceStatusMap = new HashMap<>(); StoreReplicaStatus replicaStatus = new StoreReplicaStatus(); @@ -281,6 +282,10 @@ public void writeStoreReplicaStatus( }); } + Schema getDerivedComputeSchema() { + return this.derivedComputeSchema; + } + /** * Write {@link com.linkedin.venice.meta.StoreConfig} equivalent to the meta system store. This is still only invoked * by child controllers only. @@ -370,7 +375,7 @@ private void update( * Fetch the derived compute schema id on demand for integration test since the meta system store is being created * during cluster initialization. */ - GeneratedSchemaID derivedSchemaId = zkSharedSchemaRepository + GeneratedSchemaID derivedSchemaId = getSchemaRepository() .getDerivedSchemaId(VeniceSystemStoreType.META_STORE.getZkSharedStoreName(), derivedComputeSchema.toString()); if (!derivedSchemaId.isValid()) { throw new VeniceException( @@ -390,6 +395,10 @@ private void update( }); } + HelixReadOnlySchemaRepository getSchemaRepository() { + return zkSharedSchemaRepository; + } + void writeMessageWithRetry(String metaStoreName, Consumer writerConsumer) { ReentrantLock lock = getOrCreateMetaStoreWriterLock(metaStoreName); int messageProduceRetryCount = 0; diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/helix/HelixReadOnlyStoreRepositoryAdapterTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/helix/HelixReadOnlyStoreRepositoryAdapterTest.java index f8c95cc8f0d..c54cc9dac8e 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/helix/HelixReadOnlyStoreRepositoryAdapterTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/helix/HelixReadOnlyStoreRepositoryAdapterTest.java @@ -1,11 +1,21 @@ package com.linkedin.venice.helix; -import static org.mockito.Mockito.*; -import static org.testng.AssertJUnit.*; +import static org.mockito.Mockito.anyString; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.testng.AssertJUnit.fail; +import com.linkedin.venice.meta.ReadWriteSchemaRepository; +import com.linkedin.venice.meta.ReadWriteStoreRepository; import com.linkedin.venice.meta.Store; import com.linkedin.venice.meta.StoreDataChangedListener; +import com.linkedin.venice.schema.SchemaEntry; +import com.linkedin.venice.system.store.MetaStoreWriter; import java.util.HashSet; +import java.util.Optional; import java.util.Set; import org.testng.annotations.Test; @@ -29,4 +39,34 @@ public void testStoreDeleteHandler() { } } + @Test + public void testRemoveValueSchema() { + String storeName = "abc"; + HelixReadOnlyZKSharedSchemaRepository readOnlyZKSharedSchemaRepository = + mock(HelixReadOnlyZKSharedSchemaRepository.class); + ReadWriteSchemaRepository repository = mock(ReadWriteSchemaRepository.class); + HelixReadWriteSchemaRepositoryAdapter adapter = + new HelixReadWriteSchemaRepositoryAdapter(readOnlyZKSharedSchemaRepository, repository); + adapter.removeValueSchema(storeName, 2); + MetaStoreWriter metaStoreWriter = mock(MetaStoreWriter.class); + ReadWriteStoreRepository storeRepository = mock(ReadWriteStoreRepository.class); + HelixSchemaAccessor schemaAccessor = mock(HelixSchemaAccessor.class); + HelixReadWriteSchemaRepository readWriteSchemaRepository = + new HelixReadWriteSchemaRepository(storeRepository, Optional.of(metaStoreWriter), schemaAccessor); + doReturn(true).when(storeRepository).hasStore(anyString()); + Store store = mock(Store.class); + doReturn(store).when(storeRepository).getStore(anyString()); + doReturn(store).when(storeRepository).getStoreOrThrow(anyString()); + doReturn(1).when(store).getLatestSuperSetValueSchemaId(); + SchemaEntry schemaEntry = mock(SchemaEntry.class); + doReturn(1).when(schemaEntry).getId(); + doReturn(schemaEntry).when(schemaAccessor).getValueSchema(anyString(), anyString()); + readWriteSchemaRepository.removeValueSchema(storeName, 1); + + verify(repository, times(1)).removeValueSchema(storeName, 2); + + readWriteSchemaRepository.removeValueSchema(storeName, 2); + + } + } diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/helix/HelixSchemaAccessorTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/helix/HelixSchemaAccessorTest.java new file mode 100644 index 00000000000..f6bda3ea824 --- /dev/null +++ b/internal/venice-common/src/test/java/com/linkedin/venice/helix/HelixSchemaAccessorTest.java @@ -0,0 +1,18 @@ +package com.linkedin.venice.helix; + +import static org.mockito.Mockito.mock; + +import org.apache.helix.zookeeper.impl.client.ZkClient; +import org.testng.annotations.Test; + + +public class HelixSchemaAccessorTest { + @Test + public void testRemoveValueSchema() { + String storeName = "abc"; + ZkClient zkClient = mock(ZkClient.class); + HelixAdapterSerializer serializer = mock(HelixAdapterSerializer.class); + HelixSchemaAccessor schemaAccessor = new HelixSchemaAccessor(zkClient, serializer, "clusterName"); + schemaAccessor.removeValueSchema(storeName, 2); + } +} diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/system/store/MetaStoreWriterTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/system/store/MetaStoreWriterTest.java index 1093d8b839a..609e73957ef 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/system/store/MetaStoreWriterTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/system/store/MetaStoreWriterTest.java @@ -1,21 +1,29 @@ package com.linkedin.venice.system.store; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doCallRealMethod; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import static org.testng.Assert.assertTrue; +import static org.testng.AssertJUnit.assertTrue; import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.helix.HelixReadOnlyZKSharedSchemaRepository; import com.linkedin.venice.kafka.TopicManager; +import com.linkedin.venice.meta.Instance; import com.linkedin.venice.pubsub.PubSubTopicRepository; +import com.linkedin.venice.pushmonitor.ExecutionStatus; +import com.linkedin.venice.schema.GeneratedSchemaID; +import com.linkedin.venice.schema.writecompute.WriteComputeSchemaConverter; +import com.linkedin.venice.serialization.avro.AvroProtocolDefinition; import com.linkedin.venice.systemstore.schemas.StoreMetaKey; import com.linkedin.venice.systemstore.schemas.StoreMetaValue; import com.linkedin.venice.utils.VeniceResourceCloseResult; @@ -41,6 +49,14 @@ public class MetaStoreWriterTest { public void testMetaStoreWriterWillRestartUponProduceFailure() { MetaStoreWriter metaStoreWriter = mock(MetaStoreWriter.class); String metaStoreName = "testStore"; + HelixReadOnlyZKSharedSchemaRepository schemaRepo = mock(HelixReadOnlyZKSharedSchemaRepository.class); + GeneratedSchemaID generatedSchemaID = mock(GeneratedSchemaID.class); + doReturn(true).when(generatedSchemaID).isValid(); + doReturn(generatedSchemaID).when(schemaRepo).getDerivedSchemaId(any(), any()); + doReturn(schemaRepo).when(metaStoreWriter).getSchemaRepository(); + Schema derivedSchema = WriteComputeSchemaConverter.getInstance() + .convertFromValueRecordSchema( + AvroProtocolDefinition.METADATA_SYSTEM_SCHEMA_STORE.getCurrentProtocolVersionSchema()); ReentrantLock reentrantLock = new ReentrantLock(); when(metaStoreWriter.getOrCreateMetaStoreWriterLock(metaStoreName)).thenReturn(reentrantLock); VeniceWriter badWriter = mock(VeniceWriter.class); @@ -52,6 +68,11 @@ public void testMetaStoreWriterWillRestartUponProduceFailure() { metaStoreWriter.writeMessageWithRetry(metaStoreName, vw -> vw.delete("a", null)); verify(badWriter, times(1)).delete(any(), any()); verify(goodWriter, times(1)).delete(any(), any()); + + doReturn(derivedSchema).when(metaStoreWriter).getDerivedComputeSchema(); + when(metaStoreWriter.getOrCreateMetaStoreWriter(metaStoreName)).thenReturn(goodWriter); + doCallRealMethod().when(metaStoreWriter) + .writeStoreReplicaStatus(anyString(), anyString(), anyInt(), anyInt(), any(), eq(ExecutionStatus.COMPLETED)); } @Test @@ -87,6 +108,25 @@ public Object[][] testCloseDataProvider() { return new Object[][] { { 5000, 30 }, { 4000, 2 }, { 3000, 11 }, { 2000, 0 } }; } + @Test + public void testUpdateReplicaStatus() { + TopicManager topicManager = mock(TopicManager.class); + VeniceWriterFactory writerFactory = mock(VeniceWriterFactory.class); + HelixReadOnlyZKSharedSchemaRepository schemaRepo = mock(HelixReadOnlyZKSharedSchemaRepository.class); + PubSubTopicRepository pubSubTopicRepository = mock(PubSubTopicRepository.class); + GeneratedSchemaID generatedSchemaID = mock(GeneratedSchemaID.class); + doReturn(true).when(generatedSchemaID).isValid(); + doReturn(generatedSchemaID).when(schemaRepo).getDerivedSchemaId(any(), any()); + MetaStoreWriter metaStoreWriter = + new MetaStoreWriter(topicManager, writerFactory, schemaRepo, pubSubTopicRepository, 10, 100); + ArgumentCaptor keyArgumentCaptor = ArgumentCaptor.forClass(StoreMetaKey.class); + ArgumentCaptor valueArgumentCaptor = ArgumentCaptor.forClass(StoreMetaValue.class); + ArgumentCaptor schemaArgumentCaptor = ArgumentCaptor.forClass(Integer.class); + + Instance instance = new Instance("host1", "host1", 1234); + metaStoreWriter.writeStoreReplicaStatus("cluster", "storeName", 1, 0, instance, ExecutionStatus.COMPLETED); + } + @Test(dataProvider = "testCloseDataProvider") public void testClose(long closeTimeoutMs, int numOfConcurrentVwCloseOps) throws IOException, ExecutionException, InterruptedException {