Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DRAFT] Reduce consumer lock contention in PartitionOffsetFetcherImpl #682

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ <T> T fetchMetadata(
Map<PubSubMetadataCacheKey, ValueAndExpiryTime<T>> metadataCache,
Supplier<T> valueSupplier) {
final long now = System.nanoTime();
final ValueAndExpiryTime<T> cachedValue =
ValueAndExpiryTime<T> cachedValue =
metadataCache.computeIfAbsent(key, k -> new ValueAndExpiryTime<>(valueSupplier.get(), now + ttlNs));

// For a given key in the given cache, we will only issue one async request at the same time.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import com.linkedin.venice.utils.RetryUtils;
import com.linkedin.venice.utils.Time;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.lazy.Lazy;
import io.tehuti.metrics.MetricsRepository;
import it.unimi.dsi.fastutil.ints.Int2LongMap;
import java.io.Closeable;
Expand Down Expand Up @@ -78,9 +77,8 @@ public class TopicManager implements Closeable {
private final long kafkaOperationTimeoutMs;
private final long topicMinLogCompactionLagMs;
private final PubSubAdminAdapterFactory<PubSubAdminAdapter> pubSubAdminAdapterFactory;
// TODO: Use single PubSubAdminAdapter for both read and write operations
private final Lazy<PubSubAdminAdapter> pubSubWriteOnlyAdminAdapter;
private final Lazy<PubSubAdminAdapter> pubSubReadOnlyAdminAdapter;
private final PubSubAdminAdapter writeOnlyPubSubAdminAdapter;
private final PubSubAdminAdapter readOnlyPubSubAdminAdapter;
private final PartitionOffsetFetcher partitionOffsetFetcher;

// It's expensive to grab the topic config over and over again, and it changes infrequently. So we temporarily cache
Expand All @@ -90,7 +88,7 @@ public class TopicManager implements Closeable {

public TopicManager(TopicManagerRepository.Builder builder, String pubSubBootstrapServers) {
String pubSubServersForLogger = Utils.getSanitizedStringForLogger(pubSubBootstrapServers);
this.logger = LogManager.getLogger(this.getClass().getSimpleName() + " [" + pubSubServersForLogger + "]");
this.logger = LogManager.getLogger(this.getClass().getSimpleName() + "[" + pubSubServersForLogger + "]");
this.kafkaOperationTimeoutMs = builder.getKafkaOperationTimeoutMs();
this.topicMinLogCompactionLagMs = builder.getTopicMinLogCompactionLagMs();
this.pubSubAdminAdapterFactory = builder.getPubSubAdminAdapterFactory();
Expand All @@ -101,41 +99,31 @@ public TopicManager(TopicManagerRepository.Builder builder, String pubSubBootstr

Optional<MetricsRepository> optionalMetricsRepository = Optional.ofNullable(builder.getMetricsRepository());

this.pubSubReadOnlyAdminAdapter = Lazy.of(() -> {
PubSubAdminAdapter pubSubReadOnlyAdmin =
pubSubAdminAdapterFactory.create(pubSubProperties.get(pubSubBootstrapServers), pubSubTopicRepository);
pubSubReadOnlyAdmin = createInstrumentedPubSubAdmin(
optionalMetricsRepository,
"ReadOnlyKafkaAdminStats",
pubSubReadOnlyAdmin,
pubSubBootstrapServers);
logger.info(
"{} is using read-only pubsub admin client of class: {}",
this.getClass().getSimpleName(),
pubSubReadOnlyAdmin.getClassName());
return pubSubReadOnlyAdmin;
});

this.pubSubWriteOnlyAdminAdapter = Lazy.of(() -> {
PubSubAdminAdapter pubSubWriteOnlyAdmin =
pubSubAdminAdapterFactory.create(pubSubProperties.get(pubSubBootstrapServers), pubSubTopicRepository);
pubSubWriteOnlyAdmin = createInstrumentedPubSubAdmin(
optionalMetricsRepository,
"WriteOnlyKafkaAdminStats",
pubSubWriteOnlyAdmin,
pubSubBootstrapServers);
logger.info(
"{} is using write-only pubsub admin client of class: {}",
this.getClass().getSimpleName(),
pubSubWriteOnlyAdmin.getClassName());
return pubSubWriteOnlyAdmin;
});
this.readOnlyPubSubAdminAdapter = createInstrumentedPubSubAdmin(
optionalMetricsRepository,
"ReadOnlyPubSubAdminStats",
pubSubAdminAdapterFactory.create(pubSubProperties.get(pubSubBootstrapServers), pubSubTopicRepository),
pubSubBootstrapServers);
logger.info(
"{} is using read-only pubsub admin client of class: {}",
this.getClass().getSimpleName(),
readOnlyPubSubAdminAdapter.getClassName());

this.writeOnlyPubSubAdminAdapter = createInstrumentedPubSubAdmin(
optionalMetricsRepository,
"WriteOnlyPubSubAdminStats",
pubSubAdminAdapterFactory.create(pubSubProperties.get(pubSubBootstrapServers), pubSubTopicRepository),
pubSubBootstrapServers);
logger.info(
"{} is using write-only pubsub admin client of class: {}",
this.getClass().getSimpleName(),
writeOnlyPubSubAdminAdapter.getClassName());

this.partitionOffsetFetcher = PartitionOffsetFetcherFactory.createDefaultPartitionOffsetFetcher(
builder.getPubSubConsumerAdapterFactory(),
pubSubProperties.get(pubSubBootstrapServers),
pubSubBootstrapServers,
pubSubReadOnlyAdminAdapter,
readOnlyPubSubAdminAdapter,
kafkaOperationTimeoutMs,
optionalMetricsRepository);
}
Expand Down Expand Up @@ -261,7 +249,7 @@ public void createTopic(

try {
RetryUtils.executeWithMaxAttemptAndExponentialBackoff(
() -> pubSubWriteOnlyAdminAdapter.get()
() -> writeOnlyPubSubAdminAdapter
.createTopic(topicName, numPartitions, replication, pubSubTopicConfiguration),
10,
Duration.ofMillis(200),
Expand Down Expand Up @@ -326,7 +314,7 @@ public boolean updateTopicRetention(
Optional<Long> retentionTimeMs = pubSubTopicConfiguration.retentionInMs();
if (!retentionTimeMs.isPresent() || expectedRetentionInMs != retentionTimeMs.get()) {
pubSubTopicConfiguration.setRetentionInMs(Optional.of(expectedRetentionInMs));
pubSubWriteOnlyAdminAdapter.get().setTopicConfig(topicName, pubSubTopicConfiguration);
writeOnlyPubSubAdminAdapter.setTopicConfig(topicName, pubSubTopicConfiguration);
logger.info(
"Updated topic: {} with retention.ms: {} in cluster [{}]",
topicName,
Expand Down Expand Up @@ -354,7 +342,7 @@ public synchronized void updateTopicCompactionPolicy(
PubSubTopic topic,
boolean expectedLogCompacted,
long minLogCompactionLagMs) throws PubSubTopicDoesNotExistException {
long expectedMinLogCompactionLagMs = 0l;
long expectedMinLogCompactionLagMs = 0L;
if (expectedLogCompacted) {
if (minLogCompactionLagMs > 0) {
expectedMinLogCompactionLagMs = minLogCompactionLagMs;
Expand All @@ -370,7 +358,7 @@ public synchronized void updateTopicCompactionPolicy(
|| expectedLogCompacted && expectedMinLogCompactionLagMs != currentMinLogCompactionLagMs) {
pubSubTopicConfiguration.setLogCompacted(expectedLogCompacted);
pubSubTopicConfiguration.setMinLogCompactionLagMs(expectedMinLogCompactionLagMs);
pubSubWriteOnlyAdminAdapter.get().setTopicConfig(topic, pubSubTopicConfiguration);
writeOnlyPubSubAdminAdapter.setTopicConfig(topic, pubSubTopicConfiguration);
logger.info(
"Kafka compaction policy for topic: {} has been updated from {} to {}, min compaction lag updated from {} to {}",
topic,
Expand Down Expand Up @@ -398,7 +386,7 @@ public boolean updateTopicMinInSyncReplica(PubSubTopic topicName, int minISR)
// config doesn't exist config is different
if (!currentMinISR.isPresent() || !currentMinISR.get().equals(minISR)) {
pubSubTopicConfiguration.setMinInSyncReplicas(Optional.of(minISR));
pubSubWriteOnlyAdminAdapter.get().setTopicConfig(topicName, pubSubTopicConfiguration);
writeOnlyPubSubAdminAdapter.setTopicConfig(topicName, pubSubTopicConfiguration);
logger.info("Updated topic: {} with min.insync.replicas: {}", topicName, minISR);
return true;
}
Expand All @@ -407,7 +395,7 @@ public boolean updateTopicMinInSyncReplica(PubSubTopic topicName, int minISR)
}

public Map<PubSubTopic, Long> getAllTopicRetentions() {
return pubSubReadOnlyAdminAdapter.get().getAllTopicRetentions();
return readOnlyPubSubAdminAdapter.getAllTopicRetentions();
}

/**
Expand Down Expand Up @@ -448,15 +436,13 @@ public boolean isRetentionBelowTruncatedThreshold(long retention, long truncated
* This operation is a little heavy, since it will pull the configs for all the topics.
*/
public PubSubTopicConfiguration getTopicConfig(PubSubTopic topicName) throws PubSubTopicDoesNotExistException {
final PubSubTopicConfiguration pubSubTopicConfiguration =
pubSubReadOnlyAdminAdapter.get().getTopicConfig(topicName);
PubSubTopicConfiguration pubSubTopicConfiguration = readOnlyPubSubAdminAdapter.getTopicConfig(topicName);
topicConfigCache.put(topicName, pubSubTopicConfiguration);
return pubSubTopicConfiguration;
}

public PubSubTopicConfiguration getTopicConfigWithRetry(PubSubTopic topicName) {
final PubSubTopicConfiguration pubSubTopicConfiguration =
pubSubReadOnlyAdminAdapter.get().getTopicConfigWithRetry(topicName);
PubSubTopicConfiguration pubSubTopicConfiguration = readOnlyPubSubAdminAdapter.getTopicConfigWithRetry(topicName);
topicConfigCache.put(topicName, pubSubTopicConfiguration);
return pubSubTopicConfiguration;
}
Expand All @@ -474,8 +460,8 @@ public PubSubTopicConfiguration getCachedTopicConfig(PubSubTopic topicName) {
}

public Map<PubSubTopic, PubSubTopicConfiguration> getSomeTopicConfigs(Set<PubSubTopic> topicNames) {
final Map<PubSubTopic, PubSubTopicConfiguration> topicConfigs =
pubSubReadOnlyAdminAdapter.get().getSomeTopicConfigs(topicNames);
Map<PubSubTopic, PubSubTopicConfiguration> topicConfigs =
readOnlyPubSubAdminAdapter.getSomeTopicConfigs(topicNames);
for (Map.Entry<PubSubTopic, PubSubTopicConfiguration> topicConfig: topicConfigs.entrySet()) {
topicConfigCache.put(topicConfig.getKey(), topicConfig.getValue());
}
Expand All @@ -494,7 +480,7 @@ public void ensureTopicIsDeletedAndBlock(PubSubTopic pubSubTopic) {

logger.info("Deleting topic: {}", pubSubTopic);
try {
pubSubWriteOnlyAdminAdapter.get().deleteTopic(pubSubTopic, Duration.ofMillis(kafkaOperationTimeoutMs));
writeOnlyPubSubAdminAdapter.deleteTopic(pubSubTopic, Duration.ofMillis(kafkaOperationTimeoutMs));
logger.info("Topic: {} has been deleted", pubSubTopic);
} catch (PubSubOpTimeoutException e) {
logger.warn("Failed to delete topic: {} after {} ms", pubSubTopic, kafkaOperationTimeoutMs);
Expand All @@ -506,7 +492,7 @@ public void ensureTopicIsDeletedAndBlock(PubSubTopic pubSubTopic) {
}

// let's make sure the topic is deleted
if (pubSubWriteOnlyAdminAdapter.get().containsTopic(pubSubTopic)) {
if (writeOnlyPubSubAdminAdapter.containsTopic(pubSubTopic)) {
throw new PubSubTopicExistsException("Topic: " + pubSubTopic.getName() + " still exists after deletion");
}
}
Expand Down Expand Up @@ -535,14 +521,14 @@ public void ensureTopicIsDeletedAndBlockWithRetry(PubSubTopic pubSubTopic) {
}

public synchronized Set<PubSubTopic> listTopics() {
return pubSubReadOnlyAdminAdapter.get().listAllTopics();
return readOnlyPubSubAdminAdapter.listAllTopics();
}

/**
* A quick check to see whether the topic exists.
*/
public boolean containsTopic(PubSubTopic topic) {
return pubSubReadOnlyAdminAdapter.get().containsTopic(topic);
return readOnlyPubSubAdminAdapter.containsTopic(topic);
}

/**
Expand All @@ -553,7 +539,7 @@ public boolean containsTopicWithExpectationAndRetry(
PubSubTopic topic,
int maxAttempts,
final boolean expectedResult) {
return pubSubReadOnlyAdminAdapter.get().containsTopicWithExpectationAndRetry(topic, maxAttempts, expectedResult);
return readOnlyPubSubAdminAdapter.containsTopicWithExpectationAndRetry(topic, maxAttempts, expectedResult);
}

public boolean containsTopicWithExpectationAndRetry(
Expand All @@ -563,14 +549,13 @@ public boolean containsTopicWithExpectationAndRetry(
Duration initialBackoff,
Duration maxBackoff,
Duration maxDuration) {
return pubSubReadOnlyAdminAdapter.get()
.containsTopicWithExpectationAndRetry(
topic,
maxAttempts,
expectedResult,
initialBackoff,
maxBackoff,
maxDuration);
return readOnlyPubSubAdminAdapter.containsTopicWithExpectationAndRetry(
topic,
maxAttempts,
expectedResult,
initialBackoff,
maxBackoff,
maxDuration);
}

/**
Expand Down Expand Up @@ -666,9 +651,19 @@ public String getPubSubBootstrapServers() {

@Override
public synchronized void close() {
long startTime = System.currentTimeMillis();
logger.info("Closing topic manager for pubsub cluster with bootstrap servers: {}", this.pubSubBootstrapServers);
Utils.closeQuietlyWithErrorLogged(partitionOffsetFetcher);
pubSubReadOnlyAdminAdapter.ifPresent(Utils::closeQuietlyWithErrorLogged);
pubSubWriteOnlyAdminAdapter.ifPresent(Utils::closeQuietlyWithErrorLogged);
if (readOnlyPubSubAdminAdapter != null) {
Utils.closeQuietlyWithErrorLogged(readOnlyPubSubAdminAdapter);
}
if (writeOnlyPubSubAdminAdapter != null) {
Utils.closeQuietlyWithErrorLogged(writeOnlyPubSubAdminAdapter);
}
logger.info(
"Closed topic manager for pubsub cluster with bootstrap servers: {} in {}ms",
this.pubSubBootstrapServers,
System.currentTimeMillis() - startTime);
}

// For testing only
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
import com.linkedin.venice.pubsub.PubSubConsumerAdapterFactory;
import com.linkedin.venice.pubsub.api.PubSubAdminAdapter;
import com.linkedin.venice.pubsub.api.PubSubConsumerAdapter;
import com.linkedin.venice.pubsub.api.PubSubMessageDeserializer;
import com.linkedin.venice.serialization.avro.KafkaValueSerializer;
import com.linkedin.venice.utils.SystemTime;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.utils.lazy.Lazy;
import com.linkedin.venice.utils.pools.LandFillObjectPool;
import io.tehuti.metrics.MetricsRepository;
import java.util.Optional;
Expand All @@ -18,18 +18,18 @@ public static PartitionOffsetFetcher createDefaultPartitionOffsetFetcher(
PubSubConsumerAdapterFactory pubSubConsumerAdapterFactory,
VeniceProperties veniceProperties,
String pubSubBootstrapServers,
Lazy<PubSubAdminAdapter> kafkaAdminWrapper,
PubSubAdminAdapter pubSubAdminAdapter,
long kafkaOperationTimeoutMs,
Optional<MetricsRepository> optionalMetricsRepository) {
PubSubMessageDeserializer pubSubMessageDeserializer = new PubSubMessageDeserializer(
new KafkaValueSerializer(),
new LandFillObjectPool<>(KafkaMessageEnvelope::new),
new LandFillObjectPool<>(KafkaMessageEnvelope::new));
PubSubConsumerAdapter pubSubConsumerAdapter =
pubSubConsumerAdapterFactory.create(veniceProperties, false, pubSubMessageDeserializer, pubSubBootstrapServers);
PartitionOffsetFetcher partitionOffsetFetcher = new PartitionOffsetFetcherImpl(
kafkaAdminWrapper,
Lazy.of(
() -> pubSubConsumerAdapterFactory
.create(veniceProperties, false, pubSubMessageDeserializer, pubSubBootstrapServers)),
pubSubAdminAdapter,
pubSubConsumerAdapter,
kafkaOperationTimeoutMs,
pubSubBootstrapServers);
if (optionalMetricsRepository.isPresent()) {
Expand Down
Loading
Loading