Skip to content

Commit

Permalink
[changelog] Add isCaughtUp() API to VeniceChangelogConsumer interface (
Browse files Browse the repository at this point in the history
…linkedin#1016)

A simpler solution is just to provide an API that user can call to check whether all subscriptions have caught up.
Add specific record support for regular consumer.
  • Loading branch information
sixpluszero authored Jun 5, 2024
1 parent d678216 commit 53767c0
Show file tree
Hide file tree
Showing 5 changed files with 200 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,13 @@ public interface VeniceChangelogConsumer<K, V> {
*/
Collection<PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate>> poll(long timeoutInMs);

/**
* Checks whether all subscribed partitions are caught up during bootstrap. If a partition's (currentTimestamp - latestMessageTimestamp)
* is smaller or equal to 1 min, we consider this partition is caught up.
* @return True if all subscribed partitions have caught up.
*/
boolean isCaughtUp();

/**
* Release the internal resources.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,18 @@ public <K, V> VeniceChangelogConsumer<K, V> getChangelogConsumer(String storeNam
return getChangelogConsumer(storeName, null);
}

public <K, V> VeniceChangelogConsumer<K, V> getChangelogConsumer(String storeName, String consumerId) {
return getChangelogConsumer(storeName, consumerId, null);
}

/**
* Creates a VeniceChangelogConsumer with consumer id. This is used to create multiple consumers so that
* each consumer can only subscribe to certain partitions. Multiple such consumers can work in parallel.
*/
public <K, V> VeniceChangelogConsumer<K, V> getChangelogConsumer(String storeName, String consumerId) {
public <K, V> VeniceChangelogConsumer<K, V> getChangelogConsumer(String storeName, String consumerId, Class clazz) {
return storeClientMap.computeIfAbsent(suffixConsumerIdToStore(storeName, consumerId), name -> {
ChangelogClientConfig newStoreChangelogClientConfig = getNewStoreChangelogClientConfig(storeName);
ChangelogClientConfig newStoreChangelogClientConfig =
getNewStoreChangelogClientConfig(storeName).setSpecificValue(clazz);
newStoreChangelogClientConfig.setConsumerName(name);
String viewClass = getViewClass(newStoreChangelogClientConfig, storeName);
String consumerName = suffixConsumerIdToStore(storeName + "-" + viewClass.getClass().getSimpleName(), consumerId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ public class VeniceChangelogConsumerImpl<K, V> implements VeniceChangelogConsume

protected final Map<Integer, AtomicLong> partitionToPutMessageCount = new VeniceConcurrentHashMap<>();
protected final Map<Integer, AtomicLong> partitionToDeleteMessageCount = new VeniceConcurrentHashMap<>();
protected final Map<Integer, Boolean> partitionToBootstrapState = new VeniceConcurrentHashMap<>();
protected final long startTimestamp;

protected final RecordDeserializer<K> keyDeserializer;
private final D2ControllerClient d2ControllerClient;
Expand Down Expand Up @@ -150,7 +152,8 @@ public VeniceChangelogConsumerImpl(
Schema keySchema = schemaReader.getKeySchema();
this.keyDeserializer = FastSerializerDeserializerFactory.getFastAvroGenericDeserializer(keySchema, keySchema);
this.chunkAssembler = new ChunkAssembler(storeName);

this.startTimestamp = System.currentTimeMillis();
LOGGER.info("VeniceChangelogConsumer created at timestamp: {}", startTimestamp);
this.storeRepository = new ThinClientMetaStoreBasedRepository(
changelogClientConfig.getInnerClientConfig(),
VeniceProperties.empty(),
Expand Down Expand Up @@ -185,6 +188,9 @@ public int getPartitionCount() {

@Override
public CompletableFuture<Void> subscribe(Set<Integer> partitions) {
for (int partition: partitions) {
getPartitionToBootstrapState().put(partition, false);
}
return internalSubscribe(partitions, null);
}

Expand Down Expand Up @@ -437,6 +443,15 @@ public CompletableFuture<Void> seekToTimestamp(Long timestamp) {
return this.seekToTimestamps(partitionsToSeek);
}

@Override
public boolean isCaughtUp() {
return getPartitionToBootstrapState().values().stream().allMatch(x -> x);
}

Map<Integer, Boolean> getPartitionToBootstrapState() {
return partitionToBootstrapState;
}

protected CompletableFuture<Void> internalSeek(
Set<Integer> partitions,
PubSubTopic targetTopic,
Expand Down Expand Up @@ -528,6 +543,7 @@ protected Collection<PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate>> i
PubSubTopicPartition pubSubTopicPartition = entry.getKey();
List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>> messageList = entry.getValue();
for (PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> message: messageList) {
maybeUpdatePartitionToBootstrapMap(message, pubSubTopicPartition);
if (message.getKey().isControlMessage()) {
ControlMessage controlMessage = (ControlMessage) message.getValue().getPayloadUnion();
if (handleControlMessage(
Expand Down Expand Up @@ -563,6 +579,15 @@ protected Collection<PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate>> i
return pubSubMessages;
}

void maybeUpdatePartitionToBootstrapMap(
PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> message,
PubSubTopicPartition pubSubTopicPartition) {
if (System.currentTimeMillis() - message.getValue().producerMetadata.messageTimestamp <= TimeUnit.MINUTES
.toMillis(1)) {
getPartitionToBootstrapState().put(pubSubTopicPartition.getPartitionNumber(), true);
}
}

protected Collection<PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate>> internalPoll(
long timeoutInMs,
String topicSuffix) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package com.linkedin.davinci.consumer;

import static com.linkedin.venice.kafka.protocol.enums.ControlMessageType.START_OF_SEGMENT;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -45,6 +47,7 @@
import com.linkedin.venice.serializer.FastSerializerDeserializerFactory;
import com.linkedin.venice.serializer.RecordSerializer;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import com.linkedin.venice.utils.lazy.Lazy;
import com.linkedin.venice.views.ChangeCaptureView;
import java.nio.ByteBuffer;
Expand All @@ -58,6 +61,7 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
Expand Down Expand Up @@ -226,6 +230,32 @@ public void testAfterImageConsumerSeek() throws ExecutionException, InterruptedE
Mockito.verify(mockPubSubConsumer).subscribe(pubSubTopicPartition, 10);
}

@Test
public void testBootstrapState() {
VeniceChangelogConsumerImpl veniceChangelogConsumer = mock(VeniceChangelogConsumerImpl.class);
Map<Integer, Boolean> bootstrapStateMap = new VeniceConcurrentHashMap<>();
bootstrapStateMap.put(0, false);
doReturn(bootstrapStateMap).when(veniceChangelogConsumer).getPartitionToBootstrapState();
doCallRealMethod().when(veniceChangelogConsumer).maybeUpdatePartitionToBootstrapMap(any(), any());
long currentTimestamp = System.currentTimeMillis();
PubSubTopicRepository topicRepository = new PubSubTopicRepository();
PubSubTopicPartition pubSubTopicPartition = new PubSubTopicPartitionImpl(topicRepository.getTopic("foo_v1"), 0);
ControlMessage controlMessage = new ControlMessage();
controlMessage.controlMessageType = START_OF_SEGMENT.getValue();
KafkaMessageEnvelope kafkaMessageEnvelope = new KafkaMessageEnvelope();
kafkaMessageEnvelope.producerMetadata = new ProducerMetadata();
kafkaMessageEnvelope.producerMetadata.messageTimestamp = currentTimestamp - TimeUnit.MINUTES.toMillis(2);
kafkaMessageEnvelope.payloadUnion = controlMessage;
PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> message =
new ImmutablePubSubMessage<>(KafkaKey.HEART_BEAT, kafkaMessageEnvelope, pubSubTopicPartition, 0, 0, 0);

veniceChangelogConsumer.maybeUpdatePartitionToBootstrapMap(message, pubSubTopicPartition);
Assert.assertFalse(bootstrapStateMap.get(0));
kafkaMessageEnvelope.producerMetadata.messageTimestamp = currentTimestamp - TimeUnit.SECONDS.toMillis(30);
veniceChangelogConsumer.maybeUpdatePartitionToBootstrapMap(message, pubSubTopicPartition);
Assert.assertTrue(bootstrapStateMap.get(0));
}

@Test
public void testConsumeAfterImage() throws ExecutionException, InterruptedException {
D2ControllerClient d2ControllerClient = mock(D2ControllerClient.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,7 @@ public void testAAIngestionWithStoreView() throws Exception {
}

@Test(timeOut = TEST_TIMEOUT, priority = 3)
public void testSpecificRecordVeniceChangelogConsumer() throws Exception {
public void testSpecificRecordBootstrappingVeniceChangelogConsumer() throws Exception {
ControllerClient childControllerClient =
new ControllerClient(clusterName, childDatacenters.get(0).getControllerConnectString());
File inputDir = getTempDataDirectory();
Expand Down Expand Up @@ -677,7 +677,7 @@ public void testSpecificRecordVeniceChangelogConsumer() throws Exception {
List<PubSubMessage<Utf8, ChangeEvent<TestChangelogValue>, VeniceChangeCoordinate>> polledChangeEventsList =
new ArrayList<>();
TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, true, () -> {
pollChangeEventsFromSpecificChangeCaptureConsumer(
pollChangeEventsFromSpecificBootstrappingChangeCaptureConsumer(
polledChangeEventsMap,
polledChangeEventsList,
specificChangelogConsumer);
Expand Down Expand Up @@ -710,7 +710,7 @@ public void testSpecificRecordVeniceChangelogConsumer() throws Exception {
}

TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, true, () -> {
pollChangeEventsFromSpecificChangeCaptureConsumer(
pollChangeEventsFromSpecificBootstrappingChangeCaptureConsumer(
polledChangeEventsMap,
polledChangeEventsList,
specificChangelogConsumer);
Expand All @@ -729,6 +729,119 @@ public void testSpecificRecordVeniceChangelogConsumer() throws Exception {
}
}

@Test(timeOut = TEST_TIMEOUT, priority = 3)
public void testSpecificRecordVeniceChangelogConsumer() throws Exception {
ControllerClient childControllerClient =
new ControllerClient(clusterName, childDatacenters.get(0).getControllerConnectString());
File inputDir = getTempDataDirectory();
Schema recordSchema = TestWriteUtils.writeSimpleAvroFileWithStringToNameRecordV1Schema(inputDir);
String inputDirPath = "file://" + inputDir.getAbsolutePath();
String storeName = Utils.getUniqueString("store");
Properties props =
TestWriteUtils.defaultVPJProps(parentControllers.get(0).getControllerUrl(), inputDirPath, storeName);
String keySchemaStr = recordSchema.getField(DEFAULT_KEY_FIELD_PROP).schema().toString();
String valueSchemaStr = NAME_RECORD_V2_SCHEMA.toString();
UpdateStoreQueryParams storeParms = new UpdateStoreQueryParams().setActiveActiveReplicationEnabled(true)
.setHybridRewindSeconds(500)
.setHybridOffsetLagThreshold(8)
.setChunkingEnabled(true)
.setNativeReplicationEnabled(true)
.setPartitionCount(3);
MetricsRepository metricsRepository = new MetricsRepository();
ControllerClient setupControllerClient =
createStoreForJob(clusterName, keySchemaStr, valueSchemaStr, props, storeParms);
setupControllerClient
.retryableRequest(5, controllerClient1 -> setupControllerClient.updateStore(storeName, storeParms));
// Registering real data schema as schema v2.
setupControllerClient.retryableRequest(
5,
controllerClient1 -> setupControllerClient.addValueSchema(storeName, NAME_RECORD_V1_SCHEMA.toString()));
TestWriteUtils.runPushJob("Run push job", props);
TestMockTime testMockTime = new TestMockTime();
ZkServerWrapper localZkServer = multiRegionMultiClusterWrapper.getChildRegions().get(0).getZkServerWrapper();
try (PubSubBrokerWrapper localKafka = ServiceFactory.getPubSubBroker(
new PubSubBrokerConfigs.Builder().setZkWrapper(localZkServer)
.setMockTime(testMockTime)
.setRegionName("local-pubsub")
.build())) {
Properties consumerProperties = new Properties();
String localKafkaUrl = localKafka.getAddress();
consumerProperties.put(KAFKA_BOOTSTRAP_SERVERS, localKafkaUrl);
consumerProperties.put(CLUSTER_NAME, clusterName);
consumerProperties.put(ZOOKEEPER_ADDRESS, localZkServer.getAddress());
ChangelogClientConfig globalChangelogClientConfig =
new ChangelogClientConfig().setConsumerProperties(consumerProperties)
.setControllerD2ServiceName(D2_SERVICE_NAME)
.setD2ServiceName(VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME)
.setLocalD2ZkHosts(localZkServer.getAddress())
.setControllerRequestRetryCount(3)
.setSpecificValue(TestChangelogValue.class)
.setBootstrapFileSystemPath(Utils.getUniqueString(inputDirPath));
VeniceChangelogConsumerClientFactory veniceChangelogConsumerClientFactory =
new VeniceChangelogConsumerClientFactory(globalChangelogClientConfig, metricsRepository);
VeniceChangelogConsumer<Utf8, TestChangelogValue> specificChangelogConsumer =
veniceChangelogConsumerClientFactory.getChangelogConsumer(storeName, "0", TestChangelogValue.class);
specificChangelogConsumer.subscribeAll().get();
Assert.assertFalse(specificChangelogConsumer.isCaughtUp());

Map<String, PubSubMessage<Utf8, ChangeEvent<TestChangelogValue>, VeniceChangeCoordinate>> polledChangeEventsMap =
new HashMap<>();
List<PubSubMessage<Utf8, ChangeEvent<TestChangelogValue>, VeniceChangeCoordinate>> polledChangeEventsList =
new ArrayList<>();
TestUtils.waitForNonDeterministicAssertion(120, TimeUnit.SECONDS, true, () -> {
pollChangeEventsFromSpecificChangeCaptureConsumer(
polledChangeEventsMap,
polledChangeEventsList,
specificChangelogConsumer);
Assert.assertEquals(polledChangeEventsList.size(), 100);
Assert.assertTrue(specificChangelogConsumer.isCaughtUp());
});

Assert.assertTrue(
polledChangeEventsMap.get(Integer.toString(1)).getValue().getCurrentValue() instanceof SpecificRecord);
TestChangelogValue value = new TestChangelogValue();
value.firstName = "first_name_1";
value.lastName = "last_name_1";
Assert.assertEquals(polledChangeEventsMap.get(Integer.toString(1)).getValue().getCurrentValue(), value);
polledChangeEventsList.clear();
polledChangeEventsMap.clear();

GenericRecord genericRecord = new GenericData.Record(NAME_RECORD_V1_SCHEMA);
genericRecord.put("firstName", "Venice");
genericRecord.put("lastName", "Italy");

GenericRecord genericRecordV2 = new GenericData.Record(NAME_RECORD_V1_SCHEMA);
genericRecordV2.put("firstName", "Barcelona");
genericRecordV2.put("lastName", "Spain");

VeniceSystemFactory factory = new VeniceSystemFactory();
try (VeniceSystemProducer veniceProducer = factory
.getClosableProducer("venice", new MapConfig(getSamzaProducerConfig(childDatacenters, 0, storeName)), null)) {
veniceProducer.start();
// Run Samza job to send PUT and DELETE requests.
sendStreamingRecord(veniceProducer, storeName, Integer.toString(10000), genericRecord, null);
sendStreamingRecord(veniceProducer, storeName, Integer.toString(10000), genericRecordV2, null);
}

TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, true, () -> {
pollChangeEventsFromSpecificChangeCaptureConsumer(
polledChangeEventsMap,
polledChangeEventsList,
specificChangelogConsumer);
Assert.assertEquals(polledChangeEventsList.size(), 2);
});
Assert.assertTrue(
polledChangeEventsMap.get(Integer.toString(10000)).getValue().getCurrentValue() instanceof SpecificRecord);
parentControllerClient.disableAndDeleteStore(storeName);
// Verify that topics and store is cleaned up
TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, true, () -> {
MultiStoreTopicsResponse storeTopicsResponse = childControllerClient.getDeletableStoreTopics();
Assert.assertFalse(storeTopicsResponse.isError());
Assert.assertEquals(storeTopicsResponse.getTopics().size(), 0);
});
}
}

private void runSamzaStreamJob(
VeniceSystemProducer veniceProducer,
String storeName,
Expand Down Expand Up @@ -766,7 +879,7 @@ private void pollChangeEventsFromChangeCaptureConsumer(
}
}

private void pollChangeEventsFromSpecificChangeCaptureConsumer(
private void pollChangeEventsFromSpecificBootstrappingChangeCaptureConsumer(
Map<String, PubSubMessage<Utf8, ChangeEvent<TestChangelogValue>, VeniceChangeCoordinate>> polledChangeEvents,
List<PubSubMessage<Utf8, ChangeEvent<TestChangelogValue>, VeniceChangeCoordinate>> polledMessageList,
BootstrappingVeniceChangelogConsumer veniceChangelogConsumer) {
Expand All @@ -779,6 +892,19 @@ private void pollChangeEventsFromSpecificChangeCaptureConsumer(
polledMessageList.addAll(pubSubMessages);
}

private void pollChangeEventsFromSpecificChangeCaptureConsumer(
Map<String, PubSubMessage<Utf8, ChangeEvent<TestChangelogValue>, VeniceChangeCoordinate>> polledChangeEvents,
List<PubSubMessage<Utf8, ChangeEvent<TestChangelogValue>, VeniceChangeCoordinate>> polledMessageList,
VeniceChangelogConsumer veniceChangelogConsumer) {
Collection<PubSubMessage<Utf8, ChangeEvent<TestChangelogValue>, VeniceChangeCoordinate>> pubSubMessages =
veniceChangelogConsumer.poll(1000);
for (PubSubMessage<Utf8, ChangeEvent<TestChangelogValue>, VeniceChangeCoordinate> pubSubMessage: pubSubMessages) {
String key = pubSubMessage.getKey() == null ? null : pubSubMessage.getKey().toString();
polledChangeEvents.put(key, pubSubMessage);
}
polledMessageList.addAll(pubSubMessages);
}

private void pollChangeEventsFromChangeCaptureConsumer2(
Map<String, PubSubMessage<Utf8, ChangeEvent<Utf8>, VeniceChangeCoordinate>> polledChangeEvents,
VeniceChangelogConsumer veniceChangelogConsumer) {
Expand Down

0 comments on commit 53767c0

Please sign in to comment.