From d7e89dd121b1fca8b0e2409631ca9451a37be811 Mon Sep 17 00:00:00 2001 From: David Roberts Date: Mon, 2 Oct 2023 21:03:18 +0100 Subject: [PATCH] [ML] Log warnings for jobs unassigned for a long time If a job is unassigned for a long time (say more than 15 minutes) then that's a sign of a potential problem with the cluster. In Cloud it may be an indication of a failure of autoscaling. In self-managed it may be an indication of a failed node not being replaced. Either way, warning that the situation exists in periodic log messages should make it easier for operators to detect the situation and attempt to remedy it. --- .../xpack/ml/MlAssignmentNotifier.java | 112 ++++++++++++++++++ .../xpack/ml/MlAssignmentNotifierTests.java | 76 +++++++++++- 2 files changed, 186 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java index c9f338848fe57..2b5c5c85b89e0 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java @@ -9,6 +9,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -22,27 +23,55 @@ import org.elasticsearch.xpack.core.ml.action.OpenJobAction; import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction; import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; +import org.elasticsearch.xpack.core.ml.utils.MlTaskParams; import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor; import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor; +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Objects; public class MlAssignmentNotifier implements ClusterStateListener { private static final Logger logger = LogManager.getLogger(MlAssignmentNotifier.class); + static final Duration MIN_LOG_CHECK_INTERVAL = Duration.ofSeconds(30); + static final Duration LONG_TIME_UNASSIGNED_INTERVAL = Duration.ofMinutes(15); + static final Duration MIN_REPORT_INTERVAL = Duration.ofHours(6); + private final AnomalyDetectionAuditor anomalyDetectionAuditor; private final DataFrameAnalyticsAuditor dataFrameAnalyticsAuditor; private final ThreadPool threadPool; + private final Clock clock; + private Map unassignedInfoByTask = Map.of(); + private volatile Instant lastLogCheck; MlAssignmentNotifier( AnomalyDetectionAuditor anomalyDetectionAuditor, DataFrameAnalyticsAuditor dataFrameAnalyticsAuditor, ThreadPool threadPool, ClusterService clusterService + ) { + this(anomalyDetectionAuditor, dataFrameAnalyticsAuditor, threadPool, clusterService, Clock.systemUTC()); + } + + MlAssignmentNotifier( + AnomalyDetectionAuditor anomalyDetectionAuditor, + DataFrameAnalyticsAuditor dataFrameAnalyticsAuditor, + ThreadPool threadPool, + ClusterService clusterService, + Clock clock ) { this.anomalyDetectionAuditor = anomalyDetectionAuditor; this.dataFrameAnalyticsAuditor = dataFrameAnalyticsAuditor; this.threadPool = threadPool; + this.clock = clock; + this.lastLogCheck = clock.instant(); clusterService.addListener(this); } @@ -54,9 +83,16 @@ private String executorName() { public void clusterChanged(ClusterChangedEvent event) { if (event.localNodeMaster() == false) { + unassignedInfoByTask = Map.of(); return; } + Instant now = clock.instant(); + if (lastLogCheck.plus(MIN_LOG_CHECK_INTERVAL).isBefore(now)) { + lastLogCheck = now; + threadPool.executor(executorName()).execute(() -> logLongTimeUnassigned(now, event.state())); + } + if (event.metadataChanged() == false) { return; } @@ -223,4 +259,80 @@ static String nodeName(DiscoveryNodes nodes, String nodeId) { } return nodeId; } + + private void logLongTimeUnassigned(Instant now, ClusterState state) { + PersistentTasksCustomMetadata tasks = state.getMetadata().custom(PersistentTasksCustomMetadata.TYPE); + if (tasks == null) { + return; + } + + List itemsToReport = findLongTimeUnassignedTasks(now, tasks); + + logger.warn("ML persistent tasks unassigned for a long time [{}]", String.join("|", itemsToReport)); + } + + /** + * Creates a list of items to be logged to report ML job tasks that: + * 1. Have been unassigned for a long time + * 2. Have not been logged recently (to avoid log spam) + *

+ * Only report on jobs, not datafeeds, on the assumption that jobs and their corresponding + * datafeeds get assigned together. This may miss some obscure edge cases, but will avoid + * the verbose and confusing messages that the duplication between jobs and datafeeds would + * generally cause. + *

+ * The time intervals used in this reporting reset each time the master node changes, as + * the data structure used to record the information is in memory on the current master node, + * not in cluster state. + */ + synchronized List findLongTimeUnassignedTasks(Instant now, PersistentTasksCustomMetadata tasks) { + + assert tasks != null; + + final List itemsToReport = new ArrayList<>(); + final Map oldUnassignedInfoByTask = unassignedInfoByTask; + final Map newUnassignedInfoByTask = new HashMap<>(); + + for (PersistentTask task : tasks.tasks()) { + if (task.getExecutorNode() == null) { + final String taskName = task.getTaskName(); + if (MlTasks.JOB_TASK_NAME.equals(taskName) || MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME.equals(taskName)) { + final String mlId = ((MlTaskParams) task.getParams()).getMlId(); + final TaskNameAndId key = new TaskNameAndId(taskName, mlId); + final UnassignedTimeAndReportTime previousInfo = oldUnassignedInfoByTask.get(key); + final Instant firstUnassignedTime; + final Instant lastReportedTime; + if (previousInfo != null) { + firstUnassignedTime = previousInfo.unassignedTime(); + if (firstUnassignedTime.plus(LONG_TIME_UNASSIGNED_INTERVAL).isBefore(now) + && (previousInfo.reportTime() == null || previousInfo.reportTime().plus(MIN_REPORT_INTERVAL).isBefore(now))) { + lastReportedTime = now; + itemsToReport.add( + Strings.format( + "[%s]/[%s] unassigned for [%d] seconds", + taskName, + mlId, + ChronoUnit.SECONDS.between(firstUnassignedTime, now) + ) + ); + } else { + lastReportedTime = previousInfo.reportTime(); + } + } else { + firstUnassignedTime = now; + lastReportedTime = null; + } + newUnassignedInfoByTask.put(key, new UnassignedTimeAndReportTime(firstUnassignedTime, lastReportedTime)); + } + } + } + + unassignedInfoByTask = newUnassignedInfoByTask; + + return itemsToReport; + } + + private record TaskNameAndId(String taskName, String mlId) {}; + + private record UnassignedTimeAndReportTime(Instant unassignedTime, Instant reportTime) {}; } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlAssignmentNotifierTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlAssignmentNotifierTests.java index 7960bf3ea7068..a393f691ae004 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlAssignmentNotifierTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlAssignmentNotifierTests.java @@ -17,17 +17,25 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetadata; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor; import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor; import org.junit.Before; import java.net.InetAddress; +import java.time.Duration; +import java.time.Instant; import java.util.Collections; +import java.util.List; import java.util.concurrent.ExecutorService; import static org.elasticsearch.xpack.ml.job.task.OpenJobPersistentTasksExecutorTests.addJobTask; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.empty; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -47,10 +55,9 @@ public void setupMocks() { dataFrameAnalyticsAuditor = mock(DataFrameAnalyticsAuditor.class); clusterService = mock(ClusterService.class); threadPool = mock(ThreadPool.class); - threadPool = mock(ThreadPool.class); ExecutorService executorService = mock(ExecutorService.class); - org.mockito.Mockito.doAnswer(invocation -> { + doAnswer(invocation -> { ((Runnable) invocation.getArguments()[0]).run(); return null; }).when(executorService).execute(any(Runnable.class)); @@ -233,4 +240,69 @@ public void testAuditUnassignedMlTasks() { verify(anomalyDetectionAuditor, times(2)).includeNodeInfo(); } } + + public void testFindLongTimeUnassignedTasks() { + MlAssignmentNotifier notifier = new MlAssignmentNotifier( + anomalyDetectionAuditor, + dataFrameAnalyticsAuditor, + threadPool, + clusterService + ); + + Instant now = Instant.now(); + Instant eightHoursAgo = now.minus(Duration.ofHours(8)); + Instant sevenHoursAgo = eightHoursAgo.plus(Duration.ofHours(1)); + Instant twoHoursAgo = sevenHoursAgo.plus(Duration.ofHours(5)); + + PersistentTasksCustomMetadata.Builder tasksBuilder = PersistentTasksCustomMetadata.builder(); + addJobTask("job1", "node1", JobState.OPENED, tasksBuilder); + addJobTask("job2", "node1", JobState.OPENED, tasksBuilder); + addJobTask("job3", null, JobState.OPENED, tasksBuilder); + addJobTask("job4", null, JobState.OPENED, tasksBuilder); + addJobTask("job5", null, JobState.OPENED, tasksBuilder); + List itemsToReport = notifier.findLongTimeUnassignedTasks(eightHoursAgo, tasksBuilder.build()); + // Nothing reported because unassigned jobs only just detected + assertThat(itemsToReport, empty()); + + tasksBuilder = PersistentTasksCustomMetadata.builder(); + addJobTask("job1", null, JobState.OPENED, tasksBuilder); + addJobTask("job2", "node1", JobState.OPENED, tasksBuilder); + addJobTask("job3", null, JobState.OPENED, tasksBuilder); + addJobTask("job4", "node2", JobState.OPENED, tasksBuilder); + addJobTask("job5", null, JobState.OPENED, tasksBuilder); + itemsToReport = notifier.findLongTimeUnassignedTasks(sevenHoursAgo, tasksBuilder.build()); + // Jobs 3 and 5 still unassigned so should get reported, job 4 now assigned, job 1 only just detected unassigned + assertThat( + itemsToReport, + containsInAnyOrder("[xpack/ml/job]/[job3] unassigned for [3600] seconds", "[xpack/ml/job]/[job5] unassigned for [3600] seconds") + ); + + tasksBuilder = PersistentTasksCustomMetadata.builder(); + addJobTask("job1", null, JobState.OPENED, tasksBuilder); + addJobTask("job2", null, JobState.OPENED, tasksBuilder); + addJobTask("job3", null, JobState.OPENED, tasksBuilder); + addJobTask("job4", "node2", JobState.OPENED, tasksBuilder); + addJobTask("job5", null, JobState.OPENED, tasksBuilder); + itemsToReport = notifier.findLongTimeUnassignedTasks(twoHoursAgo, tasksBuilder.build()); + // Jobs 3 and 5 still unassigned but reported less than 6 hours ago, job 1 still unassigned so gets reported now, + // job 2 only just detected unassigned + assertThat(itemsToReport, contains("[xpack/ml/job]/[job1] unassigned for [18000] seconds")); + + tasksBuilder = PersistentTasksCustomMetadata.builder(); + addJobTask("job1", null, JobState.OPENED, tasksBuilder); + addJobTask("job2", null, JobState.OPENED, tasksBuilder); + addJobTask("job3", null, JobState.OPENED, tasksBuilder); + addJobTask("job4", null, JobState.OPENED, tasksBuilder); + addJobTask("job5", "node1", JobState.OPENED, tasksBuilder); + itemsToReport = notifier.findLongTimeUnassignedTasks(now, tasksBuilder.build()); + // Job 3 still unassigned and reported more than 6 hours ago, job 1 still unassigned but reported less than 6 hours ago, + // job 2 still unassigned so gets reported now, job 4 only just detected unassigned, job 5 now assigned + assertThat( + itemsToReport, + containsInAnyOrder( + "[xpack/ml/job]/[job2] unassigned for [7200] seconds", + "[xpack/ml/job]/[job3] unassigned for [28800] seconds" + ) + ); + } }