Skip to content

Commit

Permalink
[fc][test] FC warmup part 1: delay fc startup until metadata is updat…
Browse files Browse the repository at this point in the history
…ed (#697)

* currently fast client start() is a non blocking call and when metadata update fails for the first time it schedules the update and returns leading to the read requests throwing exception until the metadata becomes ready in the background. This change makes it a blocking start until the metadata is ready. In case of InterruptedException, it will fall back to the old ways.
* testFastClientWithoutServers as an integration test doesn't make sense anymore as it will keep on waiting, so removed it. The newly added unit test mimics this (a more general version where metadata update fails for any reason). Need to add more unit tests for metadata. Not in scope for this PR.
  • Loading branch information
m-nagarajan authored Oct 16, 2023
1 parent 1e08181 commit 486cb94
Show file tree
Hide file tree
Showing 5 changed files with 202 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -62,9 +63,10 @@ public class RequestBasedMetadata extends AbstractStoreMetadata {
private static final Logger LOGGER = LogManager.getLogger(RequestBasedMetadata.class);
private static final String VERSION_PARTITION_SEPARATOR = "_";
private static final long ZSTD_DICT_FETCH_TIMEOUT = 10;
private static final long DEFAULT_REFRESH_INTERVAL_IN_SECONDS = 60;
private final long refreshIntervalInSeconds;
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
static final long DEFAULT_REFRESH_INTERVAL_IN_SECONDS = 60;
static final long WARMUP_REFRESH_INTERVAL_IN_SECONDS = 5;
private long refreshIntervalInSeconds;
private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

private final AtomicInteger currentVersion = new AtomicInteger();
private final AtomicInteger latestSuperSetValueSchemaId = new AtomicInteger();
Expand All @@ -83,6 +85,7 @@ public class RequestBasedMetadata extends AbstractStoreMetadata {
private RouterBackedSchemaReader metadataResponseSchemaReader;
private volatile boolean isServiceDiscovered;
private volatile boolean isReady;
private CountDownLatch isReadyLatch = new CountDownLatch(1);

public RequestBasedMetadata(ClientConfig clientConfig, D2TransportClient transportClient) {
super(clientConfig);
Expand Down Expand Up @@ -150,9 +153,19 @@ public void start() {

// build a base for future metadata updates then start periodic refresh
refresh();
try {
// wait till metadata is warmed up
isReadyLatch.await();
} catch (InterruptedException e) {
// if there is an InterruptedException, let the periodic refresh continue with updating the metadata
LOGGER.error(
"Metadata warmup failed and will be retried every {} seconds. Read requests will throw exception until then",
WARMUP_REFRESH_INTERVAL_IN_SECONDS,
e);
}
}

private void discoverD2Service() {
void discoverD2Service() {
if (isServiceDiscovered) {
return;
}
Expand All @@ -173,7 +186,7 @@ private void discoverD2Service() {
* @param onDemandRefresh
* @return if the fetched metadata was an updated version
*/
private synchronized void updateCache(boolean onDemandRefresh) throws InterruptedException {
synchronized void updateCache(boolean onDemandRefresh) throws InterruptedException {
long currentTimeMs = System.currentTimeMillis();
// call the METADATA endpoint
try {
Expand Down Expand Up @@ -240,6 +253,8 @@ private synchronized void updateCache(boolean onDemandRefresh) throws Interrupte
for (Map.Entry<CharSequence, Integer> entry: metadataResponse.getHelixGroupInfo().entrySet()) {
helixGroupInfo.put(entry.getKey().toString(), entry.getValue());
}
routingStrategy.updateHelixGroupInfo(helixGroupInfo);

latestSuperSetValueSchemaId.set(metadataResponse.getLatestSuperSetValueSchemaId());
// Wait for dictionary fetch to finish if there is one
try {
Expand Down Expand Up @@ -283,13 +298,22 @@ private synchronized void updateCache(boolean onDemandRefresh) throws Interrupte
private void refresh() {
try {
updateCache(false);
routingStrategy.updateHelixGroupInfo(helixGroupInfo);
isReady = true;
if (!isReady) {
isReadyLatch.countDown();
isReady = true;
LOGGER.info("Metadata warmup completed successfully");
}
} catch (Exception e) {
// Catch all errors so periodic refresh doesn't break on transient errors.
LOGGER.error("Encountered unexpected error during periodic refresh", e);
LOGGER.error(
"Metadata periodic refresh encountered unexpected error, will be retried in {} seconds",
isReady ? refreshIntervalInSeconds : WARMUP_REFRESH_INTERVAL_IN_SECONDS,
e);
} finally {
scheduler.schedule(this::refresh, refreshIntervalInSeconds, TimeUnit.SECONDS);
scheduler.schedule(
this::refresh,
isReady ? refreshIntervalInSeconds : WARMUP_REFRESH_INTERVAL_IN_SECONDS,
TimeUnit.SECONDS);
}
}

Expand Down Expand Up @@ -408,4 +432,28 @@ public boolean isReady() {
public synchronized void setD2ServiceDiscovery(D2ServiceDiscovery d2ServiceDiscovery) {
this.d2ServiceDiscovery = d2ServiceDiscovery;
}

public void setScheduler(ScheduledExecutorService scheduler) {
this.scheduler = scheduler;
}

public ScheduledExecutorService getScheduler() {
return scheduler;
}

public void setIsReadyLatch(CountDownLatch isReadyLatch) {
this.isReadyLatch = isReadyLatch;
}

public CountDownLatch getIsReadyLatch() {
return this.isReadyLatch;
}

public void setRefreshIntervalInSeconds(long refreshIntervalInSeconds) {
this.refreshIntervalInSeconds = refreshIntervalInSeconds;
}

public long getRefreshIntervalInSeconds() {
return refreshIntervalInSeconds;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,19 +81,19 @@ public void setUp() {
BATCH_GET_VALUE_RESPONSE.put("test_key_2", "test_value_2");
}

private void setUpClient() {
private void setUpClient() throws InterruptedException {
setUpClient(false);
}

private void setUpClient(boolean useStreamingBatchGetAsDefault) {
private void setUpClient(boolean useStreamingBatchGetAsDefault) throws InterruptedException {
setUpClient(useStreamingBatchGetAsDefault, false, false, false);
}

private void setUpClient(
boolean useStreamingBatchGetAsDefault,
boolean transportClientThrowsException,
boolean transportClientThrowsPartialException,
boolean transportClientPartialIncomplete) {
boolean transportClientPartialIncomplete) throws InterruptedException {
setUpClient(
useStreamingBatchGetAsDefault,
transportClientThrowsException,
Expand All @@ -118,7 +118,7 @@ private void setUpClient(
boolean transportClientThrowsPartialException, // only applicable for useStreamingBatchGetAsDefault
boolean transportClientPartialIncomplete, // only applicable for useStreamingBatchGetAsDefault
boolean mockTransportClient,
long routingLeakedRequestCleanupThresholdMS) {
long routingLeakedRequestCleanupThresholdMS) throws InterruptedException {
clientConfigBuilder = new ClientConfig.ClientConfigBuilder<>().setStoreName(STORE_NAME)
.setR2Client(mock(Client.class))
.setUseStreamingBatchGetAsDefault(useStreamingBatchGetAsDefault)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package com.linkedin.venice.fastclient.meta;

import static com.linkedin.venice.fastclient.meta.RequestBasedMetadata.WARMUP_REFRESH_INTERVAL_IN_SECONDS;
import static com.linkedin.venice.fastclient.meta.RequestBasedMetadataTestUtils.KEY_SCHEMA;
import static com.linkedin.venice.fastclient.meta.RequestBasedMetadataTestUtils.VALUE_SCHEMA;
import static com.linkedin.venice.fastclient.meta.RequestBasedMetadataTestUtils.getMockMetaData;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand All @@ -12,25 +15,69 @@
import com.linkedin.venice.compression.CompressionStrategy;
import com.linkedin.venice.fastclient.ClientConfig;
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.utils.DataProviderUtils;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.Time;
import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.testng.annotations.Test;


public class RequestBasedMetadataTest {
private static final int CURRENT_VERSION = 1;
private static final int TEST_TIMEOUT = 10 * Time.MS_PER_SECOND;

@Test
public void testMetadata() throws IOException {
@Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class, timeOut = TEST_TIMEOUT)
public void testMetadataWarmup(boolean firstUpdateFails) throws IOException, InterruptedException {
String storeName = "testStore";

ClientConfig clientConfig = RequestBasedMetadataTestUtils.getMockClientConfig(storeName);
RequestBasedMetadata requestBasedMetadata = null;
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
try {
requestBasedMetadata = getMockMetaData(clientConfig, storeName, false, true, firstUpdateFails, scheduler);
requestBasedMetadata.start();
CountDownLatch isReadyLatch = requestBasedMetadata.getIsReadyLatch();

// 1. verify based on isReadyLatch
assertEquals(isReadyLatch.getCount(), 0);

// 2. verify based on the scheduled retries
RequestBasedMetadata finalRequestBasedMetadata = requestBasedMetadata;
if (firstUpdateFails) {
// schedule retry after WARMUP_REFRESH_INTERVAL_IN_SECONDS
verify(requestBasedMetadata.getScheduler())
.schedule(any(Runnable.class), eq(WARMUP_REFRESH_INTERVAL_IN_SECONDS), eq(TimeUnit.SECONDS));
}
// after success, both cases should schedule retry after configured refresh interval:
// testing that start() is only finished after warmup is done
long periodicRetryAfterSuccessfulWarmup = requestBasedMetadata.getRefreshIntervalInSeconds();
TestUtils.waitForNonDeterministicAssertion(2, TimeUnit.SECONDS, () -> {
verify(finalRequestBasedMetadata.getScheduler())
.schedule(any(Runnable.class), eq(periodicRetryAfterSuccessfulWarmup), eq(TimeUnit.SECONDS));
});
} finally {
scheduler.shutdownNow();
if (requestBasedMetadata != null) {
requestBasedMetadata.close();
}
}
}

@Test(timeOut = TEST_TIMEOUT)
public void testMetadata() throws IOException, InterruptedException {
String storeName = "testStore";

ClientConfig clientConfig = RequestBasedMetadataTestUtils.getMockClientConfig(storeName);
RequestBasedMetadata requestBasedMetadata = null;

try {
requestBasedMetadata = RequestBasedMetadataTestUtils.getMockMetaData(clientConfig, storeName, true);
requestBasedMetadata = getMockMetaData(clientConfig, storeName, true);
requestBasedMetadata.start();
assertEquals(requestBasedMetadata.getStoreName(), storeName);
assertEquals(requestBasedMetadata.getCurrentStoreVersion(), CURRENT_VERSION);
assertEquals(
Expand Down Expand Up @@ -60,15 +107,16 @@ public void testMetadata() throws IOException {
}
}

@Test
public void testMetadataForwardCompat() throws IOException {
@Test(timeOut = TEST_TIMEOUT)
public void testMetadataForwardCompat() throws IOException, InterruptedException {
String storeName = "testStore";
RequestBasedMetadata requestBasedMetadata = null;
try {
RouterBackedSchemaReader routerBackedSchemaReader =
RequestBasedMetadataTestUtils.getMockRouterBackedSchemaReader();
ClientConfig clientConfig = RequestBasedMetadataTestUtils.getMockClientConfig(storeName);
requestBasedMetadata = getMockMetaData(clientConfig, storeName, routerBackedSchemaReader, true);
requestBasedMetadata.start();
int metadataResponseSchemaId = AvroProtocolDefinition.SERVER_METADATA_RESPONSE.getCurrentProtocolVersion();
verify(routerBackedSchemaReader, times(1)).getValueSchema(metadataResponseSchemaId);
// A new metadata response schema should be fetched for subsequent refreshes
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
package com.linkedin.venice.fastclient.meta;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import com.linkedin.venice.client.exceptions.VeniceClientException;
import com.linkedin.venice.client.schema.RouterBackedSchemaReader;
import com.linkedin.venice.client.store.D2ServiceDiscovery;
import com.linkedin.venice.client.store.transport.D2TransportClient;
Expand All @@ -30,6 +35,9 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;


public class RequestBasedMetadataTestUtils {
Expand Down Expand Up @@ -136,28 +144,99 @@ public static RouterBackedSchemaReader getMockRouterBackedSchemaReader() {
return metadataResponseSchemaReader;
}

public static RequestBasedMetadata getMockMetaData(ClientConfig clientConfig, String storeName) {
return getMockMetaData(clientConfig, storeName, getMockRouterBackedSchemaReader(), false);
public static RequestBasedMetadata getMockMetaData(ClientConfig clientConfig, String storeName)
throws InterruptedException {
return getMockMetaData(clientConfig, storeName, getMockRouterBackedSchemaReader(), false, false, false, null);
}

public static RequestBasedMetadata getMockMetaData(
ClientConfig clientConfig,
String storeName,
boolean metadataChange) {
return getMockMetaData(clientConfig, storeName, getMockRouterBackedSchemaReader(), metadataChange);
boolean metadataChange) throws InterruptedException {
return getMockMetaData(
clientConfig,
storeName,
getMockRouterBackedSchemaReader(),
metadataChange,
false,
false,
null);
}

public static RequestBasedMetadata getMockMetaData(
ClientConfig clientConfig,
String storeName,
RouterBackedSchemaReader routerBackedSchemaReader,
boolean metadataChange) {
boolean metadataChange) throws InterruptedException {
return getMockMetaData(clientConfig, storeName, routerBackedSchemaReader, metadataChange, false, false, null);
}

public static RequestBasedMetadata getMockMetaData(
ClientConfig clientConfig,
String storeName,
boolean metadataChange,
boolean mockMetadataUpdateFailure,
boolean firstUpdateFails,
ScheduledExecutorService scheduler) throws InterruptedException {
return getMockMetaData(
clientConfig,
storeName,
getMockRouterBackedSchemaReader(),
metadataChange,
mockMetadataUpdateFailure,
firstUpdateFails,
scheduler);
}

public static RequestBasedMetadata getMockMetaData(
ClientConfig clientConfig,
String storeName,
RouterBackedSchemaReader routerBackedSchemaReader,
boolean metadataChange,
boolean mockMetadataUpdateFailure,
boolean firstUpdateFails,
ScheduledExecutorService scheduler) throws InterruptedException {
D2TransportClient d2TransportClient = getMockD2TransportClient(storeName, metadataChange);
D2ServiceDiscovery d2ServiceDiscovery = getMockD2ServiceDiscovery(d2TransportClient, storeName);
RequestBasedMetadata requestBasedMetadata = new RequestBasedMetadata(clientConfig, d2TransportClient);
RequestBasedMetadata requestBasedMetadata;
if (mockMetadataUpdateFailure) {
requestBasedMetadata = mock(RequestBasedMetadata.class);
doAnswer(invocation -> {
return null;
}).when(requestBasedMetadata).discoverD2Service();

if (firstUpdateFails) {
doAnswer(invocation -> {
throw new VeniceClientException("update cache exception");
}).doAnswer(invocation -> {
return null;
}).when(requestBasedMetadata).updateCache(anyBoolean());
} else {
doAnswer(invocation -> {
return null;
}).when(requestBasedMetadata).updateCache(anyBoolean());
}

doCallRealMethod().when(requestBasedMetadata).setIsReadyLatch(any());
doCallRealMethod().when(requestBasedMetadata).getIsReadyLatch();
doCallRealMethod().when(requestBasedMetadata).setScheduler(any());
doCallRealMethod().when(requestBasedMetadata).getScheduler();
doCallRealMethod().when(requestBasedMetadata).setRefreshIntervalInSeconds(anyLong());
doCallRealMethod().when(requestBasedMetadata).getRefreshIntervalInSeconds();
requestBasedMetadata.setIsReadyLatch(new CountDownLatch(1));
ScheduledExecutorService mockScheduler = mock(ScheduledExecutorService.class);
requestBasedMetadata.setScheduler(mockScheduler);
requestBasedMetadata.setRefreshIntervalInSeconds(RequestBasedMetadata.DEFAULT_REFRESH_INTERVAL_IN_SECONDS);
doAnswer(invocation -> {
scheduler.schedule((Runnable) invocation.getArgument(0), invocation.getArgument(1), invocation.getArgument(2));
return null;
}).when(mockScheduler).schedule(any(Runnable.class), anyLong(), any(TimeUnit.class));
doCallRealMethod().when(requestBasedMetadata).start();
} else {
requestBasedMetadata = new RequestBasedMetadata(clientConfig, d2TransportClient);
}
requestBasedMetadata.setMetadataResponseSchemaReader(routerBackedSchemaReader);
requestBasedMetadata.setD2ServiceDiscovery(d2ServiceDiscovery);
requestBasedMetadata.start();
return requestBasedMetadata;
}
}
Loading

0 comments on commit 486cb94

Please sign in to comment.