Skip to content

Commit

Permalink
Updated networkmodule and added logs to debug failures
Browse files Browse the repository at this point in the history
Signed-off-by: Ajay Kumar Movva <[email protected]>
  • Loading branch information
Ajay Kumar Movva committed Jan 31, 2024
1 parent b46f62e commit 7cab2da
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,6 @@ public NetworkModule(
List<TransportInterceptor> transportInterceptors
) {
this.settings = settings;
if (transportInterceptors != null) {
transportInterceptors.forEach(this::registerTransportInterceptor);
}
for (NetworkPlugin plugin : plugins) {
Map<String, Supplier<HttpServerTransport>> httpTransportFactory = plugin.getHttpTransports(
settings,
Expand Down Expand Up @@ -192,6 +189,10 @@ public NetworkModule(
registerTransportInterceptor(interceptor);
}
}
// // Adding last because interceptors are triggered from last to first order from the list
// if (transportInterceptors != null) {
// transportInterceptors.forEach(this::registerTransportInterceptor);
// }
}

/** Adds a transport implementation that can be selected by setting {@link #TRANSPORT_TYPE_KEY}. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,10 @@ public void messageReceived(T request, TransportChannel channel, Task task) thro
if (!forceExecution) {
try {
this.admissionControlService.applyTransportAdmissionControl(this.action);
log.info("OpenSearch AC applied for action");
} catch (final OpenSearchRejectedExecutionException openSearchRejectedExecutionException) {
log.warn(openSearchRejectedExecutionException.getMessage());
log.info("OpenSearch Rejected exception");
channel.sendResponse(openSearchRejectedExecutionException);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -474,13 +474,28 @@ public List<TransportInterceptor> getTransportInterceptors(
try {
transportInterceptor.interceptHandler("foo/bar/boom", null, true, null);
} catch (Exception e) {
assertEquals(0, called.get());
assertEquals(1, called.get());
assertEquals(1, called1.get());
}

coreTransportInterceptors = new ArrayList<>();
coreTransportInterceptors.add(interceptor);
module = newNetworkModule(settings, coreTransportInterceptors, new NetworkPlugin() {
@Override
public List<TransportInterceptor> getTransportInterceptors(
NamedWriteableRegistry namedWriteableRegistry,
ThreadContext threadContext
) {
assertNotNull(threadContext);
return Collections.singletonList(interceptor1);
}
});

transportInterceptor = module.getTransportInterceptor();
try {
transportInterceptor.interceptHandler("foo/baz/boom", null, false, null);
} catch (Exception e) {
assertEquals(0, called.get());
assertEquals(1, called.get());
assertEquals(2, called1.get());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,17 +94,24 @@ public void testConnectAndExecuteRequest() throws Exception {
service.acceptIncomingRequests();
logger.info("now accepting incoming requests on local transport");
RemoteClusterService remoteClusterService = service.getRemoteClusterService();
logger.info("remote cluster service is able to get");
assertBusy(() -> { assertTrue(remoteClusterService.isRemoteNodeConnected("test", remoteNode)); }, 10, TimeUnit.SECONDS);
logger.info("remote cluster service is remote node connected");
Client client = remoteClusterService.getRemoteClusterClient(threadPool, "test");
logger.info("remote cluster client is able to create");
ClusterStateResponse clusterStateResponse = client.admin().cluster().prepareState().execute().get();
logger.info("able to get cluster state response");
assertNotNull(clusterStateResponse);
assertEquals("foo_bar_cluster", clusterStateResponse.getState().getClusterName().value());
logger.info("before triggering scroll");
// also test a failure, there is no handler for scroll registered
ActionNotFoundTransportException ex = expectThrows(
ActionNotFoundTransportException.class,
() -> client.prepareSearchScroll("").get()
);
logger.info("after triggering scroll");
assertEquals("No handler for action [indices:data/read/scroll]", ex.getMessage());
logger.info("Test execution successful");
}
}
}
Expand Down

0 comments on commit 7cab2da

Please sign in to comment.