Skip to content

Commit

Permalink
Refactor PartitionOffsetFetcher and TopicManager
Browse files Browse the repository at this point in the history
Rename caller protected methods to use fetch prefix
Fix test
  • Loading branch information
sushantmane committed Oct 30, 2023
1 parent 9163931 commit 73ed929
Show file tree
Hide file tree
Showing 6 changed files with 247 additions and 253 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ <T> T fetchMetadata(
Map<PubSubMetadataCacheKey, ValueAndExpiryTime<T>> metadataCache,
Supplier<T> valueSupplier) {
final long now = System.nanoTime();
final ValueAndExpiryTime<T> cachedValue =
ValueAndExpiryTime<T> cachedValue =
metadataCache.computeIfAbsent(key, k -> new ValueAndExpiryTime<>(valueSupplier.get(), now + ttlNs));

// For a given key in the given cache, we will only issue one async request at the same time.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import com.linkedin.venice.utils.RetryUtils;
import com.linkedin.venice.utils.Time;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.lazy.Lazy;
import io.tehuti.metrics.MetricsRepository;
import it.unimi.dsi.fastutil.ints.Int2LongMap;
import java.io.Closeable;
Expand Down Expand Up @@ -78,9 +77,8 @@ public class TopicManager implements Closeable {
private final long kafkaOperationTimeoutMs;
private final long topicMinLogCompactionLagMs;
private final PubSubAdminAdapterFactory<PubSubAdminAdapter> pubSubAdminAdapterFactory;
// TODO: Use single PubSubAdminAdapter for both read and write operations
private final Lazy<PubSubAdminAdapter> pubSubWriteOnlyAdminAdapter;
private final Lazy<PubSubAdminAdapter> pubSubReadOnlyAdminAdapter;
private final PubSubAdminAdapter writeOnlyPubSubAdminAdapter;
private final PubSubAdminAdapter readOnlyPubSubAdminAdapter;
private final PartitionOffsetFetcher partitionOffsetFetcher;

// It's expensive to grab the topic config over and over again, and it changes infrequently. So we temporarily cache
Expand All @@ -90,7 +88,7 @@ public class TopicManager implements Closeable {

public TopicManager(TopicManagerRepository.Builder builder, String pubSubBootstrapServers) {
String pubSubServersForLogger = Utils.getSanitizedStringForLogger(pubSubBootstrapServers);
this.logger = LogManager.getLogger(this.getClass().getSimpleName() + " [" + pubSubServersForLogger + "]");
this.logger = LogManager.getLogger(this.getClass().getSimpleName() + "[" + pubSubServersForLogger + "]");
this.kafkaOperationTimeoutMs = builder.getKafkaOperationTimeoutMs();
this.topicMinLogCompactionLagMs = builder.getTopicMinLogCompactionLagMs();
this.pubSubAdminAdapterFactory = builder.getPubSubAdminAdapterFactory();
Expand All @@ -101,41 +99,31 @@ public TopicManager(TopicManagerRepository.Builder builder, String pubSubBootstr

Optional<MetricsRepository> optionalMetricsRepository = Optional.ofNullable(builder.getMetricsRepository());

this.pubSubReadOnlyAdminAdapter = Lazy.of(() -> {
PubSubAdminAdapter pubSubReadOnlyAdmin =
pubSubAdminAdapterFactory.create(pubSubProperties.get(pubSubBootstrapServers), pubSubTopicRepository);
pubSubReadOnlyAdmin = createInstrumentedPubSubAdmin(
optionalMetricsRepository,
"ReadOnlyKafkaAdminStats",
pubSubReadOnlyAdmin,
pubSubBootstrapServers);
logger.info(
"{} is using read-only pubsub admin client of class: {}",
this.getClass().getSimpleName(),
pubSubReadOnlyAdmin.getClassName());
return pubSubReadOnlyAdmin;
});

this.pubSubWriteOnlyAdminAdapter = Lazy.of(() -> {
PubSubAdminAdapter pubSubWriteOnlyAdmin =
pubSubAdminAdapterFactory.create(pubSubProperties.get(pubSubBootstrapServers), pubSubTopicRepository);
pubSubWriteOnlyAdmin = createInstrumentedPubSubAdmin(
optionalMetricsRepository,
"WriteOnlyKafkaAdminStats",
pubSubWriteOnlyAdmin,
pubSubBootstrapServers);
logger.info(
"{} is using write-only pubsub admin client of class: {}",
this.getClass().getSimpleName(),
pubSubWriteOnlyAdmin.getClassName());
return pubSubWriteOnlyAdmin;
});
this.readOnlyPubSubAdminAdapter = createInstrumentedPubSubAdmin(
optionalMetricsRepository,
"ReadOnlyPubSubAdminStats",
pubSubAdminAdapterFactory.create(pubSubProperties.get(pubSubBootstrapServers), pubSubTopicRepository),
pubSubBootstrapServers);
logger.info(
"{} is using read-only pubsub admin client of class: {}",
this.getClass().getSimpleName(),
readOnlyPubSubAdminAdapter.getClassName());

this.writeOnlyPubSubAdminAdapter = createInstrumentedPubSubAdmin(
optionalMetricsRepository,
"WriteOnlyPubSubAdminStats",
pubSubAdminAdapterFactory.create(pubSubProperties.get(pubSubBootstrapServers), pubSubTopicRepository),
pubSubBootstrapServers);
logger.info(
"{} is using write-only pubsub admin client of class: {}",
this.getClass().getSimpleName(),
writeOnlyPubSubAdminAdapter.getClassName());

this.partitionOffsetFetcher = PartitionOffsetFetcherFactory.createDefaultPartitionOffsetFetcher(
builder.getPubSubConsumerAdapterFactory(),
pubSubProperties.get(pubSubBootstrapServers),
pubSubBootstrapServers,
pubSubReadOnlyAdminAdapter,
readOnlyPubSubAdminAdapter,
kafkaOperationTimeoutMs,
optionalMetricsRepository);
}
Expand Down Expand Up @@ -261,7 +249,7 @@ public void createTopic(

try {
RetryUtils.executeWithMaxAttemptAndExponentialBackoff(
() -> pubSubWriteOnlyAdminAdapter.get()
() -> writeOnlyPubSubAdminAdapter
.createTopic(topicName, numPartitions, replication, pubSubTopicConfiguration),
10,
Duration.ofMillis(200),
Expand Down Expand Up @@ -326,7 +314,7 @@ public boolean updateTopicRetention(
Optional<Long> retentionTimeMs = pubSubTopicConfiguration.retentionInMs();
if (!retentionTimeMs.isPresent() || expectedRetentionInMs != retentionTimeMs.get()) {
pubSubTopicConfiguration.setRetentionInMs(Optional.of(expectedRetentionInMs));
pubSubWriteOnlyAdminAdapter.get().setTopicConfig(topicName, pubSubTopicConfiguration);
writeOnlyPubSubAdminAdapter.setTopicConfig(topicName, pubSubTopicConfiguration);
logger.info(
"Updated topic: {} with retention.ms: {} in cluster [{}]",
topicName,
Expand Down Expand Up @@ -354,7 +342,7 @@ public synchronized void updateTopicCompactionPolicy(
PubSubTopic topic,
boolean expectedLogCompacted,
long minLogCompactionLagMs) throws PubSubTopicDoesNotExistException {
long expectedMinLogCompactionLagMs = 0l;
long expectedMinLogCompactionLagMs = 0L;
if (expectedLogCompacted) {
if (minLogCompactionLagMs > 0) {
expectedMinLogCompactionLagMs = minLogCompactionLagMs;
Expand All @@ -370,7 +358,7 @@ public synchronized void updateTopicCompactionPolicy(
|| expectedLogCompacted && expectedMinLogCompactionLagMs != currentMinLogCompactionLagMs) {
pubSubTopicConfiguration.setLogCompacted(expectedLogCompacted);
pubSubTopicConfiguration.setMinLogCompactionLagMs(expectedMinLogCompactionLagMs);
pubSubWriteOnlyAdminAdapter.get().setTopicConfig(topic, pubSubTopicConfiguration);
writeOnlyPubSubAdminAdapter.setTopicConfig(topic, pubSubTopicConfiguration);
logger.info(
"Kafka compaction policy for topic: {} has been updated from {} to {}, min compaction lag updated from {} to {}",
topic,
Expand Down Expand Up @@ -398,7 +386,7 @@ public boolean updateTopicMinInSyncReplica(PubSubTopic topicName, int minISR)
// config doesn't exist config is different
if (!currentMinISR.isPresent() || !currentMinISR.get().equals(minISR)) {
pubSubTopicConfiguration.setMinInSyncReplicas(Optional.of(minISR));
pubSubWriteOnlyAdminAdapter.get().setTopicConfig(topicName, pubSubTopicConfiguration);
writeOnlyPubSubAdminAdapter.setTopicConfig(topicName, pubSubTopicConfiguration);
logger.info("Updated topic: {} with min.insync.replicas: {}", topicName, minISR);
return true;
}
Expand All @@ -407,7 +395,7 @@ public boolean updateTopicMinInSyncReplica(PubSubTopic topicName, int minISR)
}

public Map<PubSubTopic, Long> getAllTopicRetentions() {
return pubSubReadOnlyAdminAdapter.get().getAllTopicRetentions();
return readOnlyPubSubAdminAdapter.getAllTopicRetentions();
}

/**
Expand Down Expand Up @@ -448,15 +436,13 @@ public boolean isRetentionBelowTruncatedThreshold(long retention, long truncated
* This operation is a little heavy, since it will pull the configs for all the topics.
*/
public PubSubTopicConfiguration getTopicConfig(PubSubTopic topicName) throws PubSubTopicDoesNotExistException {
final PubSubTopicConfiguration pubSubTopicConfiguration =
pubSubReadOnlyAdminAdapter.get().getTopicConfig(topicName);
PubSubTopicConfiguration pubSubTopicConfiguration = readOnlyPubSubAdminAdapter.getTopicConfig(topicName);
topicConfigCache.put(topicName, pubSubTopicConfiguration);
return pubSubTopicConfiguration;
}

public PubSubTopicConfiguration getTopicConfigWithRetry(PubSubTopic topicName) {
final PubSubTopicConfiguration pubSubTopicConfiguration =
pubSubReadOnlyAdminAdapter.get().getTopicConfigWithRetry(topicName);
PubSubTopicConfiguration pubSubTopicConfiguration = readOnlyPubSubAdminAdapter.getTopicConfigWithRetry(topicName);
topicConfigCache.put(topicName, pubSubTopicConfiguration);
return pubSubTopicConfiguration;
}
Expand All @@ -474,8 +460,8 @@ public PubSubTopicConfiguration getCachedTopicConfig(PubSubTopic topicName) {
}

public Map<PubSubTopic, PubSubTopicConfiguration> getSomeTopicConfigs(Set<PubSubTopic> topicNames) {
final Map<PubSubTopic, PubSubTopicConfiguration> topicConfigs =
pubSubReadOnlyAdminAdapter.get().getSomeTopicConfigs(topicNames);
Map<PubSubTopic, PubSubTopicConfiguration> topicConfigs =
readOnlyPubSubAdminAdapter.getSomeTopicConfigs(topicNames);
for (Map.Entry<PubSubTopic, PubSubTopicConfiguration> topicConfig: topicConfigs.entrySet()) {
topicConfigCache.put(topicConfig.getKey(), topicConfig.getValue());
}
Expand All @@ -494,7 +480,7 @@ public void ensureTopicIsDeletedAndBlock(PubSubTopic pubSubTopic) {

logger.info("Deleting topic: {}", pubSubTopic);
try {
pubSubWriteOnlyAdminAdapter.get().deleteTopic(pubSubTopic, Duration.ofMillis(kafkaOperationTimeoutMs));
writeOnlyPubSubAdminAdapter.deleteTopic(pubSubTopic, Duration.ofMillis(kafkaOperationTimeoutMs));
logger.info("Topic: {} has been deleted", pubSubTopic);
} catch (PubSubOpTimeoutException e) {
logger.warn("Failed to delete topic: {} after {} ms", pubSubTopic, kafkaOperationTimeoutMs);
Expand All @@ -506,7 +492,7 @@ public void ensureTopicIsDeletedAndBlock(PubSubTopic pubSubTopic) {
}

// let's make sure the topic is deleted
if (pubSubWriteOnlyAdminAdapter.get().containsTopic(pubSubTopic)) {
if (writeOnlyPubSubAdminAdapter.containsTopic(pubSubTopic)) {
throw new PubSubTopicExistsException("Topic: " + pubSubTopic.getName() + " still exists after deletion");
}
}
Expand Down Expand Up @@ -535,14 +521,14 @@ public void ensureTopicIsDeletedAndBlockWithRetry(PubSubTopic pubSubTopic) {
}

public synchronized Set<PubSubTopic> listTopics() {
return pubSubReadOnlyAdminAdapter.get().listAllTopics();
return readOnlyPubSubAdminAdapter.listAllTopics();
}

/**
* A quick check to see whether the topic exists.
*/
public boolean containsTopic(PubSubTopic topic) {
return pubSubReadOnlyAdminAdapter.get().containsTopic(topic);
return readOnlyPubSubAdminAdapter.containsTopic(topic);
}

/**
Expand All @@ -553,7 +539,7 @@ public boolean containsTopicWithExpectationAndRetry(
PubSubTopic topic,
int maxAttempts,
final boolean expectedResult) {
return pubSubReadOnlyAdminAdapter.get().containsTopicWithExpectationAndRetry(topic, maxAttempts, expectedResult);
return readOnlyPubSubAdminAdapter.containsTopicWithExpectationAndRetry(topic, maxAttempts, expectedResult);
}

public boolean containsTopicWithExpectationAndRetry(
Expand All @@ -563,14 +549,13 @@ public boolean containsTopicWithExpectationAndRetry(
Duration initialBackoff,
Duration maxBackoff,
Duration maxDuration) {
return pubSubReadOnlyAdminAdapter.get()
.containsTopicWithExpectationAndRetry(
topic,
maxAttempts,
expectedResult,
initialBackoff,
maxBackoff,
maxDuration);
return readOnlyPubSubAdminAdapter.containsTopicWithExpectationAndRetry(
topic,
maxAttempts,
expectedResult,
initialBackoff,
maxBackoff,
maxDuration);
}

/**
Expand Down Expand Up @@ -666,9 +651,19 @@ public String getPubSubBootstrapServers() {

@Override
public synchronized void close() {
long startTime = System.currentTimeMillis();
logger.info("Closing topic manager for pubsub cluster with bootstrap servers: {}", this.pubSubBootstrapServers);
Utils.closeQuietlyWithErrorLogged(partitionOffsetFetcher);
pubSubReadOnlyAdminAdapter.ifPresent(Utils::closeQuietlyWithErrorLogged);
pubSubWriteOnlyAdminAdapter.ifPresent(Utils::closeQuietlyWithErrorLogged);
if (readOnlyPubSubAdminAdapter != null) {
Utils.closeQuietlyWithErrorLogged(readOnlyPubSubAdminAdapter);
}
if (writeOnlyPubSubAdminAdapter != null) {
Utils.closeQuietlyWithErrorLogged(writeOnlyPubSubAdminAdapter);
}
logger.info(
"Closed topic manager for pubsub cluster with bootstrap servers: {} in {}ms",
this.pubSubBootstrapServers,
System.currentTimeMillis() - startTime);
}

// For testing only
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
import com.linkedin.venice.pubsub.PubSubConsumerAdapterFactory;
import com.linkedin.venice.pubsub.api.PubSubAdminAdapter;
import com.linkedin.venice.pubsub.api.PubSubConsumerAdapter;
import com.linkedin.venice.pubsub.api.PubSubMessageDeserializer;
import com.linkedin.venice.serialization.avro.KafkaValueSerializer;
import com.linkedin.venice.utils.SystemTime;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.utils.lazy.Lazy;
import com.linkedin.venice.utils.pools.LandFillObjectPool;
import io.tehuti.metrics.MetricsRepository;
import java.util.Optional;
Expand All @@ -18,18 +18,18 @@ public static PartitionOffsetFetcher createDefaultPartitionOffsetFetcher(
PubSubConsumerAdapterFactory pubSubConsumerAdapterFactory,
VeniceProperties veniceProperties,
String pubSubBootstrapServers,
Lazy<PubSubAdminAdapter> kafkaAdminWrapper,
PubSubAdminAdapter pubSubAdminAdapter,
long kafkaOperationTimeoutMs,
Optional<MetricsRepository> optionalMetricsRepository) {
PubSubMessageDeserializer pubSubMessageDeserializer = new PubSubMessageDeserializer(
new KafkaValueSerializer(),
new LandFillObjectPool<>(KafkaMessageEnvelope::new),
new LandFillObjectPool<>(KafkaMessageEnvelope::new));
PubSubConsumerAdapter pubSubConsumerAdapter =
pubSubConsumerAdapterFactory.create(veniceProperties, false, pubSubMessageDeserializer, pubSubBootstrapServers);
PartitionOffsetFetcher partitionOffsetFetcher = new PartitionOffsetFetcherImpl(
kafkaAdminWrapper,
Lazy.of(
() -> pubSubConsumerAdapterFactory
.create(veniceProperties, false, pubSubMessageDeserializer, pubSubBootstrapServers)),
pubSubAdminAdapter,
pubSubConsumerAdapter,
kafkaOperationTimeoutMs,
pubSubBootstrapServers);
if (optionalMetricsRepository.isPresent()) {
Expand Down
Loading

0 comments on commit 73ed929

Please sign in to comment.