Skip to content

Commit

Permalink
Undo all controller changes.
Browse files Browse the repository at this point in the history
  • Loading branch information
FelixGV committed Nov 14, 2024
1 parent 24669c1 commit ed3a795
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ private AdminConsumptionTask getAdminConsumptionTaskForCluster(String clusterNam
config.getAdminConsumptionCycleTimeoutMs(),
config.getAdminConsumptionMaxWorkerThreadPoolSize(),
pubSubTopicRepository,
pubSubMessageDeserializer,
config.getRegionName());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.api.PubSubConsumerAdapter;
import com.linkedin.venice.pubsub.api.PubSubMessage;
import com.linkedin.venice.pubsub.api.PubSubMessageDeserializer;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.pubsub.manager.TopicManager;
Expand All @@ -37,14 +38,12 @@
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -155,11 +154,13 @@ public String toString() {
private volatile long failingOffset = UNASSIGNED_VALUE;
private boolean topicExists;
/**
* A {@link Map} of stores to admin operations belonging to each store. The corresponding kafka offset and other
* metadata of each admin operation are included in the {@link AdminOperationWrapper}.
* A {@link Map} of stores to admin operations belonging to each store. The corresponding kafka offset of each admin
* operation is also attached as the first element of the {@link Pair}.
*/
private final Map<String, Queue<AdminOperationWrapper>> storeAdminOperationsMapWithOffset;

private final ConcurrentHashMap<String, AdminExecutionTask> storeToScheduledTask;

/**
* Map of store names that have encountered some sort of exception during consumption to {@link AdminErrorInfo}
* that has the details about the exception and the offset of the problematic admin message.
Expand Down Expand Up @@ -238,6 +239,10 @@ public ExecutorService getExecutorService() {
*/
private long lastUpdateTimeForConsumptionOffsetLag = 0;

private final PubSubTopicRepository pubSubTopicRepository;

private final PubSubMessageDeserializer pubSubMessageDeserializer;

/**
* The local region name of the controller.
*/
Expand All @@ -258,6 +263,7 @@ public AdminConsumptionTask(
long processingCycleTimeoutInMs,
int maxWorkerThreadPoolSize,
PubSubTopicRepository pubSubTopicRepository,
PubSubMessageDeserializer pubSubMessageDeserializer,
String regionName) {
this.clusterName = clusterName;
this.topic = AdminTopicUtils.getTopicNameFromClusterName(clusterName);
Expand All @@ -280,6 +286,7 @@ public AdminConsumptionTask(

this.storeAdminOperationsMapWithOffset = new ConcurrentHashMap<>();
this.problematicStores = new ConcurrentHashMap<>();
this.storeToScheduledTask = new ConcurrentHashMap<>();
// since we use an unbounded queue the core pool size is really the max pool size
this.executorService = new ThreadPoolExecutor(
maxWorkerThreadPoolSize,
Expand All @@ -290,6 +297,8 @@ public AdminConsumptionTask(
new DaemonThreadFactory(String.format("Venice-Admin-Execution-Task-%s", clusterName)));
this.undelegatedRecords = new LinkedList<>();
this.stats.setAdminConsumptionFailedOffset(failingOffset);
this.pubSubTopicRepository = pubSubTopicRepository;
this.pubSubMessageDeserializer = pubSubMessageDeserializer;
this.pubSubTopic = pubSubTopicRepository.getTopic(topic);
this.regionName = regionName;

Expand Down Expand Up @@ -367,29 +376,28 @@ public void run() {
}

while (!undelegatedRecords.isEmpty()) {
PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> record = undelegatedRecords.peek();
if (record == null) {
break;
}
try {
long executionId = delegateMessage(record);
long executionId = delegateMessage(undelegatedRecords.peek());
if (executionId == lastDelegatedExecutionId) {
updateLastOffset(record.getOffset());
updateLastOffset(undelegatedRecords.peek().getOffset());
}
undelegatedRecords.remove();
} catch (DataValidationException dve) {
// Very unlikely but DataValidationException could be thrown here.
LOGGER.error(
"Admin consumption task is blocked due to DataValidationException with offset {}",
record.getOffset(),
undelegatedRecords.peek().getOffset(),
dve);
failingOffset = record.getOffset();
failingOffset = undelegatedRecords.peek().getOffset();
stats.recordFailedAdminConsumption();
stats.recordAdminTopicDIVErrorReportCount();
break;
} catch (Exception e) {
LOGGER.error("Admin consumption task is blocked due to Exception with offset {}", record.getOffset(), e);
failingOffset = record.getOffset();
LOGGER.error(
"Admin consumption task is blocked due to Exception with offset {}",
undelegatedRecords.peek().getOffset(),
e);
failingOffset = undelegatedRecords.peek().getOffset();
stats.recordFailedAdminConsumption();
break;
}
Expand Down Expand Up @@ -453,6 +461,7 @@ private void unSubscribe() {
storeAdminOperationsMapWithOffset.clear();
problematicStores.clear();
undelegatedRecords.clear();
storeToScheduledTask.clear();
failingOffset = UNASSIGNED_VALUE;
offsetToSkip = UNASSIGNED_VALUE;
offsetToSkipDIV = UNASSIGNED_VALUE;
Expand Down Expand Up @@ -483,51 +492,39 @@ private void unSubscribe() {
private void executeMessagesAndCollectResults() throws InterruptedException {
lastSucceededExecutionIdMap =
new ConcurrentHashMap<>(executionIdAccessor.getLastSucceededExecutionIdMap(clusterName));
/** This set is used to track which store has a task scheduled, so that we schedule at most one per store. */
Set<String> storesWithScheduledTask = new HashSet<>();
/** List of tasks to be executed by the worker threads. */
List<Callable<Void>> tasks = new ArrayList<>();
/**
* Note that tasks and stores are parallel lists (the elements at each index correspond to one another), and both
* lists are also parallel to the results list, declared later in this function.
*/
List<String> stores = new ArrayList<>();
// Create a task for each store that has admin messages pending to be processed.
boolean skipOffsetCommandHasBeenProcessed = false;
for (Map.Entry<String, Queue<AdminOperationWrapper>> entry: storeAdminOperationsMapWithOffset.entrySet()) {
String storeName = entry.getKey();
Queue<AdminOperationWrapper> storeQueue = entry.getValue();
if (!storeQueue.isEmpty()) {
AdminOperationWrapper nextOp = storeQueue.peek();
if (nextOp == null) {
continue;
}
long adminMessageOffset = nextOp.getOffset();
if (checkOffsetToSkip(nextOp.getOffset(), false)) {
storeQueue.remove();
if (!entry.getValue().isEmpty()) {
long adminMessageOffset = entry.getValue().peek().getOffset();
if (checkOffsetToSkip(adminMessageOffset, false)) {
entry.getValue().remove();
skipOffsetCommandHasBeenProcessed = true;
}
AdminExecutionTask newTask = new AdminExecutionTask(
LOGGER,
clusterName,
storeName,
entry.getKey(),
lastSucceededExecutionIdMap,
lastPersistedExecutionId,
storeQueue,
entry.getValue(),
admin,
executionIdAccessor,
isParentController,
stats,
regionName);
regionName,
storeToScheduledTask);
// Check if there is previously created scheduled task still occupying one thread from the pool.
if (storesWithScheduledTask.add(storeName)) {
if (storeToScheduledTask.putIfAbsent(entry.getKey(), newTask) == null) {
// Log the store name and the offset of the task being added into the task list
LOGGER.info(
"Adding admin message from store {} with offset {} to the task list",
storeName,
entry.getKey(),
adminMessageOffset);
tasks.add(newTask);
stores.add(storeName);
stores.add(entry.getKey());
}
}
}
Expand All @@ -553,46 +550,44 @@ private void executeMessagesAndCollectResults() throws InterruptedException {
try {
result.get();
problematicStores.remove(storeName);
if (internalQueuesEmptied) {
Queue<AdminOperationWrapper> storeQueue = storeAdminOperationsMapWithOffset.get(storeName);
if (storeQueue != null && !storeQueue.isEmpty()) {
internalQueuesEmptied = false;
}
if (internalQueuesEmptied && storeAdminOperationsMapWithOffset.containsKey(storeName)
&& !storeAdminOperationsMapWithOffset.get(storeName).isEmpty()) {
internalQueuesEmptied = false;
}
} catch (ExecutionException | CancellationException e) {
internalQueuesEmptied = false;
AdminErrorInfo errorInfo = new AdminErrorInfo();
Queue<AdminOperationWrapper> storeQueue = storeAdminOperationsMapWithOffset.get(storeName);
int perStorePendingMessagesCount = storeQueue == null ? 0 : storeQueue.size();
int perStorePendingMessagesCount = storeAdminOperationsMapWithOffset.get(storeName).size();
pendingAdminMessagesCount += perStorePendingMessagesCount;
storesWithPendingAdminMessagesCount += perStorePendingMessagesCount > 0 ? 1 : 0;
if (e instanceof CancellationException) {
long lastSucceededId = lastSucceededExecutionIdMap.getOrDefault(storeName, UNASSIGNED_VALUE);
long newLastSucceededId = newLastSucceededExecutionIdMap.getOrDefault(storeName, UNASSIGNED_VALUE);
long lastSucceededId = lastSucceededExecutionIdMap.getOrDefault(storeName, -1L);
long newLastSucceededId = newLastSucceededExecutionIdMap.getOrDefault(storeName, -1L);

if (lastSucceededId == UNASSIGNED_VALUE) {
if (lastSucceededId == -1) {
LOGGER.error("Could not find last successful execution ID for store {}", storeName);
}

if (lastSucceededId == newLastSucceededId && perStorePendingMessagesCount > 0) {
// only mark the store problematic if no progress is made and there are still message(s) in the queue.
errorInfo.exception = new VeniceException(
"Could not finish processing admin message for store " + storeName + " in time");
errorInfo.offset = getNextOperationOffsetIfAvailable(storeName);
errorInfo.offset = storeAdminOperationsMapWithOffset.get(storeName).peek().getOffset();
problematicStores.put(storeName, errorInfo);
LOGGER.warn(errorInfo.exception.getMessage());
}
} else {
errorInfo.exception = e;
errorInfo.offset = getNextOperationOffsetIfAvailable(storeName);
errorInfo.offset = storeAdminOperationsMapWithOffset.get(storeName).peek().getOffset();
problematicStores.put(storeName, errorInfo);
}
} catch (Throwable e) {
long errorMsgOffset = getNextOperationOffsetIfAvailable(storeName);
if (errorMsgOffset == UNASSIGNED_VALUE) {
LOGGER.error("Could not get the offset of the problematic admin message for store {}", storeName);
long errorMsgOffset = -1;
try {
errorMsgOffset = storeAdminOperationsMapWithOffset.get(storeName).peek().getOffset();
} catch (Exception ex) {
LOGGER.error("Could not get the offset of the problematic admin message for store {}", storeName, ex);
}

LOGGER.error(
"Unexpected exception thrown while processing admin message for store {} at offset {}",
storeName,
Expand Down Expand Up @@ -637,15 +632,6 @@ private void executeMessagesAndCollectResults() throws InterruptedException {
}
}

/**
* @return the offset of the next enqueued operation for the given store name, or {@link #UNASSIGNED_VALUE} if unavailable.
*/
private long getNextOperationOffsetIfAvailable(String storeName) {
Queue<AdminOperationWrapper> storeQueue = storeAdminOperationsMapWithOffset.get(storeName);
AdminOperationWrapper nextOperation = storeQueue == null ? null : storeQueue.peek();
return nextOperation == null ? UNASSIGNED_VALUE : nextOperation.getOffset();
}

private void internalClose() {
unSubscribe();
executorService.shutdownNow();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ public class AdminExecutionTask implements Callable<Void> {
private final ConcurrentHashMap<String, Long> lastSucceededExecutionIdMap;
private final long lastPersistedExecutionId;

private final Map<String, AdminExecutionTask> storeToScheduledTask;

AdminExecutionTask(
Logger LOGGER,
String clusterName,
Expand All @@ -95,7 +97,8 @@ public class AdminExecutionTask implements Callable<Void> {
ExecutionIdAccessor executionIdAccessor,
boolean isParentController,
AdminConsumptionStats stats,
String regionName) {
String regionName,
Map<String, AdminExecutionTask> storeToScheduledTask) {
this.LOGGER = LOGGER;
this.clusterName = clusterName;
this.storeName = storeName;
Expand All @@ -107,6 +110,7 @@ public class AdminExecutionTask implements Callable<Void> {
this.isParentController = isParentController;
this.stats = stats;
this.regionName = regionName;
this.storeToScheduledTask = storeToScheduledTask;
}

@Override
Expand Down Expand Up @@ -155,6 +159,8 @@ public Void call() {
LOGGER.error("Error {}", logMessage, e);
}
throw e;
} finally {
storeToScheduledTask.remove(storeName);
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@


public class AdminOperationWrapper {
private final AdminOperation adminOperation;
private final long offset;
private final long producerTimestamp;
private final long localBrokerTimestamp;
private final long delegateTimestamp;
private AdminOperation adminOperation;
private long offset;
private long producerTimestamp;
private long localBrokerTimestamp;
private long delegateTimestamp;

private Long startProcessingTimestamp = null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import com.linkedin.venice.controller.stats.AdminConsumptionStats;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.guid.GuidUtils;
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
import com.linkedin.venice.kafka.protocol.state.PartitionState;
import com.linkedin.venice.kafka.protocol.state.ProducerPartitionState;
import com.linkedin.venice.kafka.validation.SegmentStatus;
Expand All @@ -71,13 +72,15 @@
import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.api.PubSubConsumerAdapter;
import com.linkedin.venice.pubsub.api.PubSubMessageDeserializer;
import com.linkedin.venice.pubsub.api.PubSubProduceResult;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.pubsub.api.exceptions.PubSubOpTimeoutException;
import com.linkedin.venice.pubsub.manager.TopicManager;
import com.linkedin.venice.serialization.DefaultSerializer;
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.serialization.avro.OptimizedKafkaValueSerializer;
import com.linkedin.venice.unit.kafka.InMemoryKafkaBroker;
import com.linkedin.venice.unit.kafka.SimplePartitioner;
import com.linkedin.venice.unit.kafka.consumer.MockInMemoryConsumer;
Expand All @@ -96,6 +99,7 @@
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import com.linkedin.venice.utils.pools.LandFillObjectPool;
import com.linkedin.venice.writer.VeniceWriter;
import com.linkedin.venice.writer.VeniceWriterOptions;
import java.io.IOException;
Expand Down Expand Up @@ -230,6 +234,10 @@ private AdminConsumptionTask getAdminConsumptionTask(
new MockInMemoryConsumer(inMemoryKafkaBroker, pollStrategy, mockKafkaConsumer);

PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository();
PubSubMessageDeserializer pubSubMessageDeserializer = new PubSubMessageDeserializer(
new OptimizedKafkaValueSerializer(),
new LandFillObjectPool<>(KafkaMessageEnvelope::new),
new LandFillObjectPool<>(KafkaMessageEnvelope::new));

return new AdminConsumptionTask(
clusterName,
Expand All @@ -246,6 +254,7 @@ private AdminConsumptionTask getAdminConsumptionTask(
adminConsumptionCycleTimeoutMs,
maxWorkerThreadPoolSize,
pubSubTopicRepository,
pubSubMessageDeserializer,
"dc-0");
}

Expand Down

0 comments on commit ed3a795

Please sign in to comment.