From 8f3c92c0a31358e7819c0e2c507c131a93873fe9 Mon Sep 17 00:00:00 2001 From: Zac Policzer Date: Thu, 1 Feb 2024 12:22:27 -0800 Subject: [PATCH 1/4] [producer][samza] Reinitalize transport client if a store has moved (resulting in store not found errors) This is to allow store moves to not break system producers. --- .../RouterBasedHybridStoreQuotaMonitor.java | 26 ++++++++++++++++--- .../venice/samza/VeniceSystemProducer.java | 20 +++++++++++--- 2 files changed, 38 insertions(+), 8 deletions(-) diff --git a/clients/venice-samza/src/main/java/com/linkedin/venice/pushmonitor/RouterBasedHybridStoreQuotaMonitor.java b/clients/venice-samza/src/main/java/com/linkedin/venice/pushmonitor/RouterBasedHybridStoreQuotaMonitor.java index 1d0d7f0ec0..4f8fe2b67e 100644 --- a/clients/venice-samza/src/main/java/com/linkedin/venice/pushmonitor/RouterBasedHybridStoreQuotaMonitor.java +++ b/clients/venice-samza/src/main/java/com/linkedin/venice/pushmonitor/RouterBasedHybridStoreQuotaMonitor.java @@ -6,6 +6,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.linkedin.venice.client.store.transport.TransportClient; import com.linkedin.venice.client.store.transport.TransportClientResponse; +import com.linkedin.venice.exceptions.ErrorType; import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.meta.Version; import com.linkedin.venice.routerapi.HybridStoreQuotaStatusResponse; @@ -41,7 +42,8 @@ public RouterBasedHybridStoreQuotaMonitor( TransportClient transportClient, String storeName, Version.PushType pushType, - String topicName) { + String topicName, + TransportClientReinitProvider reinitProvider) { final String requestPath; if (Version.PushType.STREAM.equals(pushType)) { requestPath = buildStreamHybridStoreQuotaRequestPath(storeName); @@ -54,7 +56,7 @@ public RouterBasedHybridStoreQuotaMonitor( + " can monitor hybrid store quota."); } executor = Executors.newSingleThreadExecutor(new DaemonThreadFactory("RouterBasedHybridQuotaMonitor")); - hybridQuotaMonitorTask = new HybridQuotaMonitorTask(transportClient, storeName, requestPath, this); + hybridQuotaMonitorTask = new HybridQuotaMonitorTask(transportClient, storeName, requestPath, this, reinitProvider); } public void start() { @@ -87,7 +89,9 @@ private static class HybridQuotaMonitorTask implements Runnable, Closeable { private final AtomicBoolean isRunning; private final String storeName; - private final TransportClient transportClient; + private TransportClient transportClient; + + private TransportClientReinitProvider reinitProvider; private final String requestPath; private final RouterBasedHybridStoreQuotaMonitor hybridStoreQuotaMonitorService; @@ -95,12 +99,14 @@ public HybridQuotaMonitorTask( TransportClient transportClient, String storeName, String requestPath, - RouterBasedHybridStoreQuotaMonitor hybridStoreQuotaMonitorService) { + RouterBasedHybridStoreQuotaMonitor hybridStoreQuotaMonitorService, + TransportClientReinitProvider reinitProvider) { this.transportClient = transportClient; this.storeName = storeName; this.requestPath = requestPath; this.hybridStoreQuotaMonitorService = hybridStoreQuotaMonitorService; this.isRunning = new AtomicBoolean(true); + this.reinitProvider = reinitProvider; } @Override @@ -114,6 +120,13 @@ public void run() { HybridStoreQuotaStatusResponse quotaStatusResponse = mapper.readValue(response.getBody(), HybridStoreQuotaStatusResponse.class); if (quotaStatusResponse.isError()) { + if (quotaStatusResponse.getErrorType().equals(ErrorType.STORE_NOT_FOUND)) { + LOGGER.warn("Store not found, reinitializing client! Error: {}", quotaStatusResponse.getError()); + // TODO: It'd make sense to call shutdown on the transport client, but it's a shared object so that's + // a bit dangerous. + transportClient = reinitProvider.apply(); + continue; + } LOGGER.error("Router was not able to get hybrid quota status: {}", quotaStatusResponse.getError()); continue; } @@ -144,4 +157,9 @@ public void close() { isRunning.set(false); } } + + @FunctionalInterface + public interface TransportClientReinitProvider { + TransportClient apply(); + } } diff --git a/clients/venice-samza/src/main/java/com/linkedin/venice/samza/VeniceSystemProducer.java b/clients/venice-samza/src/main/java/com/linkedin/venice/samza/VeniceSystemProducer.java index 7dc177f35b..64fe020659 100644 --- a/clients/venice-samza/src/main/java/com/linkedin/venice/samza/VeniceSystemProducer.java +++ b/clients/venice-samza/src/main/java/com/linkedin/venice/samza/VeniceSystemProducer.java @@ -422,6 +422,7 @@ public synchronized void start() { this.isStarted = true; final TransportClient transportClient; + RouterBasedHybridStoreQuotaMonitor.TransportClientReinitProvider reinitProvider; if (discoveryUrl.isPresent()) { this.controllerClient = ControllerClientFactory.discoverAndConstructControllerClient(storeName, discoveryUrl.get(), sslFactory, 1); @@ -448,10 +449,11 @@ public synchronized void start() { } if (sslFactory.isPresent()) { - transportClient = new HttpsTransportClient(discoveryUrl.get(), 0, 0, false, sslFactory.get()); + reinitProvider = () -> new HttpsTransportClient(discoveryUrl.get(), 0, 0, false, sslFactory.get()); } else { - transportClient = new HttpTransportClient(discoveryUrl.get(), 0, 0); + reinitProvider = () -> new HttpTransportClient(discoveryUrl.get(), 0, 0); } + transportClient = reinitProvider.apply(); } else { this.primaryControllerColoD2Client = getStartedD2Client(primaryControllerColoD2ZKHost); this.childColoD2Client = getStartedD2Client(veniceChildD2ZkHost); @@ -461,6 +463,7 @@ public synchronized void start() { () -> D2ControllerClient .discoverCluster(primaryControllerColoD2Client, primaryControllerD2ServiceName, this.storeName), 10); + String clusterName = discoveryResponse.getCluster(); LOGGER.info("Found cluster: {} for store: {}", clusterName, storeName); @@ -496,6 +499,15 @@ public synchronized void start() { primaryControllerColoD2Client, sslFactory); transportClient = new D2TransportClient(discoveryResponse.getD2Service(), childColoD2Client); + + reinitProvider = () -> { + D2ServiceDiscoveryResponse d2DiscoveryResponse = (D2ServiceDiscoveryResponse) controllerRequestWithRetry( + () -> D2ControllerClient + .discoverCluster(primaryControllerColoD2Client, primaryControllerD2ServiceName, this.storeName), + 10); + LOGGER.info("Found cluster: {} for store: {}", clusterName, storeName); + return new D2TransportClient(d2DiscoveryResponse.getD2Service(), childColoD2Client); + }; } // Request all the necessary info from Venice Controller @@ -568,8 +580,8 @@ public synchronized void start() { if ((pushType.equals(Version.PushType.STREAM) || pushType.equals(Version.PushType.STREAM_REPROCESSING)) && hybridStoreDiskQuotaEnabled) { - hybridStoreQuotaMonitor = - Optional.of(new RouterBasedHybridStoreQuotaMonitor(transportClient, storeName, pushType, topicName)); + hybridStoreQuotaMonitor = Optional + .of(new RouterBasedHybridStoreQuotaMonitor(transportClient, storeName, pushType, topicName, reinitProvider)); hybridStoreQuotaMonitor.get().start(); } } From 3c03f23e1a403f393360ea03b1890d1f627b25a1 Mon Sep 17 00:00:00 2001 From: Zac Policzer Date: Mon, 5 Feb 2024 13:55:07 -0800 Subject: [PATCH 2/4] add tests to pass coverage --- .../RouterBasedHybridStoreQuotaMonitor.java | 69 +++++++++++-------- ...outerBasedHybridStoreQuotaMonitorTest.java | 56 +++++++++++++++ 2 files changed, 97 insertions(+), 28 deletions(-) create mode 100644 clients/venice-samza/src/test/java/com/linkedin/venice/pushmonitor/RouterBasedHybridStoreQuotaMonitorTest.java diff --git a/clients/venice-samza/src/main/java/com/linkedin/venice/pushmonitor/RouterBasedHybridStoreQuotaMonitor.java b/clients/venice-samza/src/main/java/com/linkedin/venice/pushmonitor/RouterBasedHybridStoreQuotaMonitor.java index 4f8fe2b67e..d048c31e8f 100644 --- a/clients/venice-samza/src/main/java/com/linkedin/venice/pushmonitor/RouterBasedHybridStoreQuotaMonitor.java +++ b/clients/venice-samza/src/main/java/com/linkedin/venice/pushmonitor/RouterBasedHybridStoreQuotaMonitor.java @@ -15,10 +15,13 @@ import com.linkedin.venice.utils.ObjectMapperFactory; import com.linkedin.venice.utils.Utils; import java.io.Closeable; +import java.io.IOException; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -68,6 +71,10 @@ public void close() { hybridQuotaMonitorTask.close(); } + protected HybridQuotaMonitorTask getHybridQuotaMonitorTask() { + return hybridQuotaMonitorTask; + } + public void setCurrentStatus(HybridStoreQuotaStatus currentStatus) { this.currentStatus = currentStatus; } @@ -84,8 +91,8 @@ private static String buildStreamReprocessingHybridStoreQuotaRequestPath(String return TYPE_STREAM_REPROCESSING_HYBRID_STORE_QUOTA + "/" + versionTopic; } - private static class HybridQuotaMonitorTask implements Runnable, Closeable { - private static ObjectMapper mapper = ObjectMapperFactory.getInstance(); + protected static class HybridQuotaMonitorTask implements Runnable, Closeable { + private ObjectMapper mapper = ObjectMapperFactory.getInstance(); private final AtomicBoolean isRunning; private final String storeName; @@ -109,37 +116,43 @@ public HybridQuotaMonitorTask( this.reinitProvider = reinitProvider; } + protected void setMapper(ObjectMapper mapper) { + this.mapper = mapper; + } + + protected void checkStatus() throws ExecutionException, InterruptedException, TimeoutException, IOException { + // Get hybrid store quota status + CompletableFuture responseFuture = transportClient.get(requestPath); + TransportClientResponse response = responseFuture.get(POLL_TIMEOUT_MS, TimeUnit.MILLISECONDS); + HybridStoreQuotaStatusResponse quotaStatusResponse = + mapper.readValue(response.getBody(), HybridStoreQuotaStatusResponse.class); + if (quotaStatusResponse.isError()) { + if (quotaStatusResponse.getErrorType().equals(ErrorType.STORE_NOT_FOUND)) { + LOGGER.warn("Store not found, reinitializing client! Error: {}", quotaStatusResponse.getError()); + // TODO: It'd make sense to call shutdown on the transport client, but it's a shared object so that's + // a bit dangerous. + transportClient = reinitProvider.apply(); + return; + } + LOGGER.error("Router was not able to get hybrid quota status: {}", quotaStatusResponse.getError()); + return; + } + hybridStoreQuotaMonitorService.setCurrentStatus(quotaStatusResponse.getQuotaStatus()); + switch (quotaStatusResponse.getQuotaStatus()) { + case QUOTA_VIOLATED: + LOGGER.info("Hybrid job failed with quota violation for store: {}", storeName); + break; + default: + LOGGER.info("Current hybrid job state: {} for store: {}", quotaStatusResponse.getQuotaStatus(), storeName); + } + } + @Override public void run() { LOGGER.info("Running {}", this.getClass().getSimpleName()); while (isRunning.get()) { try { - // Get hybrid store quota status - CompletableFuture responseFuture = transportClient.get(requestPath); - TransportClientResponse response = responseFuture.get(POLL_TIMEOUT_MS, TimeUnit.MILLISECONDS); - HybridStoreQuotaStatusResponse quotaStatusResponse = - mapper.readValue(response.getBody(), HybridStoreQuotaStatusResponse.class); - if (quotaStatusResponse.isError()) { - if (quotaStatusResponse.getErrorType().equals(ErrorType.STORE_NOT_FOUND)) { - LOGGER.warn("Store not found, reinitializing client! Error: {}", quotaStatusResponse.getError()); - // TODO: It'd make sense to call shutdown on the transport client, but it's a shared object so that's - // a bit dangerous. - transportClient = reinitProvider.apply(); - continue; - } - LOGGER.error("Router was not able to get hybrid quota status: {}", quotaStatusResponse.getError()); - continue; - } - hybridStoreQuotaMonitorService.setCurrentStatus(quotaStatusResponse.getQuotaStatus()); - switch (quotaStatusResponse.getQuotaStatus()) { - case QUOTA_VIOLATED: - LOGGER.info("Hybrid job failed with quota violation for store: {}", storeName); - break; - default: - LOGGER - .info("Current hybrid job state: {} for store: {}", quotaStatusResponse.getQuotaStatus(), storeName); - } - + checkStatus(); Utils.sleep(POLL_CYCLE_DELAY_MS); } catch (Exception e) { if (isRunning.get() && !ExceptionUtils.recursiveClassEquals(e, InterruptedException.class)) { diff --git a/clients/venice-samza/src/test/java/com/linkedin/venice/pushmonitor/RouterBasedHybridStoreQuotaMonitorTest.java b/clients/venice-samza/src/test/java/com/linkedin/venice/pushmonitor/RouterBasedHybridStoreQuotaMonitorTest.java new file mode 100644 index 0000000000..0549c15f79 --- /dev/null +++ b/clients/venice-samza/src/test/java/com/linkedin/venice/pushmonitor/RouterBasedHybridStoreQuotaMonitorTest.java @@ -0,0 +1,56 @@ +package com.linkedin.venice.pushmonitor; + +import static org.testng.Assert.*; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.linkedin.venice.client.store.transport.TransportClient; +import com.linkedin.venice.client.store.transport.TransportClientResponse; +import com.linkedin.venice.exceptions.ErrorType; +import com.linkedin.venice.meta.Version; +import com.linkedin.venice.routerapi.HybridStoreQuotaStatusResponse; +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.Test; + + +public class RouterBasedHybridStoreQuotaMonitorTest { + private static final String STORE_NAME = "fake_Store"; + private static final String TOPIC_NAME = "fake_Store_v1"; + + @Test + public void testGetCurrentStatus() throws IOException, ExecutionException, InterruptedException, TimeoutException { + TransportClient mockTransportclient = Mockito.mock(TransportClient.class); + TransportClientResponse mockResponse = Mockito.mock(TransportClientResponse.class); + ObjectMapper mockMapper = Mockito.mock(ObjectMapper.class); + HybridStoreQuotaStatusResponse mockQuotaStatusResponse = Mockito.mock(HybridStoreQuotaStatusResponse.class); + Mockito.when(mockResponse.getBody()).thenReturn(STORE_NAME.getBytes()); + Mockito.when(mockTransportclient.get(Mockito.anyString())) + .thenReturn(CompletableFuture.completedFuture(mockResponse)); + Mockito + .when(mockMapper.readValue(Mockito.eq(STORE_NAME.getBytes()), Mockito.eq(HybridStoreQuotaStatusResponse.class))) + .thenReturn(mockQuotaStatusResponse); + Mockito.when(mockQuotaStatusResponse.isError()).thenReturn(true); + Mockito.when(mockQuotaStatusResponse.getErrorType()).thenReturn(ErrorType.STORE_NOT_FOUND); + + final boolean[] isReinitCalled = { false }; + RouterBasedHybridStoreQuotaMonitor.TransportClientReinitProvider transportClientReinitProvider = () -> { + isReinitCalled[0] = true; + return mockTransportclient; + }; + RouterBasedHybridStoreQuotaMonitor routerBasedHybridStoreQuotaMonitor = new RouterBasedHybridStoreQuotaMonitor( + mockTransportclient, + STORE_NAME, + Version.PushType.STREAM, + TOPIC_NAME, + transportClientReinitProvider); + + routerBasedHybridStoreQuotaMonitor.getHybridQuotaMonitorTask().setMapper(mockMapper); + routerBasedHybridStoreQuotaMonitor.getHybridQuotaMonitorTask().checkStatus(); + + Assert.assertTrue(isReinitCalled[0]); + } +} From 612d3b0083c0ef59a042db7e2636f8ccc1fc5115 Mon Sep 17 00:00:00 2001 From: Zac Policzer Date: Mon, 5 Feb 2024 13:57:39 -0800 Subject: [PATCH 3/4] change test method name to be something more appropriate to reality --- .../pushmonitor/RouterBasedHybridStoreQuotaMonitorTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/clients/venice-samza/src/test/java/com/linkedin/venice/pushmonitor/RouterBasedHybridStoreQuotaMonitorTest.java b/clients/venice-samza/src/test/java/com/linkedin/venice/pushmonitor/RouterBasedHybridStoreQuotaMonitorTest.java index 0549c15f79..ac72649435 100644 --- a/clients/venice-samza/src/test/java/com/linkedin/venice/pushmonitor/RouterBasedHybridStoreQuotaMonitorTest.java +++ b/clients/venice-samza/src/test/java/com/linkedin/venice/pushmonitor/RouterBasedHybridStoreQuotaMonitorTest.java @@ -22,7 +22,8 @@ public class RouterBasedHybridStoreQuotaMonitorTest { private static final String TOPIC_NAME = "fake_Store_v1"; @Test - public void testGetCurrentStatus() throws IOException, ExecutionException, InterruptedException, TimeoutException { + public void testTransportClientReinit() + throws IOException, ExecutionException, InterruptedException, TimeoutException { TransportClient mockTransportclient = Mockito.mock(TransportClient.class); TransportClientResponse mockResponse = Mockito.mock(TransportClientResponse.class); ObjectMapper mockMapper = Mockito.mock(ObjectMapper.class); From 0a568c977f834aae35230495dbfeb633dbcfc706 Mon Sep 17 00:00:00 2001 From: Zac Policzer Date: Mon, 5 Feb 2024 16:47:33 -0800 Subject: [PATCH 4/4] improve coverage --- .../RouterBasedHybridStoreQuotaMonitor.java | 1 - ...outerBasedHybridStoreQuotaMonitorTest.java | 34 +++++++++++++++++++ 2 files changed, 34 insertions(+), 1 deletion(-) diff --git a/clients/venice-samza/src/main/java/com/linkedin/venice/pushmonitor/RouterBasedHybridStoreQuotaMonitor.java b/clients/venice-samza/src/main/java/com/linkedin/venice/pushmonitor/RouterBasedHybridStoreQuotaMonitor.java index d048c31e8f..2dfebadf33 100644 --- a/clients/venice-samza/src/main/java/com/linkedin/venice/pushmonitor/RouterBasedHybridStoreQuotaMonitor.java +++ b/clients/venice-samza/src/main/java/com/linkedin/venice/pushmonitor/RouterBasedHybridStoreQuotaMonitor.java @@ -132,7 +132,6 @@ protected void checkStatus() throws ExecutionException, InterruptedException, Ti // TODO: It'd make sense to call shutdown on the transport client, but it's a shared object so that's // a bit dangerous. transportClient = reinitProvider.apply(); - return; } LOGGER.error("Router was not able to get hybrid quota status: {}", quotaStatusResponse.getError()); return; diff --git a/clients/venice-samza/src/test/java/com/linkedin/venice/pushmonitor/RouterBasedHybridStoreQuotaMonitorTest.java b/clients/venice-samza/src/test/java/com/linkedin/venice/pushmonitor/RouterBasedHybridStoreQuotaMonitorTest.java index ac72649435..b0e0eef746 100644 --- a/clients/venice-samza/src/test/java/com/linkedin/venice/pushmonitor/RouterBasedHybridStoreQuotaMonitorTest.java +++ b/clients/venice-samza/src/test/java/com/linkedin/venice/pushmonitor/RouterBasedHybridStoreQuotaMonitorTest.java @@ -54,4 +54,38 @@ public void testTransportClientReinit() Assert.assertTrue(isReinitCalled[0]); } + + @Test + public void testStatusChange() throws IOException, ExecutionException, InterruptedException, TimeoutException { + TransportClient mockTransportclient = Mockito.mock(TransportClient.class); + TransportClientResponse mockResponse = Mockito.mock(TransportClientResponse.class); + ObjectMapper mockMapper = Mockito.mock(ObjectMapper.class); + HybridStoreQuotaStatusResponse mockQuotaStatusResponse = Mockito.mock(HybridStoreQuotaStatusResponse.class); + Mockito.when(mockResponse.getBody()).thenReturn(STORE_NAME.getBytes()); + Mockito.when(mockTransportclient.get(Mockito.anyString())) + .thenReturn(CompletableFuture.completedFuture(mockResponse)); + Mockito + .when(mockMapper.readValue(Mockito.eq(STORE_NAME.getBytes()), Mockito.eq(HybridStoreQuotaStatusResponse.class))) + .thenReturn(mockQuotaStatusResponse); + Mockito.when(mockQuotaStatusResponse.isError()).thenReturn(false); + Mockito.when(mockQuotaStatusResponse.getQuotaStatus()).thenReturn(HybridStoreQuotaStatus.QUOTA_VIOLATED); + + final boolean[] isReinitCalled = { false }; + RouterBasedHybridStoreQuotaMonitor.TransportClientReinitProvider transportClientReinitProvider = () -> { + isReinitCalled[0] = true; + return mockTransportclient; + }; + RouterBasedHybridStoreQuotaMonitor routerBasedHybridStoreQuotaMonitor = new RouterBasedHybridStoreQuotaMonitor( + mockTransportclient, + STORE_NAME, + Version.PushType.STREAM, + TOPIC_NAME, + transportClientReinitProvider); + + routerBasedHybridStoreQuotaMonitor.getHybridQuotaMonitorTask().setMapper(mockMapper); + routerBasedHybridStoreQuotaMonitor.getHybridQuotaMonitorTask().checkStatus(); + + Assert.assertFalse(isReinitCalled[0]); + Assert.assertEquals(routerBasedHybridStoreQuotaMonitor.getCurrentStatus(), HybridStoreQuotaStatus.QUOTA_VIOLATED); + } }