From 3c03f23e1a403f393360ea03b1890d1f627b25a1 Mon Sep 17 00:00:00 2001 From: Zac Policzer Date: Mon, 5 Feb 2024 13:55:07 -0800 Subject: [PATCH] add tests to pass coverage --- .../RouterBasedHybridStoreQuotaMonitor.java | 69 +++++++++++-------- ...outerBasedHybridStoreQuotaMonitorTest.java | 56 +++++++++++++++ 2 files changed, 97 insertions(+), 28 deletions(-) create mode 100644 clients/venice-samza/src/test/java/com/linkedin/venice/pushmonitor/RouterBasedHybridStoreQuotaMonitorTest.java diff --git a/clients/venice-samza/src/main/java/com/linkedin/venice/pushmonitor/RouterBasedHybridStoreQuotaMonitor.java b/clients/venice-samza/src/main/java/com/linkedin/venice/pushmonitor/RouterBasedHybridStoreQuotaMonitor.java index 4f8fe2b67e..d048c31e8f 100644 --- a/clients/venice-samza/src/main/java/com/linkedin/venice/pushmonitor/RouterBasedHybridStoreQuotaMonitor.java +++ b/clients/venice-samza/src/main/java/com/linkedin/venice/pushmonitor/RouterBasedHybridStoreQuotaMonitor.java @@ -15,10 +15,13 @@ import com.linkedin.venice.utils.ObjectMapperFactory; import com.linkedin.venice.utils.Utils; import java.io.Closeable; +import java.io.IOException; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -68,6 +71,10 @@ public void close() { hybridQuotaMonitorTask.close(); } + protected HybridQuotaMonitorTask getHybridQuotaMonitorTask() { + return hybridQuotaMonitorTask; + } + public void setCurrentStatus(HybridStoreQuotaStatus currentStatus) { this.currentStatus = currentStatus; } @@ -84,8 +91,8 @@ private static String buildStreamReprocessingHybridStoreQuotaRequestPath(String return TYPE_STREAM_REPROCESSING_HYBRID_STORE_QUOTA + "/" + versionTopic; } - private static class HybridQuotaMonitorTask implements Runnable, Closeable { - private static ObjectMapper mapper = ObjectMapperFactory.getInstance(); + protected static class HybridQuotaMonitorTask implements Runnable, Closeable { + private ObjectMapper mapper = ObjectMapperFactory.getInstance(); private final AtomicBoolean isRunning; private final String storeName; @@ -109,37 +116,43 @@ public HybridQuotaMonitorTask( this.reinitProvider = reinitProvider; } + protected void setMapper(ObjectMapper mapper) { + this.mapper = mapper; + } + + protected void checkStatus() throws ExecutionException, InterruptedException, TimeoutException, IOException { + // Get hybrid store quota status + CompletableFuture responseFuture = transportClient.get(requestPath); + TransportClientResponse response = responseFuture.get(POLL_TIMEOUT_MS, TimeUnit.MILLISECONDS); + HybridStoreQuotaStatusResponse quotaStatusResponse = + mapper.readValue(response.getBody(), HybridStoreQuotaStatusResponse.class); + if (quotaStatusResponse.isError()) { + if (quotaStatusResponse.getErrorType().equals(ErrorType.STORE_NOT_FOUND)) { + LOGGER.warn("Store not found, reinitializing client! Error: {}", quotaStatusResponse.getError()); + // TODO: It'd make sense to call shutdown on the transport client, but it's a shared object so that's + // a bit dangerous. + transportClient = reinitProvider.apply(); + return; + } + LOGGER.error("Router was not able to get hybrid quota status: {}", quotaStatusResponse.getError()); + return; + } + hybridStoreQuotaMonitorService.setCurrentStatus(quotaStatusResponse.getQuotaStatus()); + switch (quotaStatusResponse.getQuotaStatus()) { + case QUOTA_VIOLATED: + LOGGER.info("Hybrid job failed with quota violation for store: {}", storeName); + break; + default: + LOGGER.info("Current hybrid job state: {} for store: {}", quotaStatusResponse.getQuotaStatus(), storeName); + } + } + @Override public void run() { LOGGER.info("Running {}", this.getClass().getSimpleName()); while (isRunning.get()) { try { - // Get hybrid store quota status - CompletableFuture responseFuture = transportClient.get(requestPath); - TransportClientResponse response = responseFuture.get(POLL_TIMEOUT_MS, TimeUnit.MILLISECONDS); - HybridStoreQuotaStatusResponse quotaStatusResponse = - mapper.readValue(response.getBody(), HybridStoreQuotaStatusResponse.class); - if (quotaStatusResponse.isError()) { - if (quotaStatusResponse.getErrorType().equals(ErrorType.STORE_NOT_FOUND)) { - LOGGER.warn("Store not found, reinitializing client! Error: {}", quotaStatusResponse.getError()); - // TODO: It'd make sense to call shutdown on the transport client, but it's a shared object so that's - // a bit dangerous. - transportClient = reinitProvider.apply(); - continue; - } - LOGGER.error("Router was not able to get hybrid quota status: {}", quotaStatusResponse.getError()); - continue; - } - hybridStoreQuotaMonitorService.setCurrentStatus(quotaStatusResponse.getQuotaStatus()); - switch (quotaStatusResponse.getQuotaStatus()) { - case QUOTA_VIOLATED: - LOGGER.info("Hybrid job failed with quota violation for store: {}", storeName); - break; - default: - LOGGER - .info("Current hybrid job state: {} for store: {}", quotaStatusResponse.getQuotaStatus(), storeName); - } - + checkStatus(); Utils.sleep(POLL_CYCLE_DELAY_MS); } catch (Exception e) { if (isRunning.get() && !ExceptionUtils.recursiveClassEquals(e, InterruptedException.class)) { diff --git a/clients/venice-samza/src/test/java/com/linkedin/venice/pushmonitor/RouterBasedHybridStoreQuotaMonitorTest.java b/clients/venice-samza/src/test/java/com/linkedin/venice/pushmonitor/RouterBasedHybridStoreQuotaMonitorTest.java new file mode 100644 index 0000000000..0549c15f79 --- /dev/null +++ b/clients/venice-samza/src/test/java/com/linkedin/venice/pushmonitor/RouterBasedHybridStoreQuotaMonitorTest.java @@ -0,0 +1,56 @@ +package com.linkedin.venice.pushmonitor; + +import static org.testng.Assert.*; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.linkedin.venice.client.store.transport.TransportClient; +import com.linkedin.venice.client.store.transport.TransportClientResponse; +import com.linkedin.venice.exceptions.ErrorType; +import com.linkedin.venice.meta.Version; +import com.linkedin.venice.routerapi.HybridStoreQuotaStatusResponse; +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.Test; + + +public class RouterBasedHybridStoreQuotaMonitorTest { + private static final String STORE_NAME = "fake_Store"; + private static final String TOPIC_NAME = "fake_Store_v1"; + + @Test + public void testGetCurrentStatus() throws IOException, ExecutionException, InterruptedException, TimeoutException { + TransportClient mockTransportclient = Mockito.mock(TransportClient.class); + TransportClientResponse mockResponse = Mockito.mock(TransportClientResponse.class); + ObjectMapper mockMapper = Mockito.mock(ObjectMapper.class); + HybridStoreQuotaStatusResponse mockQuotaStatusResponse = Mockito.mock(HybridStoreQuotaStatusResponse.class); + Mockito.when(mockResponse.getBody()).thenReturn(STORE_NAME.getBytes()); + Mockito.when(mockTransportclient.get(Mockito.anyString())) + .thenReturn(CompletableFuture.completedFuture(mockResponse)); + Mockito + .when(mockMapper.readValue(Mockito.eq(STORE_NAME.getBytes()), Mockito.eq(HybridStoreQuotaStatusResponse.class))) + .thenReturn(mockQuotaStatusResponse); + Mockito.when(mockQuotaStatusResponse.isError()).thenReturn(true); + Mockito.when(mockQuotaStatusResponse.getErrorType()).thenReturn(ErrorType.STORE_NOT_FOUND); + + final boolean[] isReinitCalled = { false }; + RouterBasedHybridStoreQuotaMonitor.TransportClientReinitProvider transportClientReinitProvider = () -> { + isReinitCalled[0] = true; + return mockTransportclient; + }; + RouterBasedHybridStoreQuotaMonitor routerBasedHybridStoreQuotaMonitor = new RouterBasedHybridStoreQuotaMonitor( + mockTransportclient, + STORE_NAME, + Version.PushType.STREAM, + TOPIC_NAME, + transportClientReinitProvider); + + routerBasedHybridStoreQuotaMonitor.getHybridQuotaMonitorTask().setMapper(mockMapper); + routerBasedHybridStoreQuotaMonitor.getHybridQuotaMonitorTask().checkStatus(); + + Assert.assertTrue(isReinitCalled[0]); + } +}