Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
  • Loading branch information
arjun4084346 committed Nov 14, 2024
1 parent 20785a8 commit d195293
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.linkedin.venice.controllerapi.UpdateClusterConfigQueryParams;
import com.linkedin.venice.controllerapi.UpdateStoragePersonaQueryParams;
import com.linkedin.venice.controllerapi.UpdateStoreQueryParams;
import com.linkedin.venice.exceptions.VeniceNoStoreException;
import com.linkedin.venice.helix.HelixReadOnlyStoreConfigRepository;
import com.linkedin.venice.helix.HelixReadOnlyZKSharedSchemaRepository;
import com.linkedin.venice.helix.HelixReadOnlyZKSharedSystemStoreRepository;
Expand Down Expand Up @@ -292,6 +293,9 @@ Version incrementVersionIdempotent(

default String getRealTimeTopic(String clusterName, String storeName) {
Store store = getStore(clusterName, storeName);
if (store == null) {
throw new VeniceNoStoreException(storeName, clusterName);
}
return getRealTimeTopic(clusterName, store);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,19 +282,18 @@ private void populateDeprecatedTopicQueue(PriorityQueue<PubSubTopic> topics) {
Map<String, Map<PubSubTopic, Long>> allStoreTopics = getAllVeniceStoreTopicsRetentions(topicsWithRetention);
allStoreTopics.forEach((storeName, topicRetentions) -> {
int minNumOfUnusedVersionTopicsOverride = minNumberOfUnusedKafkaTopicsToPreserve;
Store store;
try {
store = admin.getStore(admin.discoverCluster(storeName).getFirst(), storeName);
} catch (VeniceNoStoreException e) {
return;
}
PubSubTopic realTimeTopic = pubSubTopicRepository.getTopic(Version.getRealTimeTopicName(store));
if (topicRetentions.containsKey(realTimeTopic)) {
if (admin.isTopicTruncatedBasedOnRetention(topicRetentions.get(realTimeTopic))) {
topics.offer(realTimeTopic);
minNumOfUnusedVersionTopicsOverride = 0;
List<PubSubTopic> realTimeTopics = topicRetentions.keySet()
.stream()
.filter(topic -> Version.isRealTimeTopic(topic.getName()))
.collect(Collectors.toList());
for (PubSubTopic realTimeTopic: realTimeTopics) {
if (topicRetentions.containsKey(realTimeTopic)) {
if (admin.isTopicTruncatedBasedOnRetention(topicRetentions.get(realTimeTopic))) {
topics.offer(realTimeTopic);
minNumOfUnusedVersionTopicsOverride = 0;
}
topicRetentions.remove(realTimeTopic);
}
topicRetentions.remove(realTimeTopic);
}
List<PubSubTopic> oldTopicsToDelete =
extractVersionTopicsToCleanup(admin, topicRetentions, minNumOfUnusedVersionTopicsOverride, delayFactor);
Expand Down

0 comments on commit d195293

Please sign in to comment.