Skip to content

Commit

Permalink
Request level coordinator slow logs
Browse files Browse the repository at this point in the history
Signed-off-by: David Zane <[email protected]>
  • Loading branch information
dzane17 committed Oct 16, 2023
1 parent dd63559 commit 52026c8
Show file tree
Hide file tree
Showing 9 changed files with 574 additions and 4 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
## [Unreleased 2.x]
### Added
- Per request phase latency ([#10351](https://github.com/opensearch-project/OpenSearch/issues/10351))
- Request level coordinator slow logs ([#10650](https://github.com/opensearch-project/OpenSearch/pull/10650))

### Dependencies
- Bump `com.google.api.grpc:proto-google-common-protos` from 2.10.0 to 2.25.1 ([#10208](https://github.com/opensearch-project/OpenSearch/pull/10208), [#10298](https://github.com/opensearch-project/OpenSearch/pull/10298))
Expand Down Expand Up @@ -114,4 +115,4 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Security

[Unreleased 3.0]: https://github.com/opensearch-project/OpenSearch/compare/2.x...HEAD
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.12...2.x
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.12...2.x
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,10 @@ private void onPhaseStart(SearchPhase phase) {
this.searchRequestOperationsListener.ifPresent(searchRequestOperations -> { searchRequestOperations.onPhaseStart(this); });
}

private void onRequestEnd() {
this.searchRequestOperationsListener.ifPresent(searchRequestOperations -> { searchRequestOperations.onRequestEnd(this); });
}

private void executePhase(SearchPhase phase) {
try {
onPhaseStart(phase);
Expand Down Expand Up @@ -696,9 +700,10 @@ public void sendSearchResponse(InternalSearchResponse internalSearchResponse, At
searchContextId = null;
}
}
onPhaseEnd();
onRequestEnd();
listener.onResponse(buildSearchResponse(internalSearchResponse, failures, scrollId, searchContextId));
}
onPhaseEnd();
setCurrentPhase(null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ public interface SearchRequestOperationsListener {

void onPhaseFailure(SearchPhaseContext context);

default void onRequestStart(long startTimeNanos) {};

default void onRequestEnd(SearchPhaseContext context) {};

/**
* Holder of Composite Listeners
*
Expand Down Expand Up @@ -73,5 +77,27 @@ public void onPhaseFailure(SearchPhaseContext context) {
}
}
}

@Override
public void onRequestStart(long startTimeNanos) {
for (SearchRequestOperationsListener listener : listeners) {
try {
listener.onRequestStart(startTimeNanos);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("onRequestStart listener [{}] failed", listener), e);
}
}
}

@Override
public void onRequestEnd(SearchPhaseContext context) {
for (SearchRequestOperationsListener listener : listeners) {
try {
listener.onRequestEnd(context);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("onRequestEnd listener [{}] failed", listener), e);
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,269 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.action.search;

import com.fasterxml.jackson.core.io.JsonStringEncoder;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.logging.OpenSearchLogMessage;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.index.SlowLogLevel;
import org.opensearch.tasks.Task;

import java.nio.charset.Charset;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

/**
* The search time slow log implementation
*
* @opensearch.internal
*/
public final class SearchRequestSlowLog implements SearchRequestOperationsListener {
private static final Charset UTF_8 = Charset.forName("UTF-8");

private long absoluteStartNanos;

private long warnThreshold;
private long infoThreshold;
private long debugThreshold;
private long traceThreshold;
Map<String, Long> phaseTookMap;

private final Logger logger;

static final String CLUSTER_SEARCH_REQUEST_SLOWLOG_PREFIX = "cluster.search.request.slowlog.threshold";

public static final Setting<TimeValue> CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_WARN_SETTING = Setting.timeSetting(
CLUSTER_SEARCH_REQUEST_SLOWLOG_PREFIX + ".warn",
TimeValue.timeValueNanos(-1),
TimeValue.timeValueMillis(-1),
Setting.Property.Dynamic,
Setting.Property.NodeScope
);
public static final Setting<TimeValue> CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_INFO_SETTING = Setting.timeSetting(
CLUSTER_SEARCH_REQUEST_SLOWLOG_PREFIX + ".info",
TimeValue.timeValueNanos(-1),
TimeValue.timeValueMillis(-1),
Setting.Property.Dynamic,
Setting.Property.NodeScope
);
public static final Setting<TimeValue> CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_DEBUG_SETTING = Setting.timeSetting(
CLUSTER_SEARCH_REQUEST_SLOWLOG_PREFIX + ".debug",
TimeValue.timeValueNanos(-1),
TimeValue.timeValueMillis(-1),
Setting.Property.Dynamic,
Setting.Property.NodeScope
);
public static final Setting<TimeValue> CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_TRACE_SETTING = Setting.timeSetting(
CLUSTER_SEARCH_REQUEST_SLOWLOG_PREFIX + ".trace",
TimeValue.timeValueNanos(-1),
TimeValue.timeValueMillis(-1),
Setting.Property.Dynamic,
Setting.Property.NodeScope
);
public static final Setting<SlowLogLevel> CLUSTER_SEARCH_REQUEST_SLOWLOG_LEVEL = new Setting<>(
CLUSTER_SEARCH_REQUEST_SLOWLOG_PREFIX + ".level",
SlowLogLevel.TRACE.name(),
SlowLogLevel::parse,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

private static final ToXContent.Params FORMAT_PARAMS = new ToXContent.MapParams(Collections.singletonMap("pretty", "false"));
private SlowLogLevel level;

public SearchRequestSlowLog(ClusterService clusterService) {
this.logger = LogManager.getLogger(CLUSTER_SEARCH_REQUEST_SLOWLOG_PREFIX + ".SearchRequestSlowLog");
Loggers.setLevel(this.logger, SlowLogLevel.TRACE.name());

this.warnThreshold = clusterService.getClusterSettings().get(CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_WARN_SETTING).nanos();
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_WARN_SETTING, this::setWarnThreshold);
this.infoThreshold = clusterService.getClusterSettings().get(CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_INFO_SETTING).nanos();
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_INFO_SETTING, this::setInfoThreshold);
this.debugThreshold = clusterService.getClusterSettings().get(CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_DEBUG_SETTING).nanos();
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_DEBUG_SETTING, this::setDebugThreshold);
this.traceThreshold = clusterService.getClusterSettings().get(CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_TRACE_SETTING).nanos();
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_TRACE_SETTING, this::setTraceThreshold);

this.level = clusterService.getClusterSettings().get(CLUSTER_SEARCH_REQUEST_SLOWLOG_LEVEL);
clusterService.getClusterSettings().addSettingsUpdateConsumer(CLUSTER_SEARCH_REQUEST_SLOWLOG_LEVEL, this::setLevel);

this.phaseTookMap = new HashMap<>();
}

private void setLevel(SlowLogLevel level) {
this.level = level;
}

@Override
public void onPhaseStart(SearchPhaseContext context) {}

@Override
public void onPhaseEnd(SearchPhaseContext context) {
long tookInNanos = System.nanoTime() - context.getCurrentPhase().getStartTimeInNanos();
phaseTookMap.put(context.getCurrentPhase().getName(), TimeUnit.NANOSECONDS.toMillis(tookInNanos));
}

@Override
public void onPhaseFailure(SearchPhaseContext context) {}

@Override
public void onRequestStart(long startTimeNanos) {
this.absoluteStartNanos = startTimeNanos;
}

private long getRequestStart() {
return absoluteStartNanos;
}

@Override
public void onRequestEnd(SearchPhaseContext context) {
long tookInNanos = System.nanoTime() - absoluteStartNanos;

if (warnThreshold >= 0 && tookInNanos > warnThreshold && level.isLevelEnabledFor(SlowLogLevel.WARN)) {
logger.warn(new SearchRequestSlowLogMessage(context, tookInNanos, phaseTookMap));
} else if (infoThreshold >= 0 && tookInNanos > infoThreshold && level.isLevelEnabledFor(SlowLogLevel.INFO)) {
logger.info(new SearchRequestSlowLogMessage(context, tookInNanos, phaseTookMap));
} else if (debugThreshold >= 0 && tookInNanos > debugThreshold && level.isLevelEnabledFor(SlowLogLevel.DEBUG)) {
logger.debug(new SearchRequestSlowLogMessage(context, tookInNanos, phaseTookMap));
} else if (traceThreshold >= 0 && tookInNanos > traceThreshold && level.isLevelEnabledFor(SlowLogLevel.TRACE)) {
logger.trace(new SearchRequestSlowLogMessage(context, tookInNanos, phaseTookMap));
}
};

/**
* Search slow log message
*
* @opensearch.internal
*/
static final class SearchRequestSlowLogMessage extends OpenSearchLogMessage {

SearchRequestSlowLogMessage(SearchPhaseContext context, long tookInNanos, Map<String, Long> phaseTookMap) {
super(prepareMap(context, tookInNanos, phaseTookMap), message(context, tookInNanos, phaseTookMap));
}

private static Map<String, Object> prepareMap(SearchPhaseContext context, long tookInNanos, Map<String, Long> phaseTookMap) {
Map<String, Object> messageFields = new HashMap<>();
messageFields.put("took", TimeValue.timeValueNanos(tookInNanos));
messageFields.put("took_millis", TimeUnit.NANOSECONDS.toMillis(tookInNanos));
messageFields.put("phase_took", phaseTookMap);
messageFields.put("search_type", context.getRequest().searchType());
messageFields.put("total_shards", context.getNumShards());

if (context.getRequest().source() != null) {
String source = escapeJson(context.getRequest().source().toString(FORMAT_PARAMS));

messageFields.put("source", source);
} else {
messageFields.put("source", "{}");
}

messageFields.put("id", context.getTask().getHeader(Task.X_OPAQUE_ID));
return messageFields;
}

// Message will be used in plaintext logs
private static String message(SearchPhaseContext context, long tookInNanos, Map<String, Long> phaseTookMap) {
StringBuilder sb = new StringBuilder();
sb.append("took[").append(TimeValue.timeValueNanos(tookInNanos)).append("], ");
sb.append("took_millis[").append(TimeUnit.NANOSECONDS.toMillis(tookInNanos)).append("], ");
sb.append("phase_took_millis[").append(phaseTookMap.toString()).append("], ");
sb.append("search_type[").append(context.getRequest().searchType()).append("], ");
sb.append("total_shards[").append(context.getNumShards()).append("], ");
if (context.getRequest().source() != null) {
sb.append("source[").append(context.getRequest().source().toString(FORMAT_PARAMS)).append("], ");
} else {
sb.append("source[], ");
}
if (context.getTask().getHeader(Task.X_OPAQUE_ID) != null) {
sb.append("id[").append(context.getTask().getHeader(Task.X_OPAQUE_ID)).append("], ");
} else {
sb.append("id[]");
}
return sb.toString();
}

private static String escapeJson(String text) {
byte[] sourceEscaped = JsonStringEncoder.getInstance().quoteAsUTF8(text);
return new String(sourceEscaped, UTF_8);
}
}

private void setWarnThreshold(TimeValue warnThreshold) {
this.warnThreshold = warnThreshold.nanos();
}

private void setInfoThreshold(TimeValue infoThreshold) {
this.infoThreshold = infoThreshold.nanos();
}

private void setDebugThreshold(TimeValue debugThreshold) {
this.debugThreshold = debugThreshold.nanos();
}

private void setTraceThreshold(TimeValue traceThreshold) {
this.traceThreshold = traceThreshold.nanos();
}

protected long getWarnThreshold() {
return warnThreshold;
}

protected long getInfoThreshold() {
return infoThreshold;
}

protected long getDebugThreshold() {
return debugThreshold;
}

protected long getTraceThreshold() {
return traceThreshold;
}

SlowLogLevel getLevel() {
return level;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,7 @@ private void executeRequest(
} else {
searchRequestOperationsListener = null;
}
searchRequestOperationsListener.onRequestStart(relativeStartNanos);

PipelinedRequest searchRequest;
ActionListener<SearchResponse> listener;
Expand Down Expand Up @@ -1216,6 +1217,8 @@ private List<SearchRequestOperationsListener> createSearchListenerList(SearchReq
searchListenersList.add(timeProvider);
}

searchListenersList.add(new SearchRequestSlowLog(clusterService));

return searchListenersList;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.opensearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction;
import org.opensearch.action.admin.indices.close.TransportCloseIndexAction;
import org.opensearch.action.search.CreatePitController;
import org.opensearch.action.search.SearchRequestSlowLog;
import org.opensearch.action.search.TransportSearchAction;
import org.opensearch.action.support.AutoCreateIndex;
import org.opensearch.action.support.DestructiveOperations;
Expand Down Expand Up @@ -677,6 +678,13 @@ public void apply(Settings value, Settings current, Settings previous) {
TaskCancellationMonitoringSettings.IS_ENABLED_SETTING,
TaskCancellationMonitoringSettings.DURATION_MILLIS_SETTING,

// Search request slow log settings
SearchRequestSlowLog.CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_WARN_SETTING,
SearchRequestSlowLog.CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_INFO_SETTING,
SearchRequestSlowLog.CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_DEBUG_SETTING,
SearchRequestSlowLog.CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_TRACE_SETTING,
SearchRequestSlowLog.CLUSTER_SEARCH_REQUEST_SLOWLOG_LEVEL,

// Remote cluster state settings
RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING,
RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public static SlowLogLevel parse(String level) {
return valueOf(level.toUpperCase(Locale.ROOT));
}

boolean isLevelEnabledFor(SlowLogLevel levelToBeUsed) {
public boolean isLevelEnabledFor(SlowLogLevel levelToBeUsed) {
// example: this.info(2) tries to log with levelToBeUsed.warn(3) - should allow
return this.specificity <= levelToBeUsed.specificity;
}
Expand Down
Loading

0 comments on commit 52026c8

Please sign in to comment.