diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 0b03ba8239cae..0c3638de7e810 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -50,6 +50,7 @@ import org.opensearch.search.backpressure.settings.SearchTaskSettings; import org.opensearch.tasks.TaskManager; import org.opensearch.tasks.TaskResourceTrackingService; +import org.opensearch.tasks.consumer.TopNSearchTasksLogger; import org.opensearch.watcher.ResourceWatcherService; import org.opensearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction; import org.opensearch.action.admin.indices.close.TransportCloseIndexAction; @@ -601,6 +602,8 @@ public void apply(Settings value, Settings current, Settings previous) { IndexingPressure.MAX_INDEXING_BYTES, TaskResourceTrackingService.TASK_RESOURCE_TRACKING_ENABLED, TaskManager.TASK_RESOURCE_CONSUMERS_ENABLED, + TopNSearchTasksLogger.LOG_TOP_QUERIES_SIZE_SETTING, + TopNSearchTasksLogger.LOG_TOP_QUERIES_FREQUENCY_SETTING, ClusterManagerTaskThrottler.THRESHOLD_SETTINGS, ClusterManagerTaskThrottler.BASE_DELAY_SETTINGS, ClusterManagerTaskThrottler.MAX_DELAY_SETTINGS, diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 3331404e1d1cc..f399932c2b9a9 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -60,6 +60,7 @@ import org.opensearch.search.backpressure.SearchBackpressureService; import org.opensearch.search.backpressure.settings.SearchBackpressureSettings; import org.opensearch.search.pipeline.SearchPipelineService; +import org.opensearch.tasks.consumer.TopNSearchTasksLogger; import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory; import org.opensearch.watcher.ResourceWatcherService; import org.opensearch.Assertions; @@ -860,6 +861,8 @@ protected Node( settingsModule.getClusterSettings(), taskHeaders ); + TopNSearchTasksLogger taskConsumer = new TopNSearchTasksLogger(settings, settingsModule.getClusterSettings()); + transportService.getTaskManager().registerTaskResourceConsumer(taskConsumer); if (FeatureFlags.isEnabled(FeatureFlags.EXTENSIONS)) { this.extensionsManager.initializeServicesAndRestHandler( actionModule, diff --git a/server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureService.java b/server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureService.java index b6ed95b041766..246078e7a8eda 100644 --- a/server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureService.java +++ b/server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureService.java @@ -197,7 +197,7 @@ void doRun() { } for (TaskCancellation taskCancellation : getTaskCancellations(cancellableTasks)) { - logger.debug( + logger.warn( "[{} mode] cancelling task [{}] due to high resource consumption [{}]", mode.getName(), taskCancellation.getTask().getId(), diff --git a/server/src/main/java/org/opensearch/tasks/TaskManager.java b/server/src/main/java/org/opensearch/tasks/TaskManager.java index ff760219716e6..23841196b5cde 100644 --- a/server/src/main/java/org/opensearch/tasks/TaskManager.java +++ b/server/src/main/java/org/opensearch/tasks/TaskManager.java @@ -60,7 +60,6 @@ import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.common.util.concurrent.ConcurrentMapLong; import org.opensearch.common.util.concurrent.ThreadContext; -import org.opensearch.tasks.consumer.TopNSearchTasksLogger; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TcpChannel; @@ -69,6 +68,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -148,7 +148,11 @@ public TaskManager(Settings settings, ThreadPool threadPool, Set taskHea this.taskHeaders = new ArrayList<>(taskHeaders); this.maxHeaderSize = SETTING_HTTP_MAX_HEADER_SIZE.get(settings); this.taskResourceConsumersEnabled = TASK_RESOURCE_CONSUMERS_ENABLED.get(settings); - this.taskResourceConsumer = Set.of(new TopNSearchTasksLogger(settings)); + taskResourceConsumer = new HashSet<>(); + } + + public void registerTaskResourceConsumer(Consumer consumer) { + taskResourceConsumer.add(consumer); } public void setTaskResultsService(TaskResultsService taskResultsService) { diff --git a/server/src/main/java/org/opensearch/tasks/consumer/TopNSearchTasksLogger.java b/server/src/main/java/org/opensearch/tasks/consumer/TopNSearchTasksLogger.java index dd7e200d7f4b2..21cde24bd541d 100644 --- a/server/src/main/java/org/opensearch/tasks/consumer/TopNSearchTasksLogger.java +++ b/server/src/main/java/org/opensearch/tasks/consumer/TopNSearchTasksLogger.java @@ -12,6 +12,7 @@ import org.apache.logging.log4j.Logger; import org.opensearch.action.search.SearchShardTask; import org.opensearch.common.collect.Tuple; +import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; @@ -30,36 +31,41 @@ */ public class TopNSearchTasksLogger implements Consumer { public static final String TASK_DETAILS_LOG_PREFIX = "task.detailslog"; - public static final String LOG_TOP_QUERIES_SIZE = "cluster.task.consumers.top_n.size"; - public static final String LOG_TOP_QUERIES_FREQUENCY = "cluster.task.consumers.top_n.frequency"; + private static final String LOG_TOP_QUERIES_SIZE = "cluster.task.consumers.top_n.size"; + private static final String LOG_TOP_QUERIES_FREQUENCY = "cluster.task.consumers.top_n.frequency"; private static final Logger SEARCH_TASK_DETAILS_LOGGER = LogManager.getLogger(TASK_DETAILS_LOG_PREFIX + ".search"); // number of memory expensive search tasks that are logged - private static final Setting LOG_TOP_QUERIES_SIZE_SETTING = Setting.intSetting( + public static final Setting LOG_TOP_QUERIES_SIZE_SETTING = Setting.intSetting( LOG_TOP_QUERIES_SIZE, 10, + 1, + 100, Setting.Property.Dynamic, Setting.Property.NodeScope ); // frequency in which memory expensive search tasks are logged - private static final Setting LOG_TOP_QUERIES_FREQUENCY_SETTING = Setting.timeSetting( + public static final Setting LOG_TOP_QUERIES_FREQUENCY_SETTING = Setting.timeSetting( LOG_TOP_QUERIES_FREQUENCY, TimeValue.timeValueSeconds(60L), + TimeValue.timeValueSeconds(60L), Setting.Property.Dynamic, Setting.Property.NodeScope ); - private final int topQueriesSize; - private final long topQueriesLogFrequencyInNanos; + private volatile int topQueriesSize; + private volatile long topQueriesLogFrequencyInNanos; private final Queue> topQueries; private long lastReportedTimeInNanos = System.nanoTime(); - public TopNSearchTasksLogger(Settings settings) { + public TopNSearchTasksLogger(Settings settings, ClusterSettings clusterSettings) { this.topQueriesSize = LOG_TOP_QUERIES_SIZE_SETTING.get(settings); this.topQueriesLogFrequencyInNanos = LOG_TOP_QUERIES_FREQUENCY_SETTING.get(settings).getNanos(); this.topQueries = new PriorityQueue<>(topQueriesSize, Comparator.comparingLong(Tuple::v1)); + clusterSettings.addSettingsUpdateConsumer(LOG_TOP_QUERIES_SIZE_SETTING, this::setLogTopQueriesSize); + clusterSettings.addSettingsUpdateConsumer(LOG_TOP_QUERIES_FREQUENCY_SETTING, this::setTopQueriesLogFrequencyInNanos); } /** @@ -78,11 +84,12 @@ private synchronized void recordSearchTask(SearchShardTask searchTask) { publishTopNEvents(); lastReportedTimeInNanos = System.nanoTime(); } - if (topQueries.size() >= topQueriesSize && topQueries.peek().v1() < memory_in_bytes) { + int topQSize = topQueriesSize; + if (topQueries.size() >= topQSize && topQueries.peek().v1() < memory_in_bytes) { // evict the element topQueries.poll(); } - if (topQueries.size() < topQueriesSize) { + if (topQueries.size() < topQSize) { topQueries.offer(new Tuple<>(memory_in_bytes, searchTask)); } } @@ -97,4 +104,12 @@ private void logTopResourceConsumingQueries() { SEARCH_TASK_DETAILS_LOGGER.info(new SearchShardTaskDetailsLogMessage(topQuery.v2())); } } + + private void setLogTopQueriesSize(int topQueriesSize) { + this.topQueriesSize = topQueriesSize; + } + + void setTopQueriesLogFrequencyInNanos(TimeValue timeValue) { + this.topQueriesLogFrequencyInNanos = timeValue.getNanos(); + } } diff --git a/server/src/test/java/org/opensearch/tasks/consumer/TopNSearchTasksLoggerTests.java b/server/src/test/java/org/opensearch/tasks/consumer/TopNSearchTasksLoggerTests.java index a8fd3623ef09d..73f9f7c0e08cc 100644 --- a/server/src/test/java/org/opensearch/tasks/consumer/TopNSearchTasksLoggerTests.java +++ b/server/src/test/java/org/opensearch/tasks/consumer/TopNSearchTasksLoggerTests.java @@ -12,12 +12,15 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.core.LogEvent; +import org.junit.After; import org.junit.AfterClass; import org.junit.BeforeClass; import org.opensearch.action.search.SearchShardTask; import org.opensearch.common.logging.Loggers; import org.opensearch.common.logging.MockAppender; +import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; import org.opensearch.tasks.ResourceStats; import org.opensearch.tasks.ResourceStatsType; import org.opensearch.tasks.ResourceUsageMetric; @@ -26,8 +29,9 @@ import java.util.Collections; -import static org.opensearch.tasks.consumer.TopNSearchTasksLogger.LOG_TOP_QUERIES_FREQUENCY; -import static org.opensearch.tasks.consumer.TopNSearchTasksLogger.LOG_TOP_QUERIES_SIZE; +import static org.opensearch.tasks.consumer.TopNSearchTasksLogger.LOG_TOP_QUERIES_SIZE_SETTING; +import static org.opensearch.tasks.consumer.TopNSearchTasksLogger.LOG_TOP_QUERIES_FREQUENCY_SETTING; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; public class TopNSearchTasksLoggerTests extends OpenSearchSingleNodeTestCase { static MockAppender appender; @@ -42,6 +46,17 @@ public static void init() throws IllegalAccessException { Loggers.addAppender(searchLogger, appender); } + @After + public void cleanupAfterTest() { + assertAcked( + client().admin() + .cluster() + .prepareUpdateSettings() + .setPersistentSettings(Settings.builder().putNull("*")) + .setTransientSettings(Settings.builder().putNull("*")) + ); + } + @AfterClass public static void cleanup() { Loggers.removeAppender(searchLogger, appender); @@ -49,8 +64,16 @@ public static void cleanup() { } public void testLoggerWithTasks() { - final Settings settings = Settings.builder().put(LOG_TOP_QUERIES_SIZE, 1).put(LOG_TOP_QUERIES_FREQUENCY, "0ms").build(); - topNSearchTasksLogger = new TopNSearchTasksLogger(settings); + final Settings settings = Settings.builder() + .put(LOG_TOP_QUERIES_SIZE_SETTING.getKey(), 1) + .put(LOG_TOP_QUERIES_FREQUENCY_SETTING.getKey(), "60s") + .build(); + topNSearchTasksLogger = new TopNSearchTasksLogger( + settings, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) + ); + // This setting overrides is just for testing purpose + topNSearchTasksLogger.setTopQueriesLogFrequencyInNanos(TimeValue.timeValueMillis(0)); generateTasks(5); LogEvent logEvent = appender.getLastEventAndReset(); assertNotNull(logEvent); @@ -59,16 +82,28 @@ public void testLoggerWithTasks() { } public void testLoggerWithoutTasks() { - final Settings settings = Settings.builder().put(LOG_TOP_QUERIES_SIZE, 1).put(LOG_TOP_QUERIES_FREQUENCY, "500ms").build(); - topNSearchTasksLogger = new TopNSearchTasksLogger(settings); + final Settings settings = Settings.builder() + .put(LOG_TOP_QUERIES_SIZE_SETTING.getKey(), 1) + .put(LOG_TOP_QUERIES_FREQUENCY_SETTING.getKey(), "60s") + .build(); + topNSearchTasksLogger = new TopNSearchTasksLogger( + settings, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) + ); assertNull(appender.getLastEventAndReset()); } public void testLoggerWithHighFrequency() { // setting the frequency to a really large value and confirming that nothing gets written to log file. - final Settings settings = Settings.builder().put(LOG_TOP_QUERIES_SIZE, 1).put(LOG_TOP_QUERIES_FREQUENCY, "10m").build(); - topNSearchTasksLogger = new TopNSearchTasksLogger(settings); + final Settings settings = Settings.builder() + .put(LOG_TOP_QUERIES_SIZE_SETTING.getKey(), 1) + .put(LOG_TOP_QUERIES_FREQUENCY_SETTING.getKey(), "10m") + .build(); + topNSearchTasksLogger = new TopNSearchTasksLogger( + settings, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) + ); generateTasks(5); generateTasks(2);