Skip to content

Commit

Permalink
updated test
Browse files Browse the repository at this point in the history
  • Loading branch information
Sourav Maji committed Jan 31, 2024
1 parent d1993a4 commit 83a3136
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ public void refresh() {
public void clear() {
}

private void preCheckStoreCondition(String storeName) {
void preCheckStoreCondition(String storeName) {
if (!storeRepository.hasStore(storeName)) {
throw new VeniceNoStoreException(storeName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,16 @@ public DerivedSchemaEntry addDerivedSchema(
public DerivedSchemaEntry removeDerivedSchema(String storeName, int valueSchemaId, int derivedSchemaId) {
VeniceSystemStoreType systemStoreType = VeniceSystemStoreType.getSystemStoreType(storeName);
if (HelixReadOnlyStoreRepositoryAdapter.forwardToRegularRepository(systemStoreType)) {
return readWriteRegularStoreSchemaRepository.removeDerivedSchema(storeName, valueSchemaId, derivedSchemaId);
return getReadWriteSchemaRepository().removeDerivedSchema(storeName, valueSchemaId, derivedSchemaId);
}
throw new VeniceException(
errorMsgForUnsupportedOperationsAgainstSystemStore(storeName, systemStoreType, "removeDerivedSchema"));
}

ReadWriteSchemaRepository getReadWriteSchemaRepository() {
return readWriteRegularStoreSchemaRepository;
}

@Override
public int preCheckValueSchemaAndGetNextAvailableId(
String storeName,
Expand Down Expand Up @@ -193,6 +197,7 @@ public void removeValueSchema(String storeName, int schemaID) {
VeniceSystemStoreType systemStoreType = VeniceSystemStoreType.getSystemStoreType(storeName);
if (HelixReadOnlyStoreRepositoryAdapter.forwardToRegularRepository(systemStoreType)) {
readWriteRegularStoreSchemaRepository.removeValueSchema(storeName, schemaID);
return;
}
throw new VeniceException(
errorMsgForUnsupportedOperationsAgainstSystemStore(storeName, systemStoreType, "removeValueSchema"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.linkedin.venice.common.VeniceSystemStoreType;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.helix.HelixReadOnlySchemaRepository;
import com.linkedin.venice.helix.HelixReadOnlyZKSharedSchemaRepository;
import com.linkedin.venice.kafka.TopicManager;
import com.linkedin.venice.meta.Instance;
Expand Down Expand Up @@ -270,7 +271,7 @@ public void writeStoreReplicaStatus(
}
}, () -> {
// Construct an update
UpdateBuilder updateBuilder = new UpdateBuilderImpl(this.derivedComputeSchema);
UpdateBuilder updateBuilder = new UpdateBuilderImpl(getDerivedComputeSchema());
updateBuilder.setNewFieldValue("timestamp", System.currentTimeMillis());
Map<String, StoreReplicaStatus> instanceStatusMap = new HashMap<>();
StoreReplicaStatus replicaStatus = new StoreReplicaStatus();
Expand All @@ -281,6 +282,10 @@ public void writeStoreReplicaStatus(
});
}

Schema getDerivedComputeSchema() {
return this.derivedComputeSchema;
}

/**
* Write {@link com.linkedin.venice.meta.StoreConfig} equivalent to the meta system store. This is still only invoked
* by child controllers only.
Expand Down Expand Up @@ -370,7 +375,7 @@ private void update(
* Fetch the derived compute schema id on demand for integration test since the meta system store is being created
* during cluster initialization.
*/
GeneratedSchemaID derivedSchemaId = zkSharedSchemaRepository
GeneratedSchemaID derivedSchemaId = getSchemaRepository()
.getDerivedSchemaId(VeniceSystemStoreType.META_STORE.getZkSharedStoreName(), derivedComputeSchema.toString());
if (!derivedSchemaId.isValid()) {
throw new VeniceException(
Expand All @@ -390,6 +395,10 @@ private void update(
});
}

HelixReadOnlySchemaRepository getSchemaRepository() {
return zkSharedSchemaRepository;
}

void writeMessageWithRetry(String metaStoreName, Consumer<VeniceWriter> writerConsumer) {
ReentrantLock lock = getOrCreateMetaStoreWriterLock(metaStoreName);
int messageProduceRetryCount = 0;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,21 @@
package com.linkedin.venice.helix;

import static org.mockito.Mockito.*;
import static org.testng.AssertJUnit.*;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.AssertJUnit.fail;

import com.linkedin.venice.meta.ReadWriteSchemaRepository;
import com.linkedin.venice.meta.ReadWriteStoreRepository;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.StoreDataChangedListener;
import com.linkedin.venice.schema.SchemaEntry;
import com.linkedin.venice.system.store.MetaStoreWriter;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import org.testng.annotations.Test;

Expand All @@ -29,4 +39,34 @@ public void testStoreDeleteHandler() {
}
}

@Test
public void testRemoveValueSchema() {
String storeName = "abc";
HelixReadOnlyZKSharedSchemaRepository readOnlyZKSharedSchemaRepository =
mock(HelixReadOnlyZKSharedSchemaRepository.class);
ReadWriteSchemaRepository repository = mock(ReadWriteSchemaRepository.class);
HelixReadWriteSchemaRepositoryAdapter adapter =
new HelixReadWriteSchemaRepositoryAdapter(readOnlyZKSharedSchemaRepository, repository);
adapter.removeValueSchema(storeName, 2);
MetaStoreWriter metaStoreWriter = mock(MetaStoreWriter.class);
ReadWriteStoreRepository storeRepository = mock(ReadWriteStoreRepository.class);
HelixSchemaAccessor schemaAccessor = mock(HelixSchemaAccessor.class);
HelixReadWriteSchemaRepository readWriteSchemaRepository =
new HelixReadWriteSchemaRepository(storeRepository, Optional.of(metaStoreWriter), schemaAccessor);
doReturn(true).when(storeRepository).hasStore(anyString());
Store store = mock(Store.class);
doReturn(store).when(storeRepository).getStore(anyString());
doReturn(store).when(storeRepository).getStoreOrThrow(anyString());
doReturn(1).when(store).getLatestSuperSetValueSchemaId();
SchemaEntry schemaEntry = mock(SchemaEntry.class);
doReturn(1).when(schemaEntry).getId();
doReturn(schemaEntry).when(schemaAccessor).getValueSchema(anyString(), anyString());
readWriteSchemaRepository.removeValueSchema(storeName, 1);

verify(repository, times(1)).removeValueSchema(storeName, 2);

readWriteSchemaRepository.removeValueSchema(storeName, 2);

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.linkedin.venice.helix;

import static org.mockito.Mockito.mock;

import org.apache.helix.zookeeper.impl.client.ZkClient;
import org.testng.annotations.Test;


public class HelixSchemaAccessorTest {
@Test
public void testRemoveValueSchema() {
String storeName = "abc";
ZkClient zkClient = mock(ZkClient.class);
HelixAdapterSerializer serializer = mock(HelixAdapterSerializer.class);
HelixSchemaAccessor schemaAccessor = new HelixSchemaAccessor(zkClient, serializer, "clusterName");
schemaAccessor.removeValueSchema(storeName, 2);
}
}
Original file line number Diff line number Diff line change
@@ -1,21 +1,29 @@
package com.linkedin.venice.system.store;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertTrue;
import static org.testng.AssertJUnit.assertTrue;

import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.helix.HelixReadOnlyZKSharedSchemaRepository;
import com.linkedin.venice.kafka.TopicManager;
import com.linkedin.venice.meta.Instance;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pushmonitor.ExecutionStatus;
import com.linkedin.venice.schema.GeneratedSchemaID;
import com.linkedin.venice.schema.writecompute.WriteComputeSchemaConverter;
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.systemstore.schemas.StoreMetaKey;
import com.linkedin.venice.systemstore.schemas.StoreMetaValue;
import com.linkedin.venice.utils.VeniceResourceCloseResult;
Expand All @@ -41,6 +49,14 @@ public class MetaStoreWriterTest {
public void testMetaStoreWriterWillRestartUponProduceFailure() {
MetaStoreWriter metaStoreWriter = mock(MetaStoreWriter.class);
String metaStoreName = "testStore";
HelixReadOnlyZKSharedSchemaRepository schemaRepo = mock(HelixReadOnlyZKSharedSchemaRepository.class);
GeneratedSchemaID generatedSchemaID = mock(GeneratedSchemaID.class);
doReturn(true).when(generatedSchemaID).isValid();
doReturn(generatedSchemaID).when(schemaRepo).getDerivedSchemaId(any(), any());
doReturn(schemaRepo).when(metaStoreWriter).getSchemaRepository();
Schema derivedSchema = WriteComputeSchemaConverter.getInstance()
.convertFromValueRecordSchema(
AvroProtocolDefinition.METADATA_SYSTEM_SCHEMA_STORE.getCurrentProtocolVersionSchema());
ReentrantLock reentrantLock = new ReentrantLock();
when(metaStoreWriter.getOrCreateMetaStoreWriterLock(metaStoreName)).thenReturn(reentrantLock);
VeniceWriter badWriter = mock(VeniceWriter.class);
Expand All @@ -52,6 +68,11 @@ public void testMetaStoreWriterWillRestartUponProduceFailure() {
metaStoreWriter.writeMessageWithRetry(metaStoreName, vw -> vw.delete("a", null));
verify(badWriter, times(1)).delete(any(), any());
verify(goodWriter, times(1)).delete(any(), any());

doReturn(derivedSchema).when(metaStoreWriter).getDerivedComputeSchema();
when(metaStoreWriter.getOrCreateMetaStoreWriter(metaStoreName)).thenReturn(goodWriter);
doCallRealMethod().when(metaStoreWriter)
.writeStoreReplicaStatus(anyString(), anyString(), anyInt(), anyInt(), any(), eq(ExecutionStatus.COMPLETED));
}

@Test
Expand Down Expand Up @@ -87,6 +108,25 @@ public Object[][] testCloseDataProvider() {
return new Object[][] { { 5000, 30 }, { 4000, 2 }, { 3000, 11 }, { 2000, 0 } };
}

@Test
public void testUpdateReplicaStatus() {
TopicManager topicManager = mock(TopicManager.class);
VeniceWriterFactory writerFactory = mock(VeniceWriterFactory.class);
HelixReadOnlyZKSharedSchemaRepository schemaRepo = mock(HelixReadOnlyZKSharedSchemaRepository.class);
PubSubTopicRepository pubSubTopicRepository = mock(PubSubTopicRepository.class);
GeneratedSchemaID generatedSchemaID = mock(GeneratedSchemaID.class);
doReturn(true).when(generatedSchemaID).isValid();
doReturn(generatedSchemaID).when(schemaRepo).getDerivedSchemaId(any(), any());
MetaStoreWriter metaStoreWriter =
new MetaStoreWriter(topicManager, writerFactory, schemaRepo, pubSubTopicRepository, 10, 100);
ArgumentCaptor<StoreMetaKey> keyArgumentCaptor = ArgumentCaptor.forClass(StoreMetaKey.class);
ArgumentCaptor<StoreMetaValue> valueArgumentCaptor = ArgumentCaptor.forClass(StoreMetaValue.class);
ArgumentCaptor<Integer> schemaArgumentCaptor = ArgumentCaptor.forClass(Integer.class);

Instance instance = new Instance("host1", "host1", 1234);
metaStoreWriter.writeStoreReplicaStatus("cluster", "storeName", 1, 0, instance, ExecutionStatus.COMPLETED);
}

@Test(dataProvider = "testCloseDataProvider")
public void testClose(long closeTimeoutMs, int numOfConcurrentVwCloseOps)
throws IOException, ExecutionException, InterruptedException {
Expand Down

0 comments on commit 83a3136

Please sign in to comment.