Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce logging in DEBUG for MasterService:run #14795

1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Print reason why parent task was cancelled ([#14604](https://github.com/opensearch-project/OpenSearch/issues/14604))
- Add matchesPluginSystemIndexPattern to SystemIndexRegistry ([#14750](https://github.com/opensearch-project/OpenSearch/pull/14750))
- Add Plugin interface for loading application based configuration templates (([#14659](https://github.com/opensearch-project/OpenSearch/issues/14659)))
- Reduce logging in DEBUG for MasterService:run ([#14795](https://github.com/opensearch-project/OpenSearch/pull/14795))

### Dependencies
- Bump `org.gradle.test-retry` from 1.5.8 to 1.5.9 ([#13442](https://github.com/opensearch-project/OpenSearch/pull/13442))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,10 +221,15 @@ protected void onTimeout(List<? extends BatchedTask> tasks, TimeValue timeout) {
}

@Override
protected void run(Object batchingKey, List<? extends BatchedTask> tasks, String tasksSummary) {
protected void run(
Object batchingKey,
List<? extends BatchedTask> tasks,
Supplier<String> tasksSummarySupplier,
String tasksShortSummary
) {
ClusterStateTaskExecutor<Object> taskExecutor = (ClusterStateTaskExecutor<Object>) batchingKey;
List<UpdateTask> updateTasks = (List<UpdateTask>) tasks;
runTasks(new TaskInputs(taskExecutor, updateTasks, tasksSummary));
runTasks(new TaskInputs(taskExecutor, updateTasks, tasksSummarySupplier, tasksShortSummary));
sumitasr marked this conversation as resolved.
Show resolved Hide resolved
}

class UpdateTask extends BatchedTask {
Expand Down Expand Up @@ -297,26 +302,33 @@ public static boolean assertNotMasterUpdateThread(String reason) {
}

private void runTasks(TaskInputs taskInputs) {
final String summary = taskInputs.summary;
final String longSummary = logger.isTraceEnabled() ? taskInputs.summarySupplier.get() : "";
final String shortSummary = taskInputs.shortSummary;

if (!lifecycle.started()) {
logger.debug("processing [{}]: ignoring, cluster-manager service not started", summary);
logger.debug("processing [{}]: ignoring, cluster-manager service not started", shortSummary);
return;
}

logger.debug("executing cluster state update for [{}]", summary);
if (logger.isTraceEnabled()) {
logger.trace("executing cluster state update for [{}]", longSummary);
} else {
logger.debug("executing cluster state update for [{}]", shortSummary);
}

final ClusterState previousClusterState = state();

if (!previousClusterState.nodes().isLocalNodeElectedClusterManager() && taskInputs.runOnlyWhenClusterManager()) {
logger.debug("failing [{}]: local node is no longer cluster-manager", summary);
logger.debug("failing [{}]: local node is no longer cluster-manager", shortSummary);
taskInputs.onNoLongerClusterManager();
return;
}

final long computationStartTime = threadPool.preciseRelativeTimeInNanos();
final TaskOutputs taskOutputs = calculateTaskOutputs(taskInputs, previousClusterState);
final TaskOutputs taskOutputs = calculateTaskOutputs(taskInputs, previousClusterState, longSummary);
taskOutputs.notifyFailedTasks();
final TimeValue computationTime = getTimeSince(computationStartTime);
logExecutionTime(computationTime, "compute cluster state update", summary);
logExecutionTime(computationTime, "compute cluster state update", shortSummary);

clusterManagerMetrics.recordLatency(
clusterManagerMetrics.clusterStateComputeHistogram,
Expand All @@ -328,25 +340,25 @@ private void runTasks(TaskInputs taskInputs) {
final long notificationStartTime = threadPool.preciseRelativeTimeInNanos();
taskOutputs.notifySuccessfulTasksOnUnchangedClusterState();
final TimeValue executionTime = getTimeSince(notificationStartTime);
logExecutionTime(executionTime, "notify listeners on unchanged cluster state", summary);
logExecutionTime(executionTime, "notify listeners on unchanged cluster state", shortSummary);
} else {
final ClusterState newClusterState = taskOutputs.newClusterState;
if (logger.isTraceEnabled()) {
logger.trace("cluster state updated, source [{}]\n{}", summary, newClusterState);
logger.trace("cluster state updated, source [{}]\n{}", longSummary, newClusterState);
} else {
logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), summary);
logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), shortSummary);
}
final long publicationStartTime = threadPool.preciseRelativeTimeInNanos();
try {
ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(summary, newClusterState, previousClusterState);
ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(shortSummary, newClusterState, previousClusterState);
// new cluster state, notify all listeners
final DiscoveryNodes.Delta nodesDelta = clusterChangedEvent.nodesDelta();
if (nodesDelta.hasChanges() && logger.isInfoEnabled()) {
String nodesDeltaSummary = nodesDelta.shortSummary();
if (nodesDeltaSummary.length() > 0) {
logger.info(
"{}, term: {}, version: {}, delta: {}",
summary,
shortSummary,
newClusterState.term(),
newClusterState.version(),
nodesDeltaSummary
Expand All @@ -357,7 +369,7 @@ private void runTasks(TaskInputs taskInputs) {
logger.debug("publishing cluster state version [{}]", newClusterState.version());
publish(clusterChangedEvent, taskOutputs, publicationStartTime);
} catch (Exception e) {
handleException(summary, publicationStartTime, newClusterState, e);
handleException(shortSummary, publicationStartTime, newClusterState, e);
}
}
}
Expand Down Expand Up @@ -452,8 +464,8 @@ private void handleException(String summary, long startTimeMillis, ClusterState
// TODO: do we want to call updateTask.onFailure here?
}

private TaskOutputs calculateTaskOutputs(TaskInputs taskInputs, ClusterState previousClusterState) {
ClusterTasksResult<Object> clusterTasksResult = executeTasks(taskInputs, previousClusterState);
private TaskOutputs calculateTaskOutputs(TaskInputs taskInputs, ClusterState previousClusterState, String longSummary) {
sumitasr marked this conversation as resolved.
Show resolved Hide resolved
ClusterTasksResult<Object> clusterTasksResult = executeTasks(taskInputs, previousClusterState, longSummary);
ClusterState newClusterState = patchVersions(previousClusterState, clusterTasksResult);
return new TaskOutputs(
taskInputs,
Expand Down Expand Up @@ -897,7 +909,7 @@ public void onTimeout() {
}
}

private ClusterTasksResult<Object> executeTasks(TaskInputs taskInputs, ClusterState previousClusterState) {
private ClusterTasksResult<Object> executeTasks(TaskInputs taskInputs, ClusterState previousClusterState, String longSummary) {
ClusterTasksResult<Object> clusterTasksResult;
try {
List<Object> inputs = taskInputs.updateTasks.stream().map(tUpdateTask -> tUpdateTask.task).collect(Collectors.toList());
Expand All @@ -913,7 +925,7 @@ private ClusterTasksResult<Object> executeTasks(TaskInputs taskInputs, ClusterSt
"failed to execute cluster state update (on version: [{}], uuid: [{}]) for [{}]\n{}{}{}",
previousClusterState.version(),
previousClusterState.stateUUID(),
taskInputs.summary,
longSummary,
previousClusterState.nodes(),
previousClusterState.routingTable(),
previousClusterState.getRoutingNodes()
Expand Down Expand Up @@ -955,14 +967,22 @@ private List<Batcher.UpdateTask> getNonFailedTasks(TaskInputs taskInputs, Cluste
* Represents a set of tasks to be processed together with their executor
*/
private class TaskInputs {
final String summary;

final List<Batcher.UpdateTask> updateTasks;
final ClusterStateTaskExecutor<Object> executor;

TaskInputs(ClusterStateTaskExecutor<Object> executor, List<Batcher.UpdateTask> updateTasks, String summary) {
this.summary = summary;
final Supplier<String> summarySupplier;
final String shortSummary;

TaskInputs(
ClusterStateTaskExecutor<Object> executor,
List<Batcher.UpdateTask> updateTasks,
Supplier<String> summarySupplier,
String shortSummary
) {
this.executor = executor;
this.updateTasks = updateTasks;
this.summarySupplier = summarySupplier;
this.shortSummary = shortSummary;
}

boolean runOnlyWhenClusterManager() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -195,22 +196,31 @@ void runIfNotProcessed(BatchedTask updateTask) {
}

if (toExecute.isEmpty() == false) {
final String tasksSummary = processTasksBySource.entrySet().stream().map(entry -> {
final Supplier<String> tasksSummarySupplier = () -> processTasksBySource.entrySet().stream().map(entry -> {
String tasks = updateTask.describeTasks(entry.getValue());
return tasks.isEmpty() ? entry.getKey() : entry.getKey() + "[" + tasks + "]";
}).reduce((s1, s2) -> s1 + ", " + s2).orElse("");

final String tasksShortSummary = buildShortSummary(updateTask.batchingKey);
taskBatcherListener.onBeginProcessing(toExecute);
run(updateTask.batchingKey, toExecute, tasksSummary);
run(updateTask.batchingKey, toExecute, tasksSummarySupplier, tasksShortSummary);
}
}
}

private String buildShortSummary(final Object batchingKey) {
sumitasr marked this conversation as resolved.
Show resolved Hide resolved
return "Tasks batched with key: " + batchingKey.toString().split("\\$")[0];
}

/**
* Action to be implemented by the specific batching implementation
* All tasks have the given batching key.
*/
protected abstract void run(Object batchingKey, List<? extends BatchedTask> tasks, String tasksSummary);
protected abstract void run(
Object batchingKey,
List<? extends BatchedTask> tasks,
Supplier<String> tasksSummary,
String tasksShortSummary
);

/**
* Represents a runnable task that supports batching.
Expand Down
Loading
Loading