Skip to content

Commit

Permalink
Address review comment
Browse files Browse the repository at this point in the history
  • Loading branch information
sushantmane committed Feb 12, 2024
1 parent 6048dc9 commit 9c02126
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1523,10 +1523,7 @@ private void internalClose(boolean doFlush) {
}

if (topicManagerRepository != null) {
Collection<TopicManager> topicManagers = topicManagerRepository.getAllTopicManagers();
for (TopicManager topicManager: topicManagers) {
topicManager.invalidateCache(versionTopic);
}
topicManagerRepository.invalidateTopicManagerCaches(versionTopic);
}

close();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.venice.pubsub.manager;

import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import java.io.Closeable;
import java.util.ArrayList;
Expand Down Expand Up @@ -47,10 +48,20 @@ public TopicManager getTopicManager(String pubSubAddress) {
return topicManagers.computeIfAbsent(pubSubAddress, this::createTopicManager);
}

public Collection<TopicManager> getAllTopicManagers() {
Collection<TopicManager> getAllTopicManagers() {
return topicManagers.values();
}

/**
* Invalidates the cache for the given PubSub topic across all TopicManagers in the repository.
* @param pubSubTopic the PubSub topic to invalidate
*/
public void invalidateTopicManagerCaches(PubSubTopic pubSubTopic) {
for (TopicManager topicManager: getAllTopicManagers()) {
topicManager.invalidateCache(pubSubTopic);
}
}

@Override
public void close() {
long startTime = System.currentTimeMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,12 @@ <K, T> void updateCacheAsync(
return;
}

// check if the value has been already updated by another thread; if so, release the lock and return
if (cachedValue != null && cachedValue.getExpiryTimeNs() > System.nanoTime()) {
cachedValue.releaseUpdateLock();
return;
}

completableFutureSupplier.get().whenComplete((value, throwable) -> {
if (throwable != null) {
cache.remove(key);
Expand Down Expand Up @@ -713,6 +719,11 @@ boolean tryAcquireUpdateLock() {
return isUpdateInProgress.compareAndSet(false, true);
}

// release the lock
boolean releaseUpdateLock() {
return isUpdateInProgress.compareAndSet(true, false);
}

void updateValue(T value) {
this.value = value;
this.expiryTimeNs = System.nanoTime() + cachedEntryTtlInNs; // update the expiry time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -131,8 +131,8 @@ public void tearDownMethod() {
}
}

@Test(timeOut = 5 * Time.MS_PER_MINUTE, invocationCount = 1)
public void testAsyncApis() throws ExecutionException, InterruptedException, TimeoutException {
@Test(timeOut = 5 * Time.MS_PER_MINUTE)
public void testConcurrentApiExecution() throws ExecutionException, InterruptedException, TimeoutException {
int numPartitions = 3;
int replicationFactor = 1;
boolean isEternalTopic = true;
Expand All @@ -152,122 +152,98 @@ public void testAsyncApis() throws ExecutionException, InterruptedException, Tim
long tsOfLastDataMessage = messages.get(messages.size() - 1).getValue().getProducerMetadata().getMessageTimestamp();

PubSubTopicPartition nonExistentTopicPartition = new PubSubTopicPartitionImpl(nonExistentTopic, 0);
final AtomicInteger successfulRequests = new AtomicInteger(0);

// get partition count for an existing topic
Runnable t1 = getAssertionTask(
() -> assertEquals(topicManager.getPartitionCount(testTopic), numPartitions),
successfulRequests);
Runnable t1 = () -> assertEquals(topicManager.getPartitionCount(testTopic), numPartitions);

// get partition count for a non-existent topic
Runnable t2 = getAssertionTask(
() -> assertThrows(
PubSubTopicDoesNotExistException.class,
() -> topicManager.getPartitionCount(nonExistentTopic)),
successfulRequests);
Runnable t2 = () -> assertThrows(
PubSubTopicDoesNotExistException.class,
() -> topicManager.getPartitionCount(nonExistentTopic));

// contains topic
Runnable t3 = getAssertionTask(() -> assertTrue(topicManager.containsTopic(testTopic)), successfulRequests);
Runnable t3 = () -> assertTrue(topicManager.containsTopic(testTopic));

// contains topic for non-existent topic
Runnable t4 = getAssertionTask(() -> assertFalse(topicManager.containsTopic(nonExistentTopic)), successfulRequests);
Runnable t4 = () -> assertFalse(topicManager.containsTopic(nonExistentTopic));

// contains topic cached
Runnable t5 = getAssertionTask(() -> assertTrue(topicManager.containsTopicCached(testTopic)), successfulRequests);
Runnable t5 = () -> assertTrue(topicManager.containsTopicCached(testTopic));

// contains topic cached for non-existent topic
Runnable t6 =
getAssertionTask(() -> assertFalse(topicManager.containsTopicCached(nonExistentTopic)), successfulRequests);
Runnable t6 = () -> assertFalse(topicManager.containsTopicCached(nonExistentTopic));

// get latest offset with retries for an existing topic
Runnable t7 = getAssertionTask(
() -> assertEquals(topicManager.getLatestOffsetWithRetries(topicPartition, 1), numMessages),
successfulRequests);
Runnable t7 = () -> assertEquals(topicManager.getLatestOffsetWithRetries(topicPartition, 1), numMessages);

// get latest offset with retries for a non-existent topic
Runnable t8 = getAssertionTask(
() -> assertThrows(
PubSubTopicDoesNotExistException.class,
() -> topicManager.getLatestOffsetWithRetries(nonExistentTopicPartition, 1)),
successfulRequests);
Runnable t8 = () -> assertThrows(
PubSubTopicDoesNotExistException.class,
() -> topicManager.getLatestOffsetWithRetries(nonExistentTopicPartition, 1));

// get latest offset cached for an existing topic
Runnable t9 = getAssertionTask(
() -> assertEquals(topicManager.getLatestOffsetCached(testTopic, 0), numMessages),
successfulRequests);
Runnable t9 = () -> assertEquals(topicManager.getLatestOffsetCached(testTopic, 0), numMessages);

// get latest offset cached for a non-existent topic
Runnable t10 = getAssertionTask(
() -> assertEquals(
topicManager.getLatestOffsetCached(nonExistentTopic, 0),
StatsErrorCode.LAG_MEASUREMENT_FAILURE.code),
successfulRequests);
Runnable t10 = () -> assertEquals(
topicManager.getLatestOffsetCached(nonExistentTopic, 0),
StatsErrorCode.LAG_MEASUREMENT_FAILURE.code);

// get producer timestamp of last data message with retries for an existing topic
Runnable t11 = getAssertionTask(
() -> assertEquals(
topicManager.getProducerTimestampOfLastDataMessageWithRetries(topicPartition, 1),
tsOfLastDataMessage),
successfulRequests);
Runnable t11 = () -> assertEquals(
topicManager.getProducerTimestampOfLastDataMessageWithRetries(topicPartition, 1),
tsOfLastDataMessage);

// get producer timestamp of last data message with retries for a non-existent topic
Runnable t12 = getAssertionTask(
() -> assertThrows(
PubSubTopicDoesNotExistException.class,
() -> topicManager.getProducerTimestampOfLastDataMessageWithRetries(nonExistentTopicPartition, 1)),
successfulRequests);
Runnable t12 = () -> assertThrows(
PubSubTopicDoesNotExistException.class,
() -> topicManager.getProducerTimestampOfLastDataMessageWithRetries(nonExistentTopicPartition, 1));

// get producer timestamp of last data message cached for an existing topic
Runnable t13 = getAssertionTask(
() -> assertEquals(
topicManager.getProducerTimestampOfLastDataMessageCached(topicPartition),
tsOfLastDataMessage),
successfulRequests);
Runnable t13 = () -> assertEquals(
topicManager.getProducerTimestampOfLastDataMessageCached(topicPartition),
tsOfLastDataMessage);

// get producer timestamp of last data message cached for a non-existent topic
Runnable t14 = getAssertionTask(
() -> assertEquals(
topicManager.getProducerTimestampOfLastDataMessageCached(nonExistentTopicPartition),
StatsErrorCode.LAG_MEASUREMENT_FAILURE.code),
successfulRequests);
Runnable t14 = () -> assertEquals(
topicManager.getProducerTimestampOfLastDataMessageCached(nonExistentTopicPartition),
StatsErrorCode.LAG_MEASUREMENT_FAILURE.code);

// get offset by time for an existing topic
Runnable t15 = getAssertionTask(
() -> assertEquals(topicManager.getOffsetByTime(topicPartition, System.currentTimeMillis()), numMessages),
successfulRequests);
Runnable t15 =
() -> assertEquals(topicManager.getOffsetByTime(topicPartition, System.currentTimeMillis()), numMessages);

// get offset by time for a non-existent topic
Runnable t16 = getAssertionTask(
() -> assertThrows(
PubSubTopicDoesNotExistException.class,
() -> topicManager.getOffsetByTime(nonExistentTopicPartition, tsOfLastDataMessage)),
successfulRequests);
Runnable t16 = () -> assertThrows(
PubSubTopicDoesNotExistException.class,
() -> topicManager.getOffsetByTime(nonExistentTopicPartition, tsOfLastDataMessage));

// get offset by time for an existing topic: first message
Runnable t17 = getAssertionTask(
() -> assertEquals(topicManager.getOffsetByTime(topicPartition, timeBeforeProduce), 0),
successfulRequests);
Runnable t17 = () -> assertEquals(topicManager.getOffsetByTime(topicPartition, timeBeforeProduce), 0);

List<Runnable> tasks = Arrays.asList(t1, t2, t3, t4, t5, t6, t7, t8, t9, t10, t11, t12, t13, t14, t15, t16, t17);
// invalidate cache for an existing topic
Runnable t18 = () -> topicManager.invalidateCache(testTopic);

// invalidate cache for a non-existent topic
Runnable t19 = () -> topicManager.invalidateCache(nonExistentTopic);

List<Runnable> tasks =
Arrays.asList(t1, t2, t3, t4, t5, t6, t7, t8, t9, t10, t11, t12, t13, t14, t15, t16, t17, t18, t19);

final AtomicInteger successfulRequests = new AtomicInteger(0);
ExecutorService executorService = Executors.newFixedThreadPool(8);
List<Future> vwFutures = new ArrayList<>();
List<CompletableFuture<Void>> futures = new ArrayList<>();
int totalTasks = 100;
for (int i = 0; i < totalTasks; i++) {
Future future = executorService.submit(tasks.get(i % tasks.size()));
vwFutures.add(future);
int finalI = i;
futures.add(
CompletableFuture.runAsync(() -> tasks.get(finalI % tasks.size()).run(), executorService)
.thenAccept(v -> successfulRequests.incrementAndGet()));
}

int failedRequests = 0;
for (Future future: vwFutures) {
try {
future.get(3, TimeUnit.MINUTES);
} catch (Exception e) {
failedRequests++;
}
}
assertEquals(failedRequests, 0);
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(5, TimeUnit.MINUTES);
assertEquals(successfulRequests.get(), totalTasks);
}

private static Runnable getAssertionTask(Runnable runnable, AtomicInteger successfulRequests) {
return () -> {
try {
runnable.run();
successfulRequests.incrementAndGet();
} catch (AssertionError e) {
e.printStackTrace();
throw new AssertionError("Assertion failed: " + e.getMessage(), e);
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
}
};
}

@Test(timeOut = 3 * Time.MS_PER_MINUTE)
public void testMetadataApisForNonExistentTopics() throws ExecutionException, InterruptedException, TimeoutException {
PubSubTopic nonExistentTopic = pubSubTopicRepository.getTopic(Utils.getUniqueString("nonExistentTopic"));
Expand Down

0 comments on commit 9c02126

Please sign in to comment.