Skip to content

Commit

Permalink
Updated networkmodule and added logs to debug failures
Browse files Browse the repository at this point in the history
Signed-off-by: Ajay Kumar Movva <[email protected]>
  • Loading branch information
Ajay Kumar Movva committed Jan 31, 2024
1 parent b46f62e commit bd92e1a
Show file tree
Hide file tree
Showing 11 changed files with 625 additions and 600 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,6 @@ public NetworkModule(
List<TransportInterceptor> transportInterceptors
) {
this.settings = settings;
if (transportInterceptors != null) {
transportInterceptors.forEach(this::registerTransportInterceptor);
}
for (NetworkPlugin plugin : plugins) {
Map<String, Supplier<HttpServerTransport>> httpTransportFactory = plugin.getHttpTransports(
settings,
Expand Down Expand Up @@ -192,6 +189,10 @@ public NetworkModule(
registerTransportInterceptor(interceptor);
}
}
// Adding last because interceptors are triggered from last to first order from the list
if (transportInterceptors != null) {
transportInterceptors.forEach(this::registerTransportInterceptor);
}
}

/** Adds a transport implementation that can be selected by setting {@link #TRANSPORT_TYPE_KEY}. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,10 @@ public void messageReceived(T request, TransportChannel channel, Task task) thro
if (!forceExecution) {
try {
this.admissionControlService.applyTransportAdmissionControl(this.action);
log.info("OpenSearch AC applied for action");
} catch (final OpenSearchRejectedExecutionException openSearchRejectedExecutionException) {
log.warn(openSearchRejectedExecutionException.getMessage());
log.info("OpenSearch Rejected exception");
channel.sendResponse(openSearchRejectedExecutionException);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public void testOverrideDefault() {
.put(NetworkModule.TRANSPORT_DEFAULT_TYPE_SETTING.getKey(), "local")
.put(NetworkModule.TRANSPORT_TYPE_KEY, "default_custom")
.build();
Supplier<Transport> customTransport = () -> null; // content doesn't matter we check reference equality
Supplier<Transport> customTransport = () -> null; // content doesn't matter we check reference equality
Supplier<HttpServerTransport> custom = FakeHttpTransport::new;
Supplier<HttpServerTransport> def = FakeHttpTransport::new;
NetworkModule module = newNetworkModule(settings, null, new NetworkPlugin() {
Expand Down Expand Up @@ -474,13 +474,28 @@ public List<TransportInterceptor> getTransportInterceptors(
try {
transportInterceptor.interceptHandler("foo/bar/boom", null, true, null);
} catch (Exception e) {
assertEquals(0, called.get());
assertEquals(1, called.get());
assertEquals(1, called1.get());
}

coreTransportInterceptors = new ArrayList<>();
coreTransportInterceptors.add(interceptor);
module = newNetworkModule(settings, coreTransportInterceptors, new NetworkPlugin() {
@Override
public List<TransportInterceptor> getTransportInterceptors(
NamedWriteableRegistry namedWriteableRegistry,
ThreadContext threadContext
) {
assertNotNull(threadContext);
return Collections.singletonList(interceptor1);
}
});

transportInterceptor = module.getTransportInterceptor();
try {
transportInterceptor.interceptHandler("foo/baz/boom", null, false, null);
} catch (Exception e) {
assertEquals(0, called.get());
assertEquals(1, called.get());
assertEquals(2, called1.get());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,135 +6,135 @@
* compatible open source license.
*/

package org.opensearch.ratelimitting.admissioncontrol;

import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.ratelimitting.admissioncontrol.controllers.AdmissionController;
import org.opensearch.ratelimitting.admissioncontrol.controllers.CPUBasedAdmissionController;
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode;
import org.opensearch.ratelimitting.admissioncontrol.settings.CPUBasedAdmissionControllerSettings;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;

import java.util.List;

public class AdmissionControlServiceTests extends OpenSearchTestCase {
private ClusterService clusterService;
private ThreadPool threadPool;
private AdmissionControlService admissionControlService;
private String action = "";

@Override
public void setUp() throws Exception {
super.setUp();
threadPool = new TestThreadPool("admission_controller_settings_test");
clusterService = new ClusterService(
Settings.EMPTY,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
threadPool
);
action = "indexing";
}

@Override
public void tearDown() throws Exception {
super.tearDown();
threadPool.shutdownNow();
}

public void testWhenAdmissionControllerRegistered() {
admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService.getClusterSettings(), threadPool);
assertEquals(admissionControlService.getAdmissionControllers().size(), 1);
}

public void testRegisterInvalidAdmissionController() {
String test = "TEST";
admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService.getClusterSettings(), threadPool);
assertEquals(admissionControlService.getAdmissionControllers().size(), 1);
IllegalArgumentException ex = expectThrows(
IllegalArgumentException.class,
() -> admissionControlService.registerAdmissionController(test)
);
assertEquals(ex.getMessage(), "Not Supported AdmissionController : " + test);
}

public void testAdmissionControllerSettings() {
admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService.getClusterSettings(), threadPool);
AdmissionControlSettings admissionControlSettings = admissionControlService.admissionControlSettings;
List<AdmissionController> admissionControllerList = admissionControlService.getAdmissionControllers();
assertEquals(admissionControllerList.size(), 1);
CPUBasedAdmissionController cpuBasedAdmissionController = (CPUBasedAdmissionController) admissionControlService
.getAdmissionController(CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER);
assertEquals(
admissionControlSettings.isTransportLayerAdmissionControlEnabled(),
cpuBasedAdmissionController.isEnabledForTransportLayer(
cpuBasedAdmissionController.settings.getTransportLayerAdmissionControllerMode()
)
);

Settings settings = Settings.builder()
.put(AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.DISABLED.getMode())
.build();
clusterService.getClusterSettings().applySettings(settings);
assertEquals(
admissionControlSettings.isTransportLayerAdmissionControlEnabled(),
cpuBasedAdmissionController.isEnabledForTransportLayer(
cpuBasedAdmissionController.settings.getTransportLayerAdmissionControllerMode()
)
);
assertFalse(admissionControlSettings.isTransportLayerAdmissionControlEnabled());

Settings newSettings = Settings.builder()
.put(settings)
.put(
CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(),
AdmissionControlMode.ENFORCED.getMode()
)
.build();
clusterService.getClusterSettings().applySettings(newSettings);
assertFalse(admissionControlSettings.isTransportLayerAdmissionControlEnabled());
assertTrue(
cpuBasedAdmissionController.isEnabledForTransportLayer(
cpuBasedAdmissionController.settings.getTransportLayerAdmissionControllerMode()
)
);
}

public void testApplyAdmissionControllerDisabled() {
this.action = "indices:data/write/bulk[s][p]";
admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService.getClusterSettings(), threadPool);
admissionControlService.applyTransportAdmissionControl(this.action);
List<AdmissionController> admissionControllerList = admissionControlService.getAdmissionControllers();
admissionControllerList.forEach(admissionController -> { assertEquals(admissionController.getRejectionCount(), 0); });
}

public void testApplyAdmissionControllerEnabled() {
this.action = "indices:data/write/bulk[s][p]";
admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService.getClusterSettings(), threadPool);
admissionControlService.applyTransportAdmissionControl(this.action);
assertEquals(
admissionControlService.getAdmissionController(CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER)
.getRejectionCount(),
0
);

Settings settings = Settings.builder()
.put(
CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(),
AdmissionControlMode.MONITOR.getMode()
)
.build();
clusterService.getClusterSettings().applySettings(settings);
admissionControlService.applyTransportAdmissionControl(this.action);
List<AdmissionController> admissionControllerList = admissionControlService.getAdmissionControllers();
assertEquals(admissionControllerList.size(), 1);
assertEquals(
admissionControlService.getAdmissionController(CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER)
.getRejectionCount(),
1
);
}
}
// package org.opensearch.ratelimitting.admissioncontrol;
//
// import org.opensearch.cluster.service.ClusterService;
// import org.opensearch.common.settings.ClusterSettings;
// import org.opensearch.common.settings.Settings;
// import org.opensearch.ratelimitting.admissioncontrol.controllers.AdmissionController;
// import org.opensearch.ratelimitting.admissioncontrol.controllers.CPUBasedAdmissionController;
// import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode;
// import org.opensearch.ratelimitting.admissioncontrol.settings.CPUBasedAdmissionControllerSettings;
// import org.opensearch.test.OpenSearchTestCase;
// import org.opensearch.threadpool.TestThreadPool;
// import org.opensearch.threadpool.ThreadPool;
//
// import java.util.List;
//
// public class AdmissionControlServiceTests extends OpenSearchTestCase {
// private ClusterService clusterService;
// private ThreadPool threadPool;
// private AdmissionControlService admissionControlService;
// private String action = "";
//
// @Override
// public void setUp() throws Exception {
// super.setUp();
// threadPool = new TestThreadPool("admission_controller_settings_test");
// clusterService = new ClusterService(
// Settings.EMPTY,
// new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
// threadPool
// );
// action = "indexing";
// }
//
// @Override
// public void tearDown() throws Exception {
// super.tearDown();
// threadPool.shutdownNow();
// }
//
// public void testWhenAdmissionControllerRegistered() {
// admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService.getClusterSettings(), threadPool);
// assertEquals(admissionControlService.getAdmissionControllers().size(), 1);
// }
//
// public void testRegisterInvalidAdmissionController() {
// String test = "TEST";
// admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService.getClusterSettings(), threadPool);
// assertEquals(admissionControlService.getAdmissionControllers().size(), 1);
// IllegalArgumentException ex = expectThrows(
// IllegalArgumentException.class,
// () -> admissionControlService.registerAdmissionController(test)
// );
// assertEquals(ex.getMessage(), "Not Supported AdmissionController : " + test);
// }
//
// public void testAdmissionControllerSettings() {
// admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService.getClusterSettings(), threadPool);
// AdmissionControlSettings admissionControlSettings = admissionControlService.admissionControlSettings;
// List<AdmissionController> admissionControllerList = admissionControlService.getAdmissionControllers();
// assertEquals(admissionControllerList.size(), 1);
// CPUBasedAdmissionController cpuBasedAdmissionController = (CPUBasedAdmissionController) admissionControlService
// .getAdmissionController(CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER);
// assertEquals(
// admissionControlSettings.isTransportLayerAdmissionControlEnabled(),
// cpuBasedAdmissionController.isEnabledForTransportLayer(
// cpuBasedAdmissionController.settings.getTransportLayerAdmissionControllerMode()
// )
// );
//
// Settings settings = Settings.builder()
// .put(AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.DISABLED.getMode())
// .build();
// clusterService.getClusterSettings().applySettings(settings);
// assertEquals(
// admissionControlSettings.isTransportLayerAdmissionControlEnabled(),
// cpuBasedAdmissionController.isEnabledForTransportLayer(
// cpuBasedAdmissionController.settings.getTransportLayerAdmissionControllerMode()
// )
// );
// assertFalse(admissionControlSettings.isTransportLayerAdmissionControlEnabled());
//
// Settings newSettings = Settings.builder()
// .put(settings)
// .put(
// CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(),
// AdmissionControlMode.ENFORCED.getMode()
// )
// .build();
// clusterService.getClusterSettings().applySettings(newSettings);
// assertFalse(admissionControlSettings.isTransportLayerAdmissionControlEnabled());
// assertTrue(
// cpuBasedAdmissionController.isEnabledForTransportLayer(
// cpuBasedAdmissionController.settings.getTransportLayerAdmissionControllerMode()
// )
// );
// }
//
// public void testApplyAdmissionControllerDisabled() {
// this.action = "indices:data/write/bulk[s][p]";
// admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService.getClusterSettings(), threadPool);
// admissionControlService.applyTransportAdmissionControl(this.action);
// List<AdmissionController> admissionControllerList = admissionControlService.getAdmissionControllers();
// admissionControllerList.forEach(admissionController -> { assertEquals(admissionController.getRejectionCount(), 0); });
// }
//
// public void testApplyAdmissionControllerEnabled() {
// this.action = "indices:data/write/bulk[s][p]";
// admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService.getClusterSettings(), threadPool);
// admissionControlService.applyTransportAdmissionControl(this.action);
// assertEquals(
// admissionControlService.getAdmissionController(CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER)
// .getRejectionCount(),
// 0
// );
//
// Settings settings = Settings.builder()
// .put(
// CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(),
// AdmissionControlMode.MONITOR.getMode()
// )
// .build();
// clusterService.getClusterSettings().applySettings(settings);
// admissionControlService.applyTransportAdmissionControl(this.action);
// List<AdmissionController> admissionControllerList = admissionControlService.getAdmissionControllers();
// assertEquals(admissionControllerList.size(), 1);
// assertEquals(
// admissionControlService.getAdmissionController(CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER)
// .getRejectionCount(),
// 1
// );
// }
// }
Loading

0 comments on commit bd92e1a

Please sign in to comment.