Skip to content

Commit

Permalink
add tests to pass coverage
Browse files Browse the repository at this point in the history
  • Loading branch information
ZacAttack committed Feb 5, 2024
1 parent 8f3c92c commit 3c03f23
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@
import com.linkedin.venice.utils.ObjectMapperFactory;
import com.linkedin.venice.utils.Utils;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -68,6 +71,10 @@ public void close() {
hybridQuotaMonitorTask.close();
}

protected HybridQuotaMonitorTask getHybridQuotaMonitorTask() {
return hybridQuotaMonitorTask;
}

public void setCurrentStatus(HybridStoreQuotaStatus currentStatus) {
this.currentStatus = currentStatus;
}
Expand All @@ -84,8 +91,8 @@ private static String buildStreamReprocessingHybridStoreQuotaRequestPath(String
return TYPE_STREAM_REPROCESSING_HYBRID_STORE_QUOTA + "/" + versionTopic;
}

private static class HybridQuotaMonitorTask implements Runnable, Closeable {
private static ObjectMapper mapper = ObjectMapperFactory.getInstance();
protected static class HybridQuotaMonitorTask implements Runnable, Closeable {
private ObjectMapper mapper = ObjectMapperFactory.getInstance();

private final AtomicBoolean isRunning;
private final String storeName;
Expand All @@ -109,37 +116,43 @@ public HybridQuotaMonitorTask(
this.reinitProvider = reinitProvider;
}

protected void setMapper(ObjectMapper mapper) {
this.mapper = mapper;
}

protected void checkStatus() throws ExecutionException, InterruptedException, TimeoutException, IOException {
// Get hybrid store quota status
CompletableFuture<TransportClientResponse> responseFuture = transportClient.get(requestPath);
TransportClientResponse response = responseFuture.get(POLL_TIMEOUT_MS, TimeUnit.MILLISECONDS);
HybridStoreQuotaStatusResponse quotaStatusResponse =
mapper.readValue(response.getBody(), HybridStoreQuotaStatusResponse.class);
if (quotaStatusResponse.isError()) {
if (quotaStatusResponse.getErrorType().equals(ErrorType.STORE_NOT_FOUND)) {
LOGGER.warn("Store not found, reinitializing client! Error: {}", quotaStatusResponse.getError());
// TODO: It'd make sense to call shutdown on the transport client, but it's a shared object so that's
// a bit dangerous.
transportClient = reinitProvider.apply();
return;
}
LOGGER.error("Router was not able to get hybrid quota status: {}", quotaStatusResponse.getError());
return;
}
hybridStoreQuotaMonitorService.setCurrentStatus(quotaStatusResponse.getQuotaStatus());
switch (quotaStatusResponse.getQuotaStatus()) {
case QUOTA_VIOLATED:
LOGGER.info("Hybrid job failed with quota violation for store: {}", storeName);
break;
default:
LOGGER.info("Current hybrid job state: {} for store: {}", quotaStatusResponse.getQuotaStatus(), storeName);
}
}

@Override
public void run() {
LOGGER.info("Running {}", this.getClass().getSimpleName());
while (isRunning.get()) {
try {
// Get hybrid store quota status
CompletableFuture<TransportClientResponse> responseFuture = transportClient.get(requestPath);
TransportClientResponse response = responseFuture.get(POLL_TIMEOUT_MS, TimeUnit.MILLISECONDS);
HybridStoreQuotaStatusResponse quotaStatusResponse =
mapper.readValue(response.getBody(), HybridStoreQuotaStatusResponse.class);
if (quotaStatusResponse.isError()) {
if (quotaStatusResponse.getErrorType().equals(ErrorType.STORE_NOT_FOUND)) {
LOGGER.warn("Store not found, reinitializing client! Error: {}", quotaStatusResponse.getError());
// TODO: It'd make sense to call shutdown on the transport client, but it's a shared object so that's
// a bit dangerous.
transportClient = reinitProvider.apply();
continue;
}
LOGGER.error("Router was not able to get hybrid quota status: {}", quotaStatusResponse.getError());
continue;
}
hybridStoreQuotaMonitorService.setCurrentStatus(quotaStatusResponse.getQuotaStatus());
switch (quotaStatusResponse.getQuotaStatus()) {
case QUOTA_VIOLATED:
LOGGER.info("Hybrid job failed with quota violation for store: {}", storeName);
break;
default:
LOGGER
.info("Current hybrid job state: {} for store: {}", quotaStatusResponse.getQuotaStatus(), storeName);
}

checkStatus();
Utils.sleep(POLL_CYCLE_DELAY_MS);
} catch (Exception e) {
if (isRunning.get() && !ExceptionUtils.recursiveClassEquals(e, InterruptedException.class)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package com.linkedin.venice.pushmonitor;

import static org.testng.Assert.*;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.linkedin.venice.client.store.transport.TransportClient;
import com.linkedin.venice.client.store.transport.TransportClientResponse;
import com.linkedin.venice.exceptions.ErrorType;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.routerapi.HybridStoreQuotaStatusResponse;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;


public class RouterBasedHybridStoreQuotaMonitorTest {
private static final String STORE_NAME = "fake_Store";
private static final String TOPIC_NAME = "fake_Store_v1";

@Test
public void testGetCurrentStatus() throws IOException, ExecutionException, InterruptedException, TimeoutException {
TransportClient mockTransportclient = Mockito.mock(TransportClient.class);
TransportClientResponse mockResponse = Mockito.mock(TransportClientResponse.class);
ObjectMapper mockMapper = Mockito.mock(ObjectMapper.class);
HybridStoreQuotaStatusResponse mockQuotaStatusResponse = Mockito.mock(HybridStoreQuotaStatusResponse.class);
Mockito.when(mockResponse.getBody()).thenReturn(STORE_NAME.getBytes());
Mockito.when(mockTransportclient.get(Mockito.anyString()))
.thenReturn(CompletableFuture.completedFuture(mockResponse));
Mockito
.when(mockMapper.readValue(Mockito.eq(STORE_NAME.getBytes()), Mockito.eq(HybridStoreQuotaStatusResponse.class)))
.thenReturn(mockQuotaStatusResponse);
Mockito.when(mockQuotaStatusResponse.isError()).thenReturn(true);
Mockito.when(mockQuotaStatusResponse.getErrorType()).thenReturn(ErrorType.STORE_NOT_FOUND);

final boolean[] isReinitCalled = { false };
RouterBasedHybridStoreQuotaMonitor.TransportClientReinitProvider transportClientReinitProvider = () -> {
isReinitCalled[0] = true;
return mockTransportclient;
};
RouterBasedHybridStoreQuotaMonitor routerBasedHybridStoreQuotaMonitor = new RouterBasedHybridStoreQuotaMonitor(
mockTransportclient,
STORE_NAME,
Version.PushType.STREAM,
TOPIC_NAME,
transportClientReinitProvider);

routerBasedHybridStoreQuotaMonitor.getHybridQuotaMonitorTask().setMapper(mockMapper);
routerBasedHybridStoreQuotaMonitor.getHybridQuotaMonitorTask().checkStatus();

Assert.assertTrue(isReinitCalled[0]);
}
}

0 comments on commit 3c03f23

Please sign in to comment.