Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.16] Reduce logging in DEBUG for MasterService:run #14880

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -22,6 +22,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add prefix mode verification setting for repository verification (([#14790](https://github.com/opensearch-project/OpenSearch/pull/14790)))
- Add SplitResponseProcessor to Search Pipelines (([#14800](https://github.com/opensearch-project/OpenSearch/issues/14800)))
- Optimize TransportNodesAction to not send DiscoveryNodes for NodeStats, NodesInfo and ClusterStats call ([14749](https://github.com/opensearch-project/OpenSearch/pull/14749))
- Reduce logging in DEBUG for MasterService:run ([#14795](https://github.com/opensearch-project/OpenSearch/pull/14795))

### Dependencies
- Update to Apache Lucene 9.11.1 ([#14042](https://github.com/opensearch-project/OpenSearch/pull/14042), [#14576](https://github.com/opensearch-project/OpenSearch/pull/14576))
Original file line number Diff line number Diff line change
@@ -84,6 +84,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

@@ -221,10 +222,10 @@ protected void onTimeout(List<? extends BatchedTask> tasks, TimeValue timeout) {
}

@Override
protected void run(Object batchingKey, List<? extends BatchedTask> tasks, String tasksSummary) {
protected void run(Object batchingKey, List<? extends BatchedTask> tasks, Function<Boolean, String> taskSummaryGenerator) {
ClusterStateTaskExecutor<Object> taskExecutor = (ClusterStateTaskExecutor<Object>) batchingKey;
List<UpdateTask> updateTasks = (List<UpdateTask>) tasks;
runTasks(new TaskInputs(taskExecutor, updateTasks, tasksSummary));
runTasks(new TaskInputs(taskExecutor, updateTasks, taskSummaryGenerator));
}

class UpdateTask extends BatchedTask {
@@ -297,26 +298,33 @@ public static boolean assertNotMasterUpdateThread(String reason) {
}

private void runTasks(TaskInputs taskInputs) {
final String summary = taskInputs.summary;
final String longSummary = logger.isTraceEnabled() ? taskInputs.taskSummaryGenerator.apply(true) : "";
final String shortSummary = taskInputs.taskSummaryGenerator.apply(false);

if (!lifecycle.started()) {
logger.debug("processing [{}]: ignoring, cluster-manager service not started", summary);
logger.debug("processing [{}]: ignoring, cluster-manager service not started", shortSummary);
return;
}

logger.debug("executing cluster state update for [{}]", summary);
if (logger.isTraceEnabled()) {
logger.trace("executing cluster state update for [{}]", longSummary);
} else {
logger.debug("executing cluster state update for [{}]", shortSummary);
}

final ClusterState previousClusterState = state();

if (!previousClusterState.nodes().isLocalNodeElectedClusterManager() && taskInputs.runOnlyWhenClusterManager()) {
logger.debug("failing [{}]: local node is no longer cluster-manager", summary);
logger.debug("failing [{}]: local node is no longer cluster-manager", shortSummary);
taskInputs.onNoLongerClusterManager();
return;
}

final long computationStartTime = threadPool.preciseRelativeTimeInNanos();
final TaskOutputs taskOutputs = calculateTaskOutputs(taskInputs, previousClusterState);
final TaskOutputs taskOutputs = calculateTaskOutputs(taskInputs, previousClusterState, shortSummary);
taskOutputs.notifyFailedTasks();
final TimeValue computationTime = getTimeSince(computationStartTime);
logExecutionTime(computationTime, "compute cluster state update", summary);
logExecutionTime(computationTime, "compute cluster state update", shortSummary);

clusterManagerMetrics.recordLatency(
clusterManagerMetrics.clusterStateComputeHistogram,
@@ -328,25 +336,25 @@ private void runTasks(TaskInputs taskInputs) {
final long notificationStartTime = threadPool.preciseRelativeTimeInNanos();
taskOutputs.notifySuccessfulTasksOnUnchangedClusterState();
final TimeValue executionTime = getTimeSince(notificationStartTime);
logExecutionTime(executionTime, "notify listeners on unchanged cluster state", summary);
logExecutionTime(executionTime, "notify listeners on unchanged cluster state", shortSummary);
} else {
final ClusterState newClusterState = taskOutputs.newClusterState;
if (logger.isTraceEnabled()) {
logger.trace("cluster state updated, source [{}]\n{}", summary, newClusterState);
logger.trace("cluster state updated, source [{}]\n{}", longSummary, newClusterState);
} else {
logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), summary);
logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), shortSummary);
}
final long publicationStartTime = threadPool.preciseRelativeTimeInNanos();
try {
ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(summary, newClusterState, previousClusterState);
ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(shortSummary, newClusterState, previousClusterState);
// new cluster state, notify all listeners
final DiscoveryNodes.Delta nodesDelta = clusterChangedEvent.nodesDelta();
if (nodesDelta.hasChanges() && logger.isInfoEnabled()) {
String nodesDeltaSummary = nodesDelta.shortSummary();
if (nodesDeltaSummary.length() > 0) {
logger.info(
"{}, term: {}, version: {}, delta: {}",
summary,
shortSummary,
newClusterState.term(),
newClusterState.version(),
nodesDeltaSummary
@@ -357,7 +365,7 @@ private void runTasks(TaskInputs taskInputs) {
logger.debug("publishing cluster state version [{}]", newClusterState.version());
publish(clusterChangedEvent, taskOutputs, publicationStartTime);
} catch (Exception e) {
handleException(summary, publicationStartTime, newClusterState, e);
handleException(shortSummary, publicationStartTime, newClusterState, e);
}
}
}
@@ -452,8 +460,8 @@ private void handleException(String summary, long startTimeMillis, ClusterState
// TODO: do we want to call updateTask.onFailure here?
}

private TaskOutputs calculateTaskOutputs(TaskInputs taskInputs, ClusterState previousClusterState) {
ClusterTasksResult<Object> clusterTasksResult = executeTasks(taskInputs, previousClusterState);
private TaskOutputs calculateTaskOutputs(TaskInputs taskInputs, ClusterState previousClusterState, String taskSummary) {
ClusterTasksResult<Object> clusterTasksResult = executeTasks(taskInputs, previousClusterState, taskSummary);
ClusterState newClusterState = patchVersions(previousClusterState, clusterTasksResult);
return new TaskOutputs(
taskInputs,
@@ -897,7 +905,7 @@ public void onTimeout() {
}
}

private ClusterTasksResult<Object> executeTasks(TaskInputs taskInputs, ClusterState previousClusterState) {
private ClusterTasksResult<Object> executeTasks(TaskInputs taskInputs, ClusterState previousClusterState, String taskSummary) {
ClusterTasksResult<Object> clusterTasksResult;
try {
List<Object> inputs = taskInputs.updateTasks.stream().map(tUpdateTask -> tUpdateTask.task).collect(Collectors.toList());
@@ -913,7 +921,7 @@ private ClusterTasksResult<Object> executeTasks(TaskInputs taskInputs, ClusterSt
"failed to execute cluster state update (on version: [{}], uuid: [{}]) for [{}]\n{}{}{}",
previousClusterState.version(),
previousClusterState.stateUUID(),
taskInputs.summary,
taskSummary,
previousClusterState.nodes(),
previousClusterState.routingTable(),
previousClusterState.getRoutingNodes()
@@ -955,14 +963,19 @@ private List<Batcher.UpdateTask> getNonFailedTasks(TaskInputs taskInputs, Cluste
* Represents a set of tasks to be processed together with their executor
*/
private class TaskInputs {
final String summary;

final List<Batcher.UpdateTask> updateTasks;
final ClusterStateTaskExecutor<Object> executor;
final Function<Boolean, String> taskSummaryGenerator;

TaskInputs(ClusterStateTaskExecutor<Object> executor, List<Batcher.UpdateTask> updateTasks, String summary) {
this.summary = summary;
TaskInputs(
ClusterStateTaskExecutor<Object> executor,
List<Batcher.UpdateTask> updateTasks,
final Function<Boolean, String> taskSummaryGenerator
) {
this.executor = executor;
this.updateTasks = updateTasks;
this.taskSummaryGenerator = taskSummaryGenerator;
}

boolean runOnlyWhenClusterManager() {
Original file line number Diff line number Diff line change
@@ -177,7 +177,6 @@ void runIfNotProcessed(BatchedTask updateTask) {
// to give other tasks with different batching key a chance to execute.
if (updateTask.processed.get() == false) {
final List<BatchedTask> toExecute = new ArrayList<>();
final Map<String, List<BatchedTask>> processTasksBySource = new HashMap<>();
// While removing task, need to remove task first from taskMap and then remove identity from identityMap.
// Changing this order might lead to duplicate task during submission.
LinkedHashSet<BatchedTask> pending = tasksPerBatchingKey.remove(updateTask.batchingKey);
@@ -187,30 +186,41 @@ void runIfNotProcessed(BatchedTask updateTask) {
if (task.processed.getAndSet(true) == false) {
logger.trace("will process {}", task);
toExecute.add(task);
processTasksBySource.computeIfAbsent(task.source, s -> new ArrayList<>()).add(task);
} else {
logger.trace("skipping {}, already processed", task);
}
}
}

if (toExecute.isEmpty() == false) {
final String tasksSummary = processTasksBySource.entrySet().stream().map(entry -> {
String tasks = updateTask.describeTasks(entry.getValue());
return tasks.isEmpty() ? entry.getKey() : entry.getKey() + "[" + tasks + "]";
}).reduce((s1, s2) -> s1 + ", " + s2).orElse("");

Function<Boolean, String> taskSummaryGenerator = (longSummaryRequired) -> {
if (longSummaryRequired == null || !longSummaryRequired) {
return buildShortSummary(updateTask.batchingKey, toExecute.size());
}
final Map<String, List<BatchedTask>> processTasksBySource = new HashMap<>();
for (final BatchedTask task : toExecute) {
processTasksBySource.computeIfAbsent(task.source, s -> new ArrayList<>()).add(task);
}
return processTasksBySource.entrySet().stream().map(entry -> {
String tasks = updateTask.describeTasks(entry.getValue());
return tasks.isEmpty() ? entry.getKey() : entry.getKey() + "[" + tasks + "]";
}).reduce((s1, s2) -> s1 + ", " + s2).orElse("");
};
taskBatcherListener.onBeginProcessing(toExecute);
run(updateTask.batchingKey, toExecute, tasksSummary);
run(updateTask.batchingKey, toExecute, taskSummaryGenerator);
}
}
}

private String buildShortSummary(final Object batchingKey, final int taskCount) {
return "Tasks batched with key: " + batchingKey.toString().split("\\$")[0] + " and count: " + taskCount;
}

/**
* Action to be implemented by the specific batching implementation
* All tasks have the given batching key.
*/
protected abstract void run(Object batchingKey, List<? extends BatchedTask> tasks, String tasksSummary);
protected abstract void run(Object batchingKey, List<? extends BatchedTask> tasks, Function<Boolean, String> taskSummaryGenerator);

/**
* Represents a runnable task that supports batching.
Loading