From fbcc0d94d1183e15ea3c74c72894c527460b2ce3 Mon Sep 17 00:00:00 2001
From: Movva Ajaykumar <majaykumar51@gmail.com>
Date: Mon, 18 Mar 2024 13:11:52 +0530
Subject: [PATCH] Integrate IO Based AdmissionController to AdmissionControl
 Framework (#12702)

* Integrate IO Based AdmissionController to AdmissionControl Framework (#12583)
---------
Signed-off-by: Ajay Kumar Movva <movvaam@amazon.com>
Co-authored-by: Ajay Kumar Movva <movvaam@amazon.com>
---
 CHANGELOG.md                                  |   1 +
 .../AdmissionControlMultiNodeIT.java          | 292 -----------------
 .../common/settings/ClusterSettings.java      |   4 +
 .../tracker/NodeResourceUsageTracker.java     |   4 +
 .../AdmissionControlService.java              |  19 +-
 .../controllers/AdmissionController.java      |   1 -
 .../IoBasedAdmissionController.java           | 126 ++++++++
 .../IoBasedAdmissionControllerSettings.java   |  98 ++++++
 .../ResourceUsageCollectorServiceTests.java   |  96 +++---
 .../AdmissionControlServiceTests.java         |  31 +-
 .../AdmissionControlSingleNodeTests.java      | 298 +++++++++++++++---
 .../IoBasedAdmissionControllerTests.java      | 141 +++++++++
 ...asedAdmissionControllerSettingsTests.java} |   2 +-
 ...BasedAdmissionControllerSettingsTests.java | 160 ++++++++++
 14 files changed, 881 insertions(+), 392 deletions(-)
 delete mode 100644 server/src/internalClusterTest/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlMultiNodeIT.java
 create mode 100644 server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/IoBasedAdmissionController.java
 create mode 100644 server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/settings/IoBasedAdmissionControllerSettings.java
 create mode 100644 server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/controllers/IoBasedAdmissionControllerTests.java
 rename server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/settings/{CPUBasedAdmissionControlSettingsTests.java => CPUBasedAdmissionControllerSettingsTests.java} (98%)
 create mode 100644 server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/settings/IoBasedAdmissionControllerSettingsTests.java

diff --git a/CHANGELOG.md b/CHANGELOG.md
index a56ea61554bfc..86cef7765de06 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -116,6 +116,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
 - Make search query counters dynamic to support all query types ([#12601](https://github.com/opensearch-project/OpenSearch/pull/12601))
 - [Tiered caching] Add policies controlling which values can enter pluggable caches [EXPERIMENTAL] ([#12542](https://github.com/opensearch-project/OpenSearch/pull/12542))
 - [Tiered caching] Add Stale keys Management and CacheCleaner to IndicesRequestCache ([#12625](https://github.com/opensearch-project/OpenSearch/pull/12625))
+- [Admission Control] Integrated IO Based AdmissionController to AdmissionControl Framework ([#12583](https://github.com/opensearch-project/OpenSearch/pull/12583))
 
 ### Dependencies
 - Bump `peter-evans/find-comment` from 2 to 3 ([#12288](https://github.com/opensearch-project/OpenSearch/pull/12288))
diff --git a/server/src/internalClusterTest/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlMultiNodeIT.java b/server/src/internalClusterTest/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlMultiNodeIT.java
deleted file mode 100644
index 0af3d31f9e846..0000000000000
--- a/server/src/internalClusterTest/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlMultiNodeIT.java
+++ /dev/null
@@ -1,292 +0,0 @@
-/*
- * SPDX-License-Identifier: Apache-2.0
- *
- * The OpenSearch Contributors require contributions made to
- * this file be licensed under the Apache-2.0 license or a
- * compatible open source license.
- */
-
-package org.opensearch.ratelimitting.admissioncontrol;
-
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-import org.opensearch.action.admin.cluster.health.ClusterHealthRequest;
-import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
-import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest;
-import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
-import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
-import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
-import org.opensearch.action.admin.indices.stats.ShardStats;
-import org.opensearch.action.bulk.BulkRequest;
-import org.opensearch.action.bulk.BulkResponse;
-import org.opensearch.action.index.IndexRequest;
-import org.opensearch.action.search.SearchPhaseExecutionException;
-import org.opensearch.action.search.SearchResponse;
-import org.opensearch.cluster.metadata.IndexMetadata;
-import org.opensearch.cluster.node.DiscoveryNodes;
-import org.opensearch.cluster.routing.ShardRouting;
-import org.opensearch.common.UUIDs;
-import org.opensearch.common.collect.Tuple;
-import org.opensearch.common.settings.Settings;
-import org.opensearch.common.unit.TimeValue;
-import org.opensearch.node.resource.tracker.ResourceTrackerSettings;
-import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType;
-import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode;
-import org.opensearch.ratelimitting.admissioncontrol.settings.CpuBasedAdmissionControllerSettings;
-import org.opensearch.ratelimitting.admissioncontrol.stats.AdmissionControllerStats;
-import org.opensearch.test.OpenSearchIntegTestCase;
-import org.junit.After;
-import org.junit.Before;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.Stream;
-
-import static org.opensearch.ratelimitting.admissioncontrol.AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE;
-import static org.opensearch.ratelimitting.admissioncontrol.settings.CpuBasedAdmissionControllerSettings.INDEXING_CPU_USAGE_LIMIT;
-import static org.opensearch.ratelimitting.admissioncontrol.settings.CpuBasedAdmissionControllerSettings.SEARCH_CPU_USAGE_LIMIT;
-import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
-
-@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 2, numClientNodes = 1)
-public class AdmissionControlMultiNodeIT extends OpenSearchIntegTestCase {
-
-    public static final Settings settings = Settings.builder()
-        .put(ResourceTrackerSettings.GLOBAL_CPU_USAGE_AC_WINDOW_DURATION_SETTING.getKey(), TimeValue.timeValueMillis(500))
-        .put(ResourceTrackerSettings.GLOBAL_JVM_USAGE_AC_WINDOW_DURATION_SETTING.getKey(), TimeValue.timeValueMillis(500))
-        .put(ADMISSION_CONTROL_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.ENFORCED)
-        .put(SEARCH_CPU_USAGE_LIMIT.getKey(), 0)
-        .put(INDEXING_CPU_USAGE_LIMIT.getKey(), 0)
-        .build();
-
-    private static final Logger LOGGER = LogManager.getLogger(AdmissionControlMultiNodeIT.class);
-
-    public static final String INDEX_NAME = "test_index";
-
-    @Before
-    public void init() {
-        assertAcked(
-            prepareCreate(
-                INDEX_NAME,
-                Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
-            )
-        );
-        ensureGreen(INDEX_NAME);
-    }
-
-    @After
-    public void cleanup() {
-        client().admin().indices().prepareDelete(INDEX_NAME).get();
-    }
-
-    @Override
-    protected Settings nodeSettings(int nodeOrdinal) {
-        return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(settings).build();
-    }
-
-    public void testAdmissionControlRejectionOnEnforced() {
-        Tuple<String, String> primaryReplicaNodeNames = getPrimaryReplicaNodeNames(INDEX_NAME);
-        String primaryName = primaryReplicaNodeNames.v1();
-        String replicaName = primaryReplicaNodeNames.v2();
-        String coordinatingOnlyNode = getCoordinatingOnlyNode();
-        AdmissionControlService admissionControlServicePrimary = internalCluster().getInstance(AdmissionControlService.class, primaryName);
-        AdmissionControlService admissionControlServiceReplica = internalCluster().getInstance(AdmissionControlService.class, replicaName);
-        final BulkRequest bulkRequest = new BulkRequest();
-        for (int i = 0; i < 3; ++i) {
-            IndexRequest request = new IndexRequest(INDEX_NAME).id(UUIDs.base64UUID())
-                .source(Collections.singletonMap("key", randomAlphaOfLength(50)));
-            bulkRequest.add(request);
-        }
-        BulkResponse res = client(coordinatingOnlyNode).bulk(bulkRequest).actionGet();
-        assertEquals(429, res.getItems()[0].getFailure().getStatus().getStatus());
-        AdmissionControllerStats admissionControlPrimaryStats = admissionControlServicePrimary.stats()
-            .getAdmissionControllerStatsList()
-            .get(0);
-        assertEquals(admissionControlPrimaryStats.rejectionCount.get(AdmissionControlActionType.INDEXING.getType()).longValue(), 1);
-        Arrays.stream(res.getItems()).forEach(bulkItemResponse -> {
-            assertTrue(bulkItemResponse.getFailureMessage().contains("OpenSearchRejectedExecutionException"));
-        });
-        SearchResponse searchResponse;
-        try {
-            searchResponse = client(coordinatingOnlyNode).prepareSearch(INDEX_NAME).get();
-        } catch (Exception exception) {
-            assertTrue(((SearchPhaseExecutionException) exception).getDetailedMessage().contains("OpenSearchRejectedExecutionException"));
-        }
-        AdmissionControllerStats primaryStats = admissionControlServicePrimary.stats().getAdmissionControllerStatsList().get(0);
-        assertEquals(primaryStats.rejectionCount.get(AdmissionControlActionType.SEARCH.getType()).longValue(), 1);
-    }
-
-    public void testAdmissionControlEnforcedOnNonACEnabledActions() throws ExecutionException, InterruptedException {
-        String coordinatingOnlyNode = getCoordinatingOnlyNode();
-        ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
-
-        updateSettingsRequest.transientSettings(
-            Settings.builder()
-                .put(
-                    CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(),
-                    AdmissionControlMode.ENFORCED.getMode()
-                )
-        );
-        assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
-        NodesStatsRequest nodesStatsRequest = new NodesStatsRequest();
-        nodesStatsRequest.clear()
-            .indices(true)
-            .addMetrics(
-                NodesStatsRequest.Metric.JVM.metricName(),
-                NodesStatsRequest.Metric.OS.metricName(),
-                NodesStatsRequest.Metric.FS.metricName(),
-                NodesStatsRequest.Metric.PROCESS.metricName(),
-                NodesStatsRequest.Metric.ADMISSION_CONTROL.metricName()
-            );
-        NodesStatsResponse nodesStatsResponse = client(coordinatingOnlyNode).admin().cluster().nodesStats(nodesStatsRequest).actionGet();
-        ClusterHealthResponse clusterHealthResponse = client().admin().cluster().health(new ClusterHealthRequest()).actionGet();
-        assertEquals(200, clusterHealthResponse.status().getStatus());
-        assertFalse(nodesStatsResponse.hasFailures());
-    }
-
-    public void testAdmissionControlRejectionOnMonitor() {
-        Tuple<String, String> primaryReplicaNodeNames = getPrimaryReplicaNodeNames(INDEX_NAME);
-        String primaryName = primaryReplicaNodeNames.v1();
-        String replicaName = primaryReplicaNodeNames.v2();
-        String coordinatingOnlyNode = getCoordinatingOnlyNode();
-
-        AdmissionControlService admissionControlServicePrimary = internalCluster().getInstance(AdmissionControlService.class, primaryName);
-        AdmissionControlService admissionControlServiceReplica = internalCluster().getInstance(AdmissionControlService.class, replicaName);
-
-        ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
-
-        updateSettingsRequest.transientSettings(
-            Settings.builder()
-                .put(
-                    CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(),
-                    AdmissionControlMode.MONITOR.getMode()
-                )
-        );
-        assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
-
-        final BulkRequest bulkRequest = new BulkRequest();
-        for (int i = 0; i < 3; ++i) {
-            IndexRequest request = new IndexRequest(INDEX_NAME).id(UUIDs.base64UUID())
-                .source(Collections.singletonMap("key", randomAlphaOfLength(50)));
-            bulkRequest.add(request);
-        }
-        BulkResponse res = client(coordinatingOnlyNode).bulk(bulkRequest).actionGet();
-        assertFalse(res.hasFailures());
-        AdmissionControllerStats admissionControlPrimaryStats = admissionControlServicePrimary.stats()
-            .getAdmissionControllerStatsList()
-            .get(0);
-        AdmissionControllerStats admissionControlReplicaStats = admissionControlServiceReplica.stats()
-            .getAdmissionControllerStatsList()
-            .get(0);
-        long primaryRejectionCount = admissionControlPrimaryStats.rejectionCount.getOrDefault(
-            AdmissionControlActionType.INDEXING.getType(),
-            new AtomicLong(0).longValue()
-        );
-        long replicaRejectionCount = admissionControlReplicaStats.rejectionCount.getOrDefault(
-            AdmissionControlActionType.INDEXING.getType(),
-            new AtomicLong(0).longValue()
-        );
-        assertEquals(primaryRejectionCount, 1);
-        assertEquals(replicaRejectionCount, 0);
-        SearchResponse searchResponse;
-        searchResponse = client(coordinatingOnlyNode).prepareSearch(INDEX_NAME).get();
-        admissionControlPrimaryStats = admissionControlServicePrimary.stats().getAdmissionControllerStatsList().get(0);
-        admissionControlReplicaStats = admissionControlServiceReplica.stats().getAdmissionControllerStatsList().get(0);
-        primaryRejectionCount = admissionControlPrimaryStats.getRejectionCount()
-            .getOrDefault(AdmissionControlActionType.SEARCH.getType(), new AtomicLong(0).longValue());
-        replicaRejectionCount = admissionControlReplicaStats.getRejectionCount()
-            .getOrDefault(AdmissionControlActionType.SEARCH.getType(), new AtomicLong(0).longValue());
-        assertTrue(primaryRejectionCount == 1 || replicaRejectionCount == 1);
-        assertFalse(primaryRejectionCount == 1 && replicaRejectionCount == 1);
-    }
-
-    public void testAdmissionControlRejectionOnDisabled() {
-        Tuple<String, String> primaryReplicaNodeNames = getPrimaryReplicaNodeNames(INDEX_NAME);
-        String primaryName = primaryReplicaNodeNames.v1();
-        String replicaName = primaryReplicaNodeNames.v2();
-        String coordinatingOnlyNode = getCoordinatingOnlyNode();
-
-        AdmissionControlService admissionControlServicePrimary = internalCluster().getInstance(AdmissionControlService.class, primaryName);
-        AdmissionControlService admissionControlServiceReplica = internalCluster().getInstance(AdmissionControlService.class, replicaName);
-
-        ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
-
-        updateSettingsRequest.transientSettings(
-            Settings.builder()
-                .put(
-                    CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(),
-                    AdmissionControlMode.DISABLED.getMode()
-                )
-        );
-        assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
-
-        final BulkRequest bulkRequest = new BulkRequest();
-        for (int i = 0; i < 3; ++i) {
-            IndexRequest request = new IndexRequest(INDEX_NAME).id(UUIDs.base64UUID())
-                .source(Collections.singletonMap("key", randomAlphaOfLength(50)));
-            bulkRequest.add(request);
-        }
-        BulkResponse res = client(coordinatingOnlyNode).bulk(bulkRequest).actionGet();
-        assertFalse(res.hasFailures());
-        AdmissionControllerStats admissionControlPrimaryStats = admissionControlServicePrimary.stats()
-            .getAdmissionControllerStatsList()
-            .get(0);
-        AdmissionControllerStats admissionControlReplicaStats = admissionControlServiceReplica.stats()
-            .getAdmissionControllerStatsList()
-            .get(0);
-        long primaryRejectionCount = admissionControlPrimaryStats.rejectionCount.getOrDefault(
-            AdmissionControlActionType.INDEXING.getType(),
-            new AtomicLong(0).longValue()
-        );
-        long replicaRejectionCount = admissionControlReplicaStats.rejectionCount.getOrDefault(
-            AdmissionControlActionType.INDEXING.getType(),
-            new AtomicLong(0).longValue()
-        );
-        assertEquals(primaryRejectionCount, 0);
-        assertEquals(replicaRejectionCount, 0);
-        SearchResponse searchResponse;
-        searchResponse = client(coordinatingOnlyNode).prepareSearch(INDEX_NAME).get();
-        admissionControlPrimaryStats = admissionControlServicePrimary.stats().getAdmissionControllerStatsList().get(0);
-        admissionControlReplicaStats = admissionControlServiceReplica.stats().getAdmissionControllerStatsList().get(0);
-        primaryRejectionCount = admissionControlPrimaryStats.getRejectionCount()
-            .getOrDefault(AdmissionControlActionType.SEARCH.getType(), new AtomicLong(0).longValue());
-        replicaRejectionCount = admissionControlReplicaStats.getRejectionCount()
-            .getOrDefault(AdmissionControlActionType.SEARCH.getType(), new AtomicLong(0).longValue());
-        assertTrue(primaryRejectionCount == 0 && replicaRejectionCount == 0);
-    }
-
-    private Tuple<String, String> getPrimaryReplicaNodeNames(String indexName) {
-        IndicesStatsResponse response = client().admin().indices().prepareStats(indexName).get();
-        String primaryId = Stream.of(response.getShards())
-            .map(ShardStats::getShardRouting)
-            .filter(ShardRouting::primary)
-            .findAny()
-            .get()
-            .currentNodeId();
-        String replicaId = Stream.of(response.getShards())
-            .map(ShardStats::getShardRouting)
-            .filter(sr -> sr.primary() == false)
-            .findAny()
-            .get()
-            .currentNodeId();
-        DiscoveryNodes nodes = client().admin().cluster().prepareState().get().getState().nodes();
-        String primaryName = nodes.get(primaryId).getName();
-        String replicaName = nodes.get(replicaId).getName();
-        return new Tuple<>(primaryName, replicaName);
-    }
-
-    private String getCoordinatingOnlyNode() {
-        return client().admin()
-            .cluster()
-            .prepareState()
-            .get()
-            .getState()
-            .nodes()
-            .getCoordinatingOnlyNodes()
-            .values()
-            .iterator()
-            .next()
-            .getName();
-    }
-}
diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java
index a424294371422..6b4be45929553 100644
--- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java
+++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java
@@ -140,6 +140,7 @@
 import org.opensearch.plugins.PluginsService;
 import org.opensearch.ratelimitting.admissioncontrol.AdmissionControlSettings;
 import org.opensearch.ratelimitting.admissioncontrol.settings.CpuBasedAdmissionControllerSettings;
+import org.opensearch.ratelimitting.admissioncontrol.settings.IoBasedAdmissionControllerSettings;
 import org.opensearch.repositories.fs.FsRepository;
 import org.opensearch.rest.BaseRestHandler;
 import org.opensearch.script.ScriptService;
@@ -708,6 +709,9 @@ public void apply(Settings value, Settings current, Settings previous) {
                 CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE,
                 CpuBasedAdmissionControllerSettings.INDEXING_CPU_USAGE_LIMIT,
                 CpuBasedAdmissionControllerSettings.SEARCH_CPU_USAGE_LIMIT,
+                IoBasedAdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE,
+                IoBasedAdmissionControllerSettings.SEARCH_IO_USAGE_LIMIT,
+                IoBasedAdmissionControllerSettings.INDEXING_IO_USAGE_LIMIT,
                 IndicesService.CLUSTER_INDEX_RESTRICT_REPLICATION_TYPE_SETTING,
                 // Concurrent segment search settings
                 SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING,
diff --git a/server/src/main/java/org/opensearch/node/resource/tracker/NodeResourceUsageTracker.java b/server/src/main/java/org/opensearch/node/resource/tracker/NodeResourceUsageTracker.java
index 546ae07cde221..621f90e80454c 100644
--- a/server/src/main/java/org/opensearch/node/resource/tracker/NodeResourceUsageTracker.java
+++ b/server/src/main/java/org/opensearch/node/resource/tracker/NodeResourceUsageTracker.java
@@ -8,6 +8,7 @@
 
 package org.opensearch.node.resource.tracker;
 
+import org.apache.lucene.util.Constants;
 import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
 import org.opensearch.common.settings.ClusterSettings;
 import org.opensearch.common.settings.Settings;
@@ -69,6 +70,9 @@ public IoUsageStats getIoUsageStats() {
      * Checks if all of the resource usage trackers are ready
      */
     public boolean isReady() {
+        if (Constants.LINUX) {
+            return memoryUsageTracker.isReady() && cpuUsageTracker.isReady() && ioUsageTracker.isReady();
+        }
         return memoryUsageTracker.isReady() && cpuUsageTracker.isReady();
     }
 
diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlService.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlService.java
index adca6992833bd..5b842ff0d3399 100644
--- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlService.java
+++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlService.java
@@ -10,11 +10,13 @@
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import org.apache.lucene.util.Constants;
 import org.opensearch.cluster.service.ClusterService;
 import org.opensearch.common.settings.Settings;
 import org.opensearch.node.ResourceUsageCollectorService;
 import org.opensearch.ratelimitting.admissioncontrol.controllers.AdmissionController;
 import org.opensearch.ratelimitting.admissioncontrol.controllers.CpuBasedAdmissionController;
+import org.opensearch.ratelimitting.admissioncontrol.controllers.IoBasedAdmissionController;
 import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType;
 import org.opensearch.ratelimitting.admissioncontrol.stats.AdmissionControlStats;
 import org.opensearch.ratelimitting.admissioncontrol.stats.AdmissionControllerStats;
@@ -26,6 +28,7 @@
 import java.util.concurrent.ConcurrentMap;
 
 import static org.opensearch.ratelimitting.admissioncontrol.controllers.CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER;
+import static org.opensearch.ratelimitting.admissioncontrol.controllers.IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER;
 
 /**
  * Admission control Service that bootstraps and manages all the Admission Controllers in OpenSearch.
@@ -58,15 +61,18 @@ public AdmissionControlService(
         this.clusterService = clusterService;
         this.settings = settings;
         this.resourceUsageCollectorService = resourceUsageCollectorService;
-        this.initialise();
+        this.initialize();
     }
 
     /**
      * Initialise and Register all the admissionControllers
      */
-    private void initialise() {
+    private void initialize() {
         // Initialise different type of admission controllers
         registerAdmissionController(CPU_BASED_ADMISSION_CONTROLLER);
+        if (Constants.LINUX) {
+            registerAdmissionController(IO_BASED_ADMISSION_CONTROLLER);
+        }
     }
 
     /**
@@ -101,6 +107,13 @@ private AdmissionController controllerFactory(String admissionControllerName) {
                     this.clusterService,
                     this.settings
                 );
+            case IO_BASED_ADMISSION_CONTROLLER:
+                return new IoBasedAdmissionController(
+                    admissionControllerName,
+                    this.resourceUsageCollectorService,
+                    this.clusterService,
+                    this.settings
+                );
             default:
                 throw new IllegalArgumentException("Not Supported AdmissionController : " + admissionControllerName);
         }
@@ -128,7 +141,7 @@ public AdmissionController getAdmissionController(String controllerName) {
      */
     public AdmissionControlStats stats() {
         List<AdmissionControllerStats> statsList = new ArrayList<>();
-        if (this.admissionControllers.size() > 0) {
+        if (!this.admissionControllers.isEmpty()) {
             this.admissionControllers.forEach((controllerName, admissionController) -> {
                 AdmissionControllerStats admissionControllerStats = new AdmissionControllerStats(admissionController);
                 statsList.add(admissionControllerStats);
diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/AdmissionController.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/AdmissionController.java
index 2246ce34dd399..f5bb5fa660e7f 100644
--- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/AdmissionController.java
+++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/AdmissionController.java
@@ -24,7 +24,6 @@
  * and admission control can be applied if configured limit has been reached
  */
 public abstract class AdmissionController {
-
     private final String admissionControllerName;
     final ResourceUsageCollectorService resourceUsageCollectorService;
     public final Map<String, AtomicLong> rejectionCountMap;
diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/IoBasedAdmissionController.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/IoBasedAdmissionController.java
new file mode 100644
index 0000000000000..ad6cc3ff378f0
--- /dev/null
+++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/IoBasedAdmissionController.java
@@ -0,0 +1,126 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.ratelimitting.admissioncontrol.controllers;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.opensearch.cluster.service.ClusterService;
+import org.opensearch.common.settings.Settings;
+import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
+import org.opensearch.node.NodeResourceUsageStats;
+import org.opensearch.node.ResourceUsageCollectorService;
+import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType;
+import org.opensearch.ratelimitting.admissioncontrol.settings.IoBasedAdmissionControllerSettings;
+
+import java.util.Locale;
+import java.util.Optional;
+
+/**
+ *  Class for IO Based Admission Controller in OpenSearch, which aims to provide IO utilisation admission control.
+ *  It provides methods to apply admission control if configured limit has been reached
+ */
+public class IoBasedAdmissionController extends AdmissionController {
+    public static final String IO_BASED_ADMISSION_CONTROLLER = "global_io_usage";
+    private static final Logger LOGGER = LogManager.getLogger(IoBasedAdmissionController.class);
+    public IoBasedAdmissionControllerSettings settings;
+
+    /**
+     * @param admissionControllerName       name of the admissionController
+     * @param resourceUsageCollectorService instance used to get resource usage stats of the node
+     * @param clusterService                instance of the clusterService
+     */
+    public IoBasedAdmissionController(
+        String admissionControllerName,
+        ResourceUsageCollectorService resourceUsageCollectorService,
+        ClusterService clusterService,
+        Settings settings
+    ) {
+        super(admissionControllerName, resourceUsageCollectorService, clusterService);
+        this.settings = new IoBasedAdmissionControllerSettings(clusterService.getClusterSettings(), settings);
+    }
+
+    /**
+     * Apply admission control based on the resource usage for an action
+     *
+     * @param action is the transport action
+     * @param admissionControlActionType type of admissionControlActionType
+     */
+    @Override
+    public void apply(String action, AdmissionControlActionType admissionControlActionType) {
+        if (this.isEnabledForTransportLayer(this.settings.getTransportLayerAdmissionControllerMode())) {
+            this.applyForTransportLayer(action, admissionControlActionType);
+        }
+    }
+
+    /**
+     * Apply transport layer admission control if configured limit has been reached
+     */
+    private void applyForTransportLayer(String actionName, AdmissionControlActionType admissionControlActionType) {
+        if (isLimitsBreached(actionName, admissionControlActionType)) {
+            this.addRejectionCount(admissionControlActionType.getType(), 1);
+            if (this.isAdmissionControllerEnforced(this.settings.getTransportLayerAdmissionControllerMode())) {
+                throw new OpenSearchRejectedExecutionException(
+                    String.format(
+                        Locale.ROOT,
+                        "Io usage admission controller rejected the request for action [%s] as IO limit reached",
+                        admissionControlActionType.name()
+                    )
+                );
+            }
+        }
+    }
+
+    /**
+     * Check if the configured resource usage limits are breached for the action
+     */
+    private boolean isLimitsBreached(String actionName, AdmissionControlActionType admissionControlActionType) {
+        // check if cluster state is ready
+        if (clusterService.state() != null && clusterService.state().nodes() != null) {
+            long ioUsageThreshold = this.getIoRejectionThreshold(admissionControlActionType);
+            Optional<NodeResourceUsageStats> nodePerformanceStatistics = this.resourceUsageCollectorService.getNodeStatistics(
+                this.clusterService.state().nodes().getLocalNodeId()
+            );
+            if (nodePerformanceStatistics.isPresent()) {
+                double ioUsage = nodePerformanceStatistics.get().getIoUsageStats().getIoUtilisationPercent();
+                if (ioUsage >= ioUsageThreshold) {
+                    LOGGER.warn(
+                        "IoBasedAdmissionController limit reached as the current IO "
+                            + "usage [{}] exceeds the allowed limit [{}] for transport action [{}] in admissionControlMode [{}]",
+                        ioUsage,
+                        ioUsageThreshold,
+                        actionName,
+                        this.settings.getTransportLayerAdmissionControllerMode()
+                    );
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
+    /**
+     * Get IO rejection threshold based on action type
+     */
+    private long getIoRejectionThreshold(AdmissionControlActionType admissionControlActionType) {
+        switch (admissionControlActionType) {
+            case SEARCH:
+                return this.settings.getSearchIOUsageLimit();
+            case INDEXING:
+                return this.settings.getIndexingIOUsageLimit();
+            default:
+                throw new IllegalArgumentException(
+                    String.format(
+                        Locale.ROOT,
+                        "Admission control not Supported for AdmissionControlActionType: %s",
+                        admissionControlActionType.getType()
+                    )
+                );
+        }
+    }
+}
diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/settings/IoBasedAdmissionControllerSettings.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/settings/IoBasedAdmissionControllerSettings.java
new file mode 100644
index 0000000000000..e58ed28d21605
--- /dev/null
+++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/settings/IoBasedAdmissionControllerSettings.java
@@ -0,0 +1,98 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.ratelimitting.admissioncontrol.settings;
+
+import org.opensearch.common.settings.ClusterSettings;
+import org.opensearch.common.settings.Setting;
+import org.opensearch.common.settings.Settings;
+import org.opensearch.ratelimitting.admissioncontrol.AdmissionControlSettings;
+import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode;
+
+/**
+ * Settings related to IO based admission controller.
+ * @opensearch.internal
+ */
+public class IoBasedAdmissionControllerSettings {
+
+    /**
+     * Default parameters for the IoBasedAdmissionControllerSettings
+     */
+    public static class Defaults {
+        public static final long IO_USAGE_LIMIT = 95;
+    }
+
+    private AdmissionControlMode transportLayerMode;
+    private Long searchIOUsageLimit;
+    private Long indexingIOUsageLimit;
+
+    /**
+     * Feature level setting to operate in shadow-mode or in enforced-mode. If enforced field is set
+     * rejection will be performed, otherwise only rejection metrics will be populated.
+     */
+    public static final Setting<AdmissionControlMode> IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE = new Setting<>(
+        "admission_control.transport.io_usage.mode_override",
+        AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE,
+        AdmissionControlMode::fromName,
+        Setting.Property.Dynamic,
+        Setting.Property.NodeScope
+    );
+
+    /**
+     * This setting used to set the IO Limits for the search requests by default it will use default IO usage limit
+     */
+    public static final Setting<Long> SEARCH_IO_USAGE_LIMIT = Setting.longSetting(
+        "admission_control.search.io_usage.limit",
+        Defaults.IO_USAGE_LIMIT,
+        Setting.Property.Dynamic,
+        Setting.Property.NodeScope
+    );
+
+    /**
+     * This setting used to set the IO limits for the indexing requests by default it will use default IO usage limit
+     */
+    public static final Setting<Long> INDEXING_IO_USAGE_LIMIT = Setting.longSetting(
+        "admission_control.indexing.io_usage.limit",
+        Defaults.IO_USAGE_LIMIT,
+        Setting.Property.Dynamic,
+        Setting.Property.NodeScope
+    );
+
+    public IoBasedAdmissionControllerSettings(ClusterSettings clusterSettings, Settings settings) {
+        this.transportLayerMode = IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.get(settings);
+        clusterSettings.addSettingsUpdateConsumer(IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE, this::setTransportLayerMode);
+        this.searchIOUsageLimit = SEARCH_IO_USAGE_LIMIT.get(settings);
+        this.indexingIOUsageLimit = INDEXING_IO_USAGE_LIMIT.get(settings);
+        clusterSettings.addSettingsUpdateConsumer(INDEXING_IO_USAGE_LIMIT, this::setIndexingIOUsageLimit);
+        clusterSettings.addSettingsUpdateConsumer(SEARCH_IO_USAGE_LIMIT, this::setSearchIOUsageLimit);
+    }
+
+    public void setIndexingIOUsageLimit(Long indexingIOUsageLimit) {
+        this.indexingIOUsageLimit = indexingIOUsageLimit;
+    }
+
+    public void setSearchIOUsageLimit(Long searchIOUsageLimit) {
+        this.searchIOUsageLimit = searchIOUsageLimit;
+    }
+
+    public AdmissionControlMode getTransportLayerAdmissionControllerMode() {
+        return transportLayerMode;
+    }
+
+    public void setTransportLayerMode(AdmissionControlMode transportLayerMode) {
+        this.transportLayerMode = transportLayerMode;
+    }
+
+    public Long getIndexingIOUsageLimit() {
+        return indexingIOUsageLimit;
+    }
+
+    public Long getSearchIOUsageLimit() {
+        return searchIOUsageLimit;
+    }
+}
diff --git a/server/src/test/java/org/opensearch/node/ResourceUsageCollectorServiceTests.java b/server/src/test/java/org/opensearch/node/ResourceUsageCollectorServiceTests.java
index f2ee0e61c4953..6dd90784ab65f 100644
--- a/server/src/test/java/org/opensearch/node/ResourceUsageCollectorServiceTests.java
+++ b/server/src/test/java/org/opensearch/node/ResourceUsageCollectorServiceTests.java
@@ -14,24 +14,21 @@
 import org.opensearch.cluster.node.DiscoveryNode;
 import org.opensearch.cluster.node.DiscoveryNodes;
 import org.opensearch.cluster.service.ClusterService;
-import org.opensearch.common.settings.ClusterSettings;
 import org.opensearch.common.settings.Settings;
 import org.opensearch.common.unit.TimeValue;
 import org.opensearch.core.common.transport.TransportAddress;
-import org.opensearch.node.resource.tracker.NodeResourceUsageTracker;
 import org.opensearch.node.resource.tracker.ResourceTrackerSettings;
+import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode;
 import org.opensearch.test.OpenSearchSingleNodeTestCase;
-import org.opensearch.threadpool.TestThreadPool;
-import org.opensearch.threadpool.ThreadPool;
 import org.junit.After;
-import org.junit.Before;
 
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
-import static org.opensearch.test.ClusterServiceUtils.createClusterService;
+import static org.opensearch.ratelimitting.admissioncontrol.AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE;
+import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
 import static org.hamcrest.Matchers.greaterThan;
 
 /**
@@ -39,61 +36,50 @@
  * are working as expected
  */
 public class ResourceUsageCollectorServiceTests extends OpenSearchSingleNodeTestCase {
+    @Override
+    protected boolean resetNodeAfterTest() {
+        return true;
+    }
 
-    private ClusterService clusterService;
-    private ResourceUsageCollectorService collector;
-    private ThreadPool threadpool;
-    NodeResourceUsageTracker tracker;
-
-    @Before
-    public void setUp() throws Exception {
-        super.setUp();
-
-        threadpool = new TestThreadPool("resource_usage_collector_tests");
-
-        clusterService = createClusterService(threadpool);
-
-        Settings settings = Settings.builder()
-            .put(ResourceTrackerSettings.GLOBAL_JVM_USAGE_AC_WINDOW_DURATION_SETTING.getKey(), new TimeValue(500, TimeUnit.MILLISECONDS))
+    @Override
+    protected Settings nodeSettings() {
+        return Settings.builder()
+            .put(super.nodeSettings())
+            .put(ResourceTrackerSettings.GLOBAL_CPU_USAGE_AC_WINDOW_DURATION_SETTING.getKey(), TimeValue.timeValueMillis(500))
+            .put(ResourceTrackerSettings.GLOBAL_JVM_USAGE_AC_WINDOW_DURATION_SETTING.getKey(), TimeValue.timeValueMillis(500))
+            .put(ResourceTrackerSettings.GLOBAL_IO_USAGE_AC_WINDOW_DURATION_SETTING.getKey(), TimeValue.timeValueMillis(5000))
+            .put(ADMISSION_CONTROL_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.ENFORCED)
             .build();
-        tracker = new NodeResourceUsageTracker(
-            null,
-            threadpool,
-            settings,
-            new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
-        );
-        collector = new ResourceUsageCollectorService(tracker, clusterService, threadpool);
-        tracker.start();
-        collector.start();
     }
 
     @After
-    public void tearDown() throws Exception {
-        super.tearDown();
-        threadpool.shutdownNow();
-        clusterService.close();
-        collector.stop();
-        tracker.stop();
-        collector.close();
-        tracker.close();
+    public void cleanup() {
+        assertAcked(
+            client().admin()
+                .cluster()
+                .prepareUpdateSettings()
+                .setPersistentSettings(Settings.builder().putNull("*"))
+                .setTransientSettings(Settings.builder().putNull("*"))
+        );
     }
 
     public void testResourceUsageStats() {
-        collector.collectNodeResourceUsageStats("node1", System.currentTimeMillis(), 97, 99, new IoUsageStats(98));
-        Map<String, NodeResourceUsageStats> nodeStats = collector.getAllNodeStatistics();
+        ResourceUsageCollectorService resourceUsageCollectorService = getInstanceFromNode(ResourceUsageCollectorService.class);
+        resourceUsageCollectorService.collectNodeResourceUsageStats("node1", System.currentTimeMillis(), 97, 99, new IoUsageStats(98));
+        Map<String, NodeResourceUsageStats> nodeStats = resourceUsageCollectorService.getAllNodeStatistics();
         assertTrue(nodeStats.containsKey("node1"));
         assertEquals(99.0, nodeStats.get("node1").cpuUtilizationPercent, 0.0);
         assertEquals(97.0, nodeStats.get("node1").memoryUtilizationPercent, 0.0);
         assertEquals(98, nodeStats.get("node1").getIoUsageStats().getIoUtilisationPercent(), 0.0);
 
-        Optional<NodeResourceUsageStats> nodeResourceUsageStatsOptional = collector.getNodeStatistics("node1");
+        Optional<NodeResourceUsageStats> nodeResourceUsageStatsOptional = resourceUsageCollectorService.getNodeStatistics("node1");
 
         assertNotNull(nodeResourceUsageStatsOptional.get());
         assertEquals(99.0, nodeResourceUsageStatsOptional.get().cpuUtilizationPercent, 0.0);
         assertEquals(97.0, nodeResourceUsageStatsOptional.get().memoryUtilizationPercent, 0.0);
         assertEquals(98, nodeResourceUsageStatsOptional.get().getIoUsageStats().getIoUtilisationPercent(), 0.0);
 
-        nodeResourceUsageStatsOptional = collector.getNodeStatistics("node2");
+        nodeResourceUsageStatsOptional = resourceUsageCollectorService.getNodeStatistics("node2");
         assertTrue(nodeResourceUsageStatsOptional.isEmpty());
     }
 
@@ -101,26 +87,29 @@ public void testScheduler() throws Exception {
         /**
          * Wait for cluster state to be ready so that localNode().getId() is ready and we add the values to the map
          */
-        assertBusy(() -> assertTrue(collector.getNodeStatistics(clusterService.localNode().getId()).isPresent()), 1, TimeUnit.MINUTES);
-        assertTrue(collector.getNodeStatistics(clusterService.localNode().getId()).isPresent());
+        ResourceUsageCollectorService resourceUsageCollectorService = getInstanceFromNode(ResourceUsageCollectorService.class);
+        ClusterService clusterService = getInstanceFromNode(ClusterService.class);
+        assertBusy(() -> assertEquals(1, resourceUsageCollectorService.getAllNodeStatistics().size()));
+
         /**
          * Wait for memory utilization to be reported greater than 0
          */
         assertBusy(
             () -> assertThat(
-                collector.getNodeStatistics(clusterService.localNode().getId()).get().getMemoryUtilizationPercent(),
+                resourceUsageCollectorService.getNodeStatistics(clusterService.localNode().getId()).get().getMemoryUtilizationPercent(),
                 greaterThan(0.0)
             ),
             5,
             TimeUnit.SECONDS
         );
-        assertTrue(collector.getNodeStatistics("Invalid").isEmpty());
+        assertTrue(resourceUsageCollectorService.getNodeStatistics("Invalid").isEmpty());
     }
 
     /*
      * Test that concurrently adding values and removing nodes does not cause exceptions
      */
     public void testConcurrentAddingAndRemovingNodes() throws Exception {
+        ResourceUsageCollectorService resourceUsageCollectorService = getInstanceFromNode(ResourceUsageCollectorService.class);
         String[] nodes = new String[] { "a", "b", "c", "d" };
 
         final CountDownLatch latch = new CountDownLatch(5);
@@ -134,9 +123,9 @@ public void testConcurrentAddingAndRemovingNodes() throws Exception {
             }
             for (int i = 0; i < randomIntBetween(100, 200); i++) {
                 if (randomBoolean()) {
-                    collector.removeNodeResourceUsageStats(randomFrom(nodes));
+                    resourceUsageCollectorService.removeNodeResourceUsageStats(randomFrom(nodes));
                 }
-                collector.collectNodeResourceUsageStats(
+                resourceUsageCollectorService.collectNodeResourceUsageStats(
                     randomFrom(nodes),
                     System.currentTimeMillis(),
                     randomIntBetween(1, 100),
@@ -161,7 +150,7 @@ public void testConcurrentAddingAndRemovingNodes() throws Exception {
         t3.join();
         t4.join();
 
-        final Map<String, NodeResourceUsageStats> nodeStats = collector.getAllNodeStatistics();
+        final Map<String, NodeResourceUsageStats> nodeStats = resourceUsageCollectorService.getAllNodeStatistics();
         for (String nodeId : nodes) {
             if (nodeStats.containsKey(nodeId)) {
                 assertThat(nodeStats.get(nodeId).memoryUtilizationPercent, greaterThan(0.0));
@@ -172,14 +161,15 @@ public void testConcurrentAddingAndRemovingNodes() throws Exception {
     }
 
     public void testNodeRemoval() {
-        collector.collectNodeResourceUsageStats(
+        ResourceUsageCollectorService resourceUsageCollectorService = getInstanceFromNode(ResourceUsageCollectorService.class);
+        resourceUsageCollectorService.collectNodeResourceUsageStats(
             "node1",
             System.currentTimeMillis(),
             randomIntBetween(1, 100),
             randomIntBetween(1, 100),
             new IoUsageStats(randomIntBetween(1, 100))
         );
-        collector.collectNodeResourceUsageStats(
+        resourceUsageCollectorService.collectNodeResourceUsageStats(
             "node2",
             System.currentTimeMillis(),
             randomIntBetween(1, 100),
@@ -199,8 +189,8 @@ public void testNodeRemoval() {
             .build();
         ClusterChangedEvent event = new ClusterChangedEvent("test", newState, previousState);
 
-        collector.clusterChanged(event);
-        final Map<String, NodeResourceUsageStats> nodeStats = collector.getAllNodeStatistics();
+        resourceUsageCollectorService.clusterChanged(event);
+        final Map<String, NodeResourceUsageStats> nodeStats = resourceUsageCollectorService.getAllNodeStatistics();
         assertTrue(nodeStats.containsKey("node1"));
         assertFalse(nodeStats.containsKey("node2"));
     }
diff --git a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlServiceTests.java b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlServiceTests.java
index 7a67ffc8c7c5d..4f615290f1805 100644
--- a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlServiceTests.java
+++ b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlServiceTests.java
@@ -8,6 +8,7 @@
 
 package org.opensearch.ratelimitting.admissioncontrol;
 
+import org.apache.lucene.util.Constants;
 import org.opensearch.cluster.service.ClusterService;
 import org.opensearch.common.settings.ClusterSettings;
 import org.opensearch.common.settings.Settings;
@@ -48,13 +49,21 @@ public void tearDown() throws Exception {
 
     public void testWhenAdmissionControllerRegistered() {
         admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService, threadPool, null);
-        assertEquals(admissionControlService.getAdmissionControllers().size(), 1);
+        if (Constants.LINUX) {
+            assertEquals(admissionControlService.getAdmissionControllers().size(), 2);
+        } else {
+            assertEquals(admissionControlService.getAdmissionControllers().size(), 1);
+        }
     }
 
     public void testRegisterInvalidAdmissionController() {
         String test = "TEST";
         admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService, threadPool, null);
-        assertEquals(admissionControlService.getAdmissionControllers().size(), 1);
+        if (Constants.LINUX) {
+            assertEquals(admissionControlService.getAdmissionControllers().size(), 2);
+        } else {
+            assertEquals(admissionControlService.getAdmissionControllers().size(), 1);
+        }
         IllegalArgumentException ex = expectThrows(
             IllegalArgumentException.class,
             () -> admissionControlService.registerAdmissionController(test)
@@ -66,7 +75,11 @@ public void testAdmissionControllerSettings() {
         admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService, threadPool, null);
         AdmissionControlSettings admissionControlSettings = admissionControlService.admissionControlSettings;
         List<AdmissionController> admissionControllerList = admissionControlService.getAdmissionControllers();
-        assertEquals(admissionControllerList.size(), 1);
+        if (Constants.LINUX) {
+            assertEquals(admissionControllerList.size(), 2);
+        } else {
+            assertEquals(admissionControllerList.size(), 1);
+        }
         CpuBasedAdmissionController cpuBasedAdmissionController = (CpuBasedAdmissionController) admissionControlService
             .getAdmissionController(CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER);
         assertEquals(
@@ -132,7 +145,11 @@ public void testApplyAdmissionControllerEnabled() {
             .build();
         clusterService.getClusterSettings().applySettings(settings);
         List<AdmissionController> admissionControllerList = admissionControlService.getAdmissionControllers();
-        assertEquals(admissionControllerList.size(), 1);
+        if (Constants.LINUX) {
+            assertEquals(admissionControllerList.size(), 2);
+        } else {
+            assertEquals(admissionControllerList.size(), 1);
+        }
     }
 
     public void testApplyAdmissionControllerEnforced() {
@@ -153,6 +170,10 @@ public void testApplyAdmissionControllerEnforced() {
             .build();
         clusterService.getClusterSettings().applySettings(settings);
         List<AdmissionController> admissionControllerList = admissionControlService.getAdmissionControllers();
-        assertEquals(admissionControllerList.size(), 1);
+        if (Constants.LINUX) {
+            assertEquals(admissionControllerList.size(), 2);
+        } else {
+            assertEquals(admissionControllerList.size(), 1);
+        }
     }
 }
diff --git a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlSingleNodeTests.java b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlSingleNodeTests.java
index a1694b2c3cee2..5534dbcf2774b 100644
--- a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlSingleNodeTests.java
+++ b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlSingleNodeTests.java
@@ -8,6 +8,7 @@
 
 package org.opensearch.ratelimitting.admissioncontrol;
 
+import org.apache.lucene.util.Constants;
 import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
 import org.opensearch.action.admin.cluster.state.ClusterStateRequest;
 import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
@@ -21,15 +22,24 @@
 import org.opensearch.common.unit.TimeValue;
 import org.opensearch.node.ResourceUsageCollectorService;
 import org.opensearch.node.resource.tracker.ResourceTrackerSettings;
+import org.opensearch.ratelimitting.admissioncontrol.controllers.CpuBasedAdmissionController;
+import org.opensearch.ratelimitting.admissioncontrol.controllers.IoBasedAdmissionController;
 import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType;
 import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode;
 import org.opensearch.ratelimitting.admissioncontrol.stats.AdmissionControllerStats;
 import org.opensearch.test.OpenSearchSingleNodeTestCase;
 import org.junit.After;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import static org.opensearch.ratelimitting.admissioncontrol.AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE;
+import static org.opensearch.ratelimitting.admissioncontrol.settings.CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE;
 import static org.opensearch.ratelimitting.admissioncontrol.settings.CpuBasedAdmissionControllerSettings.INDEXING_CPU_USAGE_LIMIT;
 import static org.opensearch.ratelimitting.admissioncontrol.settings.CpuBasedAdmissionControllerSettings.SEARCH_CPU_USAGE_LIMIT;
+import static org.opensearch.ratelimitting.admissioncontrol.settings.IoBasedAdmissionControllerSettings.INDEXING_IO_USAGE_LIMIT;
+import static org.opensearch.ratelimitting.admissioncontrol.settings.IoBasedAdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE;
+import static org.opensearch.ratelimitting.admissioncontrol.settings.IoBasedAdmissionControllerSettings.SEARCH_IO_USAGE_LIMIT;
 import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
 import static org.hamcrest.Matchers.is;
 
@@ -38,6 +48,8 @@
  */
 public class AdmissionControlSingleNodeTests extends OpenSearchSingleNodeTestCase {
 
+    public static final String INDEX_NAME = "test_index";
+
     @Override
     protected boolean resetNodeAfterTest() {
         return true;
@@ -45,6 +57,7 @@ protected boolean resetNodeAfterTest() {
 
     @After
     public void cleanup() {
+        client().admin().indices().prepareDelete(INDEX_NAME).get();
         assertAcked(
             client().admin()
                 .cluster()
@@ -60,7 +73,8 @@ protected Settings nodeSettings() {
             .put(super.nodeSettings())
             .put(ResourceTrackerSettings.GLOBAL_CPU_USAGE_AC_WINDOW_DURATION_SETTING.getKey(), TimeValue.timeValueMillis(500))
             .put(ResourceTrackerSettings.GLOBAL_JVM_USAGE_AC_WINDOW_DURATION_SETTING.getKey(), TimeValue.timeValueMillis(500))
-            .put(ADMISSION_CONTROL_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.ENFORCED)
+            .put(ResourceTrackerSettings.GLOBAL_IO_USAGE_AC_WINDOW_DURATION_SETTING.getKey(), TimeValue.timeValueMillis(5000))
+            .put(CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.ENFORCED.getMode())
             .put(SEARCH_CPU_USAGE_LIMIT.getKey(), 0)
             .put(INDEXING_CPU_USAGE_LIMIT.getKey(), 0)
             .build();
@@ -69,11 +83,10 @@ protected Settings nodeSettings() {
     public void testAdmissionControlRejectionEnforcedMode() throws Exception {
         ensureGreen();
         assertBusy(() -> assertEquals(1, getInstanceFromNode(ResourceUsageCollectorService.class).getAllNodeStatistics().size()));
-        // Thread.sleep(700);
-        client().admin().indices().prepareCreate("index").execute().actionGet();
+        client().admin().indices().prepareCreate(INDEX_NAME).execute().actionGet();
         BulkRequestBuilder bulk = client().prepareBulk();
         for (int i = 0; i < 3; i++) {
-            bulk.add(client().prepareIndex("index").setSource("foo", "bar " + i));
+            bulk.add(client().prepareIndex(INDEX_NAME).setSource("foo", "bar " + i));
         }
         // Verify that cluster state is updated
         ActionFuture<ClusterStateResponse> future2 = client().admin().cluster().state(new ClusterStateRequest());
@@ -83,24 +96,116 @@ public void testAdmissionControlRejectionEnforcedMode() throws Exception {
         BulkResponse res = client().bulk(bulk.request()).actionGet();
         assertEquals(429, res.getItems()[0].getFailure().getStatus().getStatus());
         AdmissionControlService admissionControlService = getInstanceFromNode(AdmissionControlService.class);
-        AdmissionControllerStats acStats = admissionControlService.stats().getAdmissionControllerStatsList().get(0);
-        assertEquals(1, (long) acStats.getRejectionCount().get(AdmissionControlActionType.INDEXING.getType()));
-        client().admin().indices().prepareRefresh("index").get();
+        Map<String, AdmissionControllerStats> acStats = this.getAdmissionControlStats(admissionControlService);
+        assertEquals(
+            1,
+            (long) acStats.get(CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER)
+                .getRejectionCount()
+                .get(AdmissionControlActionType.INDEXING.getType())
+        );
+        if (Constants.LINUX) {
+            assertEquals(
+                0,
+                (long) acStats.get(IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER)
+                    .getRejectionCount()
+                    .getOrDefault(AdmissionControlActionType.INDEXING.getType(), 0L)
+            );
+        } else {
+            assertNull(acStats.get(IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER));
+        }
+        client().admin().indices().prepareRefresh(INDEX_NAME).get();
 
         // verify search request hits 429
-        SearchRequest searchRequest = new SearchRequest("index");
+        SearchRequest searchRequest = new SearchRequest(INDEX_NAME);
         try {
             client().search(searchRequest).actionGet();
         } catch (Exception e) {
             assertTrue(((SearchPhaseExecutionException) e).getDetailedMessage().contains("OpenSearchRejectedExecutionException"));
         }
-        acStats = admissionControlService.stats().getAdmissionControllerStatsList().get(0);
-        assertEquals(1, (long) acStats.getRejectionCount().get(AdmissionControlActionType.SEARCH.getType()));
+        acStats = this.getAdmissionControlStats(admissionControlService);
+        assertEquals(
+            1,
+            (long) acStats.get(CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER)
+                .getRejectionCount()
+                .get(AdmissionControlActionType.SEARCH.getType())
+        );
+        if (Constants.LINUX) {
+            assertEquals(
+                0,
+                (long) acStats.get(IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER)
+                    .getRejectionCount()
+                    .getOrDefault(AdmissionControlActionType.SEARCH.getType(), 0L)
+            );
+        } else {
+            assertNull(acStats.get(IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER));
+        }
+        ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
+        updateSettingsRequest.transientSettings(
+            Settings.builder()
+                .put(super.nodeSettings())
+                .put(SEARCH_IO_USAGE_LIMIT.getKey(), 0)
+                .put(INDEXING_IO_USAGE_LIMIT.getKey(), 0)
+                .put(CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.DISABLED.getMode())
+                .put(IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.ENFORCED.getMode())
+        );
+        assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
+
+        bulk = client().prepareBulk();
+        for (int i = 0; i < 3; i++) {
+            bulk.add(client().prepareIndex(INDEX_NAME).setSource("foo", "bar " + i));
+        }
+        res = client().bulk(bulk.request()).actionGet();
+        if (Constants.LINUX) {
+            assertEquals(429, res.getItems()[0].getFailure().getStatus().getStatus());
+        }
+        admissionControlService = getInstanceFromNode(AdmissionControlService.class);
+        acStats = this.getAdmissionControlStats(admissionControlService);
+        assertEquals(
+            1,
+            (long) acStats.get(CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER)
+                .getRejectionCount()
+                .get(AdmissionControlActionType.INDEXING.getType())
+        );
+        if (Constants.LINUX) {
+            assertEquals(
+                1,
+                (long) acStats.get(IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER)
+                    .getRejectionCount()
+                    .getOrDefault(AdmissionControlActionType.INDEXING.getType(), 0L)
+            );
+        } else {
+            assertNull(acStats.get(IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER));
+        }
+        client().admin().indices().prepareRefresh(INDEX_NAME).get();
+
+        // verify search request hits 429
+        searchRequest = new SearchRequest(INDEX_NAME);
+        try {
+            client().search(searchRequest).actionGet();
+        } catch (Exception e) {
+            assertTrue(((SearchPhaseExecutionException) e).getDetailedMessage().contains("OpenSearchRejectedExecutionException"));
+        }
+        acStats = this.getAdmissionControlStats(admissionControlService);
+        assertEquals(
+            1,
+            (long) acStats.get(CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER)
+                .getRejectionCount()
+                .get(AdmissionControlActionType.SEARCH.getType())
+        );
+        if (Constants.LINUX) {
+            assertEquals(
+                1,
+                (long) acStats.get(IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER)
+                    .getRejectionCount()
+                    .getOrDefault(AdmissionControlActionType.SEARCH.getType(), 0L)
+            );
+        } else {
+            assertNull(acStats.get(IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER));
+        }
     }
 
     public void testAdmissionControlRejectionMonitorOnlyMode() throws Exception {
         assertBusy(() -> assertEquals(1, getInstanceFromNode(ResourceUsageCollectorService.class).getAllNodeStatistics().size()));
-        // Verify that cluster state is updated
         ActionFuture<ClusterStateResponse> future2 = client().admin().cluster().state(new ClusterStateRequest());
         assertThat(future2.isDone(), is(true));
 
@@ -108,66 +213,165 @@ public void testAdmissionControlRejectionMonitorOnlyMode() throws Exception {
         updateSettingsRequest.transientSettings(
             Settings.builder()
                 .put(super.nodeSettings())
-                .put(ADMISSION_CONTROL_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.MONITOR.getMode())
+                .put(CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.MONITOR.getMode())
         );
         assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
 
         BulkRequestBuilder bulk = client().prepareBulk();
         for (int i = 0; i < 3; i++) {
-            bulk.add(client().prepareIndex("index").setSource("foo", "bar " + i));
+            bulk.add(client().prepareIndex(INDEX_NAME).setSource("foo", "bar " + i));
         }
         // verify bulk request success but admission control having rejections stats
         BulkResponse res = client().bulk(bulk.request()).actionGet();
         assertFalse(res.hasFailures());
         AdmissionControlService admissionControlService = getInstanceFromNode(AdmissionControlService.class);
-        AdmissionControllerStats acStats = admissionControlService.stats().getAdmissionControllerStatsList().get(0);
-        assertEquals(1, (long) acStats.getRejectionCount().get(AdmissionControlActionType.INDEXING.getType()));
-        client().admin().indices().prepareRefresh("index").get();
+        Map<String, AdmissionControllerStats> acStats = this.getAdmissionControlStats(admissionControlService);
+        assertEquals(
+            1,
+            (long) acStats.get(CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER)
+                .getRejectionCount()
+                .get(AdmissionControlActionType.INDEXING.getType())
+        );
+        client().admin().indices().prepareRefresh(INDEX_NAME).get();
 
         // verify search request success but admission control having rejections stats
-        SearchRequest searchRequest = new SearchRequest("index");
+        SearchRequest searchRequest = new SearchRequest(INDEX_NAME);
         SearchResponse searchResponse = client().search(searchRequest).actionGet();
         assertEquals(3, searchResponse.getHits().getHits().length);
-        acStats = admissionControlService.stats().getAdmissionControllerStatsList().get(0);
-        assertEquals(1, (long) acStats.getRejectionCount().get(AdmissionControlActionType.SEARCH.getType()));
+        acStats = this.getAdmissionControlStats(admissionControlService);
+        assertEquals(
+            1,
+            (long) acStats.get(CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER)
+                .getRejectionCount()
+                .get(AdmissionControlActionType.SEARCH.getType())
+        );
+
+        updateSettingsRequest = new ClusterUpdateSettingsRequest();
+        updateSettingsRequest.transientSettings(
+            Settings.builder()
+                .put(super.nodeSettings())
+                .put(SEARCH_IO_USAGE_LIMIT.getKey(), 0)
+                .put(INDEXING_IO_USAGE_LIMIT.getKey(), 0)
+                .put(SEARCH_CPU_USAGE_LIMIT.getKey(), 101)
+                .put(INDEXING_CPU_USAGE_LIMIT.getKey(), 101)
+                .put(CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.DISABLED.getMode())
+                .put(IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.MONITOR.getMode())
+        );
+        assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
+        bulk = client().prepareBulk();
+        for (int i = 0; i < 3; i++) {
+            bulk.add(client().prepareIndex(INDEX_NAME).setSource("foo", "bar " + i));
+        }
+        // verify bulk request success but admission control having rejections stats
+        res = client().bulk(bulk.request()).actionGet();
+        assertFalse(res.hasFailures());
+        acStats = this.getAdmissionControlStats(admissionControlService);
+        assertEquals(
+            1,
+            (long) acStats.get(CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER)
+                .getRejectionCount()
+                .get(AdmissionControlActionType.INDEXING.getType())
+        );
+        if (Constants.LINUX) {
+            assertEquals(
+                1,
+                (long) acStats.get(IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER)
+                    .getRejectionCount()
+                    .getOrDefault(AdmissionControlActionType.INDEXING.getType(), 0L)
+            );
+        } else {
+            assertNull(acStats.get(IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER));
+        }
+        searchRequest = new SearchRequest(INDEX_NAME);
+        searchResponse = client().search(searchRequest).actionGet();
+        assertEquals(3, searchResponse.getHits().getHits().length);
+        acStats = this.getAdmissionControlStats(admissionControlService);
+        assertEquals(
+            1,
+            (long) acStats.get(CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER)
+                .getRejectionCount()
+                .get(AdmissionControlActionType.SEARCH.getType())
+        );
+        if (Constants.LINUX) {
+            assertEquals(
+                1,
+                (long) acStats.get(IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER)
+                    .getRejectionCount()
+                    .getOrDefault(AdmissionControlActionType.SEARCH.getType(), 0L)
+            );
+        } else {
+            assertNull(acStats.get(IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER));
+        }
     }
 
     public void testAdmissionControlRejectionDisabledMode() throws Exception {
         assertBusy(() -> assertEquals(1, getInstanceFromNode(ResourceUsageCollectorService.class).getAllNodeStatistics().size()));
-        // Verify that cluster state is updated
         ActionFuture<ClusterStateResponse> future2 = client().admin().cluster().state(new ClusterStateRequest());
         assertThat(future2.isDone(), is(true));
 
         ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
         updateSettingsRequest.transientSettings(
-            Settings.builder().put(super.nodeSettings()).put(ADMISSION_CONTROL_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.DISABLED)
+            Settings.builder()
+                .put(super.nodeSettings())
+                .put(CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.DISABLED.getMode())
         );
         assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
 
         BulkRequestBuilder bulk = client().prepareBulk();
         for (int i = 0; i < 3; i++) {
-            bulk.add(client().prepareIndex("index").setSource("foo", "bar " + i));
+            bulk.add(client().prepareIndex(INDEX_NAME).setSource("foo", "bar " + i));
         }
         // verify bulk request success and no rejections
         BulkResponse res = client().bulk(bulk.request()).actionGet();
         assertFalse(res.hasFailures());
         AdmissionControlService admissionControlService = getInstanceFromNode(AdmissionControlService.class);
-        AdmissionControllerStats acStats = admissionControlService.stats().getAdmissionControllerStatsList().get(0);
-        assertEquals(0, acStats.getRejectionCount().size());
-        client().admin().indices().prepareRefresh("index").get();
+        Map<String, AdmissionControllerStats> acStats = this.getAdmissionControlStats(admissionControlService);
+
+        assertEquals(0, acStats.get(CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER).getRejectionCount().size());
+        client().admin().indices().prepareRefresh(INDEX_NAME).get();
 
         // verify search request success and no rejections
-        SearchRequest searchRequest = new SearchRequest("index");
+        SearchRequest searchRequest = new SearchRequest(INDEX_NAME);
         SearchResponse searchResponse = client().search(searchRequest).actionGet();
         assertEquals(3, searchResponse.getHits().getHits().length);
-        acStats = admissionControlService.stats().getAdmissionControllerStatsList().get(0);
-        assertEquals(0, acStats.getRejectionCount().size());
+        acStats = this.getAdmissionControlStats(admissionControlService);
+        assertEquals(0, acStats.get(CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER).getRejectionCount().size());
+        updateSettingsRequest = new ClusterUpdateSettingsRequest();
+        updateSettingsRequest.transientSettings(
+            Settings.builder()
+                .put(super.nodeSettings())
+                .put(IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.DISABLED.getMode())
+        );
+        assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
+        bulk = client().prepareBulk();
+        for (int i = 0; i < 3; i++) {
+            bulk.add(client().prepareIndex(INDEX_NAME).setSource("foo", "bar " + i));
+        }
+        // verify bulk request success but admission control having rejections stats
+        res = client().bulk(bulk.request()).actionGet();
+        assertFalse(res.hasFailures());
+        acStats = this.getAdmissionControlStats(admissionControlService);
+        assertEquals(0, acStats.get(CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER).getRejectionCount().size());
+        if (Constants.LINUX) {
+            assertEquals(0, acStats.get(IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER).getRejectionCount().size());
+        } else {
+            assertNull(acStats.get(IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER));
+        }
 
+        searchRequest = new SearchRequest(INDEX_NAME);
+        searchResponse = client().search(searchRequest).actionGet();
+        assertEquals(3, searchResponse.getHits().getHits().length);
+        acStats = this.getAdmissionControlStats(admissionControlService);
+        assertEquals(0, acStats.get(CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER).getRejectionCount().size());
+        if (Constants.LINUX) {
+            assertEquals(0, acStats.get(IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER).getRejectionCount().size());
+        } else {
+            assertNull(acStats.get(IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER));
+        }
     }
 
     public void testAdmissionControlWithinLimits() throws Exception {
         assertBusy(() -> assertEquals(1, getInstanceFromNode(ResourceUsageCollectorService.class).getAllNodeStatistics().size()));
-        // Verify that cluster state is updated
         ActionFuture<ClusterStateResponse> future2 = client().admin().cluster().state(new ClusterStateRequest());
         assertThat(future2.isDone(), is(true));
 
@@ -175,29 +379,49 @@ public void testAdmissionControlWithinLimits() throws Exception {
         updateSettingsRequest.transientSettings(
             Settings.builder()
                 .put(super.nodeSettings())
-                .put(ADMISSION_CONTROL_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.ENFORCED)
+                .put(ADMISSION_CONTROL_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.ENFORCED.getMode())
                 .put(SEARCH_CPU_USAGE_LIMIT.getKey(), 101)
                 .put(INDEXING_CPU_USAGE_LIMIT.getKey(), 101)
+                .put(SEARCH_IO_USAGE_LIMIT.getKey(), 101)
+                .put(INDEXING_IO_USAGE_LIMIT.getKey(), 101)
         );
         assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
 
         BulkRequestBuilder bulk = client().prepareBulk();
         for (int i = 0; i < 3; i++) {
-            bulk.add(client().prepareIndex("index").setSource("foo", "bar " + i));
+            bulk.add(client().prepareIndex(INDEX_NAME).setSource("foo", "bar " + i));
         }
         // verify bulk request success and no rejections
         BulkResponse res = client().bulk(bulk.request()).actionGet();
         assertFalse(res.hasFailures());
         AdmissionControlService admissionControlService = getInstanceFromNode(AdmissionControlService.class);
-        AdmissionControllerStats acStats = admissionControlService.stats().getAdmissionControllerStatsList().get(0);
-        assertEquals(0, acStats.getRejectionCount().size());
-        client().admin().indices().prepareRefresh("index").get();
+        Map<String, AdmissionControllerStats> acStats = this.getAdmissionControlStats(admissionControlService);
+        assertEquals(0, acStats.get(CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER).getRejectionCount().size());
+        if (Constants.LINUX) {
+            assertEquals(0, acStats.get(IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER).getRejectionCount().size());
+        } else {
+            assertNull(acStats.get(IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER));
+        }
+        client().admin().indices().prepareRefresh(INDEX_NAME).get();
 
         // verify search request success and no rejections
-        SearchRequest searchRequest = new SearchRequest("index");
+        SearchRequest searchRequest = new SearchRequest(INDEX_NAME);
         SearchResponse searchResponse = client().search(searchRequest).actionGet();
         assertEquals(3, searchResponse.getHits().getHits().length);
-        acStats = admissionControlService.stats().getAdmissionControllerStatsList().get(0);
-        assertEquals(0, acStats.getRejectionCount().size());
+        acStats = this.getAdmissionControlStats(admissionControlService);
+        assertEquals(0, acStats.get(CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER).getRejectionCount().size());
+        if (Constants.LINUX) {
+            assertEquals(0, acStats.get(IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER).getRejectionCount().size());
+        } else {
+            assertNull(acStats.get(IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER));
+        }
+    }
+
+    Map<String, AdmissionControllerStats> getAdmissionControlStats(AdmissionControlService admissionControlService) {
+        Map<String, AdmissionControllerStats> acStats = new HashMap<>();
+        for (AdmissionControllerStats admissionControllerStats : admissionControlService.stats().getAdmissionControllerStatsList()) {
+            acStats.put(admissionControllerStats.getAdmissionControllerName(), admissionControllerStats);
+        }
+        return acStats;
     }
 }
diff --git a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/controllers/IoBasedAdmissionControllerTests.java b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/controllers/IoBasedAdmissionControllerTests.java
new file mode 100644
index 0000000000000..c5a2208f49ce6
--- /dev/null
+++ b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/controllers/IoBasedAdmissionControllerTests.java
@@ -0,0 +1,141 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.ratelimitting.admissioncontrol.controllers;
+
+import org.opensearch.cluster.service.ClusterService;
+import org.opensearch.common.settings.ClusterSettings;
+import org.opensearch.common.settings.Settings;
+import org.opensearch.node.ResourceUsageCollectorService;
+import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType;
+import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode;
+import org.opensearch.ratelimitting.admissioncontrol.settings.IoBasedAdmissionControllerSettings;
+import org.opensearch.test.OpenSearchTestCase;
+import org.opensearch.threadpool.TestThreadPool;
+import org.opensearch.threadpool.ThreadPool;
+
+import org.mockito.Mockito;
+
+public class IoBasedAdmissionControllerTests extends OpenSearchTestCase {
+    private ClusterService clusterService;
+    private ThreadPool threadPool;
+    IoBasedAdmissionController admissionController = null;
+    String action = "TEST_ACTION";
+
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+        threadPool = new TestThreadPool("admission_controller_settings_test");
+        clusterService = new ClusterService(
+            Settings.EMPTY,
+            new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
+            threadPool
+        );
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        super.tearDown();
+        threadPool.shutdownNow();
+    }
+
+    public void testCheckDefaultParameters() {
+        admissionController = new IoBasedAdmissionController(
+            IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER,
+            null,
+            clusterService,
+            Settings.EMPTY
+        );
+        assertEquals(admissionController.getName(), IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER);
+        assertEquals(admissionController.getRejectionCount(AdmissionControlActionType.INDEXING.getType()), 0);
+        assertEquals(admissionController.settings.getTransportLayerAdmissionControllerMode(), AdmissionControlMode.DISABLED);
+        assertFalse(
+            admissionController.isEnabledForTransportLayer(admissionController.settings.getTransportLayerAdmissionControllerMode())
+        );
+    }
+
+    public void testCheckUpdateSettings() {
+        admissionController = new IoBasedAdmissionController(
+            IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER,
+            null,
+            clusterService,
+            Settings.EMPTY
+        );
+        Settings settings = Settings.builder()
+            .put(
+                IoBasedAdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(),
+                AdmissionControlMode.ENFORCED.getMode()
+            )
+            .build();
+        clusterService.getClusterSettings().applySettings(settings);
+        assertEquals(admissionController.getName(), IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER);
+        assertEquals(admissionController.getRejectionCount(AdmissionControlActionType.INDEXING.getType()), 0);
+        assertEquals(admissionController.settings.getTransportLayerAdmissionControllerMode(), AdmissionControlMode.ENFORCED);
+        assertTrue(admissionController.isEnabledForTransportLayer(admissionController.settings.getTransportLayerAdmissionControllerMode()));
+    }
+
+    public void testApplyControllerWithDefaultSettings() {
+        ResourceUsageCollectorService rs = Mockito.mock(ResourceUsageCollectorService.class);
+        admissionController = new IoBasedAdmissionController(
+            IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER,
+            rs,
+            clusterService,
+            Settings.EMPTY
+        );
+        assertEquals(admissionController.getRejectionCount(AdmissionControlActionType.INDEXING.getType()), 0);
+        assertEquals(admissionController.settings.getTransportLayerAdmissionControllerMode(), AdmissionControlMode.DISABLED);
+        action = "indices:data/write/bulk[s][p]";
+        admissionController.apply(action, AdmissionControlActionType.INDEXING);
+        assertEquals(admissionController.getRejectionCount(AdmissionControlActionType.INDEXING.getType()), 0);
+    }
+
+    public void testApplyControllerWhenSettingsEnabled() throws Exception {
+        Settings settings = Settings.builder()
+            .put(
+                IoBasedAdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(),
+                AdmissionControlMode.ENFORCED.getMode()
+            )
+            .build();
+        ResourceUsageCollectorService rs = Mockito.mock(ResourceUsageCollectorService.class);
+        admissionController = new IoBasedAdmissionController(
+            IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER,
+            rs,
+            clusterService,
+            settings
+        );
+        assertTrue(admissionController.isEnabledForTransportLayer(admissionController.settings.getTransportLayerAdmissionControllerMode()));
+        assertTrue(
+            admissionController.isAdmissionControllerEnforced(admissionController.settings.getTransportLayerAdmissionControllerMode())
+        );
+        assertEquals(admissionController.getRejectionCount(AdmissionControlActionType.INDEXING.getType()), 0);
+    }
+
+    public void testRejectionCount() {
+        Settings settings = Settings.builder()
+            .put(
+                IoBasedAdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(),
+                AdmissionControlMode.ENFORCED.getMode()
+            )
+            .build();
+        ResourceUsageCollectorService rs = Mockito.mock(ResourceUsageCollectorService.class);
+        admissionController = new IoBasedAdmissionController(
+            IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER,
+            rs,
+            clusterService,
+            settings
+        );
+        admissionController.addRejectionCount(AdmissionControlActionType.SEARCH.getType(), 1);
+        admissionController.addRejectionCount(AdmissionControlActionType.INDEXING.getType(), 3);
+        assertEquals(admissionController.getRejectionCount(AdmissionControlActionType.SEARCH.getType()), 1);
+        assertEquals(admissionController.getRejectionCount(AdmissionControlActionType.INDEXING.getType()), 3);
+        admissionController.addRejectionCount(AdmissionControlActionType.SEARCH.getType(), 1);
+        admissionController.addRejectionCount(AdmissionControlActionType.INDEXING.getType(), 2);
+        assertEquals(admissionController.getRejectionCount(AdmissionControlActionType.SEARCH.getType()), 2);
+        assertEquals(admissionController.getRejectionCount(AdmissionControlActionType.INDEXING.getType()), 5);
+    }
+}
diff --git a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/settings/CPUBasedAdmissionControlSettingsTests.java b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/settings/CPUBasedAdmissionControllerSettingsTests.java
similarity index 98%
rename from server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/settings/CPUBasedAdmissionControlSettingsTests.java
rename to server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/settings/CPUBasedAdmissionControllerSettingsTests.java
index 11688e2f30d4b..9ce28bc7fdb40 100644
--- a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/settings/CPUBasedAdmissionControlSettingsTests.java
+++ b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/settings/CPUBasedAdmissionControllerSettingsTests.java
@@ -20,7 +20,7 @@
 import java.util.Arrays;
 import java.util.Set;
 
-public class CPUBasedAdmissionControlSettingsTests extends OpenSearchTestCase {
+public class CPUBasedAdmissionControllerSettingsTests extends OpenSearchTestCase {
     private ClusterService clusterService;
     private ThreadPool threadPool;
 
diff --git a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/settings/IoBasedAdmissionControllerSettingsTests.java b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/settings/IoBasedAdmissionControllerSettingsTests.java
new file mode 100644
index 0000000000000..ff777c175ec0e
--- /dev/null
+++ b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/settings/IoBasedAdmissionControllerSettingsTests.java
@@ -0,0 +1,160 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.ratelimitting.admissioncontrol.settings;
+
+import org.opensearch.cluster.service.ClusterService;
+import org.opensearch.common.settings.ClusterSettings;
+import org.opensearch.common.settings.Setting;
+import org.opensearch.common.settings.Settings;
+import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode;
+import org.opensearch.test.OpenSearchTestCase;
+import org.opensearch.threadpool.TestThreadPool;
+import org.opensearch.threadpool.ThreadPool;
+
+import java.util.Arrays;
+import java.util.Set;
+
+public class IoBasedAdmissionControllerSettingsTests extends OpenSearchTestCase {
+    private ClusterService clusterService;
+    private ThreadPool threadPool;
+
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+        threadPool = new TestThreadPool("io_based_admission_controller_settings_test");
+        clusterService = new ClusterService(
+            Settings.EMPTY,
+            new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
+            threadPool
+        );
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        super.tearDown();
+        threadPool.shutdownNow();
+    }
+
+    public void testSettingsExists() {
+        Set<Setting<?>> settings = ClusterSettings.BUILT_IN_CLUSTER_SETTINGS;
+        assertTrue(
+            "All the IO based admission controller settings should be supported built in settings",
+            settings.containsAll(
+                Arrays.asList(
+                    IoBasedAdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE,
+                    IoBasedAdmissionControllerSettings.SEARCH_IO_USAGE_LIMIT,
+                    IoBasedAdmissionControllerSettings.INDEXING_IO_USAGE_LIMIT
+                )
+            )
+        );
+    }
+
+    public void testDefaultSettings() {
+        IoBasedAdmissionControllerSettings ioBasedAdmissionControllerSettings = new IoBasedAdmissionControllerSettings(
+            clusterService.getClusterSettings(),
+            Settings.EMPTY
+        );
+        long percent = 95;
+        assertEquals(ioBasedAdmissionControllerSettings.getTransportLayerAdmissionControllerMode(), AdmissionControlMode.DISABLED);
+        assertEquals(ioBasedAdmissionControllerSettings.getIndexingIOUsageLimit().longValue(), percent);
+        assertEquals(ioBasedAdmissionControllerSettings.getSearchIOUsageLimit().longValue(), percent);
+    }
+
+    public void testGetConfiguredSettings() {
+        long percent = 95;
+        long indexingPercent = 85;
+        Settings settings = Settings.builder()
+            .put(
+                IoBasedAdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(),
+                AdmissionControlMode.ENFORCED.getMode()
+            )
+            .put(IoBasedAdmissionControllerSettings.INDEXING_IO_USAGE_LIMIT.getKey(), indexingPercent)
+            .build();
+
+        IoBasedAdmissionControllerSettings ioBasedAdmissionControllerSettings = new IoBasedAdmissionControllerSettings(
+            clusterService.getClusterSettings(),
+            settings
+        );
+        assertEquals(ioBasedAdmissionControllerSettings.getTransportLayerAdmissionControllerMode(), AdmissionControlMode.ENFORCED);
+        assertEquals(ioBasedAdmissionControllerSettings.getSearchIOUsageLimit().longValue(), percent);
+        assertEquals(ioBasedAdmissionControllerSettings.getIndexingIOUsageLimit().longValue(), indexingPercent);
+    }
+
+    public void testUpdateAfterGetDefaultSettings() {
+        long percent = 95;
+        long searchPercent = 80;
+        IoBasedAdmissionControllerSettings ioBasedAdmissionControllerSettings = new IoBasedAdmissionControllerSettings(
+            clusterService.getClusterSettings(),
+            Settings.EMPTY
+        );
+        Settings settings = Settings.builder()
+            .put(
+                IoBasedAdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(),
+                AdmissionControlMode.ENFORCED.getMode()
+            )
+            .put(IoBasedAdmissionControllerSettings.SEARCH_IO_USAGE_LIMIT.getKey(), searchPercent)
+            .build();
+
+        clusterService.getClusterSettings().applySettings(settings);
+        assertEquals(ioBasedAdmissionControllerSettings.getTransportLayerAdmissionControllerMode(), AdmissionControlMode.ENFORCED);
+        assertEquals(ioBasedAdmissionControllerSettings.getSearchIOUsageLimit().longValue(), searchPercent);
+        assertEquals(ioBasedAdmissionControllerSettings.getIndexingIOUsageLimit().longValue(), percent);
+    }
+
+    public void testUpdateAfterGetConfiguredSettings() {
+        long percent = 95;
+        long indexingPercent = 85;
+        long searchPercent = 80;
+        Settings settings = Settings.builder()
+            .put(
+                IoBasedAdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(),
+                AdmissionControlMode.ENFORCED.getMode()
+            )
+            .put(IoBasedAdmissionControllerSettings.SEARCH_IO_USAGE_LIMIT.getKey(), searchPercent)
+            .build();
+
+        IoBasedAdmissionControllerSettings ioBasedAdmissionControllerSettings = new IoBasedAdmissionControllerSettings(
+            clusterService.getClusterSettings(),
+            settings
+        );
+        assertEquals(ioBasedAdmissionControllerSettings.getTransportLayerAdmissionControllerMode(), AdmissionControlMode.ENFORCED);
+        assertEquals(ioBasedAdmissionControllerSettings.getSearchIOUsageLimit().longValue(), searchPercent);
+        assertEquals(ioBasedAdmissionControllerSettings.getIndexingIOUsageLimit().longValue(), percent);
+
+        Settings updatedSettings = Settings.builder()
+            .put(
+                IoBasedAdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(),
+                AdmissionControlMode.MONITOR.getMode()
+            )
+            .put(IoBasedAdmissionControllerSettings.INDEXING_IO_USAGE_LIMIT.getKey(), indexingPercent)
+            .build();
+        clusterService.getClusterSettings().applySettings(updatedSettings);
+        assertEquals(ioBasedAdmissionControllerSettings.getTransportLayerAdmissionControllerMode(), AdmissionControlMode.MONITOR);
+        assertEquals(ioBasedAdmissionControllerSettings.getSearchIOUsageLimit().longValue(), searchPercent);
+        assertEquals(ioBasedAdmissionControllerSettings.getIndexingIOUsageLimit().longValue(), indexingPercent);
+
+        searchPercent = 70;
+        updatedSettings = Settings.builder()
+            .put(updatedSettings)
+            .put(IoBasedAdmissionControllerSettings.SEARCH_IO_USAGE_LIMIT.getKey(), searchPercent)
+            .build();
+
+        clusterService.getClusterSettings().applySettings(updatedSettings);
+        assertEquals(ioBasedAdmissionControllerSettings.getSearchIOUsageLimit().longValue(), searchPercent);
+        assertEquals(ioBasedAdmissionControllerSettings.getIndexingIOUsageLimit().longValue(), indexingPercent);
+    }
+}