Skip to content

Commit

Permalink
[controller] Create meta store RT topic before writing meta store up… (
Browse files Browse the repository at this point in the history
…linkedin#666)

* [controller] Creates meta store RT topic before writing meta store updates

In function setUpMetaStoreAndMayProduceSnapshot, one of the steps is to update the store flag to enable meta system store and the store update operation needs to write to the meta store's RT topic. However, the RT topic is created later in the code, thus causing errors in writing the update.
To fix the issue, this change moves the RT topic creation before writing meta store updates.
  • Loading branch information
lluwm authored Sep 26, 2023
1 parent 6440600 commit 867a4d1
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7408,14 +7408,18 @@ private void setUpDaVinciPushStatusStore(String clusterName, String storeName) {
}
}

private void setUpMetaStoreAndMayProduceSnapshot(String clusterName, String regularStoreName) {
void setUpMetaStoreAndMayProduceSnapshot(String clusterName, String regularStoreName) {
checkControllerLeadershipFor(clusterName);
ReadWriteStoreRepository repository = getHelixVeniceClusterResources(clusterName).getStoreMetadataRepository();
Store store = repository.getStore(regularStoreName);
if (store == null) {
throwStoreDoesNotExist(clusterName, regularStoreName);
}

// Make sure RT topic exists before producing. There's no write to parent region meta store RT, but we still create
// the RT topic to be consistent in case it was not auto-materialized
getRealTimeTopic(clusterName, VeniceSystemStoreType.META_STORE.getSystemStoreName(regularStoreName));

// Update the store flag to enable meta system store.
if (!store.isStoreMetaSystemStoreEnabled()) {
storeMetadataUpdate(clusterName, regularStoreName, (s) -> {
Expand All @@ -7424,10 +7428,6 @@ private void setUpMetaStoreAndMayProduceSnapshot(String clusterName, String regu
});
}

// Make sure RT topic exists before producing. There's no write to parent region meta store RT, but we still create
// the RT topic to be consistent in case it was not auto-materialized
getRealTimeTopic(clusterName, VeniceSystemStoreType.META_STORE.getSystemStoreName(regularStoreName));

Optional<MetaStoreWriter> metaStoreWriter = getHelixVeniceClusterResources(clusterName).getMetaStoreWriter();
if (!metaStoreWriter.isPresent()) {
LOGGER.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@

import com.linkedin.venice.helix.HelixExternalViewRepository;
import com.linkedin.venice.meta.PartitionAssignment;
import com.linkedin.venice.meta.ReadWriteStoreRepository;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.utils.HelixUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.mockito.InOrder;
import org.testng.annotations.Test;


Expand Down Expand Up @@ -47,4 +50,39 @@ public void testDropResources() {
verify(adminClient, times(1)).enablePartition(true, clusterName, "node_1", kafkaTopic, partitions);

}

/**
* This test verify that in function {@link VeniceHelixAdmin#setUpMetaStoreAndMayProduceSnapshot},
* meta store RT topic creation has to happen before any writings to meta store's rt topic.
* As of today, topic creation and checks to make sure that RT exists are handled in function
* {@link VeniceHelixAdmin#getRealTimeTopic}. On the other hand, as {@link VeniceHelixAdmin#storeMetadataUpdate}
* writes to the same RT topic, it should happen after the above function. The following test enforces
* such order at the statement level.
*
* Notice that if function semantics change over time, as long as the above invariant can be obtained,
* it is okay to relax on the ordering enforcement or delete the unit test if necessary.
*/
@Test
public void enforceRealTimeTopicCreationBeforeWriting() {
VeniceHelixAdmin veniceHelixAdmin = mock(VeniceHelixAdmin.class);
doReturn("test_rt").when(veniceHelixAdmin).getRealTimeTopic(anyString(), anyString());
doCallRealMethod().when(veniceHelixAdmin).setUpMetaStoreAndMayProduceSnapshot(anyString(), anyString());

InOrder inorder = inOrder(veniceHelixAdmin);

HelixVeniceClusterResources veniceClusterResources = mock(HelixVeniceClusterResources.class);
ReadWriteStoreRepository repo = mock(ReadWriteStoreRepository.class);
Store store = mock(Store.class);

doReturn(veniceClusterResources).when(veniceHelixAdmin).getHelixVeniceClusterResources(anyString());
doReturn(repo).when(veniceClusterResources).getStoreMetadataRepository();
doReturn(store).when(repo).getStore(anyString());
doReturn(Boolean.FALSE).when(store).isDaVinciPushStatusStoreEnabled();

veniceHelixAdmin.setUpMetaStoreAndMayProduceSnapshot(anyString(), anyString());

// Enforce that getRealTimeTopic happens before storeMetadataUpdate. See the above comments for the reasons.
inorder.verify(veniceHelixAdmin).getRealTimeTopic(anyString(), anyString());
inorder.verify(veniceHelixAdmin).storeMetadataUpdate(anyString(), anyString(), any());
}
}

0 comments on commit 867a4d1

Please sign in to comment.