From 49ff76c10d893c4f61eec9d2923170abb04dd086 Mon Sep 17 00:00:00 2001 From: Zac Policzer Date: Mon, 13 Nov 2023 04:45:28 -0800 Subject: [PATCH] [changelog] Tweak after image consumer to not consume view topic (#744) --- .../consumer/ChangelogClientConfig.java | 14 +- ...lBootstrappingVeniceChangelogConsumer.java | 2 +- .../VeniceAfterImageConsumerImpl.java | 197 ++++++++++++++++++ .../VeniceChangelogConsumerClientFactory.java | 2 +- .../consumer/VeniceChangelogConsumerImpl.java | 95 ++++++--- .../VeniceChangelogConsumerImplTest.java | 92 +++++++- .../endToEnd/TestActiveActiveIngestion.java | 80 +++++-- 7 files changed, 420 insertions(+), 62 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/ChangelogClientConfig.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/ChangelogClientConfig.java index 2b2f2f05d2..a048097a6b 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/ChangelogClientConfig.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/ChangelogClientConfig.java @@ -20,6 +20,8 @@ public class ChangelogClientConfig { private String bootstrapFileSystemPath; + private long versionSwapDetectionIntervalTimeInMs = 600000L; + public ChangelogClientConfig(String storeName) { this.innerClientConfig = new ClientConfig<>(storeName); } @@ -131,6 +133,15 @@ public String getBootstrapFileSystemPath() { return this.bootstrapFileSystemPath; } + public long getVersionSwapDetectionIntervalTimeInMs() { + return versionSwapDetectionIntervalTimeInMs; + } + + public ChangelogClientConfig setVersionSwapDetectionIntervalTimeInMs(long intervalTimeInMs) { + this.versionSwapDetectionIntervalTimeInMs = intervalTimeInMs; + return this; + } + public static ChangelogClientConfig cloneConfig(ChangelogClientConfig config) { ChangelogClientConfig newConfig = new ChangelogClientConfig().setStoreName(config.getStoreName()) .setLocalD2ZkHosts(config.getLocalD2ZkHosts()) @@ -142,7 +153,8 @@ public static ChangelogClientConfig cloneConfig(Ch .setControllerD2ServiceName(config.controllerD2ServiceName) .setD2Client(config.getD2Client()) .setControllerRequestRetryCount(config.getControllerRequestRetryCount()) - .setBootstrapFileSystemPath(config.getBootstrapFileSystemPath()); + .setBootstrapFileSystemPath(config.getBootstrapFileSystemPath()) + .setVersionSwapDetectionIntervalTimeInMs(config.getVersionSwapDetectionIntervalTimeInMs()); return newConfig; } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/InternalLocalBootstrappingVeniceChangelogConsumer.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/InternalLocalBootstrappingVeniceChangelogConsumer.java index f22c7542cc..a475e943f1 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/InternalLocalBootstrappingVeniceChangelogConsumer.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/InternalLocalBootstrappingVeniceChangelogConsumer.java @@ -389,7 +389,7 @@ public CompletableFuture seekWithBootStrap(Set partitions) { localCheckpoint = VeniceChangeCoordinate.decodeStringAndConvertToVeniceChangeCoordinate(offsetString); } } catch (IOException | ClassNotFoundException e) { - throw new VeniceException("Failed to decode local hhange capture coordinate chekcpoint with exception: ", e); + throw new VeniceException("Failed to decode local change capture coordinate checkpoint with exception: ", e); } // Where we need to catch up to diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceAfterImageConsumerImpl.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceAfterImageConsumerImpl.java index 2c4bd05711..278302ec15 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceAfterImageConsumerImpl.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceAfterImageConsumerImpl.java @@ -1,17 +1,214 @@ package com.linkedin.davinci.consumer; +import com.linkedin.venice.exceptions.VeniceException; +import com.linkedin.venice.kafka.protocol.ControlMessage; +import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope; +import com.linkedin.venice.kafka.protocol.enums.ControlMessageType; +import com.linkedin.venice.message.KafkaKey; +import com.linkedin.venice.meta.Version; +import com.linkedin.venice.pubsub.adapter.kafka.ApacheKafkaOffsetPosition; import com.linkedin.venice.pubsub.api.PubSubConsumerAdapter; import com.linkedin.venice.pubsub.api.PubSubMessage; +import com.linkedin.venice.pubsub.api.PubSubTopic; +import com.linkedin.venice.pubsub.api.PubSubTopicPartition; +import com.linkedin.venice.utils.lazy.Lazy; import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; public class VeniceAfterImageConsumerImpl extends VeniceChangelogConsumerImpl { + private static final Logger LOGGER = LogManager.getLogger(VeniceAfterImageConsumerImpl.class); + // 10 Minute default + protected long versionSwapDetectionIntervalTimeInMs; + // This consumer is used to find EOP messages without impacting consumption by other subscriptions. It's only used + // in the context of seeking to EOP in the event of the user calling that seek or a version push. + // TODO: We shouldn't use this in the long run. Once the EOP position is queryable from venice and version + // swap is produced to VT, then we should remove this as it's no longer needed. + final private Lazy> internalSeekConsumer; + private Thread versionSwapDetectionThread; + public VeniceAfterImageConsumerImpl(ChangelogClientConfig changelogClientConfig, PubSubConsumerAdapter consumer) { + this( + changelogClientConfig, + consumer, + Lazy.of( + () -> new VeniceChangelogConsumerImpl( + changelogClientConfig, + VeniceChangelogConsumerClientFactory.getConsumer( + changelogClientConfig.getConsumerProperties(), + changelogClientConfig.getStoreName() + "-" + "internal")))); + } + + protected VeniceAfterImageConsumerImpl( + ChangelogClientConfig changelogClientConfig, + PubSubConsumerAdapter consumer, + Lazy> seekConsumer) { super(changelogClientConfig, consumer); + versionSwapDetectionThread = new VersionSwapDetectionThread(); + internalSeekConsumer = seekConsumer; + versionSwapDetectionIntervalTimeInMs = changelogClientConfig.getVersionSwapDetectionIntervalTimeInMs(); } @Override public Collection, VeniceChangeCoordinate>> poll(long timeoutInMs) { + if (!versionSwapDetectionThread.isAlive()) { + versionSwapDetectionThread.start(); + } return internalPoll(timeoutInMs, ""); } + + @Override + public CompletableFuture seekToTimestamps(Map timestamps) { + return internalSeekToTimestamps(timestamps, ""); + } + + @Override + public CompletableFuture seekToTail(Set partitions) { + return internalSeekToTail(partitions, ""); + } + + @Override + public CompletableFuture seekToEndOfPush(Set partitions) { + return CompletableFuture.supplyAsync(() -> { + synchronized (internalSeekConsumer) { + try { + // TODO: This implementation basically just scans the version topic until it finds the EOP message. The + // approach + // we'd like to do is instead add the offset of the EOP message in the VT, and then just seek to that offset. + // We'll do that in a future patch. + internalSeekConsumer.get().subscribe(partitions).get(); + + // We need to get the internal consumer as we have to intercept the control messages that we would normally + // filter out from the user + PubSubConsumerAdapter consumerAdapter = internalSeekConsumer.get().getPubSubConsumer(); + + Map>> polledResults; + Map endOfPushConsumedPerPartitionMap = new HashMap<>(); + Set checkpoints = new HashSet<>(); + + // Initialize map with all false entries for each partition + for (Integer partition: partitions) { + endOfPushConsumedPerPartitionMap.put(partition, false); + } + + // poll until we get EOP for all partitions + while (true) { + polledResults = consumerAdapter.poll(5000L); + // Loop through all polled messages + for (Map.Entry>> entry: polledResults + .entrySet()) { + PubSubTopicPartition pubSubTopicPartition = entry.getKey(); + List> messageList = entry.getValue(); + for (PubSubMessage message: messageList) { + if (message.getKey().isControlMessage()) { + ControlMessage controlMessage = (ControlMessage) message.getValue().getPayloadUnion(); + ControlMessageType controlMessageType = ControlMessageType.valueOf(controlMessage); + if (controlMessageType.equals(ControlMessageType.END_OF_PUSH)) { + // note down the partition and offset and mark that we've got the thing + endOfPushConsumedPerPartitionMap.put(pubSubTopicPartition.getPartitionNumber(), true); + VeniceChangeCoordinate coordinate = new VeniceChangeCoordinate( + pubSubTopicPartition.getPubSubTopic().getName(), + new ApacheKafkaOffsetPosition(message.getOffset()), + pubSubTopicPartition.getPartitionNumber()); + checkpoints.add(coordinate); + Set unsubSet = new HashSet<>(); + unsubSet.add(pubSubTopicPartition.getPartitionNumber()); + internalSeekConsumer.get().unsubscribe(unsubSet); + // No need to look at the rest of the messages for this partition that we might have polled + break; + } + } + } + } + if (endOfPushConsumedPerPartitionMap.values().stream().allMatch(e -> e)) { + // We polled all EOP messages, stop polling! + break; + } + } + this.seekToCheckpoint(checkpoints).get(); + } catch (InterruptedException | ExecutionException e) { + throw new VeniceException( + "Seek to End of Push Failed for store: " + storeName + " partitions: " + partitions.toString(), + e); + } + } + return null; + }); + } + + @Override + protected CompletableFuture internalSeek( + Set partitions, + PubSubTopic targetTopic, + SeekFunction seekAction) { + if (!versionSwapDetectionThread.isAlive()) { + versionSwapDetectionThread.start(); + } + return super.internalSeek(partitions, targetTopic, seekAction); + } + + private class VersionSwapDetectionThread extends Thread { + VersionSwapDetectionThread() { + super("Version-Swap-Detection-Thread"); + } + + @Override + public void run() { + while (!Thread.interrupted()) { + try { + TimeUnit.MILLISECONDS.sleep(versionSwapDetectionIntervalTimeInMs); + } catch (InterruptedException e) { + // We've received an interrupt which is to be expected, so we'll just leave the loop and log + break; + } + + // Check the current version of the server + storeRepository.refresh(); + int currentVersion = storeRepository.getStore(storeName).getCurrentVersion(); + + // Check the current ingested version + Set subscriptions = pubSubConsumer.getAssignment(); + if (subscriptions.isEmpty()) { + continue; + } + int maxVersion = -1; + for (PubSubTopicPartition topicPartition: subscriptions) { + int version = Version.parseVersionFromVersionTopicName(topicPartition.getPubSubTopic().getName()); + if (version >= maxVersion) { + maxVersion = version; + } + } + + // Seek to end of push + if (currentVersion != maxVersion) { + // get current subscriptions and seek to endOfPush + Set partitions = new HashSet<>(); + for (PubSubTopicPartition partitionSubscription: subscriptions) { + partitions.add(partitionSubscription.getPartitionNumber()); + } + try { + LOGGER.info( + "New Version detected! Seeking consumer to version: " + currentVersion + + " in VersionSwaDetectionThread: " + this.getId()); + seekToEndOfPush(partitions).get(); + } catch (InterruptedException | ExecutionException e) { + LOGGER.error( + "Seek to End of Push Failed for store: " + storeName + " partitions: " + partitions + + " on VersionSwapDetectionThread: " + this.getId() + "will retry...", + e); + } + } + } + LOGGER.info("VersionSwapDetectionThread thread #" + this.getId() + "interrupted! Shutting down..."); + } + } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerClientFactory.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerClientFactory.java index 252e092d78..dd8c6c7b0c 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerClientFactory.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerClientFactory.java @@ -134,7 +134,7 @@ private String getViewClass(ChangelogClientConfig newStoreChangelogClientConfig, return viewClass; } - private PubSubConsumerAdapter getConsumer(Properties consumerProps, String consumerName) { + static PubSubConsumerAdapter getConsumer(Properties consumerProps, String consumerName) { PubSubMessageDeserializer pubSubMessageDeserializer = new PubSubMessageDeserializer( new OptimizedKafkaValueSerializer(), new LandFillObjectPool<>(KafkaMessageEnvelope::new), diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImpl.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImpl.java index ca77001d21..d44fabe8bb 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImpl.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImpl.java @@ -20,6 +20,7 @@ import com.linkedin.venice.controllerapi.StoreResponse; import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.kafka.protocol.ControlMessage; +import com.linkedin.venice.kafka.protocol.Delete; import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope; import com.linkedin.venice.kafka.protocol.Put; import com.linkedin.venice.kafka.protocol.VersionSwap; @@ -204,6 +205,10 @@ public CompletableFuture internalSubscribe(Set partitions, PubSub getPartitionListToSubscribe(partitions, Collections.EMPTY_SET, topicToSubscribe); for (PubSubTopicPartition topicPartition: topicPartitionList) { + // TODO: we do this because we don't populate the compressor into the change capture view topic, so we + // take this opportunity to populate it. This could be worth revisiting by either populating the compressor + // into view topics and consuming, or, expanding the interface to this function to have a compressor provider + // (and thereby let other view implementations figure out what would be right). if (!topicPartition.getPubSubTopic().getName().endsWith(ChangeCaptureView.CHANGE_CAPTURE_TOPIC_SUFFIX)) { compressorMap.put(topicPartition.getPartitionNumber(), getVersionCompressor(topicPartition)); } @@ -316,9 +321,12 @@ public CompletableFuture seekToEndOfPush() { @Override public CompletableFuture seekToTail(Set partitions) { + return internalSeekToTail(partitions, ChangeCaptureView.CHANGE_CAPTURE_TOPIC_SUFFIX); + } + + public CompletableFuture internalSeekToTail(Set partitions, String topicSuffix) { // Get the latest change capture topic - PubSubTopic topic = - pubSubTopicRepository.getTopic(getCurrentServingVersionTopic() + ChangeCaptureView.CHANGE_CAPTURE_TOPIC_SUFFIX); + PubSubTopic topic = pubSubTopicRepository.getTopic(getCurrentServingVersionTopic() + topicSuffix); return internalSeek(partitions, topic, p -> { Long partitionEndOffset = pubSubConsumer.endOffset(p); pubSubConsumerSeek(p, partitionEndOffset); @@ -376,12 +384,15 @@ public CompletableFuture subscribeAll() { @Override public CompletableFuture seekToTimestamps(Map timestamps) { + return internalSeekToTimestamps(timestamps, ChangeCaptureView.CHANGE_CAPTURE_TOPIC_SUFFIX); + } + + public CompletableFuture internalSeekToTimestamps(Map timestamps, String topicSuffix) { // Get the latest change capture topic storeRepository.refresh(); Store store = storeRepository.getStore(storeName); int currentVersion = store.getCurrentVersion(); - String topicName = - Version.composeKafkaTopic(storeName, currentVersion) + ChangeCaptureView.CHANGE_CAPTURE_TOPIC_SUFFIX; + String topicName = Version.composeKafkaTopic(storeName, currentVersion) + topicSuffix; PubSubTopic topic = pubSubTopicRepository.getTopic(topicName); Map topicPartitionLongMap = new HashMap<>(); for (Map.Entry timestampPair: timestamps.entrySet()) { @@ -408,7 +419,7 @@ public CompletableFuture seekToTimestamp(Long timestamp) { return this.seekToTimestamps(partitionsToSeek); } - public CompletableFuture internalSeek( + protected CompletableFuture internalSeek( Set partitions, PubSubTopic targetTopic, SeekFunction seekAction) { @@ -531,25 +542,11 @@ protected boolean handleControlMessage( Version.parseVersionFromKafkaTopicName(pubSubTopicPartition.getPubSubTopic().getName()), storeName); // Jump to next topic - // TODO: Today we don't publish the version swap message to the version topic. This necessitates relying on the - // change capture topic in order to navigate version pushes. We should pass the topicSuffix argument here once - // that - // support lands. - switchToNewTopic( - pubSubTopicPartition.getPubSubTopic(), - ChangeCaptureView.CHANGE_CAPTURE_TOPIC_SUFFIX, - pubSubTopicPartition.getPartitionNumber()); + switchToNewTopic(pubSubTopicPartition.getPubSubTopic(), topicSuffix, pubSubTopicPartition.getPartitionNumber()); return true; } if (controlMessageType.equals(ControlMessageType.VERSION_SWAP)) { - // TODO: Today we don't publish the version swap message to the version topic. This necessitates relying on the - // change capture topic in order to navigate version pushes. We should pass the topicSuffix argument here once - // that - // support lands. - return handleVersionSwapControlMessage( - controlMessage, - pubSubTopicPartition, - ChangeCaptureView.CHANGE_CAPTURE_TOPIC_SUFFIX); + return handleVersionSwapControlMessage(controlMessage, pubSubTopicPartition, topicSuffix); } return false; } @@ -663,12 +660,31 @@ protected Optional, VeniceChangeCoordinate>> con byte[] keyBytes = message.getKey().getKey(); MessageType messageType = MessageType.valueOf(message.getValue()); RecordChangeEvent recordChangeEvent; - // Internal store ingestion tasks only persist PUT messages to either VT or view topics + Object assembledObject = null; + List replicationCheckpoint = null; + if (messageType.equals(MessageType.DELETE)) { + Delete delete = (Delete) message.getValue().payloadUnion; + + // Deletes have a previous and current value of null. So just fill it in! + ChangeEvent changeEvent = new ChangeEvent<>(null, null); + pubSubChangeEventMessage = Optional.of( + new ImmutableChangeCapturePubSubMessage<>( + keyDeserializer.deserialize(keyBytes), + changeEvent, + pubSubTopicPartition, + message.getOffset(), + message.getPubSubMessageTime(), + message.getPayloadSize(), + false)); + + replicationCheckpoint = extractOffsetVectorFromMessage( + delete.getReplicationMetadataVersionId(), + delete.getReplicationMetadataPayload()); + } if (messageType.equals(MessageType.PUT)) { Put put = (Put) message.getValue().payloadUnion; // Select appropriate deserializers Lazy deserializerProvider; - Object assembledObject = null; AbstractAvroChunkingAdapter chunkingAdapter; int readerSchemaId; ReadOnlySchemaRepository schemaRepo; @@ -702,14 +718,20 @@ protected Optional, VeniceChangeCoordinate>> con return Optional.empty(); } + if (assembledObject instanceof RecordChangeEvent) { + recordChangeEvent = (RecordChangeEvent) assembledObject; + replicationCheckpoint = recordChangeEvent.replicationCheckpointVector; + } else { + replicationCheckpoint = + extractOffsetVectorFromMessage(put.getReplicationMetadataVersionId(), put.getReplicationMetadataPayload()); + } + // Now that we've assembled the object, we need to extract the replication vector depending on if it's from VT // or from the record change event. Records from VT 'typically' don't have an offset vector, but they will in // repush scenarios (which we want to be opaque to the user and filter accordingly). - List replicationCheckpoint; int payloadSize = message.getPayloadSize(); if (assembledObject instanceof RecordChangeEvent) { recordChangeEvent = (RecordChangeEvent) assembledObject; - replicationCheckpoint = recordChangeEvent.replicationCheckpointVector; pubSubChangeEventMessage = Optional.of( convertChangeEventToPubSubMessage( recordChangeEvent, @@ -719,8 +741,6 @@ protected Optional, VeniceChangeCoordinate>> con message.getPubSubMessageTime(), payloadSize)); } else { - replicationCheckpoint = - extractOffsetVectorFromMessage(put.getReplicationMetadataVersionId(), put.getReplicationMetadataPayload()); ChangeEvent changeEvent = new ChangeEvent<>(null, (V) assembledObject); pubSubChangeEventMessage = Optional.of( new ImmutableChangeCapturePubSubMessage<>( @@ -732,12 +752,13 @@ protected Optional, VeniceChangeCoordinate>> con payloadSize, false)); } + } - // Determine if the event should be filtered or not - if (filterRecordByVersionSwapHighWatermarks(replicationCheckpoint, pubSubTopicPartition)) { - pubSubChangeEventMessage = Optional.empty(); - } + // Determine if the event should be filtered or not + if (filterRecordByVersionSwapHighWatermarks(replicationCheckpoint, pubSubTopicPartition)) { + return Optional.empty(); } + return pubSubChangeEventMessage; } @@ -852,6 +873,14 @@ private boolean filterRecordByVersionSwapHighWatermarks( protected void switchToNewTopic(PubSubTopic newTopic, String topicSuffix, Integer partition) { PubSubTopic mergedTopicName = pubSubTopicRepository.getTopic(newTopic.getName() + topicSuffix); Set partitions = Collections.singleton(partition); + for (PubSubTopicPartition currentSubscribedPartition: pubSubConsumer.getAssignment()) { + if (partition.equals(currentSubscribedPartition.getPartitionNumber())) { + if (mergedTopicName.getName().equals(currentSubscribedPartition.getPubSubTopic().getName())) { + // We're being asked to switch to a topic that we're already subscribed to, NoOp this + return; + } + } + } unsubscribe(partitions); try { internalSubscribe(partitions, mergedTopicName).get(); @@ -903,4 +932,8 @@ protected PubSubTopicPartition getTopicPartition(Integer partition) { } return topicPartition.get(); } + + protected PubSubConsumerAdapter getPubSubConsumer() { + return pubSubConsumer; + } } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImplTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImplTest.java index 41dfc130b8..95695e9b9a 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImplTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImplTest.java @@ -44,6 +44,7 @@ import com.linkedin.venice.serializer.FastSerializerDeserializerFactory; import com.linkedin.venice.serializer.RecordSerializer; import com.linkedin.venice.utils.Utils; +import com.linkedin.venice.utils.lazy.Lazy; import com.linkedin.venice.views.ChangeCaptureView; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -54,6 +55,8 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; @@ -114,7 +117,8 @@ public void testConsumeBeforeAndAfterImage() throws ExecutionException, Interrup oldChangeCaptureTopic, partition, oldVersionTopic, - newVersionTopic); + newVersionTopic, + false); ChangelogClientConfig changelogClientConfig = new ChangelogClientConfig<>().setD2ControllerClient(d2ControllerClient) .setSchemaReader(schemaReader) @@ -138,8 +142,6 @@ public void testConsumeBeforeAndAfterImage() throws ExecutionException, Interrup verify(mockPubSubConsumer).subscribe(oldVersionTopicPartition, OffsetRecord.LOWEST_OFFSET); veniceChangelogConsumer.subscribe(new HashSet<>(Arrays.asList(0))).get(); - veniceChangelogConsumer.seekToEndOfPush(); - verify(mockPubSubConsumer, times(2)).subscribe(oldVersionTopicPartition, OffsetRecord.LOWEST_OFFSET); List, VeniceChangeCoordinate>> pubSubMessages = (List, VeniceChangeCoordinate>>) veniceChangelogConsumer.poll(100); @@ -165,6 +167,61 @@ public void testConsumeBeforeAndAfterImage() throws ExecutionException, Interrup verify(mockPubSubConsumer).resume(any()); } + @Test + public void testAfterImageConsumerSeek() throws ExecutionException, InterruptedException { + D2ControllerClient d2ControllerClient = mock(D2ControllerClient.class); + StoreResponse storeResponse = mock(StoreResponse.class); + StoreInfo storeInfo = mock(StoreInfo.class); + doReturn(1).when(storeInfo).getCurrentVersion(); + doReturn(2).when(storeInfo).getPartitionCount(); + doReturn(storeInfo).when(storeResponse).getStore(); + doReturn(storeResponse).when(d2ControllerClient).getStore(storeName); + MultiSchemaResponse multiRMDSchemaResponse = mock(MultiSchemaResponse.class); + MultiSchemaResponse.Schema rmdSchemaFromMultiSchemaResponse = mock(MultiSchemaResponse.Schema.class); + doReturn(rmdSchema.toString()).when(rmdSchemaFromMultiSchemaResponse).getSchemaStr(); + doReturn(new MultiSchemaResponse.Schema[] { rmdSchemaFromMultiSchemaResponse }).when(multiRMDSchemaResponse) + .getSchemas(); + doReturn(multiRMDSchemaResponse).when(d2ControllerClient).getAllReplicationMetadataSchemas(storeName); + + PubSubConsumerAdapter mockPubSubConsumer = mock(PubSubConsumerAdapter.class); + PubSubTopic oldVersionTopic = pubSubTopicRepository.getTopic(Version.composeKafkaTopic(storeName, 1)); + + prepareVersionTopicRecordsToBePolled(0L, 5L, mockPubSubConsumer, oldVersionTopic, 0, true); + ChangelogClientConfig changelogClientConfig = + new ChangelogClientConfig<>().setD2ControllerClient(d2ControllerClient) + .setSchemaReader(schemaReader) + .setStoreName(storeName) + .setViewName(""); + + VeniceChangelogConsumerImpl mockInternalSeekConsumer = Mockito.mock(VeniceChangelogConsumerImpl.class); + Mockito.when(mockInternalSeekConsumer.subscribe(any())).thenReturn(CompletableFuture.completedFuture(null)); + Mockito.when(mockInternalSeekConsumer.getPubSubConsumer()).thenReturn(mockPubSubConsumer); + prepareChangeCaptureRecordsToBePolled(0L, 10L, mockPubSubConsumer, oldVersionTopic, 0, oldVersionTopic, null, true); + VeniceAfterImageConsumerImpl veniceChangelogConsumer = new VeniceAfterImageConsumerImpl<>( + changelogClientConfig, + mockPubSubConsumer, + Lazy.of(() -> mockInternalSeekConsumer)); + veniceChangelogConsumer.versionSwapDetectionIntervalTimeInMs = 1; + ThinClientMetaStoreBasedRepository mockRepository = mock(ThinClientMetaStoreBasedRepository.class); + Store store = mock(Store.class); + Version mockVersion = new VersionImpl(storeName, 1, "foo"); + Mockito.when(store.getCurrentVersion()).thenReturn(1); + Mockito.when(store.getCompressionStrategy()).thenReturn(CompressionStrategy.NO_OP); + Mockito.when(mockRepository.getStore(anyString())).thenReturn(store); + Mockito.when(store.getVersion(Mockito.anyInt())).thenReturn(Optional.of(mockVersion)); + veniceChangelogConsumer.setStoreRepository(mockRepository); + veniceChangelogConsumer.subscribe(new HashSet<>(Arrays.asList(0))).get(); + + Set partitionSet = new HashSet<>(); + partitionSet.add(0); + veniceChangelogConsumer.seekToEndOfPush(partitionSet).get(); + + Mockito.verify(mockInternalSeekConsumer).subscribe(partitionSet); + Mockito.verify(mockInternalSeekConsumer).unsubscribe(partitionSet); + PubSubTopicPartition pubSubTopicPartition = new PubSubTopicPartitionImpl(oldVersionTopic, 0); + Mockito.verify(mockPubSubConsumer).subscribe(pubSubTopicPartition, 10); + } + @Test public void testConsumeAfterImage() throws ExecutionException, InterruptedException { D2ControllerClient d2ControllerClient = mock(D2ControllerClient.class); @@ -212,9 +269,15 @@ public void testConsumeAfterImage() throws ExecutionException, InterruptedExcept Utf8 messageStr = pubSubMessage.getValue().getCurrentValue(); Assert.assertEquals(messageStr.toString(), "newValue" + i); } - // Verify version swap from version topic to its corresponding change capture topic happened. - verify(mockPubSubConsumer).subscribe(new PubSubTopicPartitionImpl(oldVersionTopic, 0), OffsetRecord.LOWEST_OFFSET); - prepareChangeCaptureRecordsToBePolled(0L, 10L, mockPubSubConsumer, oldChangeCaptureTopic, 0, oldVersionTopic, null); + prepareChangeCaptureRecordsToBePolled( + 0L, + 10L, + mockPubSubConsumer, + oldChangeCaptureTopic, + 0, + oldVersionTopic, + null, + false); pubSubMessages = (List, VeniceChangeCoordinate>>) veniceChangelogConsumer.poll(100); Assert.assertFalse(pubSubMessages.isEmpty()); @@ -237,7 +300,8 @@ private void prepareChangeCaptureRecordsToBePolled( PubSubTopic changeCaptureTopic, int partition, PubSubTopic oldVersionTopic, - PubSubTopic newVersionTopic) { + PubSubTopic newVersionTopic, + boolean addEndOfPushMessage) { List> pubSubMessageList = new ArrayList<>(); // Add a start of push message @@ -255,6 +319,11 @@ private void prepareChangeCaptureRecordsToBePolled( Arrays.asList(i, i)); pubSubMessageList.add(pubSubMessage); } + + if (addEndOfPushMessage) { + pubSubMessageList.add(constructEndOfPushMessage(changeCaptureTopic, partition, endIdx + 1)); + } + if (newVersionTopic != null) { pubSubMessageList.add( constructVersionSwapMessage( @@ -266,7 +335,7 @@ private void prepareChangeCaptureRecordsToBePolled( } PubSubTopicPartition topicPartition = new PubSubTopicPartitionImpl(changeCaptureTopic, partition); pubSubMessagesMap.put(topicPartition, pubSubMessageList); - doReturn(pubSubMessagesMap).when(pubSubConsumer).poll(100); + doReturn(pubSubMessagesMap).when(pubSubConsumer).poll(Mockito.anyLong()); } private void prepareVersionTopicRecordsToBePolled( @@ -285,7 +354,7 @@ private void prepareVersionTopicRecordsToBePolled( consumerRecordList.add(pubSubMessage); } if (prepareEndOfPush) { - consumerRecordList.add(constructEndOfPushMessage(versionTopic, partition)); + consumerRecordList.add(constructEndOfPushMessage(versionTopic, partition, 0L)); } PubSubTopicPartition pubSubTopicPartition = new PubSubTopicPartitionImpl(versionTopic, partition); consumerRecordsMap.put(pubSubTopicPartition, consumerRecordList); @@ -367,7 +436,8 @@ private PubSubMessage constructConsumerRec private PubSubMessage constructEndOfPushMessage( PubSubTopic versionTopic, - int partition) { + int partition, + Long offset) { KafkaKey kafkaKey = new KafkaKey(MessageType.CONTROL_MESSAGE, null); EndOfPush endOfPush = new EndOfPush(); KafkaMessageEnvelope kafkaMessageEnvelope = new KafkaMessageEnvelope(); @@ -376,7 +446,7 @@ private PubSubMessage constructEndOfPushMe controlMessage.controlMessageType = ControlMessageType.END_OF_PUSH.getValue(); kafkaMessageEnvelope.payloadUnion = controlMessage; PubSubTopicPartition pubSubTopicPartition = new PubSubTopicPartitionImpl(versionTopic, partition); - return new ImmutablePubSubMessage<>(kafkaKey, kafkaMessageEnvelope, pubSubTopicPartition, 0, 0, 0); + return new ImmutablePubSubMessage<>(kafkaKey, kafkaMessageEnvelope, pubSubTopicPartition, offset, 0, 0); } private PubSubMessage constructStartOfPushMessage( diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestActiveActiveIngestion.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestActiveActiveIngestion.java index eff285c97f..dee38e4274 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestActiveActiveIngestion.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestActiveActiveIngestion.java @@ -33,6 +33,7 @@ import com.linkedin.davinci.consumer.BootstrappingVeniceChangelogConsumer; import com.linkedin.davinci.consumer.ChangeEvent; import com.linkedin.davinci.consumer.ChangelogClientConfig; +import com.linkedin.davinci.consumer.VeniceAfterImageConsumerImpl; import com.linkedin.davinci.consumer.VeniceChangeCoordinate; import com.linkedin.davinci.consumer.VeniceChangelogConsumer; import com.linkedin.davinci.consumer.VeniceChangelogConsumerClientFactory; @@ -209,11 +210,11 @@ private void pollChangeEventsFromChangeCaptureConsumer2( } } - /* private int pollAfterImageEventsFromChangeCaptureConsumer( + private int pollAfterImageEventsFromChangeCaptureConsumer( Map polledChangeEvents, VeniceChangelogConsumer veniceChangelogConsumer) { int polledMessagesNum = 0; - Collection, Long>> pubSubMessages = veniceChangelogConsumer.poll(100); + Collection, Long>> pubSubMessages = veniceChangelogConsumer.poll(1000); for (PubSubMessage, Long> pubSubMessage: pubSubMessages) { Utf8 afterImageEvent = pubSubMessage.getValue().getCurrentValue(); String key = pubSubMessage.getKey().toString(); @@ -221,7 +222,7 @@ private void pollChangeEventsFromChangeCaptureConsumer2( polledMessagesNum++; } return polledMessagesNum; - }*/ + } @Test(timeOut = TEST_TIMEOUT, dataProviderClass = DataProviderUtils.class) public void testLeaderLagWithIgnoredData() throws Exception { @@ -554,6 +555,7 @@ public void testActiveActiveStoreRestart() throws Exception { @Test(timeOut = TEST_TIMEOUT, priority = 3) public void testAAIngestionWithStoreView() throws Exception { + // Set up the store Long timestamp = System.currentTimeMillis(); ControllerClient childControllerClient = new ControllerClient(clusterName, childDatacenters.get(0).getControllerConnectString()); @@ -614,7 +616,10 @@ public void testAAIngestionWithStoreView() throws Exception { Assert.assertEquals(viewConfigMap.get("changeCaptureView").getViewParameters().size(), 1); }); + // Write Records to the store for version v1, the push job will contain 100 records. TestWriteUtils.runPushJob("Run push job", props); + + // Write Records from nearline Map samzaConfig = getSamzaConfig(storeName); VeniceSystemFactory factory = new VeniceSystemFactory(); // Use a unique key for DELETE with RMD validation @@ -635,14 +640,25 @@ public void testAAIngestionWithStoreView() throws Exception { .setControllerD2ServiceName(D2_SERVICE_NAME) .setD2ServiceName(VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME) .setLocalD2ZkHosts(localZkServer.getAddress()) - .setControllerRequestRetryCount(3); + .setControllerRequestRetryCount(3) + .setVersionSwapDetectionIntervalTimeInMs(10L); VeniceChangelogConsumerClientFactory veniceChangelogConsumerClientFactory = new VeniceChangelogConsumerClientFactory(globalChangelogClientConfig, metricsRepository); - // ChangelogClientConfig globalAfterImageClientConfig = - // ChangelogClientConfig.cloneConfig(globalChangelogClientConfig).setViewName(""); - // VeniceChangelogConsumerClientFactory veniceAfterImageConsumerClientFactory = - // new VeniceChangelogConsumerClientFactory(globalAfterImageClientConfig, metricsRepository); + ChangelogClientConfig globalAfterImageClientConfig = + ChangelogClientConfig.cloneConfig(globalChangelogClientConfig).setViewName(""); + VeniceChangelogConsumerClientFactory veniceAfterImageConsumerClientFactory = + new VeniceChangelogConsumerClientFactory(globalAfterImageClientConfig, metricsRepository); + + VeniceChangelogConsumer versionTopicConsumer = + veniceAfterImageConsumerClientFactory.getChangelogConsumer(storeName); + Assert.assertTrue(versionTopicConsumer instanceof VeniceAfterImageConsumerImpl); + versionTopicConsumer.subscribeAll().get(); + + // Let's consume those 100 records off of version 1 + Map versionTopicEvents = new HashMap<>(); + pollAfterImageEventsFromChangeCaptureConsumer(versionTopicEvents, versionTopicConsumer); + Assert.assertEquals(versionTopicEvents.size(), 100); VeniceChangelogConsumer veniceChangelogConsumer = veniceChangelogConsumerClientFactory.getChangelogConsumer(storeName); @@ -700,7 +716,14 @@ public void testAAIngestionWithStoreView() throws Exception { Assert.assertNull(changeEvent.getCurrentValue()); } }); - // run repush + + versionTopicEvents.clear(); + TestUtils.waitForNonDeterministicAssertion(10, TimeUnit.SECONDS, true, () -> { + pollAfterImageEventsFromChangeCaptureConsumer(versionTopicEvents, versionTopicConsumer); + Assert.assertEquals(versionTopicEvents.size(), 21); + }); + + // run repush. Repush will reapply all existing events to the new store and trim all events from the RT props.setProperty(SOURCE_KAFKA, "true"); props.setProperty(KAFKA_INPUT_BROKER_URL, clusterWrapper.getPubSubBrokerWrapper().getAddress()); props.setProperty(KAFKA_INPUT_MAX_RECORDS_PER_MAPPER, "5"); @@ -763,10 +786,7 @@ public void testAAIngestionWithStoreView() throws Exception { Assert.assertNull(client.get(Integer.toString(deleteWithRmdKeyIndex)).get()); }); } - // TODO disabling verification of veniceAfterImageConsumer until its behavior is defined/fixed. - // VeniceChangelogConsumer veniceAfterImageConsumer = - // veniceAfterImageConsumerClientFactory.getChangelogConsumer(storeName); - // veniceAfterImageConsumer.subscribeAll().get(); + // Validate changed events for version 2. allChangeEvents.putAll(polledChangeEvents); polledChangeEvents.clear(); @@ -785,6 +805,7 @@ public void testAAIngestionWithStoreView() throws Exception { polledChangeEvents.get(persistWithRmdKey).getValue().getCurrentValue().toString(), "stream_" + persistWithRmdKey); }); + /** * Test Repush with TTL */ @@ -858,7 +879,8 @@ public void testAAIngestionWithStoreView() throws Exception { // } // Drain the remaining events on version 3 and verify that we got everything. We don't verify the count - // because at this stage, the total events which will get polled + // because at this stage, the total events which will get polled will be determined by how far back the rewind + // managed to get (and test run duration might be variable) TestUtils.waitForNonDeterministicAssertion(10, TimeUnit.SECONDS, true, () -> { pollChangeEventsFromChangeCaptureConsumer(polledChangeEvents, veniceChangelogConsumer); pollChangeEventsFromChangeCaptureConsumer(polledChangeEvents, veniceChangelogConsumer); @@ -947,7 +969,7 @@ public void testAAIngestionWithStoreView() throws Exception { veniceChangelogConsumer.seekToBeginningOfPush().join(); TestUtils.waitForNonDeterministicAssertion(5, TimeUnit.SECONDS, () -> { pollChangeEventsFromChangeCaptureConsumer(polledChangeEvents, veniceChangelogConsumer); - Assert.assertEquals(polledChangeEvents.size(), 15); + Assert.assertEquals(polledChangeEvents.size(), 30); }); // Save a checkpoint and clear the map @@ -964,7 +986,8 @@ public void testAAIngestionWithStoreView() throws Exception { // Poll Change events again, verify we get everything TestUtils.waitForNonDeterministicAssertion(5, TimeUnit.SECONDS, () -> { pollChangeEventsFromChangeCaptureConsumer(polledChangeEvents, veniceChangelogConsumer); - Assert.assertEquals(polledChangeEvents.size(), 8); + // Repush with TTL will include delete events in the topic + Assert.assertEquals(polledChangeEvents.size(), 16); }); allChangeEvents.putAll(polledChangeEvents); polledChangeEvents.clear(); @@ -992,6 +1015,29 @@ public void testAAIngestionWithStoreView() throws Exception { Assert.assertEquals(polledChangeEvents.size(), 0); }); + versionTopicEvents.clear(); + TestUtils.waitForNonDeterministicAssertion(10, TimeUnit.SECONDS, true, () -> { + pollAfterImageEventsFromChangeCaptureConsumer(versionTopicEvents, versionTopicConsumer); + // At this point, the consumer should have auto tracked to version 4, and since we didn't apply any nearline + // writes to version 4, there should be no events to consume at this point + Assert.assertEquals(versionTopicEvents.size(), 0); + }); + + versionTopicConsumer.seekToEndOfPush().get(); + TestUtils.waitForNonDeterministicAssertion(10, TimeUnit.SECONDS, true, () -> { + pollAfterImageEventsFromChangeCaptureConsumer(versionTopicEvents, versionTopicConsumer); + // Again, no events to consume here. + Assert.assertEquals(versionTopicEvents.size(), 0); + }); + + versionTopicConsumer.seekToBeginningOfPush().get(); + TestUtils.waitForNonDeterministicAssertion(10, TimeUnit.SECONDS, true, () -> { + pollAfterImageEventsFromChangeCaptureConsumer(versionTopicEvents, versionTopicConsumer); + // Reconsuming the events from the version topic, which at this point should just contain the same 16 + // events we consumed with the before/after image consumer ealier. + Assert.assertEquals(versionTopicEvents.size(), 16); + }); + // Verify version swap count matches with version count - 1 (since we don't transmit from version 0 to version 1). // This will include messages for all partitions, so (4 version -1)*3 partitions=9 messages TestUtils.waitForNonDeterministicAssertion( @@ -1002,7 +1048,7 @@ public void testAAIngestionWithStoreView() throws Exception { // are // applied to a version) TestUtils.waitForNonDeterministicAssertion( - 5, + 8, TimeUnit.SECONDS, () -> Assert.assertEquals(TestView.getInstance().getRecordCountForStore(storeName), 86)); parentControllerClient.disableAndDeleteStore(storeName);