diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java index c3c7c21bc7..68b57f9715 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java @@ -7408,7 +7408,7 @@ private void setUpDaVinciPushStatusStore(String clusterName, String storeName) { } } - private void setUpMetaStoreAndMayProduceSnapshot(String clusterName, String regularStoreName) { + void setUpMetaStoreAndMayProduceSnapshot(String clusterName, String regularStoreName) { checkControllerLeadershipFor(clusterName); ReadWriteStoreRepository repository = getHelixVeniceClusterResources(clusterName).getStoreMetadataRepository(); Store store = repository.getStore(regularStoreName); @@ -7416,6 +7416,10 @@ private void setUpMetaStoreAndMayProduceSnapshot(String clusterName, String regu throwStoreDoesNotExist(clusterName, regularStoreName); } + // Make sure RT topic exists before producing. There's no write to parent region meta store RT, but we still create + // the RT topic to be consistent in case it was not auto-materialized + getRealTimeTopic(clusterName, VeniceSystemStoreType.META_STORE.getSystemStoreName(regularStoreName)); + // Update the store flag to enable meta system store. if (!store.isStoreMetaSystemStoreEnabled()) { storeMetadataUpdate(clusterName, regularStoreName, (s) -> { @@ -7424,10 +7428,6 @@ private void setUpMetaStoreAndMayProduceSnapshot(String clusterName, String regu }); } - // Make sure RT topic exists before producing. There's no write to parent region meta store RT, but we still create - // the RT topic to be consistent in case it was not auto-materialized - getRealTimeTopic(clusterName, VeniceSystemStoreType.META_STORE.getSystemStoreName(regularStoreName)); - Optional metaStoreWriter = getHelixVeniceClusterResources(clusterName).getMetaStoreWriter(); if (!metaStoreWriter.isPresent()) { LOGGER.info( diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceHelixAdmin.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceHelixAdmin.java index 609e0504b5..4541920c40 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceHelixAdmin.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceHelixAdmin.java @@ -5,12 +5,15 @@ import com.linkedin.venice.helix.HelixExternalViewRepository; import com.linkedin.venice.meta.PartitionAssignment; +import com.linkedin.venice.meta.ReadWriteStoreRepository; +import com.linkedin.venice.meta.Store; import com.linkedin.venice.meta.Version; import com.linkedin.venice.utils.HelixUtils; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import org.mockito.InOrder; import org.testng.annotations.Test; @@ -47,4 +50,39 @@ public void testDropResources() { verify(adminClient, times(1)).enablePartition(true, clusterName, "node_1", kafkaTopic, partitions); } + + /** + * This test verify that in function {@link VeniceHelixAdmin#setUpMetaStoreAndMayProduceSnapshot}, + * meta store RT topic creation has to happen before any writings to meta store's rt topic. + * As of today, topic creation and checks to make sure that RT exists are handled in function + * {@link VeniceHelixAdmin#getRealTimeTopic}. On the other hand, as {@link VeniceHelixAdmin#storeMetadataUpdate} + * writes to the same RT topic, it should happen after the above function. The following test enforces + * such order at the statement level. + * + * Notice that if function semantics change over time, as long as the above invariant can be obtained, + * it is okay to relax on the ordering enforcement or delete the unit test if necessary. + */ + @Test + public void enforceRealTimeTopicCreationBeforeWriting() { + VeniceHelixAdmin veniceHelixAdmin = mock(VeniceHelixAdmin.class); + doReturn("test_rt").when(veniceHelixAdmin).getRealTimeTopic(anyString(), anyString()); + doCallRealMethod().when(veniceHelixAdmin).setUpMetaStoreAndMayProduceSnapshot(anyString(), anyString()); + + InOrder inorder = inOrder(veniceHelixAdmin); + + HelixVeniceClusterResources veniceClusterResources = mock(HelixVeniceClusterResources.class); + ReadWriteStoreRepository repo = mock(ReadWriteStoreRepository.class); + Store store = mock(Store.class); + + doReturn(veniceClusterResources).when(veniceHelixAdmin).getHelixVeniceClusterResources(anyString()); + doReturn(repo).when(veniceClusterResources).getStoreMetadataRepository(); + doReturn(store).when(repo).getStore(anyString()); + doReturn(Boolean.FALSE).when(store).isDaVinciPushStatusStoreEnabled(); + + veniceHelixAdmin.setUpMetaStoreAndMayProduceSnapshot(anyString(), anyString()); + + // Enforce that getRealTimeTopic happens before storeMetadataUpdate. See the above comments for the reasons. + inorder.verify(veniceHelixAdmin).getRealTimeTopic(anyString(), anyString()); + inorder.verify(veniceHelixAdmin).storeMetadataUpdate(anyString(), anyString(), any()); + } }