Skip to content

Commit

Permalink
[controller] Turn off A/A and inc push when turning off hybrid store …
Browse files Browse the repository at this point in the history
…config (linkedin#824)

This PR adjusts the existing behavior that throws exception when trying to turn off hybrid on an inc push enabled store. This was added 1.5 year ago as part of tactical fix to prevent store to lose RT right away especially for inc push store. Based on my code reading, we will not delete RT right away, if there is active hybrid version. If that is true, the change should be considered safe.

The new behavior is: When disabling hybrid store config:
If user also try to enable A/A or inc push together, we should fail loudly.
Otherwise, we will disable A/A and inc push store config.
  • Loading branch information
sixpluszero authored Jan 26, 2024
1 parent c1d289e commit c6319ba
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2313,7 +2313,20 @@ public void updateStore(String clusterName, String storeName, UpdateStoreQueryPa
// Check if the store is being converted to a hybrid store
boolean storeBeingConvertedToHybrid = !currStore.isHybrid() && updatedHybridStoreConfig != null
&& veniceHelixAdmin.isHybrid(updatedHybridStoreConfig);

// Check if the store is being converted to a batch store
boolean storeBeingConvertedToBatch = currStore.isHybrid() && !veniceHelixAdmin.isHybrid(updatedHybridStoreConfig);
if (storeBeingConvertedToBatch && activeActiveReplicationEnabled.orElse(false)) {
throw new VeniceHttpException(
HttpStatus.SC_BAD_REQUEST,
"Cannot convert store to batch-only and enable Active/Active together.",
ErrorType.BAD_REQUEST);
}
if (storeBeingConvertedToBatch && incrementalPushEnabled.orElse(false)) {
throw new VeniceHttpException(
HttpStatus.SC_BAD_REQUEST,
"Cannot convert store to batch-only and enable incremental push together.",
ErrorType.BAD_REQUEST);
}
// Update active-active replication config.
setStore.activeActiveReplicationEnabled = activeActiveReplicationEnabled
.map(addToUpdatedConfigList(updatedConfigsList, ACTIVE_ACTIVE_REPLICATION_ENABLED))
Expand All @@ -2325,6 +2338,12 @@ public void updateStore(String clusterName, String storeName, UpdateStoreQueryPa
setStore.activeActiveReplicationEnabled = true;
updatedConfigsList.add(ACTIVE_ACTIVE_REPLICATION_ENABLED);
}
// When turning off hybrid store, we will also turn off A/A store config.
if (storeBeingConvertedToBatch && setStore.activeActiveReplicationEnabled) {
setStore.activeActiveReplicationEnabled = false;
updatedConfigsList.add(ACTIVE_ACTIVE_REPLICATION_ENABLED);
}

// Update incremental push config.
setStore.incrementalPushEnabled =
incrementalPushEnabled.map(addToUpdatedConfigList(updatedConfigsList, INCREMENTAL_PUSH_ENABLED))
Expand All @@ -2337,18 +2356,12 @@ public void updateStore(String clusterName, String storeName, UpdateStoreQueryPa
setStore.incrementalPushEnabled = true;
updatedConfigsList.add(INCREMENTAL_PUSH_ENABLED);
}

// If store is already hybrid then check to make sure the end state is valid. We do this because we allow enabling
// incremental push without enabling hybrid already (we will automatically convert to hybrid store with default
// configs).
if (veniceHelixAdmin.isHybrid(currStore.getHybridStoreConfig())
&& !veniceHelixAdmin.isHybrid(updatedHybridStoreConfig) && setStore.incrementalPushEnabled) {
throw new VeniceHttpException(
HttpStatus.SC_BAD_REQUEST,
"Cannot convert store to batch-only, incremental push enabled stores require valid hybrid configs. "
+ "Please disable incremental push if you'd like to convert the store to batch-only",
ErrorType.BAD_REQUEST);
// When turning off hybrid store, we will also turn off incremental store config.
if (storeBeingConvertedToBatch && setStore.incrementalPushEnabled) {
setStore.incrementalPushEnabled = false;
updatedConfigsList.add(INCREMENTAL_PUSH_ENABLED);
}

if (updatedHybridStoreConfig == null) {
setStore.hybridStoreConfig = null;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import com.linkedin.venice.authorization.AuthorizerService;
import com.linkedin.venice.authorization.DefaultIdentityParser;
Expand All @@ -22,6 +23,7 @@
import com.linkedin.venice.helix.ZkRoutersClusterManager;
import com.linkedin.venice.helix.ZkStoreConfigAccessor;
import com.linkedin.venice.kafka.TopicManager;
import com.linkedin.venice.meta.HybridStoreConfig;
import com.linkedin.venice.meta.OfflinePushStrategy;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
Expand Down Expand Up @@ -84,6 +86,7 @@ public void setupInternalMocks() {
doReturn(true).when(topicManager).containsTopicAndAllPartitionsAreOnline(pubSubTopicRepository.getTopic(topicName));

internalAdmin = mock(VeniceHelixAdmin.class);
when(internalAdmin.isHybrid((HybridStoreConfig) any())).thenCallRealMethod();
doReturn(topicManager).when(internalAdmin).getTopicManager();
SchemaEntry mockEntry = new SchemaEntry(0, TEST_SCHEMA);
doReturn(mockEntry).when(internalAdmin).getKeySchema(anyString(), anyString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1846,6 +1846,68 @@ public void testUpdateStoreNativeReplicationSourceFabric() {
"Native replication source fabric does not match after updating the store!");
}

@Test
public void testDisableHybridConfigWhenActiveActiveOrIncPushConfigIsEnabled() {
String storeName = Utils.getUniqueString("testUpdateStore");
Store store = TestUtils.createTestStore(storeName, "test", System.currentTimeMillis());

store.setHybridStoreConfig(
new HybridStoreConfigImpl(
1000,
100,
-1,
DataReplicationPolicy.NON_AGGREGATE,
BufferReplayPolicy.REWIND_FROM_EOP));
store.setActiveActiveReplicationEnabled(true);
store.setIncrementalPushEnabled(true);
store.setNativeReplicationEnabled(true);
store.setChunkingEnabled(true);
doReturn(store).when(internalAdmin).getStore(clusterName, storeName);

doReturn(CompletableFuture.completedFuture(new SimplePubSubProduceResultImpl(topicName, partitionId, 1, -1)))
.when(veniceWriter)
.put(any(), any(), anyInt());

when(zkClient.readData(zkMetadataNodePath, null)).thenReturn(null)
.thenReturn(AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1));

parentAdmin.initStorageCluster(clusterName);
// When user disable hybrid but also try to manually turn on A/A or Incremental Push, update operation should fail
// loudly.
Assert.assertThrows(
() -> parentAdmin.updateStore(
clusterName,
storeName,
new UpdateStoreQueryParams().setHybridRewindSeconds(-1)
.setHybridOffsetLagThreshold(-1)
.setActiveActiveReplicationEnabled(true)));
Assert.assertThrows(
() -> parentAdmin.updateStore(
clusterName,
storeName,
new UpdateStoreQueryParams().setHybridRewindSeconds(-1)
.setHybridOffsetLagThreshold(-1)
.setIncrementalPushEnabled(true)));

parentAdmin.updateStore(
clusterName,
storeName,
new UpdateStoreQueryParams().setHybridOffsetLagThreshold(-1).setHybridRewindSeconds(-1));

ArgumentCaptor<byte[]> keyCaptor = ArgumentCaptor.forClass(byte[].class);
ArgumentCaptor<byte[]> valueCaptor = ArgumentCaptor.forClass(byte[].class);
ArgumentCaptor<Integer> schemaCaptor = ArgumentCaptor.forClass(Integer.class);

verify(veniceWriter, times(1)).put(keyCaptor.capture(), valueCaptor.capture(), schemaCaptor.capture());
byte[] valueBytes = valueCaptor.getValue();
int schemaId = schemaCaptor.getValue();
AdminOperation adminMessage = adminOperationSerializer.deserialize(ByteBuffer.wrap(valueBytes), schemaId);
UpdateStore updateStore = (UpdateStore) adminMessage.payloadUnion;
Assert.assertFalse(internalAdmin.isHybrid(updateStore.getHybridStoreConfig()));
Assert.assertFalse(updateStore.incrementalPushEnabled);
Assert.assertFalse(updateStore.activeActiveReplicationEnabled);
}

@Test
public void testSetStoreViewConfig() {
String storeName = Utils.getUniqueString("testUpdateStore");
Expand Down

0 comments on commit c6319ba

Please sign in to comment.