Skip to content

Commit

Permalink
[RM-84612] fixes for embedded mode (#53)
Browse files Browse the repository at this point in the history
  • Loading branch information
lumber1000 authored Apr 10, 2024
1 parent ec60f10 commit 8740daf
Show file tree
Hide file tree
Showing 11 changed files with 62 additions and 63 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,9 @@ Example of `rabbitMQ.json`:

### 3.1.0

+ reads dictionaries from the /var/th2/config/dictionary folder
+ uses mq_router, grpc_router, cradle_manager optional JSON configs from the /var/th2/config folder
+ tries to load log4j.properties files from sources in order: '/var/th2/config', '/home/etc', configured path via cmd, default configuration
+ reads dictionaries from the /var/th2/context/dictionary folder
+ uses mq_router, grpc_router, cradle_manager optional JSON configs from the /var/th2/context folder
+ tries to load log4j.properties files from sources in order: '/var/th2/context', '/home/etc', configured path via cmd, default configuration
+ update Cradle version. Introduce async API for storing events
+ removed gRPC event loop handling
+ fixed dictionary reading
17 changes: 7 additions & 10 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ plugins {
id "org.owasp.dependencycheck" version "9.0.9"
id "com.gorylenko.gradle-git-properties" version "2.4.1"
id 'com.github.jk1.dependency-license-report' version '2.5'
id "de.undercouch.download" version "5.4.0"
id "de.undercouch.download" version "5.6.0"
id 'signing'
id "com.google.protobuf" version "0.9.3"
id "com.google.protobuf" version "0.9.4"
}

group 'com.exactpro.th2'
Expand All @@ -57,15 +57,13 @@ repositories {
}

dependencies {
api platform('com.exactpro.th2:bom:4.5.0')
api platform('com.exactpro.th2:bom:4.6.0')

implementation('com.exactpro.remotehand:remotehand:1.7.3-TH2-4662-4046816762-SNAPSHOT') {
exclude group: "org.slf4j", module: "slf4j-log4j12"
}
implementation("com.exactpro.th2:grpc-hand:3.0.0-RM-84612-+") {
implementation('com.exactpro.remotehand:remotehand:1.8.0-dev')
implementation("com.exactpro.th2:grpc-hand:3.0.0-dev") {
exclude group: "com.google.guava", module: "guava" // for compatibility with Selenium 3.141.59
}
implementation("com.exactpro.th2:common:5.8.0-dev") {
implementation("com.exactpro.th2:common:5.10.0-dev") {
exclude group: "com.google.guava", module: "guava" // for compatibility with Selenium 3.141.59
}
implementation("com.exactpro.th2:common-utils:2.2.2-dev") {
Expand All @@ -79,13 +77,12 @@ dependencies {
implementation "com.fasterxml.jackson.core:jackson-annotations"

implementation 'org.apache.commons:commons-lang3'
implementation "org.apache.commons:commons-csv:1.9.0"
implementation "org.apache.commons:commons-csv:1.10.0"

testImplementation 'org.junit.jupiter:junit-jupiter:5.10.2'
testImplementation 'org.mockito.kotlin:mockito-kotlin:5.2.1'
testImplementation 'org.jetbrains.kotlin:kotlin-test-junit5'
testImplementation 'io.strikt:strikt-core:0.34.1'

}

java {
Expand Down
8 changes: 4 additions & 4 deletions src/main/java/com/exactpro/th2/hand/Application.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ public static void main(String[] args) {

public void run(String[] args) {
try (CommonFactory factory = CommonFactory.createFromArguments(args)) {
Config config = getConfig(factory);
try (HandServer handServer = new HandServer(config)) {
Context context = getConfig(factory);
try (HandServer handServer = new HandServer(context)) {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
LOGGER.info("*** Closing hand server because JVM is shutting down");
try {
Expand All @@ -50,8 +50,8 @@ public void run(String[] args) {
}
}

protected Config getConfig(CommonFactory factory) throws ConfigurationException {
return new Config(factory);
protected Context getConfig(CommonFactory factory) throws ConfigurationException {
return new Context(factory);
}

private static void closeApp() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

public class Config {
public class Context {

protected final CommonFactory factory;
protected final CustomConfiguration customConfiguration;

public Config(CommonFactory factory, CustomConfiguration customConfiguration) throws ConfigurationException {
public Context(CommonFactory factory, CustomConfiguration customConfiguration) throws ConfigurationException {
if (customConfiguration == null) {
throw new ConfigurationException("Custom configuration is not found");
}
Expand All @@ -42,14 +42,18 @@ public Config(CommonFactory factory, CustomConfiguration customConfiguration) th
this.customConfiguration = customConfiguration;
}

public Config(CommonFactory factory) throws ConfigurationException {
public Context(CommonFactory factory) throws ConfigurationException {
this(factory, factory.getCustomConfiguration(CustomConfiguration.class));
}

public CommonFactory getFactory() {
return factory;
}

public CustomConfiguration getCustomConfiguration() {
return customConfiguration;
}

public Map<String, DriverMapping> getDriversMapping() {
return customConfiguration.getDriversMapping();
}
Expand Down
10 changes: 5 additions & 5 deletions src/main/java/com/exactpro/th2/hand/HandServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,15 @@
public class HandServer implements AutoCloseable {
private static final Logger LOGGER = LoggerFactory.getLogger(HandServer.class);

private final Config config;
private final Context context;
private final MessageHandler messageHandler;
private final Server server;
private final List<IHandService> services;
private final AtomicReference<Thread> watcher = new AtomicReference<>();

public HandServer(Config config) throws Exception {
this.config = config;
this.messageHandler = new MessageHandler(config);
public HandServer(Context context) throws Exception {
this.context = context;
this.messageHandler = new MessageHandler(context);
this.services = new ArrayList<>();
this.server = buildServer();
}
Expand All @@ -52,7 +52,7 @@ protected Server buildServer() throws Exception {
LOGGER.info("Service '{}' loaded", rhService.getClass().getName());
}

return config.getFactory().getGrpcRouter().startServer(services.toArray(new IHandService[0]));
return context.getFactory().getGrpcRouter().startServer(services.toArray(new IHandService[0]));
}

/**
Expand Down
10 changes: 5 additions & 5 deletions src/main/java/com/exactpro/th2/hand/RhConnectionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ public class RhConnectionManager {

private final Map<String, HandSessionHandler> sessions = new ConcurrentHashMap<>();
private final GridRemoteHandManager gridRemoteHandManager;
private final Config config;
private final Context context;

public RhConnectionManager(Config config) {
this.config = config;
public RhConnectionManager(Context context) {
this.context = context;
gridRemoteHandManager = new GridRemoteHandManager();
gridRemoteHandManager.createConfigurations(null, config.getRhOptions());
gridRemoteHandManager.createConfigurations(null, context.getRhOptions());
}

public HandSessionHandler getSessionHandler(String sessionId) throws IllegalArgumentException {
Expand All @@ -49,7 +49,7 @@ public HandSessionHandler getSessionHandler(String sessionId) throws IllegalArgu
}

public HandSessionHandler createSessionHandler(String targetServer) throws RhConfigurationException {
Config.DriverMapping driverSettings = config.getDriversMapping().get(targetServer);
Context.DriverMapping driverSettings = context.getDriversMapping().get(targetServer);
if (driverSettings == null) {
throw new RhConfigurationException("Unrecognized targetServer: " + targetServer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import java.util.Collections;
import java.util.Map;

import com.exactpro.th2.hand.Config;
import com.exactpro.th2.hand.Context;
import com.fasterxml.jackson.annotation.JsonAlias;
import com.fasterxml.jackson.annotation.JsonProperty;

Expand Down Expand Up @@ -48,7 +48,7 @@ public class CustomConfiguration {

@JsonProperty(value="drivers-mapping", required = true)
@JsonAlias("driversMapping")
private Map<String, Config.DriverMapping> driversMapping;
private Map<String, Context.DriverMapping> driversMapping;

@JsonProperty(value="rh-options")
@JsonAlias("rhOptions")
Expand All @@ -59,10 +59,10 @@ public class CustomConfiguration {
private int responseTimeout = DEFAULT_RESPONSE_TIMEOUT;

@JsonProperty(value="use-transport")
@JsonAlias("responseTimeoutSec")
@JsonAlias("useTransport")
private boolean useTransport = true;

public Map<String, Config.DriverMapping> getDriversMapping() {
public Map<String, Context.DriverMapping> getDriversMapping() {
return driversMapping;
}

Expand Down
30 changes: 15 additions & 15 deletions src/main/java/com/exactpro/th2/hand/services/MessageHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import com.exactpro.th2.act.grpc.hand.RhBatchResponse;
import com.exactpro.th2.common.schema.factory.CommonFactory;
import com.exactpro.th2.common.utils.message.transport.MessageUtilsKt;
import com.exactpro.th2.hand.Config;
import com.exactpro.th2.hand.Context;
import com.exactpro.th2.hand.RhConnectionManager;
import com.exactpro.th2.hand.builders.events.DefaultEventBuilder;
import com.exactpro.th2.hand.builders.mstore.ProtobufMessageStoreBuilder;
Expand All @@ -39,29 +39,29 @@
import java.util.concurrent.atomic.AtomicLong;

public class MessageHandler implements AutoCloseable {
private final Config config;
private final Context context;
private final MessageStoreHandler<?> messageStoreHandler;
private final EventStoreHandler eventStoreHandler;
private final RhConnectionManager rhConnectionManager;
private final ScriptBuilder scriptBuilder = new ScriptBuilder();

public MessageHandler(Config config) {
this.config = config;
rhConnectionManager = new RhConnectionManager(config);
CommonFactory factory = config.getFactory();
public MessageHandler(Context context) {
this.context = context;
rhConnectionManager = new RhConnectionManager(context);
CommonFactory factory = context.getFactory();
AtomicLong seqNum = new AtomicLong(getCurrentTime());
if (config.isUseTransport()) {
if (context.isUseTransport()) {
this.messageStoreHandler = new MessageStoreHandler<>(
config.getSessionGroup(),
new TransportMessageStoreSender(factory),
context.getSessionGroup(),
new TransportMessageStoreSender(context),
new TransportMessageStoreBuilder(seqNum),
message -> MessageUtilsKt.toProto(message.getId(), config.getBook(), config.getSessionGroup())
message -> MessageUtilsKt.toProto(message.getId(), context.getBook(), context.getSessionGroup())
);
} else {
this.messageStoreHandler = new MessageStoreHandler<>(
config.getSessionGroup(),
new ProtobufMessageStoreSender(factory),
new ProtobufMessageStoreBuilder(config.getFactory(), seqNum),
context.getSessionGroup(),
new ProtobufMessageStoreSender(context),
new ProtobufMessageStoreBuilder(context.getFactory(), seqNum),
message -> message.getMetadata().getId()
);
}
Expand All @@ -82,8 +82,8 @@ public ScriptBuilder getScriptBuilder() {
return scriptBuilder;
}

public Config getConfig() {
return config;
public Context getConfig() {
return context;
}

public RhConnectionManager getRhConnectionManager() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
import java.util.Collection;

public interface MessageStoreSender<T> {
String RAW_MESSAGE_ATTRIBUTE = "raw";

void sendMessages(T messages);

void sendMessages(Collection<T> messages);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import com.exactpro.th2.common.grpc.MessageGroup;
import com.exactpro.th2.common.grpc.MessageGroupBatch;
import com.exactpro.th2.common.grpc.RawMessage;
import com.exactpro.th2.common.schema.factory.CommonFactory;
import com.exactpro.th2.common.schema.message.MessageRouter;
import com.exactpro.th2.hand.Context;
import com.exactpro.th2.hand.schema.CustomConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -30,14 +30,14 @@
import java.util.Collections;

public class ProtobufMessageStoreSender implements MessageStoreSender<RawMessage> {
private static final String RAW_MESSAGE_ATTRIBUTE = "raw";
private static final Logger LOGGER = LoggerFactory.getLogger(ProtobufMessageStoreSender.class);
private final MessageRouter<MessageGroupBatch> messageRouterGroupBatch;
private final long batchLimit;


public ProtobufMessageStoreSender(CommonFactory factory) {
messageRouterGroupBatch = factory.getMessageRouterMessageGroupBatch();
CustomConfiguration customConfiguration = factory.getCustomConfiguration(CustomConfiguration.class);
public ProtobufMessageStoreSender(Context context) {
messageRouterGroupBatch = context.getFactory().getMessageRouterMessageGroupBatch();
CustomConfiguration customConfiguration = context.getCustomConfiguration();
this.batchLimit = customConfiguration.getMessageBatchLimit();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@

package com.exactpro.th2.hand.services.mstore;

import com.exactpro.th2.common.schema.factory.CommonFactory;
import com.exactpro.th2.common.schema.message.MessageRouter;
import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.GroupBatch;
import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.RawMessage;
import com.exactpro.th2.common.utils.message.transport.MessageUtilsKt;
import com.exactpro.th2.hand.Context;
import com.exactpro.th2.hand.schema.CustomConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -35,11 +35,11 @@ public class TransportMessageStoreSender implements MessageStoreSender<RawMessag
private final String book;
private final String sessionGroup;

public TransportMessageStoreSender(CommonFactory factory) {
messageRouter = factory.getTransportGroupBatchRouter();
CustomConfiguration customConfiguration = factory.getCustomConfiguration(CustomConfiguration.class);
public TransportMessageStoreSender(Context context) {
messageRouter = context.getFactory().getTransportGroupBatchRouter();
CustomConfiguration customConfiguration = context.getCustomConfiguration();
this.batchLimit = customConfiguration.getMessageBatchLimit();
this.book = factory.getBoxConfiguration().getBookName();
this.book = context.getFactory().getBoxConfiguration().getBookName();
this.sessionGroup = customConfiguration.getSessionGroup();
}

Expand Down Expand Up @@ -70,7 +70,7 @@ private void sendRawMessages(Collection<RawMessage> messages) throws Exception {
//if batch limit has incorrect value, sender should pack each message to batch
//if one message is bigger that batchLimit it is should send to mstore anyway and reject by it
if (currentBatchLength + size > batchLimit && currentBatchLength != 0) {
this.messageRouter.sendAll(currentBatchBuilder.build(), RAW_MESSAGE_ATTRIBUTE);
this.messageRouter.sendAll(currentBatchBuilder.build());
currentBatchBuilder = createBatchBuilder();
currentBatchLength = 0;
batchesCount++;
Expand All @@ -83,7 +83,7 @@ private void sendRawMessages(Collection<RawMessage> messages) throws Exception {
}

if (currentBatchLength != 0) {
this.messageRouter.sendAll(currentBatchBuilder.build(), RAW_MESSAGE_ATTRIBUTE);
this.messageRouter.sendAll(currentBatchBuilder.build());
batchesCount++;
}

Expand Down

0 comments on commit 8740daf

Please sign in to comment.