From 4cfdb52cab1b96b8a3cbc7897cbc6a8db56e3d83 Mon Sep 17 00:00:00 2001 From: "nikita.smirnov" Date: Mon, 12 Feb 2024 14:56:33 +0400 Subject: [PATCH] [RM-84612] HandBaseService implements RhBatchService to call service from the same process without network --- .../com/exactpro/th2/hand/Application.java | 15 +--- .../java/com/exactpro/th2/hand/Config.java | 13 ++-- .../com/exactpro/th2/hand/HandServer.java | 5 +- .../th2/hand/services/HandBaseService.java | 72 +++++++++++++++++-- .../th2/hand/services/MessageHandler.java | 10 ++- 5 files changed, 88 insertions(+), 27 deletions(-) diff --git a/src/main/java/com/exactpro/th2/hand/Application.java b/src/main/java/com/exactpro/th2/hand/Application.java index d110673..103ee61 100644 --- a/src/main/java/com/exactpro/th2/hand/Application.java +++ b/src/main/java/com/exactpro/th2/hand/Application.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2020 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2024 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,14 +16,10 @@ package com.exactpro.th2.hand; +import com.exactpro.th2.common.schema.factory.CommonFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.exactpro.th2.common.schema.factory.CommonFactory; - -import java.time.Instant; -import java.util.concurrent.TimeUnit; - public class Application { private static final Logger LOGGER = LoggerFactory.getLogger(Application.class); @@ -32,15 +28,10 @@ public static void main(String[] args) { new Application().run(args); } - public long getCurrentTime() { - Instant now = Instant.now(); - return TimeUnit.SECONDS.toNanos(now.getEpochSecond()) + now.getNano(); - } - public void run(String[] args) { try (CommonFactory factory = CommonFactory.createFromArguments(args)) { Config config = getConfig(factory); - try (HandServer handServer = new HandServer(config, getCurrentTime())) { + try (HandServer handServer = new HandServer(config)) { Runtime.getRuntime().addShutdownHook(new Thread(() -> { LOGGER.info("*** Closing hand server because JVM is shutting down"); try { diff --git a/src/main/java/com/exactpro/th2/hand/Config.java b/src/main/java/com/exactpro/th2/hand/Config.java index 6267df0..38c365b 100644 --- a/src/main/java/com/exactpro/th2/hand/Config.java +++ b/src/main/java/com/exactpro/th2/hand/Config.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2023 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2024 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,9 +29,7 @@ public class Config { protected final CommonFactory factory; protected final CustomConfiguration customConfiguration; - public Config(CommonFactory factory) throws ConfigurationException { - this.factory = factory; - this.customConfiguration = factory.getCustomConfiguration(CustomConfiguration.class); + public Config(CommonFactory factory, CustomConfiguration customConfiguration) throws ConfigurationException { if (customConfiguration == null) { throw new ConfigurationException("Custom configuration is not found"); } @@ -39,6 +37,13 @@ public Config(CommonFactory factory) throws ConfigurationException { if (customConfiguration.getDriversMapping().isEmpty()) { throw new ConfigurationException("Drivers mapping should be provided in custom config."); } + + this.factory = factory; + this.customConfiguration = customConfiguration; + } + + public Config(CommonFactory factory) throws ConfigurationException { + this(factory, factory.getCustomConfiguration(CustomConfiguration.class)); } public CommonFactory getFactory() { diff --git a/src/main/java/com/exactpro/th2/hand/HandServer.java b/src/main/java/com/exactpro/th2/hand/HandServer.java index 7001366..22a8098 100644 --- a/src/main/java/com/exactpro/th2/hand/HandServer.java +++ b/src/main/java/com/exactpro/th2/hand/HandServer.java @@ -27,7 +27,6 @@ import java.util.List; import java.util.ServiceLoader; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; public class HandServer implements AutoCloseable { @@ -39,9 +38,9 @@ public class HandServer implements AutoCloseable { private final List services; private final AtomicReference watcher = new AtomicReference<>(); - public HandServer(Config config, long startSequences) throws Exception { + public HandServer(Config config) throws Exception { this.config = config; - this.messageHandler = new MessageHandler(config, new AtomicLong(startSequences)); + this.messageHandler = new MessageHandler(config); this.services = new ArrayList<>(); this.server = buildServer(); } diff --git a/src/main/java/com/exactpro/th2/hand/services/HandBaseService.java b/src/main/java/com/exactpro/th2/hand/services/HandBaseService.java index f3c4796..993686d 100644 --- a/src/main/java/com/exactpro/th2/hand/services/HandBaseService.java +++ b/src/main/java/com/exactpro/th2/hand/services/HandBaseService.java @@ -16,9 +16,11 @@ package com.exactpro.th2.hand.services; +import com.exactpro.remotehand.RhConfigurationException; import com.exactpro.th2.act.grpc.hand.RhActionsBatch; import com.exactpro.th2.act.grpc.hand.RhBatchGrpc.RhBatchImplBase; import com.exactpro.th2.act.grpc.hand.RhBatchResponse; +import com.exactpro.th2.act.grpc.hand.RhBatchService; import com.exactpro.th2.act.grpc.hand.RhSessionID; import com.exactpro.th2.act.grpc.hand.RhTargetServer; import com.exactpro.th2.hand.HandException; @@ -28,9 +30,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.Map; + import static com.exactpro.th2.common.message.MessageUtils.toJson; -public class HandBaseService extends RhBatchImplBase implements IHandService { +public class HandBaseService extends RhBatchImplBase implements IHandService, RhBatchService { private final static Logger LOGGER = LoggerFactory.getLogger(HandBaseService.class); public static final String RH_SESSION_PREFIX = "/Ses"; @@ -45,9 +50,7 @@ public void init(MessageHandler messageHandler) { @Override public void register(RhTargetServer targetServer, StreamObserver responseObserver) { try { - String sessionId = messageHandler.getRhConnectionManager().createSessionHandler(targetServer.getTarget()).getId(); - RhSessionID result = RhSessionID.newBuilder().setId(sessionId).setSessionAlias(messageHandler.getConfig().getSessionAlias()).build(); - responseObserver.onNext(result); + responseObserver.onNext(register(targetServer)); responseObserver.onCompleted(); } catch (Exception e) { LOGGER.error("Error while creating session", e); @@ -59,8 +62,7 @@ public void register(RhTargetServer targetServer, StreamObserver re @Override public void unregister(RhSessionID request, StreamObserver responseObserver) { try { - messageHandler.getRhConnectionManager().closeSessionHandler(request.getId()); - responseObserver.onNext(Empty.getDefaultInstance()); + responseObserver.onNext(unregister(request)); responseObserver.onCompleted(); } catch (Exception e) { LOGGER.error("Action failure, request: '{}'", toJson(request), e); @@ -73,7 +75,7 @@ public void unregister(RhSessionID request, StreamObserver responseObserv public void executeRhActionsBatch(RhActionsBatch request, StreamObserver responseObserver) { LOGGER.trace("Action: 'executeRhActionsBatch', request: '{}'", toJson(request)); try { - responseObserver.onNext(messageHandler.handleActionsBatchRequest(request)); + responseObserver.onNext(executeRhActionsBatch(request)); responseObserver.onCompleted(); } catch (Exception e) { LOGGER.error("Action failure, request: '{}'", toJson(request), e); @@ -89,4 +91,60 @@ public void dispose() { LOGGER.error("Error while disposing message handler", e); } } + + /** + * @throws RhConfigurationException + */ + @SuppressWarnings("JavadocDeclaration") + @Override + public RhSessionID register(RhTargetServer targetServer) { + try { + String sessionId = messageHandler.getRhConnectionManager().createSessionHandler(targetServer.getTarget()).getId(); + return RhSessionID.newBuilder().setId(sessionId).setSessionAlias(messageHandler.getConfig().getSessionAlias()).build(); + } catch (RhConfigurationException e) { + sneakyThrow(e); + return null; + } + } + + @Override + public RhSessionID register(RhTargetServer input, Map properties) { + return register(input); + } + + @Override + public Empty unregister(RhSessionID request) { + messageHandler.getRhConnectionManager().closeSessionHandler(request.getId()); + return Empty.getDefaultInstance(); + } + + @Override + public Empty unregister(RhSessionID input, Map properties) { + return unregister(input); + } + + /** + * @throws IOException + */ + @SuppressWarnings("JavadocDeclaration") + @Override + public RhBatchResponse executeRhActionsBatch(RhActionsBatch request) { + try { + return messageHandler.handleActionsBatchRequest(request); + } catch (IOException e) { + sneakyThrow(e); + return null; + } + } + + @Override + public RhBatchResponse executeRhActionsBatch(RhActionsBatch input, Map properties) { + return executeRhActionsBatch(input); + } + + // This hack is used to avoid wrapping cause error + private static void sneakyThrow(Throwable e) throws E { + //noinspection unchecked + throw (E) e; + } } \ No newline at end of file diff --git a/src/main/java/com/exactpro/th2/hand/services/MessageHandler.java b/src/main/java/com/exactpro/th2/hand/services/MessageHandler.java index 137fc9c..03dbc4d 100644 --- a/src/main/java/com/exactpro/th2/hand/services/MessageHandler.java +++ b/src/main/java/com/exactpro/th2/hand/services/MessageHandler.java @@ -34,6 +34,8 @@ import com.exactpro.th2.hand.services.mstore.TransportMessageStoreSender; import java.io.IOException; +import java.time.Instant; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; public class MessageHandler implements AutoCloseable { @@ -43,10 +45,11 @@ public class MessageHandler implements AutoCloseable { private final RhConnectionManager rhConnectionManager; private final ScriptBuilder scriptBuilder = new ScriptBuilder(); - public MessageHandler(Config config, AtomicLong seqNum) { + public MessageHandler(Config config) { this.config = config; rhConnectionManager = new RhConnectionManager(config); CommonFactory factory = config.getFactory(); + AtomicLong seqNum = new AtomicLong(getCurrentTime()); if (config.isUseTransport()) { this.messageStoreHandler = new MessageStoreHandler<>( config.getSessionGroup(), @@ -96,4 +99,9 @@ public RhBatchResponse handleActionsBatchRequest(RhActionsBatch request) throws public void close() { rhConnectionManager.dispose(); } + + private static long getCurrentTime() { + Instant now = Instant.now(); + return TimeUnit.SECONDS.toNanos(now.getEpochSecond()) + now.getNano(); + } } \ No newline at end of file