Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dvc][common][test] leader complete status to follower part 1: leader produce HB and SOS to local VT with new header #741

Merged
merged 13 commits into from
Nov 16, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import static com.linkedin.davinci.kafka.consumer.LeaderFollowerStateType.STANDBY;
import static com.linkedin.venice.kafka.protocol.enums.ControlMessageType.END_OF_PUSH;
import static com.linkedin.venice.kafka.protocol.enums.ControlMessageType.START_OF_SEGMENT;
import static com.linkedin.venice.pubsub.api.PubSubMessageHeaders.ADD_LEADER_COMPLETED_HEADER;
import static com.linkedin.venice.writer.VeniceWriter.APP_DEFAULT_LOGICAL_TS;
import static com.linkedin.venice.writer.VeniceWriter.DEFAULT_LEADER_METADATA_WRAPPER;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
Expand Down Expand Up @@ -1676,7 +1677,7 @@ protected void reportIfCatchUpVersionTopicOffset(PartitionConsumptionState pcs)
if (isCurrentVersion.getAsBoolean()) {
amplificationFactorAdapter
.executePartitionConsumptionState(pcs.getUserPartition(), PartitionConsumptionState::lagHasCaughtUp);
statusReportAdapter.reportCompleted(pcs, true);
reportCompleted(pcs, true);
}
}
}
Expand Down Expand Up @@ -2087,7 +2088,14 @@ protected DelegateConsumerRecordResult delegateConsumerRecord(
LeaderMetadataWrapper leaderMetadataWrapper =
new LeaderMetadataWrapper(consumerRecord.getOffset(), kafkaClusterId);
PubSubTopicPartition topicPartition = new PubSubTopicPartitionImpl(getVersionTopic(), subPartition);
veniceWriter.get().sendHeartbeat(topicPartition, callback, leaderMetadataWrapper);
// Leaders forward HB SOS message to local VT with updated isLeaderCompleted header
veniceWriter.get()
.sendHeartbeat(
topicPartition,
callback,
leaderMetadataWrapper,
ADD_LEADER_COMPLETED_HEADER,
consumerRecord.getValue().producerMetadata.messageTimestamp); // original producers timestamp
} else {
/**
* Based on current design handling this case (specially EOS) is tricky as we don't produce the SOS/EOS
Expand Down Expand Up @@ -3194,4 +3202,25 @@ protected void maybeSendIngestionHeartbeat() {
}
lastSendIngestionHeartbeatTimestamp = currentTimestamp;
}

/**
* Once leader is marked completed, immediately send a heart beat message to the local VT such that
* followers don't have to wait till the periodic heartbeat to know that the leader is completed
*
* Also update veniceWriter of this status, such that the SOS messages sent from the leader to the
* followers can have this status.
*/
void reportCompleted(PartitionConsumptionState partitionConsumptionState, boolean forceCompletion) {
super.reportCompleted(partitionConsumptionState, forceCompletion);
if (partitionConsumptionState.getLeaderFollowerState().equals(LeaderFollowerStateType.LEADER)) {
int partitionId = partitionConsumptionState.getPartition();
VeniceWriter _veniceWriter = veniceWriter.get();
_veniceWriter.setPartitionLeaderCompletionStatus(partitionId, true);
_veniceWriter.sendHeartbeat(
new PubSubTopicPartitionImpl(versionTopic, partitionId),
null,
DEFAULT_LEADER_METADATA_WRAPPER,
ADD_LEADER_COMPLETED_HEADER);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1694,7 +1694,7 @@ private void checkConsumptionStateWhenStart(
amplificationFactorAdapter.executePartitionConsumptionState(
newPartitionConsumptionState.getUserPartition(),
PartitionConsumptionState::lagHasCaughtUp);
statusReportAdapter.reportCompleted(newPartitionConsumptionState, true);
reportCompleted(newPartitionConsumptionState, true);
isCompletedReport = true;
}
// Clear offset lag in metadata, it is only used in restart.
Expand Down Expand Up @@ -3473,9 +3473,7 @@ private ReadyToServeCheck getDefaultReadyToServeChecker() {
partition,
partitionConsumptionState.getLatestProcessedLocalVersionTopicOffset());
} else {
statusReportAdapter.reportCompleted(partitionConsumptionState);
LOGGER.info("{} Partition {} is ready to serve", consumerTaskId, partition);

reportCompleted(partitionConsumptionState);
warmupSchemaCache(store);
}
if (suppressLiveUpdates) {
Expand All @@ -3495,6 +3493,15 @@ private ReadyToServeCheck getDefaultReadyToServeChecker() {
};
}

void reportCompleted(PartitionConsumptionState partitionConsumptionState) {
reportCompleted(partitionConsumptionState, false);
}

void reportCompleted(PartitionConsumptionState partitionConsumptionState, boolean forceCompletion) {
statusReportAdapter.reportCompleted(partitionConsumptionState, forceCompletion);
LOGGER.info("{} Partition {} is ready to serve", consumerTaskId, partitionConsumptionState.getPartition());
}

/**
* Try to warm-up the schema repo cache before reporting completion as new value schema could cause latency degradation
* while trying to compile it in the read-path.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.linkedin.venice.pubsub;

import com.linkedin.venice.pubsub.api.PubSubMessage;
import com.linkedin.venice.pubsub.api.PubSubMessageHeaders;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import java.util.Objects;

Expand All @@ -13,19 +14,33 @@ public class ImmutablePubSubMessage<K, V> implements PubSubMessage<K, V, Long> {
private final long timestamp;
private final int payloadSize;

private final PubSubMessageHeaders pubSubMessageHeaders;
m-nagarajan marked this conversation as resolved.
Show resolved Hide resolved

public ImmutablePubSubMessage(
K key,
V value,
PubSubTopicPartition topicPartition,
long offset,
long timestamp,
int payloadSize) {
this(key, value, topicPartition, offset, timestamp, payloadSize, null);
}

public ImmutablePubSubMessage(
K key,
V value,
PubSubTopicPartition topicPartition,
long offset,
long timestamp,
int payloadSize,
PubSubMessageHeaders pubSubMessageHeaders) {
this.key = key;
this.value = value;
this.topicPartition = Objects.requireNonNull(topicPartition);
this.offset = offset;
this.timestamp = timestamp;
this.payloadSize = payloadSize;
this.pubSubMessageHeaders = pubSubMessageHeaders;
}

@Override
Expand Down Expand Up @@ -62,4 +77,9 @@ public int getPayloadSize() {
public boolean isEndOfBootstrap() {
return false;
}

@Override
public PubSubMessageHeaders getPubSubMessageHeaders() {
return pubSubMessageHeaders;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package com.linkedin.venice.pubsub.api;

import static com.linkedin.venice.writer.VeniceWriter.EMPTY_MSG_HEADERS;


public interface PubSubMessage<K, V, OFFSET> {
/**
* @return the key part of this message
Expand Down Expand Up @@ -43,4 +46,8 @@ default int getPartition() {
* @return whether this message marks the end of bootstrap.
*/
boolean isEndOfBootstrap();

default PubSubMessageHeaders getPubSubMessageHeaders() {
return EMPTY_MSG_HEADERS;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.linkedin.venice.pubsub.api;

import static com.linkedin.venice.pubsub.api.PubSubMessageHeaders.VENICE_TRANSPORT_PROTOCOL_HEADER;

import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
import com.linkedin.venice.kafka.protocol.enums.MessageType;
Expand All @@ -19,9 +21,6 @@
*/
public class PubSubMessageDeserializer {
private static final Logger LOGGER = LogManager.getLogger(PubSubMessageDeserializer.class);

public static final String VENICE_TRANSPORT_PROTOCOL_HEADER = "vtp";

private final KafkaKeySerializer keySerializer = new KafkaKeySerializer();
private final KafkaValueSerializer valueSerializer;
private final ObjectPool<KafkaMessageEnvelope> putEnvelopePool;
Expand Down Expand Up @@ -59,6 +58,8 @@ public PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> deserialize(
KafkaMessageEnvelope value = null;
if (key.isControlMessage()) {
for (PubSubMessageHeader header: headers.toList()) {
// only process VENICE_TRANSPORT_PROTOCOL_HEADER here. Other headers will be stored in
// ImmutablePubSubMessage and used down the ingestion path later
if (header.key().equals(VENICE_TRANSPORT_PROTOCOL_HEADER)) {
try {
Schema providedProtocolSchema = AvroCompatibilityHelper.parse(new String(header.value()));
Expand All @@ -85,7 +86,8 @@ public PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> deserialize(
topicPartition,
position,
timestamp,
keyBytes.length + valueBytes.length);
keyBytes.length + valueBytes.length,
headers);
}

private KafkaMessageEnvelope getEnvelope(byte keyHeaderByte) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ public class PubSubMessageHeaders {
// from the beginning.
private final Map<String, PubSubMessageHeader> headers = new LinkedHashMap<>();

public static final String VENICE_TRANSPORT_PROTOCOL_HEADER = "vtp";
/** Header to denote whether the leader is completed or not */
public static final String VENICE_LEADER_COMPLETION_STATUS_HEADER = "lcs";
public static final boolean ADD_LEADER_COMPLETED_HEADER = true;

public PubSubMessageHeaders add(PubSubMessageHeader header) {
headers.put(header.key(), header);
return this;
Expand Down
Loading