Skip to content

Commit

Permalink
[RM-84612] HandBaseService implements RhBatchService to call service …
Browse files Browse the repository at this point in the history
…from the same process without network
  • Loading branch information
Nikita-Smirnov-Exactpro committed Feb 12, 2024
1 parent 1c75c24 commit 4cfdb52
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 27 deletions.
15 changes: 3 additions & 12 deletions src/main/java/com/exactpro/th2/hand/Application.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2020 Exactpro (Exactpro Systems Limited)
* Copyright 2020-2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,14 +16,10 @@

package com.exactpro.th2.hand;

import com.exactpro.th2.common.schema.factory.CommonFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.exactpro.th2.common.schema.factory.CommonFactory;

import java.time.Instant;
import java.util.concurrent.TimeUnit;

public class Application
{
private static final Logger LOGGER = LoggerFactory.getLogger(Application.class);
Expand All @@ -32,15 +28,10 @@ public static void main(String[] args) {
new Application().run(args);
}

public long getCurrentTime() {
Instant now = Instant.now();
return TimeUnit.SECONDS.toNanos(now.getEpochSecond()) + now.getNano();
}

public void run(String[] args) {
try (CommonFactory factory = CommonFactory.createFromArguments(args)) {
Config config = getConfig(factory);
try (HandServer handServer = new HandServer(config, getCurrentTime())) {
try (HandServer handServer = new HandServer(config)) {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
LOGGER.info("*** Closing hand server because JVM is shutting down");
try {
Expand Down
13 changes: 9 additions & 4 deletions src/main/java/com/exactpro/th2/hand/Config.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2023 Exactpro (Exactpro Systems Limited)
* Copyright 2020-2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -29,16 +29,21 @@ public class Config {
protected final CommonFactory factory;
protected final CustomConfiguration customConfiguration;

public Config(CommonFactory factory) throws ConfigurationException {
this.factory = factory;
this.customConfiguration = factory.getCustomConfiguration(CustomConfiguration.class);
public Config(CommonFactory factory, CustomConfiguration customConfiguration) throws ConfigurationException {
if (customConfiguration == null) {
throw new ConfigurationException("Custom configuration is not found");
}

if (customConfiguration.getDriversMapping().isEmpty()) {
throw new ConfigurationException("Drivers mapping should be provided in custom config.");
}

this.factory = factory;
this.customConfiguration = customConfiguration;
}

public Config(CommonFactory factory) throws ConfigurationException {
this(factory, factory.getCustomConfiguration(CustomConfiguration.class));
}

public CommonFactory getFactory() {
Expand Down
5 changes: 2 additions & 3 deletions src/main/java/com/exactpro/th2/hand/HandServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import java.util.List;
import java.util.ServiceLoader;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

public class HandServer implements AutoCloseable {
Expand All @@ -39,9 +38,9 @@ public class HandServer implements AutoCloseable {
private final List<IHandService> services;
private final AtomicReference<Thread> watcher = new AtomicReference<>();

public HandServer(Config config, long startSequences) throws Exception {
public HandServer(Config config) throws Exception {
this.config = config;
this.messageHandler = new MessageHandler(config, new AtomicLong(startSequences));
this.messageHandler = new MessageHandler(config);
this.services = new ArrayList<>();
this.server = buildServer();
}
Expand Down
72 changes: 65 additions & 7 deletions src/main/java/com/exactpro/th2/hand/services/HandBaseService.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@

package com.exactpro.th2.hand.services;

import com.exactpro.remotehand.RhConfigurationException;
import com.exactpro.th2.act.grpc.hand.RhActionsBatch;
import com.exactpro.th2.act.grpc.hand.RhBatchGrpc.RhBatchImplBase;
import com.exactpro.th2.act.grpc.hand.RhBatchResponse;
import com.exactpro.th2.act.grpc.hand.RhBatchService;
import com.exactpro.th2.act.grpc.hand.RhSessionID;
import com.exactpro.th2.act.grpc.hand.RhTargetServer;
import com.exactpro.th2.hand.HandException;
Expand All @@ -28,9 +30,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Map;

import static com.exactpro.th2.common.message.MessageUtils.toJson;

public class HandBaseService extends RhBatchImplBase implements IHandService {
public class HandBaseService extends RhBatchImplBase implements IHandService, RhBatchService {
private final static Logger LOGGER = LoggerFactory.getLogger(HandBaseService.class);

public static final String RH_SESSION_PREFIX = "/Ses";
Expand All @@ -45,9 +50,7 @@ public void init(MessageHandler messageHandler) {
@Override
public void register(RhTargetServer targetServer, StreamObserver<RhSessionID> responseObserver) {
try {
String sessionId = messageHandler.getRhConnectionManager().createSessionHandler(targetServer.getTarget()).getId();
RhSessionID result = RhSessionID.newBuilder().setId(sessionId).setSessionAlias(messageHandler.getConfig().getSessionAlias()).build();
responseObserver.onNext(result);
responseObserver.onNext(register(targetServer));
responseObserver.onCompleted();
} catch (Exception e) {
LOGGER.error("Error while creating session", e);
Expand All @@ -59,8 +62,7 @@ public void register(RhTargetServer targetServer, StreamObserver<RhSessionID> re
@Override
public void unregister(RhSessionID request, StreamObserver<Empty> responseObserver) {
try {
messageHandler.getRhConnectionManager().closeSessionHandler(request.getId());
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onNext(unregister(request));
responseObserver.onCompleted();
} catch (Exception e) {
LOGGER.error("Action failure, request: '{}'", toJson(request), e);
Expand All @@ -73,7 +75,7 @@ public void unregister(RhSessionID request, StreamObserver<Empty> responseObserv
public void executeRhActionsBatch(RhActionsBatch request, StreamObserver<RhBatchResponse> responseObserver) {
LOGGER.trace("Action: 'executeRhActionsBatch', request: '{}'", toJson(request));
try {
responseObserver.onNext(messageHandler.handleActionsBatchRequest(request));
responseObserver.onNext(executeRhActionsBatch(request));
responseObserver.onCompleted();
} catch (Exception e) {
LOGGER.error("Action failure, request: '{}'", toJson(request), e);
Expand All @@ -89,4 +91,60 @@ public void dispose() {
LOGGER.error("Error while disposing message handler", e);
}
}

/**
* @throws RhConfigurationException
*/
@SuppressWarnings("JavadocDeclaration")
@Override
public RhSessionID register(RhTargetServer targetServer) {
try {
String sessionId = messageHandler.getRhConnectionManager().createSessionHandler(targetServer.getTarget()).getId();
return RhSessionID.newBuilder().setId(sessionId).setSessionAlias(messageHandler.getConfig().getSessionAlias()).build();
} catch (RhConfigurationException e) {
sneakyThrow(e);
return null;
}
}

@Override
public RhSessionID register(RhTargetServer input, Map<String, String> properties) {
return register(input);
}

@Override
public Empty unregister(RhSessionID request) {
messageHandler.getRhConnectionManager().closeSessionHandler(request.getId());
return Empty.getDefaultInstance();
}

@Override
public Empty unregister(RhSessionID input, Map<String, String> properties) {
return unregister(input);
}

/**
* @throws IOException
*/
@SuppressWarnings("JavadocDeclaration")
@Override
public RhBatchResponse executeRhActionsBatch(RhActionsBatch request) {
try {
return messageHandler.handleActionsBatchRequest(request);
} catch (IOException e) {
sneakyThrow(e);
return null;
}
}

@Override
public RhBatchResponse executeRhActionsBatch(RhActionsBatch input, Map<String, String> properties) {
return executeRhActionsBatch(input);
}

// This hack is used to avoid wrapping cause error
private static <E extends Throwable> void sneakyThrow(Throwable e) throws E {
//noinspection unchecked
throw (E) e;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import com.exactpro.th2.hand.services.mstore.TransportMessageStoreSender;

import java.io.IOException;
import java.time.Instant;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

public class MessageHandler implements AutoCloseable {
Expand All @@ -43,10 +45,11 @@ public class MessageHandler implements AutoCloseable {
private final RhConnectionManager rhConnectionManager;
private final ScriptBuilder scriptBuilder = new ScriptBuilder();

public MessageHandler(Config config, AtomicLong seqNum) {
public MessageHandler(Config config) {
this.config = config;
rhConnectionManager = new RhConnectionManager(config);
CommonFactory factory = config.getFactory();
AtomicLong seqNum = new AtomicLong(getCurrentTime());
if (config.isUseTransport()) {
this.messageStoreHandler = new MessageStoreHandler<>(
config.getSessionGroup(),
Expand Down Expand Up @@ -96,4 +99,9 @@ public RhBatchResponse handleActionsBatchRequest(RhActionsBatch request) throws
public void close() {
rhConnectionManager.dispose();
}

private static long getCurrentTime() {
Instant now = Instant.now();
return TimeUnit.SECONDS.toNanos(now.getEpochSecond()) + now.getNano();
}
}

0 comments on commit 4cfdb52

Please sign in to comment.