Skip to content

Commit

Permalink
make the logic to decide enabled self-contained in each listener
Browse files Browse the repository at this point in the history
Signed-off-by: Chenyang Ji <[email protected]>
  • Loading branch information
ansjcy committed Dec 18, 2023
1 parent 3448338 commit cd4f310
Show file tree
Hide file tree
Showing 10 changed files with 52 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,12 @@
package org.opensearch.action.search;

import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Setting;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;


/**
Expand All @@ -27,21 +26,9 @@
* @opensearch.internal
*/
public class SearchRequestListenerManager {

private final ClusterService clusterService;
public static final String SEARCH_PHASE_TOOK_ENABLED_KEY = "search.phase_took_enabled";
public static final Setting<Boolean> SEARCH_PHASE_TOOK_ENABLED = Setting.boolSetting(
SEARCH_PHASE_TOOK_ENABLED_KEY,
false,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);
private final List<SearchRequestOperationsListener> searchRequestListenersList;

public SearchRequestListenerManager(
ClusterService clusterService
) {
this.clusterService = clusterService;
public SearchRequestListenerManager() {
searchRequestListenersList = new ArrayList<>();
}

Expand Down Expand Up @@ -95,30 +82,18 @@ public List<SearchRequestOperationsListener> getListeners() {
* Create the {@link SearchRequestOperationsListener.CompositeListener}
* with the all listeners enabled at cluster-level and request-level.
*
* @param searchRequest The SearchRequest object. SearchRequestListenerManager will decide which request-level listeners to add based on states/flags of the request
* @param logger Logger to be attached to the {@link SearchRequestOperationsListener.CompositeListener}
* @param perRequestListeners the per-request listeners that can be optionally added to the returned CompositeListener list.
* @return SearchRequestOperationsListener.CompositeListener
*/
public SearchRequestOperationsListener.CompositeListener buildCompositeListener(
SearchRequest searchRequest,
Logger logger,
SearchRequestOperationsListener... perRequestListeners
) {
final List<SearchRequestOperationsListener> searchListenersList = searchRequestListenersList.stream().filter(SearchRequestOperationsListener::getEnabled).collect(Collectors.toList());
final List<SearchRequestOperationsListener> searchListenersList = Stream.concat(searchRequestListenersList.stream(), Arrays.stream(perRequestListeners))
.filter(SearchRequestOperationsListener::getEnabled)
.collect(Collectors.toList());

Arrays.stream(perRequestListeners).forEach((listener) -> {
if (listener != null && listener.getClass() == TransportSearchAction.SearchTimeProvider.class) {
TransportSearchAction.SearchTimeProvider timeProvider = (TransportSearchAction.SearchTimeProvider) listener;
// phase_took is enabled with request param and/or cluster setting
boolean phaseTookEnabled = (searchRequest.isPhaseTook() != null && searchRequest.isPhaseTook()) ||
clusterService.getClusterSettings().get(SEARCH_PHASE_TOOK_ENABLED);
if (phaseTookEnabled) {
timeProvider.setPhaseTook(true);
searchListenersList.add(timeProvider);
}
}
});
return new SearchRequestOperationsListener.CompositeListener(searchListenersList, logger);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ void onRequestStart(SearchRequestContext searchRequestContext) {}

void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {}


boolean getEnabled() {
return enabled;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,7 @@ public final class SearchRequestSlowLog extends SearchRequestOperationsListener

private static final ToXContent.Params FORMAT_PARAMS = new ToXContent.MapParams(Collections.singletonMap("pretty", "false"));

public SearchRequestSlowLog(
ClusterService clusterService
) {
public SearchRequestSlowLog(ClusterService clusterService) {
this(clusterService, LogManager.getLogger(CLUSTER_SEARCH_REQUEST_SLOWLOG_PREFIX)); // logger configured in log4j2.properties
}

Expand Down Expand Up @@ -235,22 +233,22 @@ private static String escapeJson(String text) {

void setWarnThreshold(TimeValue warnThreshold) {
this.warnThreshold = warnThreshold.nanos();
setEnabled();
setEnabledIfThresholdExceed();
}

void setInfoThreshold(TimeValue infoThreshold) {
this.infoThreshold = infoThreshold.nanos();
setEnabled();
setEnabledIfThresholdExceed();
}

void setDebugThreshold(TimeValue debugThreshold) {
this.debugThreshold = debugThreshold.nanos();
setEnabled();
setEnabledIfThresholdExceed();
}

void setTraceThreshold(TimeValue traceThreshold) {
this.traceThreshold = traceThreshold.nanos();
setEnabled();
setEnabledIfThresholdExceed();
}

void setLevel(SlowLogLevel level) {
Expand All @@ -277,7 +275,7 @@ SlowLogLevel getLevel() {
return level;
}

private void setEnabled() {
private void setEnabledIfThresholdExceed() {
super.setEnabled(this.warnThreshold >= 0
|| this.debugThreshold >= 0
|| this.infoThreshold >= 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,7 @@ public final class SearchRequestStats extends SearchRequestOperationsListener {
);

@Inject
public SearchRequestStats(
ClusterService clusterService
) {
public SearchRequestStats(ClusterService clusterService) {
clusterService.getClusterSettings().addSettingsUpdateConsumer(SEARCH_REQUEST_STATS_ENABLED, this::setEnabled);
for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) {
phaseStatsMap.put(searchPhaseName, new StatsHolder());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,14 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
Setting.Property.NodeScope
);

public static final String SEARCH_PHASE_TOOK_ENABLED_KEY = "search.phase_took_enabled";
public static final Setting<Boolean> SEARCH_PHASE_TOOK_ENABLED = Setting.boolSetting(
SEARCH_PHASE_TOOK_ENABLED_KEY,
false,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

private final NodeClient client;
private final ThreadPool threadPool;
private final ClusterService clusterService;
Expand Down Expand Up @@ -277,7 +285,6 @@ static final class SearchTimeProvider extends SearchRequestOperationsListener {
private final long absoluteStartMillis;
private final long relativeStartNanos;
private final LongSupplier relativeCurrentNanosProvider;
private boolean phaseTook = false;

/**
* Instantiates a new search time provider. The absolute start time is the real clock time
Expand All @@ -304,12 +311,8 @@ long buildTookInMillis() {
return TimeUnit.NANOSECONDS.toMillis(relativeCurrentNanosProvider.getAsLong() - relativeStartNanos);
}

public void setPhaseTook(boolean phaseTook) {
this.phaseTook = phaseTook;
}

SearchResponse.PhaseTook getPhaseTook() {
if (phaseTook) {
if (getEnabled()) {
Map<String, Long> phaseTookMap = new HashMap<>();
// Convert Map<SearchPhaseName, Long> to Map<String, Long> for SearchResponse()
for (SearchPhaseName searchPhaseName : phaseStatsMap.keySet()) {
Expand All @@ -323,6 +326,22 @@ SearchResponse.PhaseTook getPhaseTook() {

Map<SearchPhaseName, Long> phaseStatsMap = new EnumMap<>(SearchPhaseName.class);

/**
* Set if this listener is enabled based on the cluster level setting
* and per request enable flags.
*
* @param enabledAtClusterLevel if the SearchTimeProvider listener is enabled at cluster level
* @param searchRequest the original Search Request
* @opensearch.internal
*/

void setEnabled(boolean enabledAtClusterLevel, SearchRequest searchRequest) {
// phase_took is enabled wi th request param and/or cluster setting
super.setEnabled(
enabledAtClusterLevel || (searchRequest.isPhaseTook() != null && searchRequest.isPhaseTook())
);
}

@Override
void onPhaseStart(SearchPhaseContext context) {}

Expand Down Expand Up @@ -462,8 +481,11 @@ private void executeRequest(
relativeStartNanos,
System::nanoTime
);
timeProvider.setEnabled(
clusterService.getClusterSettings().get(SEARCH_PHASE_TOOK_ENABLED),
originalSearchRequest
);
SearchRequestOperationsListener.CompositeListener requestOperationsListeners = searchRequestListenerManager.buildCompositeListener(
originalSearchRequest,
logger,
timeProvider
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ public void apply(Settings value, Settings current, Settings previous) {
TransportSearchAction.SHARD_COUNT_LIMIT_SETTING,
TransportSearchAction.SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING,
TransportSearchAction.SEARCH_QUERY_METRICS_ENABLED_SETTING,
SearchRequestListenerManager.SEARCH_PHASE_TOOK_ENABLED,
TransportSearchAction.SEARCH_PHASE_TOOK_ENABLED,
SearchRequestStats.SEARCH_REQUEST_STATS_ENABLED,
RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE,
SniffConnectionStrategy.REMOTE_CONNECTIONS_PER_CLUSTER,
Expand Down
2 changes: 1 addition & 1 deletion server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -788,7 +788,7 @@ protected Node(
final SearchRequestSlowLog searchRequestSlowLog = new SearchRequestSlowLog(clusterService);

// register all standard SearchRequestOperationsListeners to the SearchRequestListenerManager
final SearchRequestListenerManager searchRequestListenerManager = new SearchRequestListenerManager(clusterService);
final SearchRequestListenerManager searchRequestListenerManager = new SearchRequestListenerManager();
searchRequestListenerManager.addListeners(
searchRequestStats,
searchRequestSlowLog
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,15 @@

public class SearchRequestListenerManagerTests extends OpenSearchTestCase {
public void testAddAndGetListeners() {
ClusterService clusterService = new ClusterService(
Settings.EMPTY,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
null
);
SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService);
SearchRequestListenerManager listenerManager = new SearchRequestListenerManager();
SearchRequestOperationsListener testListener = createTestSearchRequestOperationsListener();
listenerManager.addListeners(testListener);
assertEquals(1, listenerManager.getListeners().size());
assertEquals(testListener, listenerManager.getListeners().get(0));
}

public void testRemoveListeners() {
ClusterService clusterService = new ClusterService(
Settings.EMPTY,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
null
);
SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService);
SearchRequestListenerManager listenerManager = new SearchRequestListenerManager();
SearchRequestOperationsListener testListener1 = createTestSearchRequestOperationsListener();
SearchRequestOperationsListener testListener2 = createTestSearchRequestOperationsListener();
listenerManager.addListeners(testListener1, testListener2);
Expand All @@ -50,22 +40,14 @@ public void testRemoveListeners() {
}

public void testStandardListenersEnabled() throws NoSuchFieldException, IllegalAccessException {
ClusterService clusterService = new ClusterService(
Settings.EMPTY,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
null
);
SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService);
SearchRequestListenerManager listenerManager = new SearchRequestListenerManager();
SearchRequestOperationsListener testListener1 = createTestSearchRequestOperationsListener();
SearchRequestOperationsListener testListener2 = createTestSearchRequestOperationsListener();
testListener2.setEnabled(true);
listenerManager.addListeners(testListener1, testListener2);
SearchSourceBuilder source = SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery());
SearchRequest searchRequest = new SearchRequest().source(source);
SearchRequestOperationsListener.CompositeListener compositeListener = listenerManager.buildCompositeListener(
searchRequest,
logger
);
SearchRequestOperationsListener.CompositeListener compositeListener = listenerManager.buildCompositeListener(logger);
Field listenersField = SearchRequestOperationsListener.CompositeListener.class.getDeclaredField("listeners");
listenersField.setAccessible(true);
List<SearchRequestOperationsListener> listeners = (List<SearchRequestOperationsListener>) listenersField.get(compositeListener);
Expand All @@ -77,12 +59,7 @@ public void testStandardListenersEnabled() throws NoSuchFieldException, IllegalA
}

public void testStandardListenersAndTimeProvider() throws NoSuchFieldException, IllegalAccessException {
ClusterService clusterService = new ClusterService(
Settings.EMPTY,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
null
);
SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService);
SearchRequestListenerManager listenerManager = new SearchRequestListenerManager();
SearchRequestOperationsListener testListener1 = createTestSearchRequestOperationsListener();
testListener1.setEnabled(true);
SearchRequestOperationsListener timeProviderListener = new TransportSearchAction.SearchTimeProvider(
Expand All @@ -95,7 +72,6 @@ public void testStandardListenersAndTimeProvider() throws NoSuchFieldException,
SearchRequest searchRequest = new SearchRequest().source(source);
searchRequest.setPhaseTook(true);
SearchRequestOperationsListener.CompositeListener compositeListener = listenerManager.buildCompositeListener(
searchRequest,
logger,
timeProviderListener
);
Expand All @@ -110,12 +86,7 @@ public void testStandardListenersAndTimeProvider() throws NoSuchFieldException,
}

public void testStandardListenersDisabledAndTimeProvider() throws NoSuchFieldException, IllegalAccessException {
ClusterService clusterService = new ClusterService(
Settings.EMPTY,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
null
);
SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService);
SearchRequestListenerManager listenerManager = new SearchRequestListenerManager();
SearchRequestOperationsListener testListener1 = createTestSearchRequestOperationsListener();
SearchRequestOperationsListener timeProviderListener = new TransportSearchAction.SearchTimeProvider(
0,
Expand All @@ -127,7 +98,6 @@ public void testStandardListenersDisabledAndTimeProvider() throws NoSuchFieldExc
SearchRequest searchRequest = new SearchRequest().source(source);
searchRequest.setPhaseTook(true);
SearchRequestOperationsListener.CompositeListener compositeListener = listenerManager.buildCompositeListener(
searchRequest,
logger,
timeProviderListener
);
Expand All @@ -142,12 +112,7 @@ public void testStandardListenersDisabledAndTimeProvider() throws NoSuchFieldExc
}

public void testStandardListenerAndTimeProviderDisabled() throws NoSuchFieldException, IllegalAccessException {
ClusterService clusterService = new ClusterService(
Settings.EMPTY,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
null
);
SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService);
SearchRequestListenerManager listenerManager = new SearchRequestListenerManager();
SearchRequestOperationsListener testListener1 = createTestSearchRequestOperationsListener();
testListener1.setEnabled(true);
SearchRequestOperationsListener timeProviderListener = new TransportSearchAction.SearchTimeProvider(
Expand All @@ -160,7 +125,6 @@ public void testStandardListenerAndTimeProviderDisabled() throws NoSuchFieldExce
SearchRequest searchRequest = new SearchRequest().source(source);
searchRequest.setPhaseTook(false);
SearchRequestOperationsListener.CompositeListener compositeListener = listenerManager.buildCompositeListener(
searchRequest,
logger,
timeProviderListener
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public void testMultipleSlowLoggersUseSingleLog4jLogger() {
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
null
);
SearchRequestListenerManager listenerManager2 = new SearchRequestListenerManager(clusterService2);
SearchRequestListenerManager listenerManager2 = new SearchRequestListenerManager();
SearchRequestSlowLog searchRequestSlowLog2 = new SearchRequestSlowLog(clusterService2);

int numberOfLoggersAfter = context.getLoggers().size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2285,7 +2285,7 @@ public void onFailure(final Exception e) {
writableRegistry(),
searchService::aggReduceContextBuilder
);
SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService);
SearchRequestListenerManager listenerManager = new SearchRequestListenerManager();
actions.put(
SearchAction.INSTANCE,
new TransportSearchAction(
Expand Down

0 comments on commit cd4f310

Please sign in to comment.