From 867a4d1158aaf24a1b321590c86f40fab4a8458d Mon Sep 17 00:00:00 2001 From: Lei Lu Date: Tue, 26 Sep 2023 15:49:10 -0700 Subject: [PATCH] =?UTF-8?q?[controller]=20Create=20meta=20store=20RT=20top?= =?UTF-8?q?ic=20before=20writing=20meta=20store=20up=E2=80=A6=20(#666)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * [controller] Creates meta store RT topic before writing meta store updates In function setUpMetaStoreAndMayProduceSnapshot, one of the steps is to update the store flag to enable meta system store and the store update operation needs to write to the meta store's RT topic. However, the RT topic is created later in the code, thus causing errors in writing the update. To fix the issue, this change moves the RT topic creation before writing meta store updates. --- .../venice/controller/VeniceHelixAdmin.java | 10 ++--- .../controller/TestVeniceHelixAdmin.java | 38 +++++++++++++++++++ 2 files changed, 43 insertions(+), 5 deletions(-) diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java index c3c7c21bc7..68b57f9715 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java @@ -7408,7 +7408,7 @@ private void setUpDaVinciPushStatusStore(String clusterName, String storeName) { } } - private void setUpMetaStoreAndMayProduceSnapshot(String clusterName, String regularStoreName) { + void setUpMetaStoreAndMayProduceSnapshot(String clusterName, String regularStoreName) { checkControllerLeadershipFor(clusterName); ReadWriteStoreRepository repository = getHelixVeniceClusterResources(clusterName).getStoreMetadataRepository(); Store store = repository.getStore(regularStoreName); @@ -7416,6 +7416,10 @@ private void setUpMetaStoreAndMayProduceSnapshot(String clusterName, String regu throwStoreDoesNotExist(clusterName, regularStoreName); } + // Make sure RT topic exists before producing. There's no write to parent region meta store RT, but we still create + // the RT topic to be consistent in case it was not auto-materialized + getRealTimeTopic(clusterName, VeniceSystemStoreType.META_STORE.getSystemStoreName(regularStoreName)); + // Update the store flag to enable meta system store. if (!store.isStoreMetaSystemStoreEnabled()) { storeMetadataUpdate(clusterName, regularStoreName, (s) -> { @@ -7424,10 +7428,6 @@ private void setUpMetaStoreAndMayProduceSnapshot(String clusterName, String regu }); } - // Make sure RT topic exists before producing. There's no write to parent region meta store RT, but we still create - // the RT topic to be consistent in case it was not auto-materialized - getRealTimeTopic(clusterName, VeniceSystemStoreType.META_STORE.getSystemStoreName(regularStoreName)); - Optional metaStoreWriter = getHelixVeniceClusterResources(clusterName).getMetaStoreWriter(); if (!metaStoreWriter.isPresent()) { LOGGER.info( diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceHelixAdmin.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceHelixAdmin.java index 609e0504b5..4541920c40 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceHelixAdmin.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceHelixAdmin.java @@ -5,12 +5,15 @@ import com.linkedin.venice.helix.HelixExternalViewRepository; import com.linkedin.venice.meta.PartitionAssignment; +import com.linkedin.venice.meta.ReadWriteStoreRepository; +import com.linkedin.venice.meta.Store; import com.linkedin.venice.meta.Version; import com.linkedin.venice.utils.HelixUtils; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import org.mockito.InOrder; import org.testng.annotations.Test; @@ -47,4 +50,39 @@ public void testDropResources() { verify(adminClient, times(1)).enablePartition(true, clusterName, "node_1", kafkaTopic, partitions); } + + /** + * This test verify that in function {@link VeniceHelixAdmin#setUpMetaStoreAndMayProduceSnapshot}, + * meta store RT topic creation has to happen before any writings to meta store's rt topic. + * As of today, topic creation and checks to make sure that RT exists are handled in function + * {@link VeniceHelixAdmin#getRealTimeTopic}. On the other hand, as {@link VeniceHelixAdmin#storeMetadataUpdate} + * writes to the same RT topic, it should happen after the above function. The following test enforces + * such order at the statement level. + * + * Notice that if function semantics change over time, as long as the above invariant can be obtained, + * it is okay to relax on the ordering enforcement or delete the unit test if necessary. + */ + @Test + public void enforceRealTimeTopicCreationBeforeWriting() { + VeniceHelixAdmin veniceHelixAdmin = mock(VeniceHelixAdmin.class); + doReturn("test_rt").when(veniceHelixAdmin).getRealTimeTopic(anyString(), anyString()); + doCallRealMethod().when(veniceHelixAdmin).setUpMetaStoreAndMayProduceSnapshot(anyString(), anyString()); + + InOrder inorder = inOrder(veniceHelixAdmin); + + HelixVeniceClusterResources veniceClusterResources = mock(HelixVeniceClusterResources.class); + ReadWriteStoreRepository repo = mock(ReadWriteStoreRepository.class); + Store store = mock(Store.class); + + doReturn(veniceClusterResources).when(veniceHelixAdmin).getHelixVeniceClusterResources(anyString()); + doReturn(repo).when(veniceClusterResources).getStoreMetadataRepository(); + doReturn(store).when(repo).getStore(anyString()); + doReturn(Boolean.FALSE).when(store).isDaVinciPushStatusStoreEnabled(); + + veniceHelixAdmin.setUpMetaStoreAndMayProduceSnapshot(anyString(), anyString()); + + // Enforce that getRealTimeTopic happens before storeMetadataUpdate. See the above comments for the reasons. + inorder.verify(veniceHelixAdmin).getRealTimeTopic(anyString(), anyString()); + inorder.verify(veniceHelixAdmin).storeMetadataUpdate(anyString(), anyString(), any()); + } }