diff --git a/Dockerfile b/Dockerfile index 84d6f55..5e8af41 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM gradle:6.6-jdk11 AS build +FROM gradle:7.6-jdk11 AS build ARG release_version COPY ./ . RUN gradle --no-daemon clean build dockerPrepare -Prelease_version=${release_version} diff --git a/README.md b/README.md index 94a3cc5..fb2b710 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# Connect (3.11.1) +# Connect (3.11.2) The "Connect" component is responsible for the communication with a target system. This component implements the logic of the interaction protocol, receiving and sending messages from and to the system, respectively. @@ -28,6 +28,7 @@ Parameters: + name - the service name that will be displayed in the events inside the report; + settings - the parameters that will be transformed to the actual service's settings specified in the **services.xml** file. + maxMessageBatchSize - the limitation for message batch size which connect sends to the first and to the second publish pins with. The default value is set to 100. ++ maxMessageFlushTime - defines maximum time between outgoing message batches in milliseconds. The default value is set to 1000. + enableMessageSendingEvent - if this option is set to `true`, connect sends a separate event for every message sent which incomes from the pin with the send attribute. The default value is set to true ## Metrics @@ -102,6 +103,7 @@ spec: type: "th2_service:Your_Service_Type" name: "your_service" maxMessageBatchSize: 100 + maxMessageFlushTime: 1000 enableMessageSendingEvent: true settings: param1: "value1" @@ -119,6 +121,12 @@ spec: ## Release notes +### 3.11.2 + ++ Sailfish updated from `3.3.132` to `3.3.144` ++ Added `maxMessageFlushTime` option ++ Use temporal directory for last layer in sailfish's workspace + ### 3.11.1 + Updated `sailfish-core` version from `3.3.54` to `3.3.132` @@ -129,9 +137,13 @@ spec: + Updated `kotlin` form `1.5.30` to `1.6.21` + Renamed project to `conn-sailfish` +### 3.10.2 + ++ Events are made more convenient. Added event names and error logs. Error message moved from the name to the body of the event. + ### 3.10.1 -+ Update `sailfish-core` version from `3.2.1674` to `3.2.1741` ++ Updated `sailfish-core` version from `3.2.1674` to `3.2.1741` + Add exception for checking the property in `IMetadata` + Added synchronization by processor to `ServiceListener.onMessage()` otherwise processor sometimes misses some sequences + Added log about missed sequences @@ -139,15 +151,15 @@ spec: ### 3.10.0 -+ Update `th2-common` version from `3.25.1` to `3.33.0` -+ Update `org.jetbrains.kotlin.jvm` version from `1.3.72` to `1.5.30` ++ Updated `th2-common` version from `3.25.1` to `3.33.0` ++ Updated `org.jetbrains.kotlin.jvm` version from `1.3.72` to `1.5.30` ### 3.9.0 -+ Update `sailfish-core` version from `3.2.1650` to `3.2.1674` ++ Updated `sailfish-core` version from `3.2.1650` to `3.2.1674` + Embedded Sailfish service based on MINA decodes the message as sender during sending. This approach is important for protocols in which a pair of messages have the same protocol message type and different structures depending on the direction. -+ Update `th2-common` version from `3.16.5` to `3.25.1` -+ Update `th2-sailfish-utils` version from `3.4.0` to `3.8.0` ++ Updated `th2-common` version from `3.16.5` to `3.25.1` ++ Updated `th2-sailfish-utils` version from `3.4.0` to `3.8.0` ### 3.8.1 @@ -159,11 +171,11 @@ spec: ### 3.7.2 -+ Update Sailfish version to 3.2.1603 ++ Updated Sailfish version to 3.2.1603 ### 3.7.1 -+ Update Sailfish version to 3.2.1572 (unwraps the EvolutionBatch when sending raw message) ++ Updated Sailfish version to 3.2.1572 (unwraps the EvolutionBatch when sending raw message) ### 3.7.0 @@ -191,7 +203,7 @@ spec: + reads dictionaries from the /var/th2/config/dictionary folder. + uses mq_router, grpc_router, cradle_manager optional JSON configs from the /var/th2/config folder + tries to load log4j.properties files from sources in order: '/var/th2/config', '/home/etc', configured path via cmd, default configuration -+ update Cradle version. Introduce async API for storing events ++ updated Cradle version. Introduce async API for storing events ### 3.4.1 diff --git a/build.gradle b/build.gradle index bac1a9b..5a358f4 100644 --- a/build.gradle +++ b/build.gradle @@ -10,7 +10,7 @@ dependencyCheck { } ext { - sailfishVersion = '3.3.132' + sailfishVersion = '3.3.144' } ext.excludeSailfish = { rcd -> @@ -63,7 +63,7 @@ repositories { mavenCentral() mavenLocal() - configurations.all { + configurations.configureEach { resolutionStrategy.cacheChangingModulesFor 0, 'seconds' resolutionStrategy.cacheDynamicVersionsFor 0, 'seconds' } @@ -106,7 +106,7 @@ dependencies { implementation("com.exactpro.sf:sailfish-core:${sailfishVersion}") - testImplementation 'org.junit.jupiter:junit-jupiter:5.7.1' + testImplementation 'org.junit.jupiter:junit-jupiter:5.9.0' } test { @@ -114,13 +114,13 @@ test { } application { - mainClassName 'com.exactpro.th2.conn.MicroserviceMain' + mainClass.set('com.exactpro.th2.conn.MicroserviceMain') } applicationName = 'service' distTar { - archiveName "${applicationName}.tar" + archiveFileName.set("${applicationName}.tar") } dockerPrepare { diff --git a/gradle.properties b/gradle.properties index 9e655ad..f8a1209 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1 +1 @@ -release_version = 3.11.1 \ No newline at end of file +release_version = 3.11.2 \ No newline at end of file diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index d97373f..bdf0ae0 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -15,7 +15,7 @@ # #Tue Jun 09 10:46:12 MSK 2020 -distributionUrl=https\://services.gradle.org/distributions/gradle-6.6-all.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-7.6-all.zip distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists zipStorePath=wrapper/dists diff --git a/src/main/java/com/exactpro/th2/conn/MessageSender.java b/src/main/java/com/exactpro/th2/conn/MessageSender.java index 15c7e19..044bf70 100644 --- a/src/main/java/com/exactpro/th2/conn/MessageSender.java +++ b/src/main/java/com/exactpro/th2/conn/MessageSender.java @@ -103,7 +103,7 @@ private void sendMessage(RawMessage protoMsg) throws InterruptedException { logger.debug("Message sent. Base64 view: {}", Base64.getEncoder().encodeToString(data)); } } catch (Exception ex) { - Event errorEvent = createErrorEvent("SendError") + Event errorEvent = createErrorEvent("SendError", ex) .bodyData(EventUtils.createMessageBean("Cannot send message. Message body in base64:")) .bodyData(EventUtils.createMessageBean(Base64.getEncoder().encodeToString(data))); EventStoreExtensions.addException(errorEvent, ex); @@ -124,10 +124,12 @@ private void storeErrorEvent(Event errorEvent, @Nullable EventID parentId) { } } - private Event createErrorEvent(String eventType) { + private Event createErrorEvent(String eventType, Exception e) { return Event.start().endTimestamp() .status(Status.FAILED) - .type(eventType); + .type(eventType) + .name("Failed to send raw message") + .exception(e, true); } private IMetadata toSailfishMetadata(RawMessage protoMsg) { diff --git a/src/main/java/com/exactpro/th2/conn/MicroserviceMain.java b/src/main/java/com/exactpro/th2/conn/MicroserviceMain.java index d6bbd34..5f5e87a 100644 --- a/src/main/java/com/exactpro/th2/conn/MicroserviceMain.java +++ b/src/main/java/com/exactpro/th2/conn/MicroserviceMain.java @@ -29,6 +29,7 @@ import java.io.File; import java.io.IOException; import java.io.InputStream; +import java.nio.file.Files; import java.time.Instant; import java.util.Deque; import java.util.Map; @@ -83,7 +84,6 @@ public class MicroserviceMain { private static final Logger LOGGER = LoggerFactory.getLogger(MicroserviceMain.class); - public static final int MAX_MESSAGES_COUNT = 100; public static final long NANOSECONDS_IN_SECOND = 1_000_000_000L; public static final String PASSWORD_PARAMETER = "password"; public static final String DEFAULT_PASSWORD_PARAMETER = "default"; @@ -128,7 +128,8 @@ public static void main(String[] args) { processor.onComplete(); }); - IServiceFactory serviceFactory = new ServiceFactory(workspaceFolder); + IServiceFactory serviceFactory = new ServiceFactory(workspaceFolder, + Files.createTempDirectory("sailfish-workspace").toFile()); disposer.register(() -> { LOGGER.info("Close service factory"); serviceFactory.close(); @@ -181,7 +182,7 @@ public static void main(String[] args) { }); createPipeline(processor, processor::onComplete, eventBatchRouter, rawMessageRouter, - configuration.getMaxMessageBatchSize(), configuration.isEnableMessageSendingEvent()) + configuration.getMaxMessageBatchSize(), configuration.getMaxMessageFlushTime(), configuration.isEnableMessageSendingEvent()) .blockingSubscribe(new TermibnationSubscriber<>(serviceProxy, messageSender)); } catch (SailfishURIException | WorkspaceSecurityException e) { LOGGER.error(e.getMessage(), e); exitCode = 2; } catch (IOException e) { LOGGER.error(e.getMessage(), e); exitCode = 3; @@ -211,7 +212,9 @@ private static Object getParamValue(ISettingsProxy settings, String parameterNam Flowable flowable, Action terminateFlowable, MessageRouter eventBatchRouter, MessageRouter rawMessageRouter, - int maxMessageBatchSize, boolean enableMessageSendingEvent) { + int maxMessageBatchSize, + long maxMessageFlushTime, + boolean enableMessageSendingEvent) { LOGGER.info("AvailableProcessors '{}'", Runtime.getRuntime().availableProcessors()); return flowable @@ -238,7 +241,7 @@ private static Object getParamValue(ISettingsProxy settings, String parameterNam if (enableMessageSendingEvent && direction == Direction.SECOND) { subscribeToSendMessage(eventBatchRouter, messageConnectable); } - createPackAndPublishPipeline(direction, messageConnectable, rawMessageRouter, maxMessageBatchSize); + createPackAndPublishPipeline(direction, messageConnectable, rawMessageRouter, maxMessageBatchSize, maxMessageFlushTime); return messageConnectable; }); @@ -273,7 +276,7 @@ private static void subscribeToSendMessage(MessageRouter eventBatchR } private static void createPackAndPublishPipeline(Direction direction, Flowable messageConnectable, - MessageRouter rawMessageRouter, int maxMessageBatchSize) { + MessageRouter rawMessageRouter, int maxMessageBatchSize, long maxMessageFlushTime) { LOGGER.info("Map group {}", direction); Flowable batchConnectable = messageConnectable @@ -282,7 +285,7 @@ private static void createPackAndPublishPipeline(Direction direction, Flowable !list.isEmpty()) .map(ConnectivityBatch::new) diff --git a/src/main/java/com/exactpro/th2/conn/ServiceListener.java b/src/main/java/com/exactpro/th2/conn/ServiceListener.java index aa246d9..189c426 100644 --- a/src/main/java/com/exactpro/th2/conn/ServiceListener.java +++ b/src/main/java/com/exactpro/th2/conn/ServiceListener.java @@ -25,6 +25,7 @@ import com.exactpro.sf.services.ServiceHandlerRoute; import com.exactpro.th2.common.event.Event; import com.exactpro.th2.common.event.Event.Status; +import com.exactpro.th2.common.event.EventUtils; import com.exactpro.th2.conn.events.EventDispatcher; import com.exactpro.th2.conn.events.EventHolder; import com.exactpro.th2.conn.utility.EventStoreExtensions; @@ -144,11 +145,13 @@ public void onMessage(IServiceProxy service, IMessage message, boolean rejected, @Override public void onEvent(IServiceProxy service, ServiceEvent serviceEvent) { LOGGER.info("Session '{}' emitted service event '{}'", sessionAlias, serviceEvent); + String eventName = "Service [" + serviceEvent.getServiceName().getServiceName() + "] emitted event with status " + serviceEvent.getLevel(); try { Event event = Event.start().endTimestamp() - .name(serviceEvent.getMessage()) + .name(eventName) .status(serviceEvent.getLevel() == Level.ERROR ? Status.FAILED : Status.PASSED) .type("Service event") + .bodyData(EventUtils.createMessageBean(serviceEvent.getMessage())) .description(serviceEvent.getDetails()); eventDispatcher.store(EventHolder.createServiceEvent(event)); diff --git a/src/main/java/com/exactpro/th2/conn/configuration/ConnectivityConfiguration.java b/src/main/java/com/exactpro/th2/conn/configuration/ConnectivityConfiguration.java index 691017b..a002491 100644 --- a/src/main/java/com/exactpro/th2/conn/configuration/ConnectivityConfiguration.java +++ b/src/main/java/com/exactpro/th2/conn/configuration/ConnectivityConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2021 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2022 Exactpro (Exactpro Systems Limited) * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -22,10 +22,12 @@ public class ConnectivityConfiguration { private int maxMessageBatchSize = 100; + private long maxMessageFlushTime = 1000; + @JsonProperty(value = "session-alias", required = true) private String sessionAlias; - @JsonProperty(value = "workspace",required = true) + @JsonProperty(value = "workspace", required = true) private String workspaceFolder; @JsonProperty(required = true) @@ -45,6 +47,10 @@ public int getMaxMessageBatchSize() { return maxMessageBatchSize; } + public long getMaxMessageFlushTime() { + return maxMessageFlushTime; + } + public String getSessionAlias() { return sessionAlias; } diff --git a/src/main/resources/log4j.properties b/src/main/resources/log4j.properties index 5429812..d6d73c8 100644 --- a/src/main/resources/log4j.properties +++ b/src/main/resources/log4j.properties @@ -13,12 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ -log4j.rootLogger=INFO, FILE, CON -log4j.appender.FILE=org.apache.log4j.FileAppender -log4j.appender.FILE.File=evolution_all.log -log4j.appender.FILE.Append=false -log4j.appender.FILE.layout=org.apache.log4j.PatternLayout -log4j.appender.FILE.layout.ConversionPattern=%d{dd MMM yyyy HH:mm:ss,SSS} %-6p [%-15t] %c - %m%n +log4j.rootLogger=INFO, CON log4j.appender.CON=org.apache.log4j.ConsoleAppender log4j.appender.CON.layout=org.apache.log4j.PatternLayout diff --git a/src/test/java/com/exactpro/th2/conn/TestEvent.java b/src/test/java/com/exactpro/th2/conn/TestEvent.java new file mode 100644 index 0000000..3518a86 --- /dev/null +++ b/src/test/java/com/exactpro/th2/conn/TestEvent.java @@ -0,0 +1,154 @@ +/* + * Copyright 2022 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.exactpro.th2.conn; + +import com.exactpro.sf.externalapi.IServiceProxy; +import com.exactpro.th2.common.event.Event; +import com.exactpro.th2.common.event.EventUtils; +import com.exactpro.th2.common.grpc.EventBatch; +import com.exactpro.th2.common.grpc.EventID; +import com.exactpro.th2.common.grpc.RawMessage; +import com.exactpro.th2.common.grpc.RawMessageBatch; +import com.exactpro.th2.common.schema.message.MessageListener; +import com.exactpro.th2.common.schema.message.MessageRouter; +import com.exactpro.th2.common.schema.message.SubscriberMonitor; +import com.exactpro.th2.conn.events.EventDispatcher; +import com.exactpro.th2.conn.events.EventHolder; +import com.exactpro.th2.conn.events.EventType; +import com.google.protobuf.ByteString; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.Map; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; + +public class TestEvent { + + private static final String rootID = "rootID"; + private static final Map parentIds = Map.of(EventType.ERROR, "errorEventID"); + + private static IServiceProxy serviceProxy; + private static EventDispatcher eventDispatcher; + private static MessageSender messageSender; + private static MessageListener messageListener; + private static Event event; + private static String parentId; + + @BeforeAll + static void initMessages() throws IOException { + serviceProxy = mock(IServiceProxy.class); + @SuppressWarnings("unchecked") + MessageRouter router = mock(MessageRouter.class); + + doAnswer(invocation -> { + messageListener = invocation.getArgument(0); + return (SubscriberMonitor) () -> { + }; + }).when(router).subscribeAll(any(), any()); + + eventDispatcher = mock(EventDispatcher.class); + doAnswer(invocation -> { + EventHolder eventHolder = invocation.getArgument(0); + EventType eventType = eventHolder.getType(); + + eventDispatcher.store(eventHolder.getEvent(), + parentIds.get(eventType) == null ? rootID : parentIds.get(eventType)); + return null; + }).when(eventDispatcher).store(any()); + + doAnswer(invocation -> { + event = invocation.getArgument(0); + parentId = invocation.getArgument(1); + return null; + }).when(eventDispatcher).store(any(), any()); + + messageSender = new MessageSender(serviceProxy, router, eventDispatcher, + EventID.newBuilder().setId("stubID").build()); + messageSender.start(); + } + + @AfterEach + void clear() { + event = null; + parentId = null; + } + + public void sendIncorrectMessage() throws Exception { + RawMessageBatch rawMessageBatch = RawMessageBatch.newBuilder() + .addMessages(RawMessage.newBuilder().build()) + .build(); + + doThrow(new IllegalStateException("error")).when(serviceProxy).sendRaw(any(), any()); + messageListener.handler("stubValue", rawMessageBatch); + } + + @Test + public void eventHasBodyTest() throws Exception { + sendIncorrectMessage(); + + ByteString body = event.toProto(EventUtils.toEventID(parentId)).getBody(); + Assertions.assertEquals("[{\"data\":\"java.lang.IllegalStateException: error\",\"type\":\"message\"}," + + "{\"data\":\"Cannot send message. Message body in base64:\",\"type\":\"message\"},{\"data\":\"\"," + + "\"type\":\"message\"},{\"data\":\"java.lang.IllegalStateException: error\",\"type\":\"message\"}]", body.toStringUtf8()); + } + + @Test + public void eventHasNameTest() throws Exception { + sendIncorrectMessage(); + + String name = event.toProto(EventUtils.toEventID(parentId)).getName(); + Assertions.assertEquals("Failed to send raw message", name); + } + + @Test + public void sentMessageWithParentEventIDTest() throws Exception { + RawMessageBatch rawMessageBatch = RawMessageBatch.newBuilder() + .addMessages(RawMessage.newBuilder() + .setParentEventId(EventID.newBuilder() + .setId("RawMessageParentEventID")).build()) + .build(); + + doThrow(new IllegalStateException("error")).when(serviceProxy).sendRaw(any(), any()); + messageListener.handler("stubValue", rawMessageBatch); + + event.addSubEvent(Event.start()); + + EventBatch eventBatch = event.toBatchProto(EventUtils.toEventID(parentId)); + Assertions.assertEquals("RawMessageParentEventID", eventBatch.getParentEventId().getId()); + } + + @Test + public void sentMessageWithoutParentEventIDTest() throws Exception { + sendIncorrectMessage(); + event.addSubEvent(Event.start()); + + EventBatch eventBatch = event.toBatchProto(EventUtils.toEventID(parentId)); + Assertions.assertEquals("errorEventID", eventBatch.getParentEventId().getId()); + } + + @AfterAll + static void close() throws IOException { + messageSender.stop(); + } +} \ No newline at end of file diff --git a/src/test/java/com/exactpro/th2/conn/TestServiceListener.java b/src/test/java/com/exactpro/th2/conn/TestServiceListener.java new file mode 100644 index 0000000..18b4bec --- /dev/null +++ b/src/test/java/com/exactpro/th2/conn/TestServiceListener.java @@ -0,0 +1,85 @@ +/* + * Copyright 2022 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.exactpro.th2.conn; + +import com.exactpro.sf.common.services.ServiceName; +import com.exactpro.sf.externalapi.IServiceProxy; +import com.exactpro.sf.services.ServiceEvent; +import com.exactpro.sf.services.ServiceEventFactory; +import com.exactpro.th2.common.event.Event; +import com.exactpro.th2.common.grpc.Direction; +import com.exactpro.th2.conn.events.EventDispatcher; +import com.exactpro.th2.conn.events.EventHolder; +import com.fasterxml.jackson.core.JsonProcessingException; +import io.reactivex.rxjava3.processors.FlowableProcessor; +import io.reactivex.rxjava3.processors.UnicastProcessor; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +import static org.mockito.Mockito.mock; + +public class TestServiceListener { + + + @Test + public void onEventTest() throws JsonProcessingException { + + FlowableProcessor processor = UnicastProcessor.create(); + MyEventDispatcher eventDispatcher = new MyEventDispatcher(); + + ServiceListener serviceListener = new ServiceListener(Map.of(Direction.FIRST, new AtomicLong(1)), + "SessionAlias", processor, eventDispatcher); + + ServiceEvent serviceEvent = ServiceEventFactory.createEventInfo(ServiceName.parse("serviceName"), ServiceEvent.Type.INFO, + "Warn: incoming message with missing field: 45", null); + + IServiceProxy serviceProxy = mock(IServiceProxy.class); + serviceListener.onEvent(serviceProxy, serviceEvent); + + Event event = eventDispatcher.getEvent(); + var grpcEvent = event.toProto(null); + + String name = grpcEvent.getName(); + Assertions.assertEquals("Service [serviceName] emitted event with status INFO", name); + + String body = grpcEvent.getBody().toStringUtf8(); + Assertions.assertEquals("[{\"data\":\"Warn: incoming message with missing field: 45\",\"type\":\"message\"}]", body); + } + + + public static class MyEventDispatcher implements EventDispatcher { + + Event event; + + @Override + public void store(@NotNull EventHolder eventHolder) { + this.event = eventHolder.getEvent(); + } + + @Override + public void store(@NotNull Event event, @NotNull String parentId) { + this.event = event; + } + + public Event getEvent() { + return event; + } + } +}