From 8d84eea68d9ed7dedcccc8161569d5b4d2ee4eec Mon Sep 17 00:00:00 2001 From: gaobinlong Date: Tue, 27 Feb 2024 06:46:54 +0800 Subject: [PATCH] Fix get task API does not refresh resource stats (#11531) * Fix get task API does not refresh resource stats Signed-off-by: Gao Binlong * modify change log Signed-off-by: Gao Binlong --------- Signed-off-by: Gao Binlong --- CHANGELOG.md | 1 + .../tasks/get/TransportGetTaskAction.java | 8 ++- .../node/tasks/ResourceAwareTasksTests.java | 55 ++++++++++++++++++- .../node/tasks/TaskManagerTestCase.java | 16 ++++++ 4 files changed, 77 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 56e185a181e50..7f025d2304966 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -133,6 +133,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Prevent read beyond slice boundary in ByteArrayIndexInput ([#10481](https://github.com/opensearch-project/OpenSearch/issues/10481)) - Fix the "highlight.max_analyzer_offset" request parameter with "plain" highlighter ([#10919](https://github.com/opensearch-project/OpenSearch/pull/10919)) - Warn about deprecated and ignored index.mapper.dynamic index setting ([#11193](https://github.com/opensearch-project/OpenSearch/pull/11193)) +- Fix get task API does not refresh resource stats ([#11531](https://github.com/opensearch-project/OpenSearch/pull/11531)) ### Security diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java index e62c83490d810..ab6451382aa88 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java @@ -55,6 +55,7 @@ import org.opensearch.index.IndexNotFoundException; import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskInfo; +import org.opensearch.tasks.TaskResourceTrackingService; import org.opensearch.tasks.TaskResult; import org.opensearch.tasks.TaskResultsService; import org.opensearch.threadpool.ThreadPool; @@ -84,6 +85,8 @@ public class TransportGetTaskAction extends HandledTransportAction 0); + }; + + taskTestContext.operationFinishedValidator = (task, threadId) -> { assertEquals(0, resourceTasks.size()); }; + + startResourceAwareNodesAction(testNodes[0], false, taskTestContext, new ActionListener() { + @Override + public void onResponse(NodesResponse listTasksResponse) { + responseReference.set(listTasksResponse); + taskTestContext.requestCompleteLatch.countDown(); + } + + @Override + public void onFailure(Exception e) { + throwableReference.set(e); + taskTestContext.requestCompleteLatch.countDown(); + } + }); + + // Waiting for whole request to complete and return successfully till client + taskTestContext.requestCompleteLatch.await(); + + assertTasksRequestFinishedSuccessfully(responseReference.get(), throwableReference.get()); + } + + public void testOnDemandRefreshWhileGetTask() throws InterruptedException { + setup(true, false); + + final AtomicReference throwableReference = new AtomicReference<>(); + final AtomicReference responseReference = new AtomicReference<>(); + + TaskTestContext taskTestContext = new TaskTestContext(); + + Map resourceTasks = testNodes[0].taskResourceTrackingService.getResourceAwareTasks(); + + taskTestContext.operationStartValidator = (task, threadId) -> { + assertFalse(resourceTasks.isEmpty()); + GetTaskResponse getTaskResponse = ActionTestUtils.executeBlocking( + testNodes[0].transportGetTaskAction, + new GetTaskRequest().setTaskId(new TaskId(testNodes[0].getNodeId(), new ArrayList<>(resourceTasks.values()).get(0).getId())) + ); + + TaskInfo taskInfo = getTaskResponse.getTask().getTask(); + + assertNotNull(taskInfo.getResourceStats()); + assertNotNull(taskInfo.getResourceStats().getResourceUsageInfo()); + assertNotNull(taskInfo.getResourceStats().getResourceUsageInfo().get("total")); + TaskResourceUsage taskResourceUsage = taskInfo.getResourceStats().getResourceUsageInfo().get("total"); assertCPUTime(taskResourceUsage.getCpuTimeInNanos()); assertTrue(taskResourceUsage.getMemoryInBytes() > 0); }; diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java b/server/src/test/java/org/opensearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java index a3fa0f9cb16e4..8d87fd5135663 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java @@ -34,6 +34,7 @@ import org.opensearch.Version; import org.opensearch.action.FailedNodeException; import org.opensearch.action.admin.cluster.node.tasks.cancel.TransportCancelTasksAction; +import org.opensearch.action.admin.cluster.node.tasks.get.TransportGetTaskAction; import org.opensearch.action.admin.cluster.node.tasks.list.TransportListTasksAction; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.nodes.BaseNodeResponse; @@ -41,6 +42,7 @@ import org.opensearch.action.support.nodes.BaseNodesResponse; import org.opensearch.action.support.nodes.TransportNodesAction; import org.opensearch.action.support.replication.ClusterStateCreationUtils; +import org.opensearch.client.Client; import org.opensearch.cluster.ClusterModule; import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.node.DiscoveryNode; @@ -57,6 +59,7 @@ import org.opensearch.core.common.io.stream.Writeable; import org.opensearch.core.common.transport.BoundTransportAddress; import org.opensearch.core.indices.breaker.NoneCircuitBreakerService; +import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.tasks.TaskCancellationService; import org.opensearch.tasks.TaskManager; import org.opensearch.tasks.TaskResourceTrackingService; @@ -85,6 +88,7 @@ import static java.util.Collections.emptySet; import static org.opensearch.test.ClusterServiceUtils.createClusterService; import static org.opensearch.test.ClusterServiceUtils.setState; +import static org.mockito.Mockito.mock; /** * The test case for unit testing task manager and related transport actions @@ -249,6 +253,17 @@ protected TaskManager createTaskManager( taskResourceTrackingService ); transportCancelTasksAction = new TransportCancelTasksAction(clusterService, transportService, actionFilters); + Client mockClient = mock(Client.class); + NamedXContentRegistry namedXContentRegistry = mock(NamedXContentRegistry.class); + transportGetTaskAction = new TransportGetTaskAction( + threadPool, + transportService, + actionFilters, + clusterService, + mockClient, + namedXContentRegistry, + taskResourceTrackingService + ); transportService.acceptIncomingRequests(); } @@ -258,6 +273,7 @@ protected TaskManager createTaskManager( private final SetOnce discoveryNode = new SetOnce<>(); public final TransportListTasksAction transportListTasksAction; public final TransportCancelTasksAction transportCancelTasksAction; + public final TransportGetTaskAction transportGetTaskAction; @Override public void close() {