Skip to content

Commit

Permalink
Add TopNSearchTasksLogger settings to Cluster Settings (#6716) (#7242)
Browse files Browse the repository at this point in the history
Signed-off-by: PritLadani <[email protected]>
(cherry picked from commit 9f6c067)
  • Loading branch information
PritLadani authored Apr 20, 2023
1 parent dc060a6 commit c00e0f7
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.opensearch.search.backpressure.settings.SearchTaskSettings;
import org.opensearch.tasks.TaskManager;
import org.opensearch.tasks.TaskResourceTrackingService;
import org.opensearch.tasks.consumer.TopNSearchTasksLogger;
import org.opensearch.watcher.ResourceWatcherService;
import org.opensearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction;
import org.opensearch.action.admin.indices.close.TransportCloseIndexAction;
Expand Down Expand Up @@ -601,6 +602,8 @@ public void apply(Settings value, Settings current, Settings previous) {
IndexingPressure.MAX_INDEXING_BYTES,
TaskResourceTrackingService.TASK_RESOURCE_TRACKING_ENABLED,
TaskManager.TASK_RESOURCE_CONSUMERS_ENABLED,
TopNSearchTasksLogger.LOG_TOP_QUERIES_SIZE_SETTING,
TopNSearchTasksLogger.LOG_TOP_QUERIES_FREQUENCY_SETTING,
ClusterManagerTaskThrottler.THRESHOLD_SETTINGS,
ClusterManagerTaskThrottler.BASE_DELAY_SETTINGS,
ClusterManagerTaskThrottler.MAX_DELAY_SETTINGS,
Expand Down
3 changes: 3 additions & 0 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.opensearch.search.backpressure.SearchBackpressureService;
import org.opensearch.search.backpressure.settings.SearchBackpressureSettings;
import org.opensearch.search.pipeline.SearchPipelineService;
import org.opensearch.tasks.consumer.TopNSearchTasksLogger;
import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory;
import org.opensearch.watcher.ResourceWatcherService;
import org.opensearch.Assertions;
Expand Down Expand Up @@ -860,6 +861,8 @@ protected Node(
settingsModule.getClusterSettings(),
taskHeaders
);
TopNSearchTasksLogger taskConsumer = new TopNSearchTasksLogger(settings, settingsModule.getClusterSettings());
transportService.getTaskManager().registerTaskResourceConsumer(taskConsumer);
if (FeatureFlags.isEnabled(FeatureFlags.EXTENSIONS)) {
this.extensionsManager.initializeServicesAndRestHandler(
actionModule,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ void doRun() {
}

for (TaskCancellation taskCancellation : getTaskCancellations(cancellableTasks)) {
logger.debug(
logger.warn(
"[{} mode] cancelling task [{}] due to high resource consumption [{}]",
mode.getName(),
taskCancellation.getTask().getId(),
Expand Down
8 changes: 6 additions & 2 deletions server/src/main/java/org/opensearch/tasks/TaskManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@
import org.opensearch.common.util.concurrent.ConcurrentCollections;
import org.opensearch.common.util.concurrent.ConcurrentMapLong;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.tasks.consumer.TopNSearchTasksLogger;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TcpChannel;

Expand All @@ -69,6 +68,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -148,7 +148,11 @@ public TaskManager(Settings settings, ThreadPool threadPool, Set<String> taskHea
this.taskHeaders = new ArrayList<>(taskHeaders);
this.maxHeaderSize = SETTING_HTTP_MAX_HEADER_SIZE.get(settings);
this.taskResourceConsumersEnabled = TASK_RESOURCE_CONSUMERS_ENABLED.get(settings);
this.taskResourceConsumer = Set.of(new TopNSearchTasksLogger(settings));
taskResourceConsumer = new HashSet<>();
}

public void registerTaskResourceConsumer(Consumer<Task> consumer) {
taskResourceConsumer.add(consumer);
}

public void setTaskResultsService(TaskResultsService taskResultsService) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.action.search.SearchShardTask;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
Expand All @@ -30,36 +31,41 @@
*/
public class TopNSearchTasksLogger implements Consumer<Task> {
public static final String TASK_DETAILS_LOG_PREFIX = "task.detailslog";
public static final String LOG_TOP_QUERIES_SIZE = "cluster.task.consumers.top_n.size";
public static final String LOG_TOP_QUERIES_FREQUENCY = "cluster.task.consumers.top_n.frequency";
private static final String LOG_TOP_QUERIES_SIZE = "cluster.task.consumers.top_n.size";
private static final String LOG_TOP_QUERIES_FREQUENCY = "cluster.task.consumers.top_n.frequency";

private static final Logger SEARCH_TASK_DETAILS_LOGGER = LogManager.getLogger(TASK_DETAILS_LOG_PREFIX + ".search");

// number of memory expensive search tasks that are logged
private static final Setting<Integer> LOG_TOP_QUERIES_SIZE_SETTING = Setting.intSetting(
public static final Setting<Integer> LOG_TOP_QUERIES_SIZE_SETTING = Setting.intSetting(
LOG_TOP_QUERIES_SIZE,
10,
1,
100,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

// frequency in which memory expensive search tasks are logged
private static final Setting<TimeValue> LOG_TOP_QUERIES_FREQUENCY_SETTING = Setting.timeSetting(
public static final Setting<TimeValue> LOG_TOP_QUERIES_FREQUENCY_SETTING = Setting.timeSetting(
LOG_TOP_QUERIES_FREQUENCY,
TimeValue.timeValueSeconds(60L),
TimeValue.timeValueSeconds(60L),
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

private final int topQueriesSize;
private final long topQueriesLogFrequencyInNanos;
private volatile int topQueriesSize;
private volatile long topQueriesLogFrequencyInNanos;
private final Queue<Tuple<Long, SearchShardTask>> topQueries;
private long lastReportedTimeInNanos = System.nanoTime();

public TopNSearchTasksLogger(Settings settings) {
public TopNSearchTasksLogger(Settings settings, ClusterSettings clusterSettings) {
this.topQueriesSize = LOG_TOP_QUERIES_SIZE_SETTING.get(settings);
this.topQueriesLogFrequencyInNanos = LOG_TOP_QUERIES_FREQUENCY_SETTING.get(settings).getNanos();
this.topQueries = new PriorityQueue<>(topQueriesSize, Comparator.comparingLong(Tuple::v1));
clusterSettings.addSettingsUpdateConsumer(LOG_TOP_QUERIES_SIZE_SETTING, this::setLogTopQueriesSize);
clusterSettings.addSettingsUpdateConsumer(LOG_TOP_QUERIES_FREQUENCY_SETTING, this::setTopQueriesLogFrequencyInNanos);
}

/**
Expand All @@ -78,11 +84,12 @@ private synchronized void recordSearchTask(SearchShardTask searchTask) {
publishTopNEvents();
lastReportedTimeInNanos = System.nanoTime();
}
if (topQueries.size() >= topQueriesSize && topQueries.peek().v1() < memory_in_bytes) {
int topQSize = topQueriesSize;
if (topQueries.size() >= topQSize && topQueries.peek().v1() < memory_in_bytes) {
// evict the element
topQueries.poll();
}
if (topQueries.size() < topQueriesSize) {
if (topQueries.size() < topQSize) {
topQueries.offer(new Tuple<>(memory_in_bytes, searchTask));
}
}
Expand All @@ -97,4 +104,12 @@ private void logTopResourceConsumingQueries() {
SEARCH_TASK_DETAILS_LOGGER.info(new SearchShardTaskDetailsLogMessage(topQuery.v2()));
}
}

private void setLogTopQueriesSize(int topQueriesSize) {
this.topQueriesSize = topQueriesSize;
}

void setTopQueriesLogFrequencyInNanos(TimeValue timeValue) {
this.topQueriesLogFrequencyInNanos = timeValue.getNanos();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,15 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.LogEvent;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.opensearch.action.search.SearchShardTask;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.logging.MockAppender;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.tasks.ResourceStats;
import org.opensearch.tasks.ResourceStatsType;
import org.opensearch.tasks.ResourceUsageMetric;
Expand All @@ -26,8 +29,9 @@

import java.util.Collections;

import static org.opensearch.tasks.consumer.TopNSearchTasksLogger.LOG_TOP_QUERIES_FREQUENCY;
import static org.opensearch.tasks.consumer.TopNSearchTasksLogger.LOG_TOP_QUERIES_SIZE;
import static org.opensearch.tasks.consumer.TopNSearchTasksLogger.LOG_TOP_QUERIES_SIZE_SETTING;
import static org.opensearch.tasks.consumer.TopNSearchTasksLogger.LOG_TOP_QUERIES_FREQUENCY_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

public class TopNSearchTasksLoggerTests extends OpenSearchSingleNodeTestCase {
static MockAppender appender;
Expand All @@ -42,15 +46,34 @@ public static void init() throws IllegalAccessException {
Loggers.addAppender(searchLogger, appender);
}

@After
public void cleanupAfterTest() {
assertAcked(
client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(Settings.builder().putNull("*"))
.setTransientSettings(Settings.builder().putNull("*"))
);
}

@AfterClass
public static void cleanup() {
Loggers.removeAppender(searchLogger, appender);
appender.stop();
}

public void testLoggerWithTasks() {
final Settings settings = Settings.builder().put(LOG_TOP_QUERIES_SIZE, 1).put(LOG_TOP_QUERIES_FREQUENCY, "0ms").build();
topNSearchTasksLogger = new TopNSearchTasksLogger(settings);
final Settings settings = Settings.builder()
.put(LOG_TOP_QUERIES_SIZE_SETTING.getKey(), 1)
.put(LOG_TOP_QUERIES_FREQUENCY_SETTING.getKey(), "60s")
.build();
topNSearchTasksLogger = new TopNSearchTasksLogger(
settings,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
);
// This setting overrides is just for testing purpose
topNSearchTasksLogger.setTopQueriesLogFrequencyInNanos(TimeValue.timeValueMillis(0));
generateTasks(5);
LogEvent logEvent = appender.getLastEventAndReset();
assertNotNull(logEvent);
Expand All @@ -59,16 +82,28 @@ public void testLoggerWithTasks() {
}

public void testLoggerWithoutTasks() {
final Settings settings = Settings.builder().put(LOG_TOP_QUERIES_SIZE, 1).put(LOG_TOP_QUERIES_FREQUENCY, "500ms").build();
topNSearchTasksLogger = new TopNSearchTasksLogger(settings);
final Settings settings = Settings.builder()
.put(LOG_TOP_QUERIES_SIZE_SETTING.getKey(), 1)
.put(LOG_TOP_QUERIES_FREQUENCY_SETTING.getKey(), "60s")
.build();
topNSearchTasksLogger = new TopNSearchTasksLogger(
settings,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
);

assertNull(appender.getLastEventAndReset());
}

public void testLoggerWithHighFrequency() {
// setting the frequency to a really large value and confirming that nothing gets written to log file.
final Settings settings = Settings.builder().put(LOG_TOP_QUERIES_SIZE, 1).put(LOG_TOP_QUERIES_FREQUENCY, "10m").build();
topNSearchTasksLogger = new TopNSearchTasksLogger(settings);
final Settings settings = Settings.builder()
.put(LOG_TOP_QUERIES_SIZE_SETTING.getKey(), 1)
.put(LOG_TOP_QUERIES_FREQUENCY_SETTING.getKey(), "10m")
.build();
topNSearchTasksLogger = new TopNSearchTasksLogger(
settings,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
);
generateTasks(5);
generateTasks(2);

Expand Down

0 comments on commit c00e0f7

Please sign in to comment.