diff --git a/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java index 268a6ed6f85b8..b8517f53ff294 100644 --- a/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java @@ -98,6 +98,7 @@ import org.opensearch.indices.IndicesService; import org.opensearch.indices.SystemIndices; import org.opensearch.node.NodeClosedException; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; import org.opensearch.tasks.Task; import org.opensearch.telemetry.tracing.Tracer; import org.opensearch.threadpool.ThreadPool; @@ -180,7 +181,8 @@ public TransportShardBulkAction( false, indexingPressureService, systemIndices, - tracer + tracer, + AdmissionControlActionType.INDEXING ); this.updateHelper = updateHelper; this.mappingUpdatedAction = mappingUpdatedAction; diff --git a/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java index 11046e44b61e0..95f998e2d89c2 100644 --- a/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java @@ -38,7 +38,6 @@ import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListenerResponseHandler; import org.opensearch.action.UnavailableShardsException; -import org.opensearch.action.bulk.TransportShardBulkAction; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.ActiveShardCount; import org.opensearch.action.support.ChannelActionListener; @@ -203,6 +202,40 @@ protected TransportReplicationAction( String executor, boolean syncGlobalCheckpointAfterOperation, boolean forceExecutionOnPrimary + ) { + this( + settings, + actionName, + transportService, + clusterService, + indicesService, + threadPool, + shardStateAction, + actionFilters, + requestReader, + replicaRequestReader, + executor, + syncGlobalCheckpointAfterOperation, + forceExecutionOnPrimary, + null + ); + } + + protected TransportReplicationAction( + Settings settings, + String actionName, + TransportService transportService, + ClusterService clusterService, + IndicesService indicesService, + ThreadPool threadPool, + ShardStateAction shardStateAction, + ActionFilters actionFilters, + Writeable.Reader requestReader, + Writeable.Reader replicaRequestReader, + String executor, + boolean syncGlobalCheckpointAfterOperation, + boolean forceExecutionOnPrimary, + AdmissionControlActionType admissionControlActionType ) { super(actionName, actionFilters, transportService.getTaskManager()); this.threadPool = threadPool; @@ -221,27 +254,8 @@ protected TransportReplicationAction( transportService.registerRequestHandler(actionName, ThreadPool.Names.SAME, requestReader, this::handleOperationRequest); - // Register only TransportShardBulkAction for admission control ( primary indexing action ) - if (transportPrimaryAction.equals(TransportShardBulkAction.ACTION_NAME + PRIMARY_ACTION_SUFFIX)) { - transportService.registerRequestHandler( - transportPrimaryAction, - executor, - forceExecutionOnPrimary, - true, - AdmissionControlActionType.INDEXING, - in -> new ConcreteShardRequest<>(requestReader, in), - this::handlePrimaryRequest - ); - } else { - transportService.registerRequestHandler( - transportPrimaryAction, - executor, - forceExecutionOnPrimary, - true, - in -> new ConcreteShardRequest<>(requestReader, in), - this::handlePrimaryRequest - ); - } + // This method will register Primary Request Handler Based on AdmissionControlActionType + registerPrimaryRequestHandler(requestReader, admissionControlActionType); // we must never reject on because of thread pool capacity on replicas transportService.registerRequestHandler( @@ -262,6 +276,38 @@ protected TransportReplicationAction( clusterSettings.addSettingsUpdateConsumer(REPLICATION_RETRY_TIMEOUT, (v) -> retryTimeout = v); } + /** + * This method will register handler as based on admissionControlActionType and AdmissionControlHandler will be + * invoked for registered action + * @param requestReader instance of the request reader + * @param admissionControlActionType type of AdmissionControlActionType + */ + private void registerPrimaryRequestHandler( + Writeable.Reader requestReader, + AdmissionControlActionType admissionControlActionType + ) { + if (admissionControlActionType != null) { + transportService.registerRequestHandler( + transportPrimaryAction, + executor, + forceExecutionOnPrimary, + true, + admissionControlActionType, + in -> new ConcreteShardRequest<>(requestReader, in), + this::handlePrimaryRequest + ); + } else { + transportService.registerRequestHandler( + transportPrimaryAction, + executor, + forceExecutionOnPrimary, + true, + in -> new ConcreteShardRequest<>(requestReader, in), + this::handlePrimaryRequest + ); + } + } + @Override protected void doExecute(Task task, Request request, ActionListener listener) { assert request.shardId() != null : "request shardId must be set"; diff --git a/server/src/main/java/org/opensearch/action/support/replication/TransportWriteAction.java b/server/src/main/java/org/opensearch/action/support/replication/TransportWriteAction.java index 9ebfa8cfd0df8..27f9e6dee83de 100644 --- a/server/src/main/java/org/opensearch/action/support/replication/TransportWriteAction.java +++ b/server/src/main/java/org/opensearch/action/support/replication/TransportWriteAction.java @@ -59,6 +59,7 @@ import org.opensearch.index.translog.Translog.Location; import org.opensearch.indices.IndicesService; import org.opensearch.indices.SystemIndices; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; import org.opensearch.telemetry.tracing.Span; import org.opensearch.telemetry.tracing.SpanBuilder; import org.opensearch.telemetry.tracing.SpanScope; @@ -104,7 +105,8 @@ protected TransportWriteAction( boolean forceExecutionOnPrimary, IndexingPressureService indexingPressureService, SystemIndices systemIndices, - Tracer tracer + Tracer tracer, + AdmissionControlActionType admissionControlActionType ) { // We pass ThreadPool.Names.SAME to the super class as we control the dispatching to the // ThreadPool.Names.WRITE/ThreadPool.Names.SYSTEM_WRITE thread pools in this class. @@ -121,7 +123,8 @@ protected TransportWriteAction( replicaRequest, ThreadPool.Names.SAME, true, - forceExecutionOnPrimary + forceExecutionOnPrimary, + admissionControlActionType ); this.executorFunction = executorFunction; this.indexingPressureService = indexingPressureService; @@ -129,6 +132,43 @@ protected TransportWriteAction( this.tracer = tracer; } + protected TransportWriteAction( + Settings settings, + String actionName, + TransportService transportService, + ClusterService clusterService, + IndicesService indicesService, + ThreadPool threadPool, + ShardStateAction shardStateAction, + ActionFilters actionFilters, + Writeable.Reader request, + Writeable.Reader replicaRequest, + Function executorFunction, + boolean forceExecutionOnPrimary, + IndexingPressureService indexingPressureService, + SystemIndices systemIndices, + Tracer tracer + ) { + this( + settings, + actionName, + transportService, + clusterService, + indicesService, + threadPool, + shardStateAction, + actionFilters, + request, + replicaRequest, + executorFunction, + forceExecutionOnPrimary, + indexingPressureService, + systemIndices, + tracer, + null + ); + } + protected String executor(IndexShard shard) { return executorFunction.apply(shard); } diff --git a/server/src/main/java/org/opensearch/common/network/NetworkModule.java b/server/src/main/java/org/opensearch/common/network/NetworkModule.java index c04c5cdca0a5e..2edf3967c61b0 100644 --- a/server/src/main/java/org/opensearch/common/network/NetworkModule.java +++ b/server/src/main/java/org/opensearch/common/network/NetworkModule.java @@ -154,9 +154,6 @@ public NetworkModule( List transportInterceptors ) { this.settings = settings; - if (transportInterceptors != null) { - transportInterceptors.forEach(this::registerTransportInterceptor); - } for (NetworkPlugin plugin : plugins) { Map> httpTransportFactory = plugin.getHttpTransports( settings, @@ -193,6 +190,10 @@ public NetworkModule( registerTransportInterceptor(interceptor); } } + // Adding last because interceptors are triggered from last to first order from the list + if (transportInterceptors != null) { + transportInterceptors.forEach(this::registerTransportInterceptor); + } } /** Adds a transport implementation that can be selected by setting {@link #TRANSPORT_TYPE_KEY}. */ diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 614ee56cf8458..0e253fe7c3ed8 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -700,8 +700,7 @@ public void apply(Settings value, Settings current, Settings previous) { AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE, CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE, CpuBasedAdmissionControllerSettings.INDEXING_CPU_USAGE_LIMIT, - CpuBasedAdmissionControllerSettings.SEARCH_CPU_USAGE_LIMIT, - IndicesService.CLUSTER_RESTRICT_INDEX_REPLICATION_TYPE_SETTING + CpuBasedAdmissionControllerSettings.SEARCH_CPU_USAGE_LIMIT ) ) ); diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControlStats.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControlStats.java index 82a9c479d9d41..39909c571c63e 100644 --- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControlStats.java +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControlStats.java @@ -59,28 +59,9 @@ public List getAdmissionControllerStatsList() { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject("admission_control"); - this.admissionControllerStatsList.forEach(stats -> { - try { - builder.field(stats.getAdmissionControllerName(), stats); - } catch (IOException e) { - throw new RuntimeException(e); - } - }); - return builder.endObject(); - } - - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; + for (AdmissionControllerStats admissionControllerStats : this.admissionControllerStatsList) { + builder.field(admissionControllerStats.getAdmissionControllerName(), admissionControllerStats); } - if (obj == null || getClass() != obj.getClass()) return false; - AdmissionControlStats admissionControlStats = (AdmissionControlStats) obj; - return this.getAdmissionControllerStatsList().size() == admissionControlStats.getAdmissionControllerStatsList().size(); - } - - @Override - public int hashCode() { - return super.hashCode(); + return builder.endObject(); } } diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControllerStats.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControllerStats.java index 28a6d063b3e2a..3895cac3eaa07 100644 --- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControllerStats.java +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControllerStats.java @@ -14,11 +14,9 @@ import org.opensearch.core.xcontent.ToXContentFragment; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.ratelimitting.admissioncontrol.controllers.AdmissionController; -import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; import java.io.IOException; import java.util.Map; -import java.util.Objects; /** * Class for admission controller ( such as CPU ) stats which includes rejection count for each action type @@ -64,48 +62,13 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws { builder.startObject("rejection_count"); { - this.rejectionCount.forEach((actionType, count) -> { - try { - builder.field(actionType, count); - } catch (IOException e) { - throw new RuntimeException(e); - } - }); + for (Map.Entry rejectionCountEntry : this.rejectionCount.entrySet()) { + builder.field(rejectionCountEntry.getKey(), rejectionCountEntry.getValue()); + } } builder.endObject(); } builder.endObject(); return builder.endObject(); } - - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (obj == null || getClass() != obj.getClass()) return false; - AdmissionControllerStats admissionControllerStats = (AdmissionControllerStats) obj; - return Objects.equals(this.getAdmissionControllerName(), admissionControllerStats.getAdmissionControllerName()) - && Objects.equals( - this.rejectionCount.containsKey(AdmissionControlActionType.SEARCH.getType()), - admissionControllerStats.rejectionCount.containsKey(AdmissionControlActionType.SEARCH.getType()) - ) - && Objects.equals( - this.rejectionCount.get(AdmissionControlActionType.SEARCH.getType()), - admissionControllerStats.rejectionCount.get(AdmissionControlActionType.SEARCH.getType()) - ) - && Objects.equals( - this.rejectionCount.containsKey(AdmissionControlActionType.INDEXING.getType()), - admissionControllerStats.rejectionCount.containsKey(AdmissionControlActionType.INDEXING.getType()) - ) - && Objects.equals( - this.rejectionCount.get(AdmissionControlActionType.INDEXING.getType()), - admissionControllerStats.rejectionCount.get(AdmissionControlActionType.INDEXING.getType()) - ); - } - - @Override - public int hashCode() { - return super.hashCode(); - } } diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportHandler.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportHandler.java index 647da15c084b5..1e8f309234f90 100644 --- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportHandler.java +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportHandler.java @@ -63,8 +63,6 @@ public void messageReceived(T request, TransportChannel channel, Task task) thro log.warn(openSearchRejectedExecutionException.getMessage()); channel.sendResponse(openSearchRejectedExecutionException); return; - } catch (final Exception e) { - throw e; } } actualHandler.messageReceived(request, channel, task); diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java index 01faab5688d26..b8ab5c935fa34 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java @@ -547,14 +547,33 @@ public void testSerialization() throws IOException { } AdmissionControlStats admissionControlStats = nodeStats.getAdmissionControlStats(); AdmissionControlStats deserializedAdmissionControlStats = deserializedNodeStats.getAdmissionControlStats(); - assertEquals(admissionControlStats, deserializedAdmissionControlStats); - AdmissionControllerStats admissionControllerStats = admissionControlStats.getAdmissionControllerStatsList().get(0); - AdmissionControllerStats deserializedAdmissionControllerStats = deserializedAdmissionControlStats - .getAdmissionControllerStatsList() - .get(0); - assertEquals(1, (long) admissionControllerStats.getRejectionCount().get(AdmissionControlActionType.SEARCH.getType())); - assertEquals(2, (long) admissionControllerStats.getRejectionCount().get(AdmissionControlActionType.INDEXING.getType())); - assertEquals(admissionControllerStats, deserializedAdmissionControllerStats); + if (admissionControlStats == null) { + assertNull(deserializedAdmissionControlStats); + } else { + assertEquals( + admissionControlStats.getAdmissionControllerStatsList().size(), + deserializedAdmissionControlStats.getAdmissionControllerStatsList().size() + ); + AdmissionControllerStats admissionControllerStats = admissionControlStats.getAdmissionControllerStatsList().get(0); + AdmissionControllerStats deserializedAdmissionControllerStats = deserializedAdmissionControlStats + .getAdmissionControllerStatsList() + .get(0); + assertEquals( + admissionControllerStats.getAdmissionControllerName(), + deserializedAdmissionControllerStats.getAdmissionControllerName() + ); + assertEquals(1, (long) admissionControllerStats.getRejectionCount().get(AdmissionControlActionType.SEARCH.getType())); + assertEquals( + admissionControllerStats.getRejectionCount().get(AdmissionControlActionType.SEARCH.getType()), + deserializedAdmissionControllerStats.getRejectionCount().get(AdmissionControlActionType.SEARCH.getType()) + ); + + assertEquals(2, (long) admissionControllerStats.getRejectionCount().get(AdmissionControlActionType.INDEXING.getType())); + assertEquals( + admissionControllerStats.getRejectionCount().get(AdmissionControlActionType.INDEXING.getType()), + deserializedAdmissionControllerStats.getRejectionCount().get(AdmissionControlActionType.INDEXING.getType()) + ); + } } } } diff --git a/server/src/test/java/org/opensearch/common/network/NetworkModuleTests.java b/server/src/test/java/org/opensearch/common/network/NetworkModuleTests.java index ab51cafb039c2..de4bdcac6c2b2 100644 --- a/server/src/test/java/org/opensearch/common/network/NetworkModuleTests.java +++ b/server/src/test/java/org/opensearch/common/network/NetworkModuleTests.java @@ -474,13 +474,29 @@ public List getTransportInterceptors( try { transportInterceptor.interceptHandler("foo/bar/boom", null, true, null); } catch (Exception e) { - assertEquals(0, called.get()); + assertEquals(1, called.get()); assertEquals(1, called1.get()); } + + coreTransportInterceptors = new ArrayList<>(); + coreTransportInterceptors.add(interceptor); + module = newNetworkModule(settings, coreTransportInterceptors, new NetworkPlugin() { + @Override + public List getTransportInterceptors( + NamedWriteableRegistry namedWriteableRegistry, + ThreadContext threadContext + ) { + assertNotNull(threadContext); + return Collections.singletonList(interceptor1); + } + }); + + transportInterceptor = module.getTransportInterceptor(); + try { transportInterceptor.interceptHandler("foo/baz/boom", null, false, null); } catch (Exception e) { - assertEquals(0, called.get()); + assertEquals(1, called.get()); assertEquals(2, called1.get()); } }