Skip to content

Commit

Permalink
Revert "Support task resource tracking in OpenSearch (#2639)" (#3046)
Browse files Browse the repository at this point in the history
This reverts commit 6517eec.

Signed-off-by: dblock <[email protected]>
Signed-off-by: Nicholas Walter Knize <[email protected]>
  • Loading branch information
dblock authored Apr 25, 2022
1 parent 1c13249 commit a389d1e
Show file tree
Hide file tree
Showing 30 changed files with 61 additions and 1,421 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -470,9 +470,6 @@ public void onTaskUnregistered(Task task) {}

@Override
public void waitForTaskCompletion(Task task) {}

@Override
public void taskExecutionStarted(Task task, Boolean closeableInvoked) {}
});
}
// Need to run the task in a separate thread because node client's .execute() is blocked by our task listener
Expand Down Expand Up @@ -654,9 +651,6 @@ public void waitForTaskCompletion(Task task) {
waitForWaitingToStart.countDown();
}

@Override
public void taskExecutionStarted(Task task, Boolean closeableInvoked) {}

@Override
public void onTaskRegistered(Task task) {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskInfo;
import org.opensearch.tasks.TaskResourceTrackingService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

Expand All @@ -61,15 +60,8 @@ public static long waitForCompletionTimeout(TimeValue timeout) {

private static final TimeValue DEFAULT_WAIT_FOR_COMPLETION_TIMEOUT = timeValueSeconds(30);

private final TaskResourceTrackingService taskResourceTrackingService;

@Inject
public TransportListTasksAction(
ClusterService clusterService,
TransportService transportService,
ActionFilters actionFilters,
TaskResourceTrackingService taskResourceTrackingService
) {
public TransportListTasksAction(ClusterService clusterService, TransportService transportService, ActionFilters actionFilters) {
super(
ListTasksAction.NAME,
clusterService,
Expand All @@ -80,7 +72,6 @@ public TransportListTasksAction(
TaskInfo::new,
ThreadPool.Names.MANAGEMENT
);
this.taskResourceTrackingService = taskResourceTrackingService;
}

@Override
Expand Down Expand Up @@ -110,8 +101,6 @@ protected void processTasks(ListTasksRequest request, Consumer<Task> operation)
}
taskManager.waitForTaskCompletion(task, timeoutNanos);
});
} else {
operation = operation.andThen(taskResourceTrackingService::refreshResourceStats);
}
super.processTasks(request, operation);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,6 @@ public SearchShardTask(long id, String type, String action, String description,
super(id, type, action, description, parentTaskId, headers);
}

@Override
public boolean supportsResourceTracking() {
return true;
}

@Override
public boolean shouldCancelChildrenOnCancellation() {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,6 @@ public final String getDescription() {
return descriptionSupplier.get();
}

@Override
public boolean supportsResourceTracking() {
return true;
}

/**
* Attach a {@link SearchProgressListener} to this task.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.opensearch.action.ActionResponse;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.lease.Releasables;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskCancelledException;
import org.opensearch.tasks.TaskId;
Expand Down Expand Up @@ -89,39 +88,31 @@ public final Task execute(Request request, ActionListener<Response> listener) {
*/
final Releasable unregisterChildNode = registerChildNode(request.getParentTask());
final Task task;

try {
task = taskManager.register("transport", actionName, request);
} catch (TaskCancelledException e) {
unregisterChildNode.close();
throw e;
}

ThreadContext.StoredContext storedContext = taskManager.taskExecutionStarted(task);
try {
execute(task, request, new ActionListener<Response>() {
@Override
public void onResponse(Response response) {
try {
Releasables.close(unregisterChildNode, () -> taskManager.unregister(task));
} finally {
listener.onResponse(response);
}
execute(task, request, new ActionListener<Response>() {
@Override
public void onResponse(Response response) {
try {
Releasables.close(unregisterChildNode, () -> taskManager.unregister(task));
} finally {
listener.onResponse(response);
}
}

@Override
public void onFailure(Exception e) {
try {
Releasables.close(unregisterChildNode, () -> taskManager.unregister(task));
} finally {
listener.onFailure(e);
}
@Override
public void onFailure(Exception e) {
try {
Releasables.close(unregisterChildNode, () -> taskManager.unregister(task));
} finally {
listener.onFailure(e);
}
});
} finally {
storedContext.close();
}

}
});
return task;
}

Expand All @@ -138,30 +129,25 @@ public final Task execute(Request request, TaskListener<Response> listener) {
unregisterChildNode.close();
throw e;
}
ThreadContext.StoredContext storedContext = taskManager.taskExecutionStarted(task);
try {
execute(task, request, new ActionListener<Response>() {
@Override
public void onResponse(Response response) {
try {
Releasables.close(unregisterChildNode, () -> taskManager.unregister(task));
} finally {
listener.onResponse(task, response);
}
execute(task, request, new ActionListener<Response>() {
@Override
public void onResponse(Response response) {
try {
Releasables.close(unregisterChildNode, () -> taskManager.unregister(task));
} finally {
listener.onResponse(task, response);
}
}

@Override
public void onFailure(Exception e) {
try {
Releasables.close(unregisterChildNode, () -> taskManager.unregister(task));
} finally {
listener.onFailure(task, e);
}
@Override
public void onFailure(Exception e) {
try {
Releasables.close(unregisterChildNode, () -> taskManager.unregister(task));
} finally {
listener.onFailure(task, e);
}
});
} finally {
storedContext.close();
}
}
});
return task;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@
import org.opensearch.script.ScriptMetadata;
import org.opensearch.snapshots.SnapshotsInfoService;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskResourceTrackingService;
import org.opensearch.tasks.TaskResultsService;

import java.util.ArrayList;
Expand Down Expand Up @@ -395,7 +394,6 @@ protected void configure() {
bind(NodeMappingRefreshAction.class).asEagerSingleton();
bind(MappingUpdatedAction.class).asEagerSingleton();
bind(TaskResultsService.class).asEagerSingleton();
bind(TaskResourceTrackingService.class).asEagerSingleton();
bind(AllocationDeciders.class).toInstance(allocationDeciders);
bind(ShardsAllocator.class).toInstance(shardsAllocator);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.opensearch.index.ShardIndexingPressureMemoryManager;
import org.opensearch.index.ShardIndexingPressureSettings;
import org.opensearch.index.ShardIndexingPressureStore;
import org.opensearch.tasks.TaskResourceTrackingService;
import org.opensearch.watcher.ResourceWatcherService;
import org.opensearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction;
import org.opensearch.action.admin.indices.close.TransportCloseIndexAction;
Expand Down Expand Up @@ -567,8 +566,7 @@ public void apply(Settings value, Settings current, Settings previous) {
ShardIndexingPressureMemoryManager.THROUGHPUT_DEGRADATION_LIMITS,
ShardIndexingPressureMemoryManager.SUCCESSFUL_REQUEST_ELAPSED_TIMEOUT,
ShardIndexingPressureMemoryManager.MAX_OUTSTANDING_REQUESTS,
IndexingPressure.MAX_INDEXING_BYTES,
TaskResourceTrackingService.TASK_RESOURCE_TRACKING_ENABLED
IndexingPressure.MAX_INDEXING_BYTES
)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.node.Node;
import org.opensearch.threadpool.RunnableTaskExecutionListener;
import org.opensearch.threadpool.TaskAwareRunnable;

import java.util.List;
import java.util.Optional;
Expand All @@ -57,7 +55,6 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

public class OpenSearchExecutors {
Expand Down Expand Up @@ -175,39 +172,14 @@ public static OpenSearchThreadPoolExecutor newFixed(
);
}

public static OpenSearchThreadPoolExecutor newAutoQueueFixed(
String name,
int size,
int initialQueueCapacity,
int minQueueSize,
int maxQueueSize,
int frameSize,
TimeValue targetedResponseTime,
ThreadFactory threadFactory,
ThreadContext contextHolder
) {
return newAutoQueueFixed(
name,
size,
initialQueueCapacity,
minQueueSize,
maxQueueSize,
frameSize,
targetedResponseTime,
threadFactory,
contextHolder,
null
);
}

/**
* Return a new executor that will automatically adjust the queue size based on queue throughput.
*
* @param size number of fixed threads to use for executing tasks
* @param size number of fixed threads to use for executing tasks
* @param initialQueueCapacity initial size of the executor queue
* @param minQueueSize minimum queue size that the queue can be adjusted to
* @param maxQueueSize maximum queue size that the queue can be adjusted to
* @param frameSize number of tasks during which stats are collected before adjusting queue size
* @param minQueueSize minimum queue size that the queue can be adjusted to
* @param maxQueueSize maximum queue size that the queue can be adjusted to
* @param frameSize number of tasks during which stats are collected before adjusting queue size
*/
public static OpenSearchThreadPoolExecutor newAutoQueueFixed(
String name,
Expand All @@ -218,8 +190,7 @@ public static OpenSearchThreadPoolExecutor newAutoQueueFixed(
int frameSize,
TimeValue targetedResponseTime,
ThreadFactory threadFactory,
ThreadContext contextHolder,
AtomicReference<RunnableTaskExecutionListener> runnableTaskListener
ThreadContext contextHolder
) {
if (initialQueueCapacity <= 0) {
throw new IllegalArgumentException(
Expand All @@ -230,17 +201,6 @@ public static OpenSearchThreadPoolExecutor newAutoQueueFixed(
ConcurrentCollections.<Runnable>newBlockingQueue(),
initialQueueCapacity
);

Function<Runnable, WrappedRunnable> runnableWrapper;
if (runnableTaskListener != null) {
runnableWrapper = (runnable) -> {
TaskAwareRunnable taskAwareRunnable = new TaskAwareRunnable(contextHolder, runnable, runnableTaskListener);
return new TimedRunnable(taskAwareRunnable);
};
} else {
runnableWrapper = TimedRunnable::new;
}

return new QueueResizingOpenSearchThreadPoolExecutor(
name,
size,
Expand All @@ -250,7 +210,7 @@ public static OpenSearchThreadPoolExecutor newAutoQueueFixed(
queue,
minQueueSize,
maxQueueSize,
runnableWrapper,
TimedRunnable::new,
frameSize,
targetedResponseTime,
threadFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@

import static org.opensearch.http.HttpTransportSettings.SETTING_HTTP_MAX_WARNING_HEADER_COUNT;
import static org.opensearch.http.HttpTransportSettings.SETTING_HTTP_MAX_WARNING_HEADER_SIZE;
import static org.opensearch.tasks.TaskResourceTrackingService.TASK_ID;

/**
* A ThreadContext is a map of string headers and a transient map of keyed objects that are associated with
Expand Down Expand Up @@ -135,23 +134,16 @@ public StoredContext stashContext() {
* This is needed so the DeprecationLogger in another thread can see the value of X-Opaque-ID provided by a user.
* Otherwise when context is stash, it should be empty.
*/

ThreadContextStruct threadContextStruct = DEFAULT_CONTEXT;

if (context.requestHeaders.containsKey(Task.X_OPAQUE_ID)) {
threadContextStruct = threadContextStruct.putHeaders(
ThreadContextStruct threadContextStruct = DEFAULT_CONTEXT.putHeaders(
MapBuilder.<String, String>newMapBuilder()
.put(Task.X_OPAQUE_ID, context.requestHeaders.get(Task.X_OPAQUE_ID))
.immutableMap()
);
threadLocal.set(threadContextStruct);
} else {
threadLocal.set(DEFAULT_CONTEXT);
}

if (context.transientHeaders.containsKey(TASK_ID)) {
threadContextStruct = threadContextStruct.putTransient(TASK_ID, context.transientHeaders.get(TASK_ID));
}

threadLocal.set(threadContextStruct);

return () -> {
// If the node and thus the threadLocal get closed while this task
// is still executing, we don't want this runnable to fail with an
Expand Down
Loading

0 comments on commit a389d1e

Please sign in to comment.