Skip to content

Commit

Permalink
[changelog] Tweak after image consumer to not consume view topic (#744)
Browse files Browse the repository at this point in the history
  • Loading branch information
ZacAttack authored Nov 13, 2023
1 parent a420879 commit 49ff76c
Show file tree
Hide file tree
Showing 7 changed files with 420 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ public class ChangelogClientConfig<T extends SpecificRecord> {

private String bootstrapFileSystemPath;

private long versionSwapDetectionIntervalTimeInMs = 600000L;

public ChangelogClientConfig(String storeName) {
this.innerClientConfig = new ClientConfig<>(storeName);
}
Expand Down Expand Up @@ -131,6 +133,15 @@ public String getBootstrapFileSystemPath() {
return this.bootstrapFileSystemPath;
}

public long getVersionSwapDetectionIntervalTimeInMs() {
return versionSwapDetectionIntervalTimeInMs;
}

public ChangelogClientConfig setVersionSwapDetectionIntervalTimeInMs(long intervalTimeInMs) {
this.versionSwapDetectionIntervalTimeInMs = intervalTimeInMs;
return this;
}

public static <V extends SpecificRecord> ChangelogClientConfig<V> cloneConfig(ChangelogClientConfig<V> config) {
ChangelogClientConfig<V> newConfig = new ChangelogClientConfig<V>().setStoreName(config.getStoreName())
.setLocalD2ZkHosts(config.getLocalD2ZkHosts())
Expand All @@ -142,7 +153,8 @@ public static <V extends SpecificRecord> ChangelogClientConfig<V> cloneConfig(Ch
.setControllerD2ServiceName(config.controllerD2ServiceName)
.setD2Client(config.getD2Client())
.setControllerRequestRetryCount(config.getControllerRequestRetryCount())
.setBootstrapFileSystemPath(config.getBootstrapFileSystemPath());
.setBootstrapFileSystemPath(config.getBootstrapFileSystemPath())
.setVersionSwapDetectionIntervalTimeInMs(config.getVersionSwapDetectionIntervalTimeInMs());
return newConfig;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ public CompletableFuture<Void> seekWithBootStrap(Set<Integer> partitions) {
localCheckpoint = VeniceChangeCoordinate.decodeStringAndConvertToVeniceChangeCoordinate(offsetString);
}
} catch (IOException | ClassNotFoundException e) {
throw new VeniceException("Failed to decode local hhange capture coordinate chekcpoint with exception: ", e);
throw new VeniceException("Failed to decode local change capture coordinate checkpoint with exception: ", e);
}

// Where we need to catch up to
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,214 @@
package com.linkedin.davinci.consumer;

import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.kafka.protocol.ControlMessage;
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
import com.linkedin.venice.kafka.protocol.enums.ControlMessageType;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.pubsub.adapter.kafka.ApacheKafkaOffsetPosition;
import com.linkedin.venice.pubsub.api.PubSubConsumerAdapter;
import com.linkedin.venice.pubsub.api.PubSubMessage;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.utils.lazy.Lazy;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;


public class VeniceAfterImageConsumerImpl<K, V> extends VeniceChangelogConsumerImpl<K, V> {
private static final Logger LOGGER = LogManager.getLogger(VeniceAfterImageConsumerImpl.class);
// 10 Minute default
protected long versionSwapDetectionIntervalTimeInMs;
// This consumer is used to find EOP messages without impacting consumption by other subscriptions. It's only used
// in the context of seeking to EOP in the event of the user calling that seek or a version push.
// TODO: We shouldn't use this in the long run. Once the EOP position is queryable from venice and version
// swap is produced to VT, then we should remove this as it's no longer needed.
final private Lazy<VeniceChangelogConsumerImpl<K, V>> internalSeekConsumer;
private Thread versionSwapDetectionThread;

public VeniceAfterImageConsumerImpl(ChangelogClientConfig changelogClientConfig, PubSubConsumerAdapter consumer) {
this(
changelogClientConfig,
consumer,
Lazy.of(
() -> new VeniceChangelogConsumerImpl<K, V>(
changelogClientConfig,
VeniceChangelogConsumerClientFactory.getConsumer(
changelogClientConfig.getConsumerProperties(),
changelogClientConfig.getStoreName() + "-" + "internal"))));
}

protected VeniceAfterImageConsumerImpl(
ChangelogClientConfig changelogClientConfig,
PubSubConsumerAdapter consumer,
Lazy<VeniceChangelogConsumerImpl<K, V>> seekConsumer) {
super(changelogClientConfig, consumer);
versionSwapDetectionThread = new VersionSwapDetectionThread();
internalSeekConsumer = seekConsumer;
versionSwapDetectionIntervalTimeInMs = changelogClientConfig.getVersionSwapDetectionIntervalTimeInMs();
}

@Override
public Collection<PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate>> poll(long timeoutInMs) {
if (!versionSwapDetectionThread.isAlive()) {
versionSwapDetectionThread.start();
}
return internalPoll(timeoutInMs, "");
}

@Override
public CompletableFuture<Void> seekToTimestamps(Map<Integer, Long> timestamps) {
return internalSeekToTimestamps(timestamps, "");
}

@Override
public CompletableFuture<Void> seekToTail(Set<Integer> partitions) {
return internalSeekToTail(partitions, "");
}

@Override
public CompletableFuture<Void> seekToEndOfPush(Set<Integer> partitions) {
return CompletableFuture.supplyAsync(() -> {
synchronized (internalSeekConsumer) {
try {
// TODO: This implementation basically just scans the version topic until it finds the EOP message. The
// approach
// we'd like to do is instead add the offset of the EOP message in the VT, and then just seek to that offset.
// We'll do that in a future patch.
internalSeekConsumer.get().subscribe(partitions).get();

// We need to get the internal consumer as we have to intercept the control messages that we would normally
// filter out from the user
PubSubConsumerAdapter consumerAdapter = internalSeekConsumer.get().getPubSubConsumer();

Map<PubSubTopicPartition, List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>>> polledResults;
Map<Integer, Boolean> endOfPushConsumedPerPartitionMap = new HashMap<>();
Set<VeniceChangeCoordinate> checkpoints = new HashSet<>();

// Initialize map with all false entries for each partition
for (Integer partition: partitions) {
endOfPushConsumedPerPartitionMap.put(partition, false);
}

// poll until we get EOP for all partitions
while (true) {
polledResults = consumerAdapter.poll(5000L);
// Loop through all polled messages
for (Map.Entry<PubSubTopicPartition, List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>>> entry: polledResults
.entrySet()) {
PubSubTopicPartition pubSubTopicPartition = entry.getKey();
List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>> messageList = entry.getValue();
for (PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> message: messageList) {
if (message.getKey().isControlMessage()) {
ControlMessage controlMessage = (ControlMessage) message.getValue().getPayloadUnion();
ControlMessageType controlMessageType = ControlMessageType.valueOf(controlMessage);
if (controlMessageType.equals(ControlMessageType.END_OF_PUSH)) {
// note down the partition and offset and mark that we've got the thing
endOfPushConsumedPerPartitionMap.put(pubSubTopicPartition.getPartitionNumber(), true);
VeniceChangeCoordinate coordinate = new VeniceChangeCoordinate(
pubSubTopicPartition.getPubSubTopic().getName(),
new ApacheKafkaOffsetPosition(message.getOffset()),
pubSubTopicPartition.getPartitionNumber());
checkpoints.add(coordinate);
Set<Integer> unsubSet = new HashSet<>();
unsubSet.add(pubSubTopicPartition.getPartitionNumber());
internalSeekConsumer.get().unsubscribe(unsubSet);
// No need to look at the rest of the messages for this partition that we might have polled
break;
}
}
}
}
if (endOfPushConsumedPerPartitionMap.values().stream().allMatch(e -> e)) {
// We polled all EOP messages, stop polling!
break;
}
}
this.seekToCheckpoint(checkpoints).get();
} catch (InterruptedException | ExecutionException e) {
throw new VeniceException(
"Seek to End of Push Failed for store: " + storeName + " partitions: " + partitions.toString(),
e);
}
}
return null;
});
}

@Override
protected CompletableFuture<Void> internalSeek(
Set<Integer> partitions,
PubSubTopic targetTopic,
SeekFunction seekAction) {
if (!versionSwapDetectionThread.isAlive()) {
versionSwapDetectionThread.start();
}
return super.internalSeek(partitions, targetTopic, seekAction);
}

private class VersionSwapDetectionThread extends Thread {
VersionSwapDetectionThread() {
super("Version-Swap-Detection-Thread");
}

@Override
public void run() {
while (!Thread.interrupted()) {
try {
TimeUnit.MILLISECONDS.sleep(versionSwapDetectionIntervalTimeInMs);
} catch (InterruptedException e) {
// We've received an interrupt which is to be expected, so we'll just leave the loop and log
break;
}

// Check the current version of the server
storeRepository.refresh();
int currentVersion = storeRepository.getStore(storeName).getCurrentVersion();

// Check the current ingested version
Set<PubSubTopicPartition> subscriptions = pubSubConsumer.getAssignment();
if (subscriptions.isEmpty()) {
continue;
}
int maxVersion = -1;
for (PubSubTopicPartition topicPartition: subscriptions) {
int version = Version.parseVersionFromVersionTopicName(topicPartition.getPubSubTopic().getName());
if (version >= maxVersion) {
maxVersion = version;
}
}

// Seek to end of push
if (currentVersion != maxVersion) {
// get current subscriptions and seek to endOfPush
Set<Integer> partitions = new HashSet<>();
for (PubSubTopicPartition partitionSubscription: subscriptions) {
partitions.add(partitionSubscription.getPartitionNumber());
}
try {
LOGGER.info(
"New Version detected! Seeking consumer to version: " + currentVersion
+ " in VersionSwaDetectionThread: " + this.getId());
seekToEndOfPush(partitions).get();
} catch (InterruptedException | ExecutionException e) {
LOGGER.error(
"Seek to End of Push Failed for store: " + storeName + " partitions: " + partitions
+ " on VersionSwapDetectionThread: " + this.getId() + "will retry...",
e);
}
}
}
LOGGER.info("VersionSwapDetectionThread thread #" + this.getId() + "interrupted! Shutting down...");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ private String getViewClass(ChangelogClientConfig newStoreChangelogClientConfig,
return viewClass;
}

private PubSubConsumerAdapter getConsumer(Properties consumerProps, String consumerName) {
static PubSubConsumerAdapter getConsumer(Properties consumerProps, String consumerName) {
PubSubMessageDeserializer pubSubMessageDeserializer = new PubSubMessageDeserializer(
new OptimizedKafkaValueSerializer(),
new LandFillObjectPool<>(KafkaMessageEnvelope::new),
Expand Down
Loading

0 comments on commit 49ff76c

Please sign in to comment.