diff --git a/server/src/main/java/org/opensearch/cluster/service/ClusterApplierService.java b/server/src/main/java/org/opensearch/cluster/service/ClusterApplierService.java index 47080cfbde692..b4cb9a9c5a2d4 100644 --- a/server/src/main/java/org/opensearch/cluster/service/ClusterApplierService.java +++ b/server/src/main/java/org/opensearch/cluster/service/ClusterApplierService.java @@ -61,8 +61,8 @@ import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.common.util.concurrent.PrioritizedOpenSearchThreadPoolExecutor; import org.opensearch.common.util.concurrent.ThreadContext; -import org.opensearch.common.util.concurrent.ThreadContextAccess; import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; +import org.opensearch.identity.SystemSubject; import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry; import org.opensearch.telemetry.metrics.tags.Tags; import org.opensearch.threadpool.Scheduler; @@ -396,31 +396,33 @@ private void submitStateUpdateTask( } final ThreadContext threadContext = threadPool.getThreadContext(); final Supplier supplier = threadContext.newRestorableContext(true); - try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { - ThreadContextAccess.doPrivilegedVoid(threadContext::markAsSystemContext); - final UpdateTask updateTask = new UpdateTask( - config.priority(), - source, - new SafeClusterApplyListener(listener, supplier, logger), - executor - ); - if (config.timeout() != null) { - threadPoolExecutor.execute( - updateTask, - config.timeout(), - () -> threadPool.generic() - .execute(() -> listener.onFailure(source, new ProcessClusterEventTimeoutException(config.timeout(), source))) + SystemSubject.getInstance().runAs(() -> { + try { + final UpdateTask updateTask = new UpdateTask( + config.priority(), + source, + new SafeClusterApplyListener(listener, supplier, logger), + executor ); - } else { - threadPoolExecutor.execute(updateTask); - } - } catch (OpenSearchRejectedExecutionException e) { - // ignore cases where we are shutting down..., there is really nothing interesting - // to be done here... - if (!lifecycle.stoppedOrClosed()) { - throw e; + if (config.timeout() != null) { + threadPoolExecutor.execute( + updateTask, + config.timeout(), + () -> threadPool.generic() + .execute(() -> listener.onFailure(source, new ProcessClusterEventTimeoutException(config.timeout(), source))) + ); + } else { + threadPoolExecutor.execute(updateTask); + } + } catch (OpenSearchRejectedExecutionException e) { + // ignore cases where we are shutting down..., there is really nothing interesting + // to be done here... + if (!lifecycle.stoppedOrClosed()) { + throw e; + } } - } + return null; + }); } /** asserts that the current thread is NOT the cluster state update thread */ diff --git a/server/src/main/java/org/opensearch/cluster/service/MasterService.java b/server/src/main/java/org/opensearch/cluster/service/MasterService.java index 713de8cdd0fda..188401b268ac3 100644 --- a/server/src/main/java/org/opensearch/cluster/service/MasterService.java +++ b/server/src/main/java/org/opensearch/cluster/service/MasterService.java @@ -66,11 +66,11 @@ import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.common.util.concurrent.PrioritizedOpenSearchThreadPoolExecutor; import org.opensearch.common.util.concurrent.ThreadContext; -import org.opensearch.common.util.concurrent.ThreadContextAccess; import org.opensearch.core.Assertions; import org.opensearch.core.common.text.Text; import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; import org.opensearch.discovery.Discovery; +import org.opensearch.identity.SystemSubject; import org.opensearch.node.Node; import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry; import org.opensearch.telemetry.metrics.tags.Tags; @@ -1022,21 +1022,23 @@ public void submitStateUpdateTasks( } final ThreadContext threadContext = threadPool.getThreadContext(); final Supplier supplier = threadContext.newRestorableContext(true); - try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { - ThreadContextAccess.doPrivilegedVoid(threadContext::markAsSystemContext); - - List safeTasks = tasks.entrySet() - .stream() - .map(e -> taskBatcher.new UpdateTask(config.priority(), source, e.getKey(), safe(e.getValue(), supplier), executor)) - .collect(Collectors.toList()); - taskBatcher.submitTasks(safeTasks, config.timeout()); - } catch (OpenSearchRejectedExecutionException e) { - // ignore cases where we are shutting down..., there is really nothing interesting - // to be done here... - if (!lifecycle.stoppedOrClosed()) { - throw e; + + SystemSubject.getInstance().runAs(() -> { + try { + List safeTasks = tasks.entrySet() + .stream() + .map(e -> taskBatcher.new UpdateTask(config.priority(), source, e.getKey(), safe(e.getValue(), supplier), executor)) + .collect(Collectors.toList()); + taskBatcher.submitTasks(safeTasks, config.timeout()); + } catch (OpenSearchRejectedExecutionException e) { + // ignore cases where we are shutting down..., there is really nothing interesting + // to be done here... + if (!lifecycle.stoppedOrClosed()) { + throw e; + } } - } + return null; + }); } public ClusterStateStats getClusterStateStats() { diff --git a/server/src/main/java/org/opensearch/http/AbstractHttpServerTransport.java b/server/src/main/java/org/opensearch/http/AbstractHttpServerTransport.java index 991fbf12072be..c6a5005888649 100644 --- a/server/src/main/java/org/opensearch/http/AbstractHttpServerTransport.java +++ b/server/src/main/java/org/opensearch/http/AbstractHttpServerTransport.java @@ -52,6 +52,7 @@ import org.opensearch.core.common.transport.TransportAddress; import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.identity.SystemSubject; import org.opensearch.rest.RestChannel; import org.opensearch.rest.RestRequest; import org.opensearch.telemetry.tracing.Span; @@ -383,9 +384,9 @@ public void incomingRequest(final HttpRequest httpRequest, final HttpChannel htt // Visible for testing void dispatchRequest(final RestRequest restRequest, final RestChannel channel, final Throwable badRequestCause) { - RestChannel traceableRestChannel = channel; final ThreadContext threadContext = threadPool.getThreadContext(); - try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { + SystemSubject.getInstance().runAs(() -> { + RestChannel traceableRestChannel = channel; final Span span = tracer.startSpan(SpanBuilder.from(restRequest)); try (final SpanScope spanScope = tracer.withSpanInScope(span)) { if (channel != null) { @@ -397,8 +398,8 @@ void dispatchRequest(final RestRequest restRequest, final RestChannel channel, f dispatcher.dispatchRequest(restRequest, traceableRestChannel, threadContext); } } - } - + return null; + }); } private void handleIncomingRequest(final HttpRequest httpRequest, final HttpChannel httpChannel, final Exception exception) { diff --git a/server/src/main/java/org/opensearch/identity/IdentityService.java b/server/src/main/java/org/opensearch/identity/IdentityService.java index 03f937180f4ba..d0a890c123fb3 100644 --- a/server/src/main/java/org/opensearch/identity/IdentityService.java +++ b/server/src/main/java/org/opensearch/identity/IdentityService.java @@ -32,6 +32,7 @@ public class IdentityService { public IdentityService(final Settings settings, final ThreadPool threadPool, final List identityPlugins) { this.settings = settings; + SystemSubject.getInstance().initialize(threadPool); if (identityPlugins.size() == 0) { log.debug("Identity plugins size is 0"); diff --git a/server/src/main/java/org/opensearch/identity/SystemSubject.java b/server/src/main/java/org/opensearch/identity/SystemSubject.java new file mode 100644 index 0000000000000..c67722a215fdf --- /dev/null +++ b/server/src/main/java/org/opensearch/identity/SystemSubject.java @@ -0,0 +1,69 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.identity; + +import org.opensearch.common.annotation.InternalApi; +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.common.util.concurrent.ThreadContextAccess; +import org.opensearch.threadpool.ThreadPool; + +import java.security.Principal; +import java.util.concurrent.Callable; + +/** + * A special reserved {@link Subject} that represents the internal system subject + * + * @opensearch.internal + */ +@InternalApi +public class SystemSubject implements Subject { + private static final SystemSubject INSTANCE = new SystemSubject(); + + private static final SystemPrincipal SYSTEM_PRINCIPAL = new SystemPrincipal(); + + private ThreadPool threadPool; + + private SystemSubject() {} + + public static SystemSubject getInstance() { + return SystemSubject.INSTANCE; + } + + void initialize(ThreadPool threadPool) { + this.threadPool = threadPool; + } + + @Override + public Principal getPrincipal() { + return SYSTEM_PRINCIPAL; + } + + @Override + public T runAs(Callable callable) throws RuntimeException { + ThreadContext threadContext = threadPool.getThreadContext(); + try (ThreadContext.StoredContext ctx = threadPool.getThreadContext().stashContext()) { + ThreadContextAccess.doPrivilegedVoid(threadContext::markAsSystemContext); + try { + return callable.call(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + private static class SystemPrincipal implements Principal { + + private SystemPrincipal() {} + + @Override + public String getName() { + return "system"; + } + } +} diff --git a/server/src/main/java/org/opensearch/index/seqno/GlobalCheckpointSyncAction.java b/server/src/main/java/org/opensearch/index/seqno/GlobalCheckpointSyncAction.java index fedf239871368..1ce5f3da66589 100644 --- a/server/src/main/java/org/opensearch/index/seqno/GlobalCheckpointSyncAction.java +++ b/server/src/main/java/org/opensearch/index/seqno/GlobalCheckpointSyncAction.java @@ -43,11 +43,10 @@ import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.inject.Inject; import org.opensearch.common.settings.Settings; -import org.opensearch.common.util.concurrent.ThreadContext; -import org.opensearch.common.util.concurrent.ThreadContextAccess; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.index.shard.ShardId; +import org.opensearch.identity.SystemSubject; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardClosedException; import org.opensearch.index.translog.Translog; @@ -97,15 +96,14 @@ public GlobalCheckpointSyncAction( } public void updateGlobalCheckpointForShard(final ShardId shardId) { - final ThreadContext threadContext = threadPool.getThreadContext(); - try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { - ThreadContextAccess.doPrivilegedVoid(threadContext::markAsSystemContext); + SystemSubject.getInstance().runAs(() -> { execute(new Request(shardId), ActionListener.wrap(r -> {}, e -> { if (ExceptionsHelper.unwrap(e, AlreadyClosedException.class, IndexShardClosedException.class) == null) { logger.info(new ParameterizedMessage("{} global checkpoint sync failed", shardId), e); } })); - } + return null; + }); } @Override diff --git a/server/src/main/java/org/opensearch/index/seqno/RetentionLeaseBackgroundSyncAction.java b/server/src/main/java/org/opensearch/index/seqno/RetentionLeaseBackgroundSyncAction.java index e8ebf11ef0e5c..2add4743b9b75 100644 --- a/server/src/main/java/org/opensearch/index/seqno/RetentionLeaseBackgroundSyncAction.java +++ b/server/src/main/java/org/opensearch/index/seqno/RetentionLeaseBackgroundSyncAction.java @@ -47,13 +47,12 @@ import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.inject.Inject; import org.opensearch.common.settings.Settings; -import org.opensearch.common.util.concurrent.ThreadContext; -import org.opensearch.common.util.concurrent.ThreadContextAccess; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.tasks.TaskId; +import org.opensearch.identity.SystemSubject; import org.opensearch.index.IndexNotFoundException; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardClosedException; @@ -120,10 +119,7 @@ protected void doExecute(Task task, Request request, ActionListener { final Request request = new Request(shardId, retentionLeases); final ReplicationTask task = (ReplicationTask) taskManager.register("transport", "retention_lease_background_sync", request); transportService.sendChildRequest( @@ -170,7 +166,8 @@ public void handleException(TransportException e) { } } ); - } + return null; + }); } @Override diff --git a/server/src/main/java/org/opensearch/index/seqno/RetentionLeaseSyncAction.java b/server/src/main/java/org/opensearch/index/seqno/RetentionLeaseSyncAction.java index 9e8437ca78879..6ede0416be12a 100644 --- a/server/src/main/java/org/opensearch/index/seqno/RetentionLeaseSyncAction.java +++ b/server/src/main/java/org/opensearch/index/seqno/RetentionLeaseSyncAction.java @@ -49,13 +49,12 @@ import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.inject.Inject; import org.opensearch.common.settings.Settings; -import org.opensearch.common.util.concurrent.ThreadContext; -import org.opensearch.common.util.concurrent.ThreadContextAccess; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.tasks.TaskId; +import org.opensearch.identity.SystemSubject; import org.opensearch.index.IndexNotFoundException; import org.opensearch.index.IndexingPressureService; import org.opensearch.index.shard.IndexShard; @@ -135,10 +134,7 @@ final void sync( RetentionLeases retentionLeases, ActionListener listener ) { - final ThreadContext threadContext = threadPool.getThreadContext(); - try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { - // we have to execute under the system context so that if security is enabled the sync is authorized - ThreadContextAccess.doPrivilegedVoid(threadContext::markAsSystemContext); + SystemSubject.getInstance().runAs(() -> { final Request request = new Request(shardId, retentionLeases); final ReplicationTask task = (ReplicationTask) taskManager.register("transport", "retention_lease_sync", request); transportService.sendChildRequest( @@ -181,7 +177,8 @@ public void handleException(TransportException e) { } } ); - } + return null; + }); } @Override diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java index d1e2884956f5c..d11c8f3bf54a1 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java @@ -23,10 +23,9 @@ import org.opensearch.common.annotation.PublicApi; import org.opensearch.common.inject.Inject; import org.opensearch.common.settings.Settings; -import org.opensearch.common.util.concurrent.ThreadContext; -import org.opensearch.common.util.concurrent.ThreadContextAccess; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.identity.SystemSubject; import org.opensearch.index.IndexNotFoundException; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardClosedException; @@ -111,10 +110,7 @@ public ReplicationMode getReplicationMode(IndexShard indexShard) { final void publish(IndexShard indexShard, ReplicationCheckpoint checkpoint) { String primaryAllocationId = indexShard.routingEntry().allocationId().getId(); long primaryTerm = indexShard.getPendingPrimaryTerm(); - final ThreadContext threadContext = threadPool.getThreadContext(); - try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { - // we have to execute under the system context so that if security is enabled the sync is authorized - ThreadContextAccess.doPrivilegedVoid(threadContext::markAsSystemContext); + SystemSubject.getInstance().runAs(() -> { PublishCheckpointRequest request = new PublishCheckpointRequest(checkpoint); final ReplicationTask task = (ReplicationTask) taskManager.register("transport", "segrep_publish_checkpoint", request); final ReplicationTimer timer = new ReplicationTimer(); @@ -182,7 +178,8 @@ public void handleException(TransportException e) { checkpoint ) ); - } + return null; + }); } @Override diff --git a/server/src/main/java/org/opensearch/transport/RemoteClusterConnection.java b/server/src/main/java/org/opensearch/transport/RemoteClusterConnection.java index 8f0ee52ac3acd..734c08086eada 100644 --- a/server/src/main/java/org/opensearch/transport/RemoteClusterConnection.java +++ b/server/src/main/java/org/opensearch/transport/RemoteClusterConnection.java @@ -40,10 +40,10 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.ThreadContext; -import org.opensearch.common.util.concurrent.ThreadContextAccess; import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.identity.SystemSubject; import org.opensearch.threadpool.ThreadPool; import java.io.Closeable; @@ -135,10 +135,7 @@ void collectNodes(ActionListener> listener) { final ThreadContext threadContext = threadPool.getThreadContext(); final ContextPreservingActionListener> contextPreservingActionListener = new ContextPreservingActionListener<>(threadContext.newRestorableContext(false), listener); - try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { - // we stash any context here since this is an internal execution and should not leak any existing context information - ThreadContextAccess.doPrivilegedVoid(threadContext::markAsSystemContext); - + SystemSubject.getInstance().runAs(() -> { final ClusterStateRequest request = new ClusterStateRequest(); request.clear(); request.nodes(true); @@ -173,7 +170,8 @@ public String executor() { } } ); - } + return null; + }); }; try { // just in case if we are not connected for some reason we try to connect and if we fail we have to notify the listener diff --git a/server/src/main/java/org/opensearch/transport/SniffConnectionStrategy.java b/server/src/main/java/org/opensearch/transport/SniffConnectionStrategy.java index 1d94228218fd0..448445ecc79de 100644 --- a/server/src/main/java/org/opensearch/transport/SniffConnectionStrategy.java +++ b/server/src/main/java/org/opensearch/transport/SniffConnectionStrategy.java @@ -47,7 +47,6 @@ import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.concurrent.ThreadContext; -import org.opensearch.common.util.concurrent.ThreadContextAccess; import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.Strings; @@ -56,6 +55,7 @@ import org.opensearch.core.common.io.stream.Writeable; import org.opensearch.core.common.transport.TransportAddress; import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.identity.SystemSubject; import org.opensearch.threadpool.ThreadPool; import java.io.IOException; @@ -347,10 +347,7 @@ private void collectRemoteNodes(Iterator> seedNodes, Act threadContext.newRestorableContext(false), new SniffClusterStateResponseHandler(connection, listener, seedNodes) ); - try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { - // we stash any context here since this is an internal execution and should not leak any - // existing context information. - ThreadContextAccess.doPrivilegedVoid(threadContext::markAsSystemContext); + SystemSubject.getInstance().runAs(() -> { transportService.sendRequest( connection, ClusterStateAction.NAME, @@ -358,7 +355,8 @@ private void collectRemoteNodes(Iterator> seedNodes, Act TransportRequestOptions.EMPTY, responseHandler ); - } + return null; + }); }, e -> { final Transport.Connection connection = openConnectionStep.result(); final DiscoveryNode node = connection.getNode(); diff --git a/server/src/test/java/org/opensearch/identity/SystemSubjectTests.java b/server/src/test/java/org/opensearch/identity/SystemSubjectTests.java new file mode 100644 index 0000000000000..3f6290934307f --- /dev/null +++ b/server/src/test/java/org/opensearch/identity/SystemSubjectTests.java @@ -0,0 +1,28 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.identity; + +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; + +public class SystemSubjectTests extends OpenSearchTestCase { + public void testSystemSubject() { + ThreadPool threadPool = new TestThreadPool(getTestName()); + SystemSubject systemSubject = SystemSubject.getInstance(); + assertEquals("system", systemSubject.getPrincipal().getName()); + systemSubject.initialize(threadPool); + assertFalse(threadPool.getThreadContext().isSystemContext()); + systemSubject.runAs(() -> { + assertTrue(threadPool.getThreadContext().isSystemContext()); + return false; + }); + terminate(threadPool); + } +} diff --git a/test/framework/src/main/java/org/opensearch/cluster/service/FakeThreadPoolClusterManagerService.java b/test/framework/src/main/java/org/opensearch/cluster/service/FakeThreadPoolClusterManagerService.java index 64f3dbc4fd967..f88f92e685e40 100644 --- a/test/framework/src/main/java/org/opensearch/cluster/service/FakeThreadPoolClusterManagerService.java +++ b/test/framework/src/main/java/org/opensearch/cluster/service/FakeThreadPoolClusterManagerService.java @@ -43,9 +43,8 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.common.util.concurrent.PrioritizedOpenSearchThreadPoolExecutor; -import org.opensearch.common.util.concurrent.ThreadContext; -import org.opensearch.common.util.concurrent.ThreadContextAccess; import org.opensearch.core.action.ActionListener; +import org.opensearch.identity.SystemSubject; import org.opensearch.node.Node; import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry; import org.opensearch.threadpool.ThreadPool; @@ -133,11 +132,10 @@ public void run() { final Runnable task = pendingTasks.remove(taskIndex); taskInProgress = true; scheduledNextTask = false; - final ThreadContext threadContext = threadPool.getThreadContext(); - try (ThreadContext.StoredContext ignored = threadContext.stashContext()) { - ThreadContextAccess.doPrivilegedVoid(threadContext::markAsSystemContext); + SystemSubject.getInstance().runAs(() -> { task.run(); - } + return null; + }); if (waitForPublish == false) { taskInProgress = false; }