Skip to content

Commit

Permalink
clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
adamxchen committed Feb 2, 2024
1 parent ad8bd7d commit 8ae7d24
Show file tree
Hide file tree
Showing 18 changed files with 202 additions and 182 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import static com.linkedin.venice.ConfigKeys.SERVER_DEDICATED_CONSUMER_POOL_FOR_AA_WC_LEADER_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_DEDICATED_CONSUMER_POOL_SIZE_FOR_AA_WC_LEADER;
import static com.linkedin.venice.ConfigKeys.SERVER_DEDICATED_DRAINER_FOR_SORTED_INPUT_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_DELAY_REPORT_READY_TO_SERVE_MS;
import static com.linkedin.venice.ConfigKeys.SERVER_DISK_FULL_THRESHOLD;
import static com.linkedin.venice.ConfigKeys.SERVER_DISK_HEALTH_CHECK_INTERVAL_IN_SECONDS;
import static com.linkedin.venice.ConfigKeys.SERVER_DISK_HEALTH_CHECK_SERVICE_ENABLED;
Expand Down Expand Up @@ -364,7 +363,6 @@ public class VeniceServerConfig extends VeniceClusterConfig {
private final KafkaConsumerService.ConsumerAssignmentStrategy sharedConsumerAssignmentStrategy;
private final int consumerPoolSizePerKafkaCluster;
private final boolean leakedResourceCleanupEnabled;
private final long delayReadyToServeMS;

private final IngestionMode ingestionMode;
private final int ingestionServicePort;
Expand Down Expand Up @@ -612,7 +610,6 @@ public VeniceServerConfig(VeniceProperties serverProperties, Map<String, Map<Str
+ consumerPoolSizePerKafkaCluster);
}
leakedResourceCleanupEnabled = serverProperties.getBoolean(SERVER_LEAKED_RESOURCE_CLEANUP_ENABLED, true);
delayReadyToServeMS = serverProperties.getLong(SERVER_DELAY_REPORT_READY_TO_SERVE_MS, 0);

ingestionMode =
IngestionMode.valueOf(serverProperties.getString(SERVER_INGESTION_MODE, IngestionMode.BUILT_IN.toString()));
Expand Down Expand Up @@ -1094,10 +1091,6 @@ public boolean isLeakedResourceCleanupEnabled() {
return leakedResourceCleanupEnabled;
}

public long getDelayReadyToServeMS() {
return delayReadyToServeMS;
}

public IngestionMode getIngestionMode() {
return ingestionMode;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ protected void processMessageAndMaybeProduceToKafka(
break;
default:
throw new VeniceMessageException(
consumerTaskId + " : Invalid/Unrecognized operation type submitted: " + kafkaValue.messageType);
ingestionTaskName + " : Invalid/Unrecognized operation type submitted: " + kafkaValue.messageType);
}
final ChunkedValueManifestContainer valueManifestContainer = new ChunkedValueManifestContainer();
Lazy<ByteBufferValueRecord<ByteBuffer>> oldValueProvider = Lazy.of(
Expand Down Expand Up @@ -489,7 +489,7 @@ protected void processMessageAndMaybeProduceToKafka(
break;
default:
throw new VeniceMessageException(
consumerTaskId + " : Invalid/Unrecognized operation type submitted: " + kafkaValue.messageType);
ingestionTaskName + " : Invalid/Unrecognized operation type submitted: " + kafkaValue.messageType);
}

if (mergeConflictResult.isUpdateIgnored()) {
Expand Down Expand Up @@ -838,7 +838,8 @@ protected void startConsumingAsLeader(PartitionConsumptionState partitionConsump
*/
if (shouldNewLeaderSwitchToRemoteConsumption(partitionConsumptionState)) {
partitionConsumptionState.setConsumeRemotely(true);
LOGGER.info("{} enabled remote consumption from topic {} partition {}", consumerTaskId, leaderTopic, partition);
LOGGER
.info("{} enabled remote consumption from topic {} partition {}", ingestionTaskName, leaderTopic, partition);
}

partitionConsumptionState.setLeaderFollowerState(LEADER);
Expand All @@ -850,7 +851,7 @@ protected void startConsumingAsLeader(PartitionConsumptionState partitionConsump
LOGGER.info(
"{} is promoted to leader for partition {} and it is going to start consuming from "
+ "topic {} with offset by Kafka URL mapping {}",
consumerTaskId,
ingestionTaskName,
partition,
leaderTopic,
leaderOffsetByKafkaURL);
Expand All @@ -864,7 +865,7 @@ protected void startConsumingAsLeader(PartitionConsumptionState partitionConsump

LOGGER.info(
"{}, as a leader, started consuming from topic {} partition {} with offset by Kafka URL mapping {}",
consumerTaskId,
ingestionTaskName,
leaderTopic,
partition,
leaderOffsetByKafkaURL);
Expand Down Expand Up @@ -919,7 +920,7 @@ protected void leaderExecuteTopicSwitch(
rewindStartTimestamp = calculateRewindStartTime(partitionConsumptionState);
LOGGER.info(
"{} leader calculated rewindStartTimestamp {} for {}",
consumerTaskId,
ingestionTaskName,
rewindStartTimestamp,
sourceTopicPartition);
} else {
Expand Down Expand Up @@ -998,7 +999,7 @@ protected void leaderExecuteTopicSwitch(
partitionConsumptionState.setConsumeRemotely(true);
LOGGER.info(
"{} enabled remote consumption and switch to topic {} partition {} with offset by Kafka URL mapping {}",
consumerTaskId,
ingestionTaskName,
newSourceTopic,
sourceTopicPartition,
upstreamOffsetsByKafkaURLs);
Expand Down Expand Up @@ -1031,7 +1032,7 @@ protected void leaderExecuteTopicSwitch(
syncConsumedUpstreamRTOffsetMapIfNeeded(partitionConsumptionState, upstreamOffsetsByKafkaURLs);
LOGGER.info(
"{} leader successfully switch feed topic from {} to {} on partition {} with offset by Kafka URL mapping {}",
consumerTaskId,
ingestionTaskName,
currentLeaderTopic,
newSourceTopic,
partition,
Expand Down Expand Up @@ -1094,7 +1095,7 @@ protected boolean processTopicSwitch(
rewindStartTimestamp = calculateRewindStartTime(partitionConsumptionState);
LOGGER.info(
"{} leader calculated rewindStartTimestamp {} for topic {} partition {}",
consumerTaskId,
ingestionTaskName,
rewindStartTimestamp,
newSourceTopicName,
newSourceTopicPartition);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.linkedin.venice.service.AbstractVeniceService;
import com.linkedin.venice.throttle.EventThrottler;
import com.linkedin.venice.utils.DaemonThreadFactory;
import com.linkedin.venice.utils.RedundantExceptionFilter;
import com.linkedin.venice.utils.SystemTime;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
Expand Down Expand Up @@ -47,6 +48,8 @@
*/
public class AggKafkaConsumerService extends AbstractVeniceService {
private static final Logger LOGGER = LogManager.getLogger(AggKafkaConsumerService.class);
private static final RedundantExceptionFilter REDUNDANT_LOGGING_FILTER =
RedundantExceptionFilter.getRedundantExceptionFilter();

private final PubSubConsumerAdapterFactory consumerFactory;
private final VeniceServerConfig serverConfig;
Expand All @@ -73,6 +76,9 @@ public class AggKafkaConsumerService extends AbstractVeniceService {
private final Function<String, Boolean> isAAOrWCEnabledFunc;
private final ReadOnlyStoreRepository metadataRepository;

private final static String STUCK_CONSUMER_MSG =
"Didn't find any suspicious ingestion task, and please contact developers to investigate it further";

public AggKafkaConsumerService(
final PubSubConsumerAdapterFactory consumerFactory,
TopicManagerRepository.SSLPropertiesSupplier sslPropertiesSupplier,
Expand Down Expand Up @@ -241,8 +247,9 @@ protected static Runnable getStuckConsumerDetectionAndRepairRunnable(
}
});
if (!hasRepairedSomeIngestionTask.get()) {
LOGGER.error(
"Didn't find any suspicious ingestion task, and please contact developers to investigate it further");
if (!REDUNDANT_LOGGING_FILTER.isRedundantException(STUCK_CONSUMER_MSG)) {
LOGGER.warn(STUCK_CONSUMER_MSG);
}
stuckConsumerRepairStats.recordRepairFailure();
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ public void run() {
Set<PubSubTopicPartition> topicPartitionsToUnsub = new HashSet<>();
int payloadBytesConsumedInOnePoll;
int polledPubSubMessagesCount = 0;
Map<String, StorePollCounter> storePollCounterMap = new HashMap<>();
try {
while (running) {
try {
Expand Down Expand Up @@ -126,7 +127,6 @@ public void run() {
payloadBytesConsumedInOnePoll = 0;
polledPubSubMessagesCount = 0;
beforeProducingToWriteBufferTimestamp = System.currentTimeMillis();
Map<String, StorePollCounter> storePollCounterMap = new HashMap<>();
for (Map.Entry<PubSubTopicPartition, List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>>> entry: polledPubSubMessages
.entrySet()) {
PubSubTopicPartition pubSubTopicPartition = entry.getKey();
Expand Down Expand Up @@ -164,6 +164,7 @@ public void run() {
recordsThrottler.accept(polledPubSubMessagesCount);
cleaner.unsubscribe(topicPartitionsToUnsub);
aggStats.recordTotalDetectedNoRunningIngestionTopicPartitionNum(topicPartitionsToUnsub.size());
storePollCounterMap.clear();
} else {
// No result came back, here will add some delay
addSomeDelay = true;
Expand Down Expand Up @@ -230,7 +231,7 @@ void removeDataReceiver(PubSubTopicPartition topicPartition) {
/**
* This class is used to count the number of messages and the byte size of the messages for a given store per poll.
*/
class StorePollCounter {
static class StorePollCounter {
protected int msgCount;
protected int byteSize;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,6 @@ public void handleStoreDeleted(Store store) {
.setServerConfig(serverConfig)
.setDiskUsage(diskUsage)
.setAggKafkaConsumerService(aggKafkaConsumerService)
.setStartReportingReadyToServeTimestamp(System.currentTimeMillis() + serverConfig.getDelayReadyToServeMS())
.setPartitionStateSerializer(partitionStateSerializer)
.setIsDaVinciClient(isDaVinciClient)
.setRemoteIngestionRepairService(remoteIngestionRepairService)
Expand Down
Loading

0 comments on commit 8ae7d24

Please sign in to comment.