Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[controller][server] TopicManager refactoring and performance improvements #743

Merged
merged 8 commits into from
Feb 15, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -1523,10 +1523,7 @@ private void internalClose(boolean doFlush) {
}

if (topicManagerRepository != null) {
Collection<TopicManager> topicManagers = topicManagerRepository.getAllTopicManagers();
for (TopicManager topicManager: topicManagers) {
topicManager.invalidateCache(versionTopic);
}
topicManagerRepository.invalidateTopicManagerCaches(versionTopic);
}

close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -714,7 +714,7 @@ public void close() {
logger.warn("{} is already closed", this);
sushantmane marked this conversation as resolved.
Show resolved Hide resolved
}
try {
closeAsync().get(600, TimeUnit.SECONDS);
closeAsync().get(2, TimeUnit.MINUTES);
sushantmane marked this conversation as resolved.
Show resolved Hide resolved
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("Interrupted while closing: {}", this, e);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.venice.pubsub.manager;

import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import java.io.Closeable;
import java.util.ArrayList;
Expand Down Expand Up @@ -47,10 +48,20 @@ public TopicManager getTopicManager(String pubSubAddress) {
return topicManagers.computeIfAbsent(pubSubAddress, this::createTopicManager);
}

public Collection<TopicManager> getAllTopicManagers() {
Collection<TopicManager> getAllTopicManagers() {
return topicManagers.values();
}

/**
* Invalidates the cache for the given PubSub topic across all TopicManagers in the repository.
* @param pubSubTopic the PubSub topic to invalidate
*/
public void invalidateTopicManagerCaches(PubSubTopic pubSubTopic) {
for (TopicManager topicManager: getAllTopicManagers()) {
topicManager.invalidateCache(pubSubTopic);
}
}

@Override
public void close() {
long startTime = System.currentTimeMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,12 @@ <K, T> void updateCacheAsync(
return;
}

// check if the value has been already updated by another thread; if so, release the lock and return
if (cachedValue != null && cachedValue.getExpiryTimeNs() > System.nanoTime()) {
cachedValue.releaseUpdateLock();
return;
}

completableFutureSupplier.get().whenComplete((value, throwable) -> {
if (throwable != null) {
cache.remove(key);
Expand Down Expand Up @@ -713,6 +719,11 @@ boolean tryAcquireUpdateLock() {
return isUpdateInProgress.compareAndSet(false, true);
}

// release the lock
boolean releaseUpdateLock() {
return isUpdateInProgress.compareAndSet(true, false);
}

void updateValue(T value) {
this.value = value;
this.expiryTimeNs = System.nanoTime() + cachedEntryTtlInNs; // update the expiry time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -131,8 +131,8 @@ public void tearDownMethod() {
}
}

@Test(timeOut = 5 * Time.MS_PER_MINUTE, invocationCount = 1)
public void testAsyncApis() throws ExecutionException, InterruptedException, TimeoutException {
@Test(timeOut = 5 * Time.MS_PER_MINUTE)
public void testConcurrentApiExecution() throws ExecutionException, InterruptedException, TimeoutException {
int numPartitions = 3;
int replicationFactor = 1;
boolean isEternalTopic = true;
Expand All @@ -152,122 +152,98 @@ public void testAsyncApis() throws ExecutionException, InterruptedException, Tim
long tsOfLastDataMessage = messages.get(messages.size() - 1).getValue().getProducerMetadata().getMessageTimestamp();

PubSubTopicPartition nonExistentTopicPartition = new PubSubTopicPartitionImpl(nonExistentTopic, 0);
final AtomicInteger successfulRequests = new AtomicInteger(0);

// get partition count for an existing topic
Runnable t1 = getAssertionTask(
() -> assertEquals(topicManager.getPartitionCount(testTopic), numPartitions),
successfulRequests);
Runnable t1 = () -> assertEquals(topicManager.getPartitionCount(testTopic), numPartitions);

// get partition count for a non-existent topic
Runnable t2 = getAssertionTask(
() -> assertThrows(
PubSubTopicDoesNotExistException.class,
() -> topicManager.getPartitionCount(nonExistentTopic)),
successfulRequests);
Runnable t2 = () -> assertThrows(
PubSubTopicDoesNotExistException.class,
() -> topicManager.getPartitionCount(nonExistentTopic));

// contains topic
Runnable t3 = getAssertionTask(() -> assertTrue(topicManager.containsTopic(testTopic)), successfulRequests);
Runnable t3 = () -> assertTrue(topicManager.containsTopic(testTopic));

// contains topic for non-existent topic
Runnable t4 = getAssertionTask(() -> assertFalse(topicManager.containsTopic(nonExistentTopic)), successfulRequests);
Runnable t4 = () -> assertFalse(topicManager.containsTopic(nonExistentTopic));

// contains topic cached
Runnable t5 = getAssertionTask(() -> assertTrue(topicManager.containsTopicCached(testTopic)), successfulRequests);
Runnable t5 = () -> assertTrue(topicManager.containsTopicCached(testTopic));

// contains topic cached for non-existent topic
Runnable t6 =
getAssertionTask(() -> assertFalse(topicManager.containsTopicCached(nonExistentTopic)), successfulRequests);
Runnable t6 = () -> assertFalse(topicManager.containsTopicCached(nonExistentTopic));

// get latest offset with retries for an existing topic
Runnable t7 = getAssertionTask(
() -> assertEquals(topicManager.getLatestOffsetWithRetries(topicPartition, 1), numMessages),
successfulRequests);
Runnable t7 = () -> assertEquals(topicManager.getLatestOffsetWithRetries(topicPartition, 1), numMessages);

// get latest offset with retries for a non-existent topic
Runnable t8 = getAssertionTask(
() -> assertThrows(
PubSubTopicDoesNotExistException.class,
() -> topicManager.getLatestOffsetWithRetries(nonExistentTopicPartition, 1)),
successfulRequests);
Runnable t8 = () -> assertThrows(
PubSubTopicDoesNotExistException.class,
() -> topicManager.getLatestOffsetWithRetries(nonExistentTopicPartition, 1));

// get latest offset cached for an existing topic
Runnable t9 = getAssertionTask(
() -> assertEquals(topicManager.getLatestOffsetCached(testTopic, 0), numMessages),
successfulRequests);
Runnable t9 = () -> assertEquals(topicManager.getLatestOffsetCached(testTopic, 0), numMessages);

// get latest offset cached for a non-existent topic
Runnable t10 = getAssertionTask(
() -> assertEquals(
topicManager.getLatestOffsetCached(nonExistentTopic, 0),
StatsErrorCode.LAG_MEASUREMENT_FAILURE.code),
successfulRequests);
Runnable t10 = () -> assertEquals(
topicManager.getLatestOffsetCached(nonExistentTopic, 0),
StatsErrorCode.LAG_MEASUREMENT_FAILURE.code);

// get producer timestamp of last data message with retries for an existing topic
Runnable t11 = getAssertionTask(
() -> assertEquals(
topicManager.getProducerTimestampOfLastDataMessageWithRetries(topicPartition, 1),
tsOfLastDataMessage),
successfulRequests);
Runnable t11 = () -> assertEquals(
topicManager.getProducerTimestampOfLastDataMessageWithRetries(topicPartition, 1),
tsOfLastDataMessage);

// get producer timestamp of last data message with retries for a non-existent topic
Runnable t12 = getAssertionTask(
() -> assertThrows(
PubSubTopicDoesNotExistException.class,
() -> topicManager.getProducerTimestampOfLastDataMessageWithRetries(nonExistentTopicPartition, 1)),
successfulRequests);
Runnable t12 = () -> assertThrows(
PubSubTopicDoesNotExistException.class,
() -> topicManager.getProducerTimestampOfLastDataMessageWithRetries(nonExistentTopicPartition, 1));

// get producer timestamp of last data message cached for an existing topic
Runnable t13 = getAssertionTask(
() -> assertEquals(
topicManager.getProducerTimestampOfLastDataMessageCached(topicPartition),
tsOfLastDataMessage),
successfulRequests);
Runnable t13 = () -> assertEquals(
topicManager.getProducerTimestampOfLastDataMessageCached(topicPartition),
tsOfLastDataMessage);

// get producer timestamp of last data message cached for a non-existent topic
Runnable t14 = getAssertionTask(
() -> assertEquals(
topicManager.getProducerTimestampOfLastDataMessageCached(nonExistentTopicPartition),
StatsErrorCode.LAG_MEASUREMENT_FAILURE.code),
successfulRequests);
Runnable t14 = () -> assertEquals(
topicManager.getProducerTimestampOfLastDataMessageCached(nonExistentTopicPartition),
StatsErrorCode.LAG_MEASUREMENT_FAILURE.code);

// get offset by time for an existing topic
Runnable t15 = getAssertionTask(
() -> assertEquals(topicManager.getOffsetByTime(topicPartition, System.currentTimeMillis()), numMessages),
successfulRequests);
Runnable t15 =
() -> assertEquals(topicManager.getOffsetByTime(topicPartition, System.currentTimeMillis()), numMessages);

// get offset by time for a non-existent topic
Runnable t16 = getAssertionTask(
() -> assertThrows(
PubSubTopicDoesNotExistException.class,
() -> topicManager.getOffsetByTime(nonExistentTopicPartition, tsOfLastDataMessage)),
successfulRequests);
Runnable t16 = () -> assertThrows(
PubSubTopicDoesNotExistException.class,
() -> topicManager.getOffsetByTime(nonExistentTopicPartition, tsOfLastDataMessage));

// get offset by time for an existing topic: first message
Runnable t17 = getAssertionTask(
() -> assertEquals(topicManager.getOffsetByTime(topicPartition, timeBeforeProduce), 0),
successfulRequests);
Runnable t17 = () -> assertEquals(topicManager.getOffsetByTime(topicPartition, timeBeforeProduce), 0);

List<Runnable> tasks = Arrays.asList(t1, t2, t3, t4, t5, t6, t7, t8, t9, t10, t11, t12, t13, t14, t15, t16, t17);
// invalidate cache for an existing topic
Runnable t18 = () -> topicManager.invalidateCache(testTopic);

// invalidate cache for a non-existent topic
Runnable t19 = () -> topicManager.invalidateCache(nonExistentTopic);

List<Runnable> tasks =
Arrays.asList(t1, t2, t3, t4, t5, t6, t7, t8, t9, t10, t11, t12, t13, t14, t15, t16, t17, t18, t19);

final AtomicInteger successfulRequests = new AtomicInteger(0);
ExecutorService executorService = Executors.newFixedThreadPool(8);
List<Future> vwFutures = new ArrayList<>();
List<CompletableFuture<Void>> futures = new ArrayList<>();
int totalTasks = 100;
for (int i = 0; i < totalTasks; i++) {
Future future = executorService.submit(tasks.get(i % tasks.size()));
vwFutures.add(future);
int finalI = i;
futures.add(
CompletableFuture.runAsync(() -> tasks.get(finalI % tasks.size()).run(), executorService)
.thenAccept(v -> successfulRequests.incrementAndGet()));
}

int failedRequests = 0;
for (Future future: vwFutures) {
try {
future.get(3, TimeUnit.MINUTES);
} catch (Exception e) {
failedRequests++;
}
}
assertEquals(failedRequests, 0);
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(5, TimeUnit.MINUTES);
assertEquals(successfulRequests.get(), totalTasks);
}

private static Runnable getAssertionTask(Runnable runnable, AtomicInteger successfulRequests) {
return () -> {
try {
runnable.run();
successfulRequests.incrementAndGet();
} catch (AssertionError e) {
e.printStackTrace();
throw new AssertionError("Assertion failed: " + e.getMessage(), e);
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
}
};
}

@Test(timeOut = 3 * Time.MS_PER_MINUTE)
public void testMetadataApisForNonExistentTopics() throws ExecutionException, InterruptedException, TimeoutException {
PubSubTopic nonExistentTopic = pubSubTopicRepository.getTopic(Utils.getUniqueString("nonExistentTopic"));
Expand Down
Loading