Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dvc][common][test] leader complete status to follower part 1: leader produce HB and SOS to local VT with new header #741

Merged
merged 13 commits into from
Nov 16, 2023
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package com.linkedin.davinci.consumer;

import static com.linkedin.venice.writer.VeniceWriter.EMPTY_MSG_HEADERS;

import com.linkedin.venice.pubsub.adapter.kafka.ApacheKafkaOffsetPosition;
import com.linkedin.venice.pubsub.api.PubSubMessage;
import com.linkedin.venice.pubsub.api.PubSubMessageHeaders;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import java.util.Objects;

Expand Down Expand Up @@ -69,4 +72,9 @@ public int getPayloadSize() {
public boolean isEndOfBootstrap() {
return isEndOfBootstrap;
}

@Override
public PubSubMessageHeaders getPubSubMessageHeaders() {
return EMPTY_MSG_HEADERS;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1676,7 +1676,7 @@ protected void reportIfCatchUpVersionTopicOffset(PartitionConsumptionState pcs)
if (isCurrentVersion.getAsBoolean()) {
amplificationFactorAdapter
.executePartitionConsumptionState(pcs.getUserPartition(), PartitionConsumptionState::lagHasCaughtUp);
statusReportAdapter.reportCompleted(pcs, true);
reportCompletedAndSendHeartBeat(pcs, partition, true);
}
}
}
Expand Down Expand Up @@ -2075,19 +2075,47 @@ protected DelegateConsumerRecordResult delegateConsumerRecord(
kafkaClusterId,
beforeProcessingRecordTimestampNs);
} else {
if (controlMessageType == START_OF_SEGMENT
&& Arrays.equals(consumerRecord.getKey().getKey(), KafkaKey.HEART_BEAT.getKey())) {
LeaderProducerCallback callback = createProducerCallback(
consumerRecord,
partitionConsumptionState,
leaderProducedRecordContext,
subPartition,
kafkaUrl,
beforeProcessingRecordTimestampNs);
LeaderMetadataWrapper leaderMetadataWrapper =
new LeaderMetadataWrapper(consumerRecord.getOffset(), kafkaClusterId);
PubSubTopicPartition topicPartition = new PubSubTopicPartitionImpl(getVersionTopic(), subPartition);
veniceWriter.get().sendHeartbeat(topicPartition, callback, leaderMetadataWrapper);
if (controlMessageType == START_OF_SEGMENT) {
if (Arrays.equals(consumerRecord.getKey().getKey(), KafkaKey.HEART_BEAT.getKey())) {
LeaderProducerCallback callback = createProducerCallback(
consumerRecord,
partitionConsumptionState,
leaderProducedRecordContext,
subPartition,
kafkaUrl,
beforeProcessingRecordTimestampNs);
LeaderMetadataWrapper leaderMetadataWrapper =
new LeaderMetadataWrapper(consumerRecord.getOffset(), kafkaClusterId);
PubSubTopicPartition topicPartition = new PubSubTopicPartitionImpl(getVersionTopic(), subPartition);
// Leaders forward HB SOS message to local VT with updated isLeaderCompleted header
veniceWriter.get()
.sendHeartbeat(
topicPartition,
callback,
leaderMetadataWrapper,
true,
consumerRecord.getValue().producerMetadata.messageTimestamp,
partitionConsumptionState.isCompletionReported());
} else {
// Leaders forward SOS message to local VT: add isLeaderCompleted Header
produceToLocalKafka(
m-nagarajan marked this conversation as resolved.
Show resolved Hide resolved
consumerRecord,
partitionConsumptionState,
leaderProducedRecordContext,
(callback, leaderMetadataWrapper) -> veniceWriter.get()
.put(
consumerRecord.getKey(),
consumerRecord.getValue(),
callback,
consumerRecord.getTopicPartition().getPartitionNumber(),
leaderMetadataWrapper,
true,
partitionConsumptionState.isCompletionReported()),
subPartition,
kafkaUrl,
kafkaClusterId,
beforeProcessingRecordTimestampNs);
}
} else {
/**
* Based on current design handling this case (specially EOS) is tricky as we don't produce the SOS/EOS
Expand Down Expand Up @@ -2569,6 +2597,20 @@ public long getRegionHybridOffsetLag(int regionId) {
return StatsErrorCode.ACTIVE_ACTIVE_NOT_ENABLED.code;
}

@Override
protected void sendInstantHeartBeat(
PartitionConsumptionState partitionConsumptionState,
PubSubTopicPartition pubSubTopicPartition) {
veniceWriter.get()
.sendHeartbeat(
pubSubTopicPartition,
null,
DEFAULT_LEADER_METADATA_WRAPPER,
true,
0,
partitionConsumptionState.isCompletionReported());
}

/**
* Unsubscribe from all the topics being consumed for the partition in partitionConsumptionState
*/
Expand Down Expand Up @@ -3182,7 +3224,14 @@ protected void maybeSendIngestionHeartbeat() {
}
PubSubTopicPartition topicPartition = new PubSubTopicPartitionImpl(leaderTopic, pcs.getPartition());
try {
veniceWriter.get().sendHeartbeat(topicPartition, null, DEFAULT_LEADER_METADATA_WRAPPER);
veniceWriter.get()
.sendHeartbeat(
topicPartition,
null,
DEFAULT_LEADER_METADATA_WRAPPER,
true,
0,
pcs.isCompletionReported());
} catch (Exception e) {
String errorMessage = String.format(
"Failed to send ingestion heartbeat for topic: %s, partition: %s",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.davinci.kafka.consumer;

import static com.linkedin.venice.writer.VeniceWriter.EMPTY_MSG_HEADERS;
import static java.util.Collections.reverseOrder;
import static java.util.Comparator.comparing;
import static java.util.stream.Collectors.toList;
Expand All @@ -13,6 +14,7 @@
import com.linkedin.venice.kafka.protocol.enums.MessageType;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.pubsub.api.PubSubMessage;
import com.linkedin.venice.pubsub.api.PubSubMessageHeaders;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.utils.DaemonThreadFactory;
import com.linkedin.venice.utils.PartitionUtils;
Expand Down Expand Up @@ -647,5 +649,10 @@ public int getPayloadSize() {
public boolean isEndOfBootstrap() {
return false;
}

@Override
public PubSubMessageHeaders getPubSubMessageHeaders() {
return EMPTY_MSG_HEADERS;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1694,7 +1694,7 @@ private void checkConsumptionStateWhenStart(
amplificationFactorAdapter.executePartitionConsumptionState(
newPartitionConsumptionState.getUserPartition(),
PartitionConsumptionState::lagHasCaughtUp);
statusReportAdapter.reportCompleted(newPartitionConsumptionState, true);
reportCompletedAndSendHeartBeat(newPartitionConsumptionState, partition, true);
isCompletedReport = true;
}
// Clear offset lag in metadata, it is only used in restart.
Expand Down Expand Up @@ -3473,9 +3473,7 @@ private ReadyToServeCheck getDefaultReadyToServeChecker() {
partition,
partitionConsumptionState.getLatestProcessedLocalVersionTopicOffset());
} else {
statusReportAdapter.reportCompleted(partitionConsumptionState);
LOGGER.info("{} Partition {} is ready to serve", consumerTaskId, partition);

reportCompletedAndSendHeartBeat(partitionConsumptionState, partition);
warmupSchemaCache(store);
}
if (suppressLiveUpdates) {
Expand All @@ -3495,6 +3493,34 @@ private ReadyToServeCheck getDefaultReadyToServeChecker() {
};
}

void reportCompletedAndSendHeartBeat(PartitionConsumptionState partitionConsumptionState, int partition) {
m-nagarajan marked this conversation as resolved.
Show resolved Hide resolved
reportCompletedAndSendHeartBeat(partitionConsumptionState, partition, false);
}

/**
* Whenever a leader is marked to be completed, it should send a heartbeat SOS to VT
* check {@link StoreIngestionTask#sendInstantHeartBeat} for more details.
*/
void reportCompletedAndSendHeartBeat(
PartitionConsumptionState partitionConsumptionState,
int partition,
boolean forceCompletion) {
statusReportAdapter.reportCompleted(partitionConsumptionState, forceCompletion);
LOGGER.info("{} Partition {} is ready to serve", consumerTaskId, partition);
if (partitionConsumptionState.getLeaderFollowerState().equals(LeaderFollowerStateType.LEADER)) {
// if leader is marked completed, immediately send a heart beat message
sendInstantHeartBeat(partitionConsumptionState, new PubSubTopicPartitionImpl(versionTopic, partition));
}
}

/**
* Once leader is marked completed, immediately send a heart beat message to the local VT such that
* followers don't have to wait till the periodic heartbeat to know that the leader is completed
*/
protected abstract void sendInstantHeartBeat(
PartitionConsumptionState partitionConsumptionState,
PubSubTopicPartition pubSubTopicPartition);

/**
* Try to warm-up the schema repo cache before reporting completion as new value schema could cause latency degradation
* while trying to compile it in the read-path.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3572,7 +3572,7 @@ public void testMaybeSendIngestionHeartbeat(boolean isActiveActive) {
ingestionTask.maybeSendIngestionHeartbeat();
// Second invocation should be skipped since it shouldn't be time for another heartbeat yet.
ingestionTask.maybeSendIngestionHeartbeat();
verify(veniceWriter, times(1)).sendHeartbeat(any(), any(), any());
verify(veniceWriter, times(1)).sendHeartbeat(any(), any(), any(), anyBoolean(), anyLong(), anyBoolean());
}

private VeniceStoreVersionConfig getDefaultMockVeniceStoreVersionConfig(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.linkedin.venice.pubsub;

import com.linkedin.venice.pubsub.api.PubSubMessage;
import com.linkedin.venice.pubsub.api.PubSubMessageHeaders;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import java.util.Objects;

Expand All @@ -13,19 +14,33 @@ public class ImmutablePubSubMessage<K, V> implements PubSubMessage<K, V, Long> {
private final long timestamp;
private final int payloadSize;

private final PubSubMessageHeaders pubSubMessageHeaders;
m-nagarajan marked this conversation as resolved.
Show resolved Hide resolved

public ImmutablePubSubMessage(
K key,
V value,
PubSubTopicPartition topicPartition,
long offset,
long timestamp,
int payloadSize) {
this(key, value, topicPartition, offset, timestamp, payloadSize, null);
}

public ImmutablePubSubMessage(
K key,
V value,
PubSubTopicPartition topicPartition,
long offset,
long timestamp,
int payloadSize,
PubSubMessageHeaders pubSubMessageHeaders) {
this.key = key;
this.value = value;
this.topicPartition = Objects.requireNonNull(topicPartition);
this.offset = offset;
this.timestamp = timestamp;
this.payloadSize = payloadSize;
this.pubSubMessageHeaders = pubSubMessageHeaders;
}

@Override
Expand Down Expand Up @@ -62,4 +77,9 @@ public int getPayloadSize() {
public boolean isEndOfBootstrap() {
return false;
}

@Override
public PubSubMessageHeaders getPubSubMessageHeaders() {
return pubSubMessageHeaders;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,6 @@ default int getPartition() {
* @return whether this message marks the end of bootstrap.
*/
boolean isEndOfBootstrap();

PubSubMessageHeaders getPubSubMessageHeaders();
m-nagarajan marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ public class PubSubMessageDeserializer {
private static final Logger LOGGER = LogManager.getLogger(PubSubMessageDeserializer.class);

public static final String VENICE_TRANSPORT_PROTOCOL_HEADER = "vtp";
/** Header to denote whether the leader is completed or not */
m-nagarajan marked this conversation as resolved.
Show resolved Hide resolved
public static final String VENICE_LEADER_COMPLETION_STATUS_HEADER = "lcs";

private final KafkaKeySerializer keySerializer = new KafkaKeySerializer();
private final KafkaValueSerializer valueSerializer;
Expand Down Expand Up @@ -59,6 +61,8 @@ public PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> deserialize(
KafkaMessageEnvelope value = null;
if (key.isControlMessage()) {
for (PubSubMessageHeader header: headers.toList()) {
// only process VENICE_TRANSPORT_PROTOCOL_HEADER here. Other headers will be stored in
// ImmutablePubSubMessage and used down the ingestion path later
if (header.key().equals(VENICE_TRANSPORT_PROTOCOL_HEADER)) {
try {
Schema providedProtocolSchema = AvroCompatibilityHelper.parse(new String(header.value()));
Expand All @@ -85,7 +89,8 @@ public PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> deserialize(
topicPartition,
position,
timestamp,
keyBytes.length + valueBytes.length);
keyBytes.length + valueBytes.length,
headers);
}

private KafkaMessageEnvelope getEnvelope(byte keyHeaderByte) {
Expand Down
Loading
Loading