From afeb2eb7ac3a9401a680a27cea754feb21bdea6f Mon Sep 17 00:00:00 2001 From: "sergey.sitnikov" Date: Mon, 30 May 2022 14:41:03 +0000 Subject: [PATCH 01/20] [TH2-3717] Implement ability to specify the group option --- build.gradle | 2 +- .../exactpro/th2/conn/ConnectivityMessage.java | 15 +++++++++------ .../com/exactpro/th2/conn/MicroserviceMain.java | 2 +- .../com/exactpro/th2/conn/ServiceListener.java | 7 +++++-- .../configuration/ConnectivityConfiguration.java | 6 ++++++ .../exactpro/th2/conn/TestConnectivityMessage.kt | 3 ++- 6 files changed, 24 insertions(+), 11 deletions(-) diff --git a/build.gradle b/build.gradle index 7e90ed1..0ce9a3c 100644 --- a/build.gradle +++ b/build.gradle @@ -66,7 +66,7 @@ compileTestKotlin { dependencies { api platform('com.exactpro.th2:bom:3.0.0') - implementation 'com.exactpro.th2:common:3.33.0' + implementation 'com.exactpro.th2:common:3.38.0-TH2-3578-2300290805-SNAPSHOT' implementation "com.exactpro.th2:sailfish-utils:3.8.0" implementation "org.slf4j:slf4j-log4j12" diff --git a/src/main/java/com/exactpro/th2/conn/ConnectivityMessage.java b/src/main/java/com/exactpro/th2/conn/ConnectivityMessage.java index a45a31f..abb75b6 100644 --- a/src/main/java/com/exactpro/th2/conn/ConnectivityMessage.java +++ b/src/main/java/com/exactpro/th2/conn/ConnectivityMessage.java @@ -55,12 +55,12 @@ public class ConnectivityMessage { private final MessageID messageID; private final Timestamp timestamp; - public ConnectivityMessage(List sailfishMessages, String sessionAlias, Direction direction, long sequence) { + public ConnectivityMessage(List sailfishMessages, String sessionAlias, Direction direction, long sequence, String sessionGroup) { this.sailfishMessages = Collections.unmodifiableList(requireNonNull(sailfishMessages, "Message can't be null")); if (sailfishMessages.isEmpty()) { throw new IllegalArgumentException("At least one sailfish messages must be passed. Session alias: " + sessionAlias + "; Direction: " + direction); } - messageID = createMessageID(createConnectionID(requireNonNull(sessionAlias, "Session alias can't be null")), + messageID = createMessageID(createConnectionID(requireNonNull(sessionAlias, "Session alias can't be null"), sessionGroup), requireNonNull(direction, "Direction can't be null"), sequence); timestamp = createTimestamp(sailfishMessages.get(0).getMetaData().getMsgTimestamp().getTime()); } @@ -139,10 +139,13 @@ private static int calculateTotalBodySize(Collection messages) { }).sum(); } - private static ConnectionID createConnectionID(String sessionAlias) { - return ConnectionID.newBuilder() - .setSessionAlias(sessionAlias) - .build(); + private static ConnectionID createConnectionID(String sessionAlias, String sessionGroup) { + ConnectionID.Builder builder = ConnectionID.newBuilder() + .setSessionAlias(sessionAlias); + if (sessionGroup != null) { + builder.setSessionGroup(sessionGroup); + } + return builder.build(); } private static MessageID createMessageID(ConnectionID connectionId, Direction direction, long sequence) { diff --git a/src/main/java/com/exactpro/th2/conn/MicroserviceMain.java b/src/main/java/com/exactpro/th2/conn/MicroserviceMain.java index d6bbd34..f4f3043 100644 --- a/src/main/java/com/exactpro/th2/conn/MicroserviceMain.java +++ b/src/main/java/com/exactpro/th2/conn/MicroserviceMain.java @@ -162,7 +162,7 @@ public static void main(String[] args) { EventType.SERVICE_EVENT, serviceEventsRoot.getId() )); - IServiceListener serviceListener = new ServiceListener(directionToSequence, configuration.getSessionAlias(), processor, eventDispatcher); + IServiceListener serviceListener = new ServiceListener(directionToSequence, configuration.getSessionAlias(), processor, eventDispatcher, configuration.getSessionGroup()); IServiceProxy serviceProxy = loadService(serviceFactory, factory, configuration, serviceListener); disposer.register(() -> { LOGGER.info("Stop service proxy"); diff --git a/src/main/java/com/exactpro/th2/conn/ServiceListener.java b/src/main/java/com/exactpro/th2/conn/ServiceListener.java index aa246d9..dde098c 100644 --- a/src/main/java/com/exactpro/th2/conn/ServiceListener.java +++ b/src/main/java/com/exactpro/th2/conn/ServiceListener.java @@ -74,6 +74,7 @@ public class ServiceListener implements IServiceListener { private final Map directionToSequence; private final String sessionAlias; + private final String sessionGroup; private final Subscriber subscriber; private final EventDispatcher eventDispatcher; @@ -81,12 +82,14 @@ public ServiceListener( Map directionToSequence, String sessionAlias, Subscriber subscriber, - EventDispatcher eventDispatcher + EventDispatcher eventDispatcher, + String sessionGroup ) { this.directionToSequence = requireNonNull(directionToSequence, "Map direction to sequence counter can't be null"); this.sessionAlias = requireNonNull(sessionAlias, "Session alias can't be null"); this.subscriber = requireNonNull(subscriber, "Subscriber can't be null"); this.eventDispatcher = requireNonNull(eventDispatcher, "Event dispatcher can't be null"); + this.sessionGroup = sessionGroup; } @Override @@ -161,6 +164,6 @@ public void onEvent(IServiceProxy service, ServiceEvent serviceEvent) { private ConnectivityMessage createConnectivityMessage(List messages, Direction direction, AtomicLong directionSeq) { long sequence = directionSeq.incrementAndGet(); LOGGER.debug("On message: direction '{}'; sequence '{}'; messages '{}'", direction, sequence, messages); - return new ConnectivityMessage(messages, sessionAlias, direction, sequence); + return new ConnectivityMessage(messages, sessionAlias, direction, sequence, sessionGroup); } } diff --git a/src/main/java/com/exactpro/th2/conn/configuration/ConnectivityConfiguration.java b/src/main/java/com/exactpro/th2/conn/configuration/ConnectivityConfiguration.java index 691017b..70befe8 100644 --- a/src/main/java/com/exactpro/th2/conn/configuration/ConnectivityConfiguration.java +++ b/src/main/java/com/exactpro/th2/conn/configuration/ConnectivityConfiguration.java @@ -37,6 +37,8 @@ public class ConnectivityConfiguration { @JsonProperty(required = true) private Map settings; + private String sessionGroup; + public boolean isEnableMessageSendingEvent() { return enableMessageSendingEvent; } @@ -64,4 +66,8 @@ public String getName() { public Map getSettings() { return settings; } + + public String getSessionGroup() { + return sessionGroup; + } } diff --git a/src/test/kotlin/com/exactpro/th2/conn/TestConnectivityMessage.kt b/src/test/kotlin/com/exactpro/th2/conn/TestConnectivityMessage.kt index c2261ae..df5a9f5 100644 --- a/src/test/kotlin/com/exactpro/th2/conn/TestConnectivityMessage.kt +++ b/src/test/kotlin/com/exactpro/th2/conn/TestConnectivityMessage.kt @@ -41,7 +41,8 @@ class TestConnectivityMessage { ), "test", Direction.SECOND, - 1L + 1L, + null ) val rawMessage: RawMessage = connectivityMessage.convertToProtoRawMessage() From 25b37792fca1eef20ed260ff8ff10e3f680921dd Mon Sep 17 00:00:00 2001 From: "sergey.sitnikov" Date: Fri, 10 Jun 2022 08:10:51 +0000 Subject: [PATCH 02/20] [TH2-3717] Add timestamp for multithreading --- .../th2/conn/ConnectivityMessage.java | 6 ++++-- .../exactpro/th2/conn/MicroserviceMain.java | 20 +++++++++++-------- 2 files changed, 16 insertions(+), 10 deletions(-) diff --git a/src/main/java/com/exactpro/th2/conn/ConnectivityMessage.java b/src/main/java/com/exactpro/th2/conn/ConnectivityMessage.java index abb75b6..9af8e23 100644 --- a/src/main/java/com/exactpro/th2/conn/ConnectivityMessage.java +++ b/src/main/java/com/exactpro/th2/conn/ConnectivityMessage.java @@ -62,7 +62,9 @@ public ConnectivityMessage(List sailfishMessages, String sessionAlias, } messageID = createMessageID(createConnectionID(requireNonNull(sessionAlias, "Session alias can't be null"), sessionGroup), requireNonNull(direction, "Direction can't be null"), sequence); - timestamp = createTimestamp(sailfishMessages.get(0).getMetaData().getMsgTimestamp().getTime()); + long time = System.currentTimeMillis(); + LOGGER.info("SF time: {}. Current time: {}", sailfishMessages.get(0).getMetaData().getMsgTimestamp().getTime(), time); + timestamp = createTimestamp(time); } public String getSessionAlias() { @@ -122,7 +124,7 @@ public List getSailfishMessages() { return sailfishMessages; } - @Override + @Override public String toString() { return new ToStringBuilder(this) .append("messageID", shortDebugString(messageID)) diff --git a/src/main/java/com/exactpro/th2/conn/MicroserviceMain.java b/src/main/java/com/exactpro/th2/conn/MicroserviceMain.java index f4f3043..f7a7221 100644 --- a/src/main/java/com/exactpro/th2/conn/MicroserviceMain.java +++ b/src/main/java/com/exactpro/th2/conn/MicroserviceMain.java @@ -30,12 +30,14 @@ import java.io.IOException; import java.io.InputStream; import java.time.Instant; +import java.util.Comparator; import java.util.Deque; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -222,23 +224,25 @@ private static Object getParamValue(ISettingsProxy settings, String parameterNam )) .observeOn(PIPELINE_SCHEDULER) .doOnNext(connectivityMessage -> LOGGER.debug("Start handling connectivity message {}", connectivityMessage)) - .groupBy(ConnectivityMessage::getDirection) + .groupBy(ConnectivityMessage::getSessionAlias) .map(group -> { - @NonNull Direction direction = requireNonNull(group.getKey(), "Direction can't be null"); + AtomicReference<@NonNull Direction> direction = new AtomicReference<>(); Flowable messageConnectable = group - .doOnNext(message -> LOGGER.trace( + .doOnNext(message -> { + LOGGER.trace( "Message inside map with sequence {} and direction {}", message.getSequence(), - message.getDirection() - )) + message.getDirection()); + direction.set(requireNonNull(message.getDirection(), "Direction can't be null")); + }) .doOnCancel(terminateFlowable) // This call is required for terminate the publisher and prevent creation another group .publish() - .refCount(enableMessageSendingEvent && direction == Direction.SECOND ? 2 : 1); + .refCount(enableMessageSendingEvent && direction.get() == Direction.SECOND ? 2 : 1); - if (enableMessageSendingEvent && direction == Direction.SECOND) { + if (enableMessageSendingEvent && direction.get() == Direction.SECOND) { subscribeToSendMessage(eventBatchRouter, messageConnectable); } - createPackAndPublishPipeline(direction, messageConnectable, rawMessageRouter, maxMessageBatchSize); + createPackAndPublishPipeline(direction.get(), messageConnectable, rawMessageRouter, maxMessageBatchSize); return messageConnectable; }); From 4d9530d5da36180b2a2111ad49fc76ab20b04f33 Mon Sep 17 00:00:00 2001 From: "sergey.sitnikov" Date: Tue, 14 Jun 2022 08:15:45 +0000 Subject: [PATCH 03/20] [TH2-3717] Update timestamp to preserve order --- .../java/com/exactpro/th2/conn/ConnectivityMessage.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/exactpro/th2/conn/ConnectivityMessage.java b/src/main/java/com/exactpro/th2/conn/ConnectivityMessage.java index 9af8e23..e1de4e9 100644 --- a/src/main/java/com/exactpro/th2/conn/ConnectivityMessage.java +++ b/src/main/java/com/exactpro/th2/conn/ConnectivityMessage.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2021 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2022 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -62,9 +62,8 @@ public ConnectivityMessage(List sailfishMessages, String sessionAlias, } messageID = createMessageID(createConnectionID(requireNonNull(sessionAlias, "Session alias can't be null"), sessionGroup), requireNonNull(direction, "Direction can't be null"), sequence); - long time = System.currentTimeMillis(); - LOGGER.info("SF time: {}. Current time: {}", sailfishMessages.get(0).getMetaData().getMsgTimestamp().getTime(), time); - timestamp = createTimestamp(time); + LOGGER.warn("Sailfish transforms th2 message to real send message too slow, delay is about 10 milliseconds. Please add more hardware resources"); + timestamp = createTimestamp(System.currentTimeMillis()); } public String getSessionAlias() { From e423cd5d352ec7f214c4920688ee7ad7b52d01b6 Mon Sep 17 00:00:00 2001 From: "sergey.sitnikov" Date: Tue, 14 Jun 2022 11:24:27 +0000 Subject: [PATCH 04/20] [TH2-3717] Update pipeline creation --- .../exactpro/th2/conn/MicroserviceMain.java | 23 ++++++++++--------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/src/main/java/com/exactpro/th2/conn/MicroserviceMain.java b/src/main/java/com/exactpro/th2/conn/MicroserviceMain.java index f7a7221..aeeaf5b 100644 --- a/src/main/java/com/exactpro/th2/conn/MicroserviceMain.java +++ b/src/main/java/com/exactpro/th2/conn/MicroserviceMain.java @@ -216,25 +216,26 @@ private static Object getParamValue(ISettingsProxy settings, String parameterNam int maxMessageBatchSize, boolean enableMessageSendingEvent) { LOGGER.info("AvailableProcessors '{}'", Runtime.getRuntime().availableProcessors()); + AtomicReference<@NonNull Direction> direction = new AtomicReference<>(); return flowable - .doOnNext(message -> LOGGER.trace( - "Message before observeOn with sequence {} and direction {}", - message.getSequence(), - message.getDirection() - )) + .doOnNext(message -> { + LOGGER.trace( + "Message before observeOn with sequence {} and direction {}", + message.getSequence(), + message.getDirection() + ); + direction.set(requireNonNull(message.getDirection(), "Direction can't be null")); + }) .observeOn(PIPELINE_SCHEDULER) .doOnNext(connectivityMessage -> LOGGER.debug("Start handling connectivity message {}", connectivityMessage)) .groupBy(ConnectivityMessage::getSessionAlias) .map(group -> { - AtomicReference<@NonNull Direction> direction = new AtomicReference<>(); Flowable messageConnectable = group - .doOnNext(message -> { - LOGGER.trace( + .doOnNext(message -> LOGGER.trace( "Message inside map with sequence {} and direction {}", message.getSequence(), - message.getDirection()); - direction.set(requireNonNull(message.getDirection(), "Direction can't be null")); - }) + message.getDirection()) + ) .doOnCancel(terminateFlowable) // This call is required for terminate the publisher and prevent creation another group .publish() .refCount(enableMessageSendingEvent && direction.get() == Direction.SECOND ? 2 : 1); From d011433a35f4441f0dcdeb202420b5d24c9b0622 Mon Sep 17 00:00:00 2001 From: "sergey.sitnikov" Date: Tue, 14 Jun 2022 15:15:03 +0000 Subject: [PATCH 05/20] [TH2-3717] Update batch creation --- .../exactpro/th2/conn/ConnectivityBatch.java | 3 +-- .../exactpro/th2/conn/MicroserviceMain.java | 25 ++++++++++--------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/src/main/java/com/exactpro/th2/conn/ConnectivityBatch.java b/src/main/java/com/exactpro/th2/conn/ConnectivityBatch.java index c5ed40b..e9c5c75 100644 --- a/src/main/java/com/exactpro/th2/conn/ConnectivityBatch.java +++ b/src/main/java/com/exactpro/th2/conn/ConnectivityBatch.java @@ -79,8 +79,7 @@ private static void checkMessages(List iMessages, String se } if (!iMessages.stream() - .allMatch(iMessage -> Objects.equals(sessionAlias, iMessage.getSessionAlias()) - && direction == iMessage.getDirection())) { + .allMatch(iMessage -> Objects.equals(sessionAlias, iMessage.getSessionAlias()))) { throw new IllegalArgumentException("List " + iMessages + " has elements with incorrect metadata, expected session alias '"+ sessionAlias +"' direction '" + direction + '\''); } diff --git a/src/main/java/com/exactpro/th2/conn/MicroserviceMain.java b/src/main/java/com/exactpro/th2/conn/MicroserviceMain.java index aeeaf5b..2533e91 100644 --- a/src/main/java/com/exactpro/th2/conn/MicroserviceMain.java +++ b/src/main/java/com/exactpro/th2/conn/MicroserviceMain.java @@ -216,7 +216,6 @@ private static Object getParamValue(ISettingsProxy settings, String parameterNam int maxMessageBatchSize, boolean enableMessageSendingEvent) { LOGGER.info("AvailableProcessors '{}'", Runtime.getRuntime().availableProcessors()); - AtomicReference<@NonNull Direction> direction = new AtomicReference<>(); return flowable .doOnNext(message -> { LOGGER.trace( @@ -224,17 +223,20 @@ private static Object getParamValue(ISettingsProxy settings, String parameterNam message.getSequence(), message.getDirection() ); - direction.set(requireNonNull(message.getDirection(), "Direction can't be null")); }) .observeOn(PIPELINE_SCHEDULER) .doOnNext(connectivityMessage -> LOGGER.debug("Start handling connectivity message {}", connectivityMessage)) .groupBy(ConnectivityMessage::getSessionAlias) .map(group -> { + AtomicReference<@NonNull Direction> direction = new AtomicReference<>(); Flowable messageConnectable = group - .doOnNext(message -> LOGGER.trace( - "Message inside map with sequence {} and direction {}", - message.getSequence(), - message.getDirection()) + .doOnNext(message -> { + LOGGER.trace( + "Message inside map with sequence {} and direction {}", + message.getSequence(), + message.getDirection()); + direction.set(message.getDirection()); + } ) .doOnCancel(terminateFlowable) // This call is required for terminate the publisher and prevent creation another group .publish() @@ -243,7 +245,7 @@ private static Object getParamValue(ISettingsProxy settings, String parameterNam if (enableMessageSendingEvent && direction.get() == Direction.SECOND) { subscribeToSendMessage(eventBatchRouter, messageConnectable); } - createPackAndPublishPipeline(direction.get(), messageConnectable, rawMessageRouter, maxMessageBatchSize); + createPackAndPublishPipeline(messageConnectable, rawMessageRouter, maxMessageBatchSize); return messageConnectable; }); @@ -277,10 +279,9 @@ private static void subscribeToSendMessage(MessageRouter eventBatchR }); } - private static void createPackAndPublishPipeline(Direction direction, Flowable messageConnectable, + private static void createPackAndPublishPipeline(Flowable messageConnectable, MessageRouter rawMessageRouter, int maxMessageBatchSize) { - LOGGER.info("Map group {}", direction); Flowable batchConnectable = messageConnectable .doOnNext(message -> LOGGER.trace( "Message before window with sequence {} and direction {}", @@ -305,7 +306,7 @@ private static void createPackAndPublishPipeline(Direction direction, Flowable { try { RawMessageBatch rawBatch = batch.convertToProtoRawBatch(); - rawMessageRouter.sendAll(rawBatch, (direction == Direction.FIRST ? QueueAttribute.FIRST : QueueAttribute.SECOND).toString()); + rawMessageRouter.send(rawBatch); //FIXME: Only one pin can be used } catch (Exception e) { if (LOGGER.isErrorEnabled()) { LOGGER.error("Cannot send batch with sequences: {}", @@ -314,9 +315,9 @@ private static void createPackAndPublishPipeline(Direction direction, Flowable Date: Wed, 15 Jun 2022 14:45:27 +0000 Subject: [PATCH 06/20] [TH2-3717] Update message checking inside batch --- .../exactpro/th2/conn/ConnectivityBatch.java | 29 +------------------ .../th2/conn/ConnectivityMessage.java | 6 ++-- 2 files changed, 5 insertions(+), 30 deletions(-) diff --git a/src/main/java/com/exactpro/th2/conn/ConnectivityBatch.java b/src/main/java/com/exactpro/th2/conn/ConnectivityBatch.java index e9c5c75..653f7da 100644 --- a/src/main/java/com/exactpro/th2/conn/ConnectivityBatch.java +++ b/src/main/java/com/exactpro/th2/conn/ConnectivityBatch.java @@ -82,34 +82,7 @@ private static void checkMessages(List iMessages, String se .allMatch(iMessage -> Objects.equals(sessionAlias, iMessage.getSessionAlias()))) { throw new IllegalArgumentException("List " + iMessages + " has elements with incorrect metadata, expected session alias '"+ sessionAlias +"' direction '" + direction + '\''); } - - if (LOGGER.isErrorEnabled()) { - boolean sequencesUnordered = false; - List missedSequences = new ArrayList<>(); - for (int index = 0; index < iMessages.size() - 1; index++) { - long nextExpectedSequence = iMessages.get(index).getSequence() + 1; - long nextSequence = iMessages.get(index + 1).getSequence(); - if (nextExpectedSequence != nextSequence) { - sequencesUnordered = true; - } - LongStream.range(nextExpectedSequence, nextSequence).forEach(missedSequences::add); - } - if (sequencesUnordered) { - LOGGER.error( - "List {} hasn't elements with incremental sequence with one step for session alias '{}' and direction '{}'{}", - iMessages.stream() - .map(ConnectivityMessage::getSequence) - .collect(Collectors.toList()), - sessionAlias, - direction, - missedSequences.isEmpty() - ? "" - : String.format(". Missed sequences %s", missedSequences) - ); - } - } - - // FIXME: Replace logging to thowing exception after solving message reordering problem + // FIXME: Replace logging to thowing exception after solving message reordering problem // throw new IllegalArgumentException("List " + iMessages.stream() // .map(ConnectivityMessage::getSequence) // .collect(Collectors.toList())+ " hasn't elements with incremental sequence with one step"); diff --git a/src/main/java/com/exactpro/th2/conn/ConnectivityMessage.java b/src/main/java/com/exactpro/th2/conn/ConnectivityMessage.java index e1de4e9..89fecf6 100644 --- a/src/main/java/com/exactpro/th2/conn/ConnectivityMessage.java +++ b/src/main/java/com/exactpro/th2/conn/ConnectivityMessage.java @@ -62,8 +62,10 @@ public ConnectivityMessage(List sailfishMessages, String sessionAlias, } messageID = createMessageID(createConnectionID(requireNonNull(sessionAlias, "Session alias can't be null"), sessionGroup), requireNonNull(direction, "Direction can't be null"), sequence); - LOGGER.warn("Sailfish transforms th2 message to real send message too slow, delay is about 10 milliseconds. Please add more hardware resources"); - timestamp = createTimestamp(System.currentTimeMillis()); + long time = System.currentTimeMillis(); + LOGGER.warn("Sailfish transforms th2 message to real send message too slow, delay is about {} milliseconds. Please add more hardware resources", + time - sailfishMessages.get(0).getMetaData().getMsgTimestamp().getTime()); + timestamp = createTimestamp(time); } public String getSessionAlias() { From 4613de9c1524eeaf38e529b6db3e0721fcb90b49 Mon Sep 17 00:00:00 2001 From: "sergey.sitnikov" Date: Thu, 16 Jun 2022 12:20:59 +0000 Subject: [PATCH 07/20] [TH2-3717] Update message logging --- .../java/com/exactpro/th2/conn/ConnectivityMessage.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/exactpro/th2/conn/ConnectivityMessage.java b/src/main/java/com/exactpro/th2/conn/ConnectivityMessage.java index 89fecf6..e1de4e9 100644 --- a/src/main/java/com/exactpro/th2/conn/ConnectivityMessage.java +++ b/src/main/java/com/exactpro/th2/conn/ConnectivityMessage.java @@ -62,10 +62,8 @@ public ConnectivityMessage(List sailfishMessages, String sessionAlias, } messageID = createMessageID(createConnectionID(requireNonNull(sessionAlias, "Session alias can't be null"), sessionGroup), requireNonNull(direction, "Direction can't be null"), sequence); - long time = System.currentTimeMillis(); - LOGGER.warn("Sailfish transforms th2 message to real send message too slow, delay is about {} milliseconds. Please add more hardware resources", - time - sailfishMessages.get(0).getMetaData().getMsgTimestamp().getTime()); - timestamp = createTimestamp(time); + LOGGER.warn("Sailfish transforms th2 message to real send message too slow, delay is about 10 milliseconds. Please add more hardware resources"); + timestamp = createTimestamp(System.currentTimeMillis()); } public String getSessionAlias() { From 3cba4b9120f54e764fb2a15dcd33836b261ef94c Mon Sep 17 00:00:00 2001 From: Dmitriy-Yugay Date: Fri, 18 Mar 2022 10:37:56 +0400 Subject: [PATCH 08/20] moved error message from the name to the body of the event --- .../exactpro/th2/conn/ServiceListener.java | 5 +- .../th2/conn/TestServiceListener.java | 91 +++++++++++++++++++ 2 files changed, 95 insertions(+), 1 deletion(-) create mode 100644 src/test/java/com/exactpro/th2/conn/TestServiceListener.java diff --git a/src/main/java/com/exactpro/th2/conn/ServiceListener.java b/src/main/java/com/exactpro/th2/conn/ServiceListener.java index dde098c..1d82d0d 100644 --- a/src/main/java/com/exactpro/th2/conn/ServiceListener.java +++ b/src/main/java/com/exactpro/th2/conn/ServiceListener.java @@ -25,6 +25,7 @@ import com.exactpro.sf.services.ServiceHandlerRoute; import com.exactpro.th2.common.event.Event; import com.exactpro.th2.common.event.Event.Status; +import com.exactpro.th2.common.event.EventUtils; import com.exactpro.th2.conn.events.EventDispatcher; import com.exactpro.th2.conn.events.EventHolder; import com.exactpro.th2.conn.utility.EventStoreExtensions; @@ -147,11 +148,13 @@ public void onMessage(IServiceProxy service, IMessage message, boolean rejected, @Override public void onEvent(IServiceProxy service, ServiceEvent serviceEvent) { LOGGER.info("Session '{}' emitted service event '{}'", sessionAlias, serviceEvent); + String eventName = "Service [" + serviceEvent.getServiceName().getServiceName() + "] emitted event with status " + serviceEvent.getLevel(); try { Event event = Event.start().endTimestamp() - .name(serviceEvent.getMessage()) + .name(eventName) .status(serviceEvent.getLevel() == Level.ERROR ? Status.FAILED : Status.PASSED) .type("Service event") + .bodyData(EventUtils.createMessageBean(serviceEvent.getMessage())) .description(serviceEvent.getDetails()); eventDispatcher.store(EventHolder.createServiceEvent(event)); diff --git a/src/test/java/com/exactpro/th2/conn/TestServiceListener.java b/src/test/java/com/exactpro/th2/conn/TestServiceListener.java new file mode 100644 index 0000000..bb1fc1f --- /dev/null +++ b/src/test/java/com/exactpro/th2/conn/TestServiceListener.java @@ -0,0 +1,91 @@ +/* + * Copyright 2022 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.exactpro.th2.conn; + +import com.exactpro.sf.common.services.ServiceName; +import com.exactpro.sf.externalapi.IServiceProxy; +import com.exactpro.sf.services.ServiceEvent; +import com.exactpro.sf.services.ServiceEventFactory; +import com.exactpro.th2.common.event.Event; +import com.exactpro.th2.common.grpc.Direction; +import com.exactpro.th2.conn.events.EventDispatcher; +import com.exactpro.th2.conn.events.EventHolder; +import com.fasterxml.jackson.core.JsonProcessingException; +import io.reactivex.rxjava3.processors.FlowableProcessor; +import io.reactivex.rxjava3.processors.UnicastProcessor; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +import static org.mockito.Mockito.mock; + +public class TestServiceListener { + + + @Test + public void onEventTest() throws JsonProcessingException { + + FlowableProcessor processor = UnicastProcessor.create(); + MyEventDispatcher eventDispatcher = new MyEventDispatcher(); + + ServiceListener serviceListener = new ServiceListener(Map.of(Direction.FIRST, new AtomicLong(1)), + "SessionAlias", processor, eventDispatcher); + + ServiceEvent serviceEvent = ServiceEventFactory.createEventInfo(ServiceName.parse("serviceName"), ServiceEvent.Type.INFO, + "Warn: incoming message with missing field: 45: Required " + + "tag missing, field=45: 8=FIXT.1.1\0019=112\00135=j\00134=3783\00149=FGW" + + "\00152=20210203-12:30:48.238\00156=DEMO-CONN1\00158=Unknown SecurityID" + + "\001371=48\001372=D\001379=9741113\001380=2\00110=019\001", null); + + IServiceProxy serviceProxy = mock(IServiceProxy.class); + serviceListener.onEvent(serviceProxy, serviceEvent); + + Event event = eventDispatcher.getEvent(); + com.exactpro.th2.common.grpc.Event grpcEvent = event.toProto(null); + + String name = grpcEvent.getName(); + Assertions.assertEquals("Service [serviceName] emitted event with status INFO", name); + + String body = grpcEvent.getBody().toStringUtf8(); + Assertions.assertEquals("[{\"data\":\"Warn: incoming message with missing field: 45: Required " + + "tag missing, field=45: 8=FIXT.1.1\\u00019=112\\u000135=j\\u000134=3783\\u000149=FGW" + + "\\u000152=20210203-12:30:48.238\\u000156=DEMO-CONN1\\u000158=Unknown SecurityID" + + "\\u0001371=48\\u0001372=D\\u0001379=9741113\\u0001380=2\\u000110=019\\u0001\",\"type\":\"message\"}]", body); + } + + + public static class MyEventDispatcher implements EventDispatcher { + + Event event; + + @Override + public void store(@NotNull EventHolder eventHolder) { + this.event = eventHolder.getEvent(); + } + + @Override + public void store(@NotNull Event event, @NotNull String parentId) { + this.event = event; + } + + public Event getEvent() { + return event; + } + } +} From 0bc7b3f83693eb5cd667d7ec6054da2b33f3b56e Mon Sep 17 00:00:00 2001 From: Dmitriy-Yugay Date: Fri, 25 Mar 2022 18:31:23 +0400 Subject: [PATCH 09/20] minor fixes --- README.md | 6 +++++- gradle.properties | 2 +- .../com/exactpro/th2/conn/TestServiceListener.java | 12 +++--------- 3 files changed, 9 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index 3e2e403..8986d1f 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# Connect (3.10.1) +# Connect (3.10.2) The "Connect" component is responsible for the communication with a target system. This component implements the logic of the interaction protocol, receiving and sending messages from and to the system, respectively. @@ -119,6 +119,10 @@ spec: ## Release notes +### 3.10.2 + ++ Events are made more convenient. Error message moved from the name to the body of the event. + ### 3.10.1 + Update `sailfish-core` version from `3.2.1674` to `3.2.1741` diff --git a/gradle.properties b/gradle.properties index 45efc1f..36f4a18 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1 +1 @@ -release_version = 3.10.1 \ No newline at end of file +release_version = 3.10.2 \ No newline at end of file diff --git a/src/test/java/com/exactpro/th2/conn/TestServiceListener.java b/src/test/java/com/exactpro/th2/conn/TestServiceListener.java index bb1fc1f..18b4bec 100644 --- a/src/test/java/com/exactpro/th2/conn/TestServiceListener.java +++ b/src/test/java/com/exactpro/th2/conn/TestServiceListener.java @@ -48,25 +48,19 @@ public void onEventTest() throws JsonProcessingException { "SessionAlias", processor, eventDispatcher); ServiceEvent serviceEvent = ServiceEventFactory.createEventInfo(ServiceName.parse("serviceName"), ServiceEvent.Type.INFO, - "Warn: incoming message with missing field: 45: Required " + - "tag missing, field=45: 8=FIXT.1.1\0019=112\00135=j\00134=3783\00149=FGW" + - "\00152=20210203-12:30:48.238\00156=DEMO-CONN1\00158=Unknown SecurityID" + - "\001371=48\001372=D\001379=9741113\001380=2\00110=019\001", null); + "Warn: incoming message with missing field: 45", null); IServiceProxy serviceProxy = mock(IServiceProxy.class); serviceListener.onEvent(serviceProxy, serviceEvent); Event event = eventDispatcher.getEvent(); - com.exactpro.th2.common.grpc.Event grpcEvent = event.toProto(null); + var grpcEvent = event.toProto(null); String name = grpcEvent.getName(); Assertions.assertEquals("Service [serviceName] emitted event with status INFO", name); String body = grpcEvent.getBody().toStringUtf8(); - Assertions.assertEquals("[{\"data\":\"Warn: incoming message with missing field: 45: Required " + - "tag missing, field=45: 8=FIXT.1.1\\u00019=112\\u000135=j\\u000134=3783\\u000149=FGW" + - "\\u000152=20210203-12:30:48.238\\u000156=DEMO-CONN1\\u000158=Unknown SecurityID" + - "\\u0001371=48\\u0001372=D\\u0001379=9741113\\u0001380=2\\u000110=019\\u0001\",\"type\":\"message\"}]", body); + Assertions.assertEquals("[{\"data\":\"Warn: incoming message with missing field: 45\",\"type\":\"message\"}]", body); } From 5e32660a0ee4851a6b861426a7b7e367cfede0af Mon Sep 17 00:00:00 2001 From: Dmitriy-Yugay <86191842+Dmitriy-Yugay@users.noreply.github.com> Date: Wed, 20 Apr 2022 10:31:05 +0400 Subject: [PATCH 10/20] Th2 1881 2 (#123) * updated Readme * changed dummy implementation of MessageRouter on Mock * changed dummy eventDispatcher on Mock * changed dummy implementation of MessageRouter on Mock * changed dummy eventDispatcher on Mock * minor fixes Co-authored-by: Oleg Smirnov --- README.md | 2 +- .../com/exactpro/th2/conn/MessageSender.java | 8 +- .../java/com/exactpro/th2/conn/TestEvent.java | 153 ++++++++++++++++++ 3 files changed, 159 insertions(+), 4 deletions(-) create mode 100644 src/test/java/com/exactpro/th2/conn/TestEvent.java diff --git a/README.md b/README.md index 8986d1f..b6d7352 100644 --- a/README.md +++ b/README.md @@ -121,7 +121,7 @@ spec: ### 3.10.2 -+ Events are made more convenient. Error message moved from the name to the body of the event. ++ Events are made more convenient. Added event names and error logs. Error message moved from the name to the body of the event. ### 3.10.1 diff --git a/src/main/java/com/exactpro/th2/conn/MessageSender.java b/src/main/java/com/exactpro/th2/conn/MessageSender.java index 15c7e19..044bf70 100644 --- a/src/main/java/com/exactpro/th2/conn/MessageSender.java +++ b/src/main/java/com/exactpro/th2/conn/MessageSender.java @@ -103,7 +103,7 @@ private void sendMessage(RawMessage protoMsg) throws InterruptedException { logger.debug("Message sent. Base64 view: {}", Base64.getEncoder().encodeToString(data)); } } catch (Exception ex) { - Event errorEvent = createErrorEvent("SendError") + Event errorEvent = createErrorEvent("SendError", ex) .bodyData(EventUtils.createMessageBean("Cannot send message. Message body in base64:")) .bodyData(EventUtils.createMessageBean(Base64.getEncoder().encodeToString(data))); EventStoreExtensions.addException(errorEvent, ex); @@ -124,10 +124,12 @@ private void storeErrorEvent(Event errorEvent, @Nullable EventID parentId) { } } - private Event createErrorEvent(String eventType) { + private Event createErrorEvent(String eventType, Exception e) { return Event.start().endTimestamp() .status(Status.FAILED) - .type(eventType); + .type(eventType) + .name("Failed to send raw message") + .exception(e, true); } private IMetadata toSailfishMetadata(RawMessage protoMsg) { diff --git a/src/test/java/com/exactpro/th2/conn/TestEvent.java b/src/test/java/com/exactpro/th2/conn/TestEvent.java new file mode 100644 index 0000000..555da05 --- /dev/null +++ b/src/test/java/com/exactpro/th2/conn/TestEvent.java @@ -0,0 +1,153 @@ +/* + * Copyright 2022 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.exactpro.th2.conn; + +import com.exactpro.sf.externalapi.IServiceProxy; +import com.exactpro.th2.common.event.Event; +import com.exactpro.th2.common.event.EventUtils; +import com.exactpro.th2.common.grpc.EventBatch; +import com.exactpro.th2.common.grpc.EventID; +import com.exactpro.th2.common.grpc.RawMessage; +import com.exactpro.th2.common.grpc.RawMessageBatch; +import com.exactpro.th2.common.schema.message.MessageListener; +import com.exactpro.th2.common.schema.message.MessageRouter; +import com.exactpro.th2.common.schema.message.SubscriberMonitor; +import com.exactpro.th2.conn.events.EventDispatcher; +import com.exactpro.th2.conn.events.EventHolder; +import com.exactpro.th2.conn.events.EventType; +import com.google.protobuf.ByteString; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.Map; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; + +public class TestEvent { + + private static final String rootID = "rootID"; + private static final Map parentIds = Map.of(EventType.ERROR, "errorEventID"); + + private static IServiceProxy serviceProxy; + private static EventDispatcher eventDispatcher; + private static MessageSender messageSender; + private static MessageListener messageListener; + private static Event event; + private static String parentId; + + @BeforeAll + public static void initMessages() throws IOException { + serviceProxy = mock(IServiceProxy.class); + MessageRouter router = mock(MessageRouter.class); + + doAnswer(invocation -> { + messageListener = invocation.getArgument(0); + return (SubscriberMonitor) () -> { + }; + }).when(router).subscribeAll(any(), any()); + + eventDispatcher = mock(EventDispatcher.class); + doAnswer(invocation -> { + EventHolder eventHolder = invocation.getArgument(0); + EventType eventType = eventHolder.getType(); + + eventDispatcher.store(eventHolder.getEvent(), + parentIds.get(eventType) != null ? parentIds.get(eventType) : rootID); + return null; + }).when(eventDispatcher).store(any()); + + doAnswer(invocation -> { + event = invocation.getArgument(0); + parentId = invocation.getArgument(1); + return null; + }).when(eventDispatcher).store(any(), any()); + + messageSender = new MessageSender(serviceProxy, router, eventDispatcher, + EventID.newBuilder().setId("stubID").build()); + messageSender.start(); + } + + @AfterEach + public void clear() { + event = null; + parentId = null; + } + + public void sendIncorrectMessage() throws Exception { + RawMessageBatch rawMessageBatch = RawMessageBatch.newBuilder() + .addMessages(RawMessage.newBuilder().build()) + .build(); + + doThrow(new IllegalStateException("error")).when(serviceProxy).sendRaw(any(), any()); + messageListener.handler("stubValue", rawMessageBatch); + } + + @Test + public void eventHasBodyTest() throws Exception { + sendIncorrectMessage(); + + ByteString body = event.toProto(EventUtils.toEventID(parentId)).getBody(); + Assertions.assertEquals("[{\"data\":\"java.lang.IllegalStateException: error\",\"type\":\"message\"}," + + "{\"data\":\"Cannot send message. Message body in base64:\",\"type\":\"message\"},{\"data\":\"\"," + + "\"type\":\"message\"},{\"data\":\"java.lang.IllegalStateException: error\",\"type\":\"message\"}]", body.toStringUtf8()); + } + + @Test + public void eventHasNameTest() throws Exception { + sendIncorrectMessage(); + + String name = event.toProto(EventUtils.toEventID(parentId)).getName(); + Assertions.assertEquals("Failed to send raw message", name); + } + + @Test + public void sentMessageWithParentEventIDTest() throws Exception { + RawMessageBatch rawMessageBatch = RawMessageBatch.newBuilder() + .addMessages(RawMessage.newBuilder() + .setParentEventId(EventID.newBuilder() + .setId("RawMessageParentEventID")).build()) + .build(); + + doThrow(new IllegalStateException("error")).when(serviceProxy).sendRaw(any(), any()); + messageListener.handler("stubValue", rawMessageBatch); + + event.addSubEvent(Event.start()); + + EventBatch eventBatch = event.toBatchProto(EventUtils.toEventID(parentId)); + Assertions.assertEquals("RawMessageParentEventID", eventBatch.getParentEventId().getId()); + } + + @Test + public void sentMessageWithoutParentEventIDTest() throws Exception { + sendIncorrectMessage(); + event.addSubEvent(Event.start()); + + EventBatch eventBatch = event.toBatchProto(EventUtils.toEventID(parentId)); + Assertions.assertEquals("errorEventID", eventBatch.getParentEventId().getId()); + } + + @AfterAll + private static void close() throws IOException { + messageSender.stop(); + } +} \ No newline at end of file From 0b5789199c3ce287faa094ae76f14e17bc13df96 Mon Sep 17 00:00:00 2001 From: "sergey.sitnikov" Date: Thu, 16 Jun 2022 12:38:59 +0000 Subject: [PATCH 11/20] [TH2-3717] Update project version and README file --- README.md | 27 ++++++++++++++++----------- gradle.properties | 2 +- 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index b6d7352..0e689a1 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# Connect (3.10.2) +# Connect (3.11.0) The "Connect" component is responsible for the communication with a target system. This component implements the logic of the interaction protocol, receiving and sending messages from and to the system, respectively. @@ -17,6 +17,7 @@ session-alias: "connectivity-alias" workspace: "/home/sailfish/workspace" type: "th2_service:Your_Service_Type" name: "your_service" +sessionGroup "group" settings: param1: "value1" ``` @@ -76,13 +77,13 @@ You need to perform the following steps: ## Pins -Connect has 2 types of pins for interacting with th2 components. -Messages that were received from / sent to the target system will be sent to the following queues: +Connect has only 1 type of pins for interacting with th2 components. +Messages that were received from / sent to the target system will be sent to the following queue: -- incoming raw messages -- outgoing raw messages +- sended raw messages -The "Connect" component uses a separate queue to send messages. The component subscribes to that pin at the start and waits for the messages. + +The "Connect" component uses a queue to send messages. The component subscribes to that pin at the start and waits for the messages. The messages received from that pin will be sent to the target system. Also, this component is responsible for maintaining connections and sessions in the cases where this is provided by the communication protocol. Here you can automatically send heartbeat messages, send a logon/logout, requests to retransmit messages in the event of a gap, etc. @@ -101,17 +102,15 @@ spec: workspace: "/home/sailfish/workspace" type: "th2_service:Your_Service_Type" name: "your_service" + sessionGroup: "group" maxMessageBatchSize: 100 enableMessageSendingEvent: true settings: param1: "value1" pins: - - name: in_raw - connection-type: mq - attributes: ["first", "raw", "publish", "store"] - - name: out_raw + - name: send_raw connection-type: mq - attributes: ["second", "raw", "publish", "store"] + attributes: ["raw", "publish", "store"] - name: to_send connection-type: mq attributes: ["send", "raw", "subscribe"] @@ -119,6 +118,12 @@ spec: ## Release notes +### 3.11.0 + ++ Add session group support from th2:common version 3.38.0-TH2-3578-2300290805-SNAPSHOT. ++ Replace 2 queues with in/out pins to one queue. ++ Messages are not grouped by direction, both direction publish together. + ### 3.10.2 + Events are made more convenient. Added event names and error logs. Error message moved from the name to the body of the event. diff --git a/gradle.properties b/gradle.properties index 36f4a18..b39c5da 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1 +1 @@ -release_version = 3.10.2 \ No newline at end of file +release_version = 3.11.0 \ No newline at end of file From b2c62c974080274e31a432144831c6b8f90eec35 Mon Sep 17 00:00:00 2001 From: "sergey.sitnikov" Date: Thu, 16 Jun 2022 12:43:20 +0000 Subject: [PATCH 12/20] [TH2-3717] Update imports --- src/main/java/com/exactpro/th2/conn/MicroserviceMain.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/main/java/com/exactpro/th2/conn/MicroserviceMain.java b/src/main/java/com/exactpro/th2/conn/MicroserviceMain.java index 2533e91..f5727eb 100644 --- a/src/main/java/com/exactpro/th2/conn/MicroserviceMain.java +++ b/src/main/java/com/exactpro/th2/conn/MicroserviceMain.java @@ -20,7 +20,6 @@ import static com.exactpro.th2.conn.utility.SailfishMetadataExtensions.contains; import static com.exactpro.th2.conn.utility.SailfishMetadataExtensions.getParentEventID; import static io.reactivex.rxjava3.plugins.RxJavaPlugins.createSingleScheduler; -import static java.util.Objects.requireNonNull; import static org.apache.commons.lang.StringUtils.containsIgnoreCase; import static org.apache.commons.lang.StringUtils.repeat; import static org.apache.commons.lang3.ClassUtils.primitiveToWrapper; @@ -30,7 +29,6 @@ import java.io.IOException; import java.io.InputStream; import java.time.Instant; -import java.util.Comparator; import java.util.Deque; import java.util.Map; import java.util.Map.Entry; @@ -67,7 +65,6 @@ import com.exactpro.th2.common.grpc.RawMessageBatch; import com.exactpro.th2.common.schema.factory.CommonFactory; import com.exactpro.th2.common.schema.message.MessageRouter; -import com.exactpro.th2.common.schema.message.QueueAttribute; import com.exactpro.th2.conn.configuration.ConnectivityConfiguration; import com.exactpro.th2.conn.events.EventDispatcher; import com.exactpro.th2.conn.events.EventType; From cd8b0888d76d12402eb22faedc55b0bb022d3f4f Mon Sep 17 00:00:00 2001 From: "sergey.sitnikov" Date: Thu, 16 Jun 2022 12:47:47 +0000 Subject: [PATCH 13/20] [TH2-3717] Add group to TestServiceListener --- src/test/java/com/exactpro/th2/conn/TestServiceListener.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/com/exactpro/th2/conn/TestServiceListener.java b/src/test/java/com/exactpro/th2/conn/TestServiceListener.java index 18b4bec..e4865d6 100644 --- a/src/test/java/com/exactpro/th2/conn/TestServiceListener.java +++ b/src/test/java/com/exactpro/th2/conn/TestServiceListener.java @@ -45,7 +45,7 @@ public void onEventTest() throws JsonProcessingException { MyEventDispatcher eventDispatcher = new MyEventDispatcher(); ServiceListener serviceListener = new ServiceListener(Map.of(Direction.FIRST, new AtomicLong(1)), - "SessionAlias", processor, eventDispatcher); + "SessionAlias", processor, eventDispatcher, null); ServiceEvent serviceEvent = ServiceEventFactory.createEventInfo(ServiceName.parse("serviceName"), ServiceEvent.Type.INFO, "Warn: incoming message with missing field: 45", null); From ea0b7a7327517d8306fcb076e4ac43ab9ffc11fa Mon Sep 17 00:00:00 2001 From: "sergey.sitnikov" Date: Thu, 16 Jun 2022 18:11:59 +0000 Subject: [PATCH 14/20] [TH2-3717] Fix formatting --- src/main/java/com/exactpro/th2/conn/ConnectivityMessage.java | 2 +- src/main/java/com/exactpro/th2/conn/ServiceListener.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/exactpro/th2/conn/ConnectivityMessage.java b/src/main/java/com/exactpro/th2/conn/ConnectivityMessage.java index e1de4e9..3a8f9cd 100644 --- a/src/main/java/com/exactpro/th2/conn/ConnectivityMessage.java +++ b/src/main/java/com/exactpro/th2/conn/ConnectivityMessage.java @@ -123,7 +123,7 @@ public List getSailfishMessages() { return sailfishMessages; } - @Override + @Override public String toString() { return new ToStringBuilder(this) .append("messageID", shortDebugString(messageID)) diff --git a/src/main/java/com/exactpro/th2/conn/ServiceListener.java b/src/main/java/com/exactpro/th2/conn/ServiceListener.java index 1d82d0d..c379f06 100644 --- a/src/main/java/com/exactpro/th2/conn/ServiceListener.java +++ b/src/main/java/com/exactpro/th2/conn/ServiceListener.java @@ -84,7 +84,7 @@ public ServiceListener( String sessionAlias, Subscriber subscriber, EventDispatcher eventDispatcher, - String sessionGroup + String sessionGroup ) { this.directionToSequence = requireNonNull(directionToSequence, "Map direction to sequence counter can't be null"); this.sessionAlias = requireNonNull(sessionAlias, "Session alias can't be null"); From f5be15883559fda593d878201220f6d1a01badb2 Mon Sep 17 00:00:00 2001 From: "sergey.sitnikov" Date: Fri, 17 Jun 2022 05:12:11 +0000 Subject: [PATCH 15/20] [TH2-3717] Update README file --- README.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 0e689a1..c5243a5 100644 --- a/README.md +++ b/README.md @@ -17,7 +17,7 @@ session-alias: "connectivity-alias" workspace: "/home/sailfish/workspace" type: "th2_service:Your_Service_Type" name: "your_service" -sessionGroup "group" +sessionGroup: "group" settings: param1: "value1" ``` @@ -30,6 +30,7 @@ Parameters: + settings - the parameters that will be transformed to the actual service's settings specified in the **services.xml** file. + maxMessageBatchSize - the limitation for message batch size which connect sends to the first and to the second publish pins with. The default value is set to 100. + enableMessageSendingEvent - if this option is set to `true`, connect sends a separate event for every message sent which incomes from the pin with the send attribute. The default value is set to true ++ sessionGroup - parameter will be set for all messsages received or sent by this component ## Metrics From 5f03e02474889f170ef7745310f32753461ad392 Mon Sep 17 00:00:00 2001 From: "sergey.sitnikov" Date: Fri, 17 Jun 2022 07:47:38 +0000 Subject: [PATCH 16/20] [TH2-3717] Update sequences check in batch --- .../exactpro/th2/conn/ConnectivityBatch.java | 43 +++++++--- .../exactpro/th2/conn/MicroserviceMain.java | 78 +++++++++---------- 2 files changed, 69 insertions(+), 52 deletions(-) diff --git a/src/main/java/com/exactpro/th2/conn/ConnectivityBatch.java b/src/main/java/com/exactpro/th2/conn/ConnectivityBatch.java index 653f7da..49933d6 100644 --- a/src/main/java/com/exactpro/th2/conn/ConnectivityBatch.java +++ b/src/main/java/com/exactpro/th2/conn/ConnectivityBatch.java @@ -17,6 +17,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.stream.Collectors; import java.util.stream.LongStream; @@ -32,7 +33,6 @@ public class ConnectivityBatch { private final String sessionAlias; private final long sequence; - private final Direction direction; private final List connectivityMessages; public ConnectivityBatch(List connectivityMessages) { @@ -42,8 +42,7 @@ public ConnectivityBatch(List connectivityMessages) { ConnectivityMessage firstMessage = connectivityMessages.get(0); this.sessionAlias = firstMessage.getSessionAlias(); - this.direction = firstMessage.getDirection(); - checkMessages(connectivityMessages, sessionAlias, direction); + checkMessages(connectivityMessages, sessionAlias); this.connectivityMessages = List.copyOf(connectivityMessages); this.sequence = firstMessage.getSequence(); @@ -65,26 +64,52 @@ public long getSequence() { return sequence; } - public Direction getDirection() { - return direction; - } - public List getMessages() { return connectivityMessages; } - private static void checkMessages(List iMessages, String sessionAlias, Direction direction) { + private static void checkMessages(List iMessages, String sessionAlias) { if (iMessages.isEmpty()) { throw new IllegalArgumentException("List can't be empty"); } if (!iMessages.stream() .allMatch(iMessage -> Objects.equals(sessionAlias, iMessage.getSessionAlias()))) { - throw new IllegalArgumentException("List " + iMessages + " has elements with incorrect metadata, expected session alias '"+ sessionAlias +"' direction '" + direction + '\''); + throw new IllegalArgumentException("List " + iMessages + " has elements with incorrect metadata, expected session alias '"+ sessionAlias); } + if (LOGGER.isErrorEnabled()) { + Map> directionToMessages = iMessages.stream().collect(Collectors.groupingBy(ConnectivityMessage::getDirection)); + directionToMessages.forEach((direction, messages) -> checkDirectionMessages(messages, direction, sessionAlias)); + } // FIXME: Replace logging to thowing exception after solving message reordering problem // throw new IllegalArgumentException("List " + iMessages.stream() // .map(ConnectivityMessage::getSequence) // .collect(Collectors.toList())+ " hasn't elements with incremental sequence with one step"); } + + private static void checkDirectionMessages(List iMessages, Direction direction, String sessionAlias) { + boolean sequencesUnordered = false; + List missedSequences = new ArrayList<>(); + for (int index = 0; index < iMessages.size() - 1; index++) { + long nextExpectedSequence = iMessages.get(index).getSequence() + 1; + long nextSequence = iMessages.get(index + 1).getSequence(); + if (nextExpectedSequence != nextSequence) { + sequencesUnordered = true; + } + LongStream.range(nextExpectedSequence, nextSequence).forEach(missedSequences::add); + } + if (sequencesUnordered) { + LOGGER.error( + "List {} hasn't elements with incremental sequence with one step for session alias '{}' and direction '{}'{}", + iMessages.stream() + .map(ConnectivityMessage::getSequence) + .collect(Collectors.toList()), + sessionAlias, + direction, + missedSequences.isEmpty() + ? "" + : String.format(". Missed sequences %s", missedSequences) + ); + } + } } diff --git a/src/main/java/com/exactpro/th2/conn/MicroserviceMain.java b/src/main/java/com/exactpro/th2/conn/MicroserviceMain.java index f5727eb..462d973 100644 --- a/src/main/java/com/exactpro/th2/conn/MicroserviceMain.java +++ b/src/main/java/com/exactpro/th2/conn/MicroserviceMain.java @@ -35,7 +35,6 @@ import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -206,46 +205,37 @@ private static Object getParamValue(ISettingsProxy settings, String parameterNam return parameterValue; } - private static @NonNull Flowable> createPipeline( + private static @NonNull Flowable createPipeline( Flowable flowable, Action terminateFlowable, MessageRouter eventBatchRouter, MessageRouter rawMessageRouter, int maxMessageBatchSize, boolean enableMessageSendingEvent) { LOGGER.info("AvailableProcessors '{}'", Runtime.getRuntime().availableProcessors()); - return flowable - .doOnNext(message -> { - LOGGER.trace( + Flowable messageConnectable = flowable + .doOnNext(message -> { + LOGGER.trace( "Message before observeOn with sequence {} and direction {}", message.getSequence(), message.getDirection() ); }) - .observeOn(PIPELINE_SCHEDULER) - .doOnNext(connectivityMessage -> LOGGER.debug("Start handling connectivity message {}", connectivityMessage)) - .groupBy(ConnectivityMessage::getSessionAlias) - .map(group -> { - AtomicReference<@NonNull Direction> direction = new AtomicReference<>(); - Flowable messageConnectable = group - .doOnNext(message -> { - LOGGER.trace( - "Message inside map with sequence {} and direction {}", - message.getSequence(), - message.getDirection()); - direction.set(message.getDirection()); - } - ) - .doOnCancel(terminateFlowable) // This call is required for terminate the publisher and prevent creation another group - .publish() - .refCount(enableMessageSendingEvent && direction.get() == Direction.SECOND ? 2 : 1); - - if (enableMessageSendingEvent && direction.get() == Direction.SECOND) { - subscribeToSendMessage(eventBatchRouter, messageConnectable); - } - createPackAndPublishPipeline(messageConnectable, rawMessageRouter, maxMessageBatchSize); + .observeOn(PIPELINE_SCHEDULER) + .doOnNext(connectivityMessage -> { + LOGGER.debug("Start handling connectivity message {}", connectivityMessage); + LOGGER.trace( + "Message inside map with sequence {} and direction {}", + connectivityMessage.getSequence(), + connectivityMessage.getDirection()); + }) + .doOnCancel(terminateFlowable) // This call is required for terminate the publisher and prevent creation another group + .publish() + .refCount(1); - return messageConnectable; - }); + subscribeToSendMessage(eventBatchRouter, messageConnectable); + + createPackAndPublishPipeline(messageConnectable, rawMessageRouter, maxMessageBatchSize); + return messageConnectable; } private static void subscribeToSendMessage(MessageRouter eventBatchRouter, Flowable messageConnectable) { @@ -257,21 +247,23 @@ private static void subscribeToSendMessage(MessageRouter eventBatchR // Sailfish does not support sending multiple messages at once. // So we should send only a single event here. // But just in case we are wrong, we add checking for sending multiple events - boolean sent = false; - for (IMessage message : connectivityMessage.getSailfishMessages()) { - if (!contains(message.getMetaData(), PARENT_EVENT_ID)) { - continue; - } - if (sent) { - LOGGER.warn("The connectivity message has more than one sailfish message with parent event ID: {}", connectivityMessage); + if (connectivityMessage.getDirection() == Direction.SECOND) { + boolean sent = false; + for (IMessage message : connectivityMessage.getSailfishMessages()) { + if (!contains(message.getMetaData(), PARENT_EVENT_ID)) { + continue; + } + if (sent) { + LOGGER.warn("The connectivity message has more than one sailfish message with parent event ID: {}", connectivityMessage); + } + Event event = Event.start().endTimestamp() + .name("Send '" + message.getName() + "' message") + .type("Send message") + .messageID(connectivityMessage.getMessageID()); + LOGGER.debug("Sending event {} related to message with sequence {}", event.getId(), connectivityMessage.getSequence()); + storeEvent(eventBatchRouter, event, getParentEventID(message.getMetaData()).getId()); + sent = true; } - Event event = Event.start().endTimestamp() - .name("Send '" + message.getName() + "' message") - .type("Send message") - .messageID(connectivityMessage.getMessageID()); - LOGGER.debug("Sending event {} related to message with sequence {}", event.getId(), connectivityMessage.getSequence()); - storeEvent(eventBatchRouter, event, getParentEventID(message.getMetaData()).getId()); - sent = true; } }); } From 4b5cb196a464939b851ba61181e36679d0e8cedb Mon Sep 17 00:00:00 2001 From: "sergey.sitnikov" Date: Fri, 17 Jun 2022 15:07:58 +0000 Subject: [PATCH 17/20] [TH2-3717] Update subscribe method --- .../exactpro/th2/conn/MicroserviceMain.java | 35 ++++++++++--------- 1 file changed, 18 insertions(+), 17 deletions(-) diff --git a/src/main/java/com/exactpro/th2/conn/MicroserviceMain.java b/src/main/java/com/exactpro/th2/conn/MicroserviceMain.java index 462d973..b896267 100644 --- a/src/main/java/com/exactpro/th2/conn/MicroserviceMain.java +++ b/src/main/java/com/exactpro/th2/conn/MicroserviceMain.java @@ -230,7 +230,7 @@ private static Object getParamValue(ISettingsProxy settings, String parameterNam }) .doOnCancel(terminateFlowable) // This call is required for terminate the publisher and prevent creation another group .publish() - .refCount(1); + .refCount(2); subscribeToSendMessage(eventBatchRouter, messageConnectable); @@ -247,23 +247,24 @@ private static void subscribeToSendMessage(MessageRouter eventBatchR // Sailfish does not support sending multiple messages at once. // So we should send only a single event here. // But just in case we are wrong, we add checking for sending multiple events - if (connectivityMessage.getDirection() == Direction.SECOND) { - boolean sent = false; - for (IMessage message : connectivityMessage.getSailfishMessages()) { - if (!contains(message.getMetaData(), PARENT_EVENT_ID)) { - continue; - } - if (sent) { - LOGGER.warn("The connectivity message has more than one sailfish message with parent event ID: {}", connectivityMessage); - } - Event event = Event.start().endTimestamp() - .name("Send '" + message.getName() + "' message") - .type("Send message") - .messageID(connectivityMessage.getMessageID()); - LOGGER.debug("Sending event {} related to message with sequence {}", event.getId(), connectivityMessage.getSequence()); - storeEvent(eventBatchRouter, event, getParentEventID(message.getMetaData()).getId()); - sent = true; + if (connectivityMessage.getDirection() != Direction.SECOND) { + return; + } + boolean sent = false; + for (IMessage message : connectivityMessage.getSailfishMessages()) { + if (!contains(message.getMetaData(), PARENT_EVENT_ID)) { + continue; + } + if (sent) { + LOGGER.warn("The connectivity message has more than one sailfish message with parent event ID: {}", connectivityMessage); } + Event event = Event.start().endTimestamp() + .name("Send '" + message.getName() + "' message") + .type("Send message") + .messageID(connectivityMessage.getMessageID()); + LOGGER.debug("Sending event {} related to message with sequence {}", event.getId(), connectivityMessage.getSequence()); + storeEvent(eventBatchRouter, event, getParentEventID(message.getMetaData()).getId()); + sent = true; } }); } From 32721892e09a362089e46e951342c2340c1f9215 Mon Sep 17 00:00:00 2001 From: "sergey.sitnikov" Date: Tue, 21 Jun 2022 08:42:12 +0000 Subject: [PATCH 18/20] [TH2-3717] Update publishPipeline method --- src/main/java/com/exactpro/th2/conn/MicroserviceMain.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/main/java/com/exactpro/th2/conn/MicroserviceMain.java b/src/main/java/com/exactpro/th2/conn/MicroserviceMain.java index 3c7c7a4..11f2235 100644 --- a/src/main/java/com/exactpro/th2/conn/MicroserviceMain.java +++ b/src/main/java/com/exactpro/th2/conn/MicroserviceMain.java @@ -290,9 +290,7 @@ private static void createPackAndPublishPipeline(Flowable m .map(ConnectivityMessage::getSequence) .collect(Collectors.toList())); } - }) - .publish() - .refCount(1); + }); batchConnectable .subscribe(batch -> { From 77236e2306525495232645aa714bb463338f5f3f Mon Sep 17 00:00:00 2001 From: "oleg.smirnov" Date: Wed, 22 Jun 2022 15:21:23 +0400 Subject: [PATCH 19/20] [TH2-3717] Add aditional logging for listener subscriptions --- .../com/exactpro/th2/conn/MessageSender.java | 2 ++ .../com/exactpro/th2/conn/MicroserviceMain.java | 17 ++++++++++------- 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/src/main/java/com/exactpro/th2/conn/MessageSender.java b/src/main/java/com/exactpro/th2/conn/MessageSender.java index 044bf70..a3a64a8 100644 --- a/src/main/java/com/exactpro/th2/conn/MessageSender.java +++ b/src/main/java/com/exactpro/th2/conn/MessageSender.java @@ -67,6 +67,8 @@ public void start() { throw new IllegalStateException("Already subscribe"); } + logger.info("Subscribing to queue with messages to send"); + subscriberMonitor = router.subscribeAll(this::handle, SEND_ATTRIBUTE); } diff --git a/src/main/java/com/exactpro/th2/conn/MicroserviceMain.java b/src/main/java/com/exactpro/th2/conn/MicroserviceMain.java index 11f2235..a7e239f 100644 --- a/src/main/java/com/exactpro/th2/conn/MicroserviceMain.java +++ b/src/main/java/com/exactpro/th2/conn/MicroserviceMain.java @@ -74,6 +74,7 @@ import io.reactivex.rxjava3.core.Flowable; import io.reactivex.rxjava3.core.Scheduler; import io.reactivex.rxjava3.exceptions.Exceptions; +import io.reactivex.rxjava3.flowables.ConnectableFlowable; import io.reactivex.rxjava3.functions.Action; import io.reactivex.rxjava3.processors.FlowableProcessor; import io.reactivex.rxjava3.processors.UnicastProcessor; @@ -214,7 +215,7 @@ private static Object getParamValue(ISettingsProxy settings, String parameterNam int maxMessageBatchSize, boolean enableMessageSendingEvent) { LOGGER.info("AvailableProcessors '{}'", Runtime.getRuntime().availableProcessors()); - Flowable messageConnectable = flowable + ConnectableFlowable messageConnectable = flowable .doOnNext(message -> { LOGGER.trace( "Message before observeOn with sequence {} and direction {}", @@ -231,12 +232,14 @@ private static Object getParamValue(ISettingsProxy settings, String parameterNam connectivityMessage.getDirection()); }) .doOnCancel(terminateFlowable) // This call is required for terminate the publisher and prevent creation another group - .publish() - .refCount(2); + .publish(); subscribeToSendMessage(eventBatchRouter, messageConnectable); createPackAndPublishPipeline(messageConnectable, rawMessageRouter, maxMessageBatchSize); + + messageConnectable.connect(); + return messageConnectable; } @@ -274,7 +277,7 @@ private static void subscribeToSendMessage(MessageRouter eventBatchR private static void createPackAndPublishPipeline(Flowable messageConnectable, MessageRouter rawMessageRouter, int maxMessageBatchSize) { - Flowable batchConnectable = messageConnectable + messageConnectable .doOnNext(message -> LOGGER.trace( "Message before window with sequence {} and direction {}", message.getSequence(), @@ -290,9 +293,7 @@ private static void createPackAndPublishPipeline(Flowable m .map(ConnectivityMessage::getSequence) .collect(Collectors.toList())); } - }); - - batchConnectable + }) .subscribe(batch -> { try { RawMessageBatch rawBatch = batch.convertToProtoRawBatch(); @@ -416,7 +417,9 @@ protected void onStart() { try { LOGGER.info("Subscribed to pipeline"); serviceProxy.start(); + LOGGER.info("Service started. Starting message sender"); messageSender.start(); + LOGGER.info("Subscription finished"); } catch (Exception e) { LOGGER.error("Services starting failure", e); Exceptions.propagate(e); From f08b89b70ac6ca56ff008d286e444e3404f703f5 Mon Sep 17 00:00:00 2001 From: "oleg.smirnov" Date: Wed, 22 Jun 2022 17:37:23 +0400 Subject: [PATCH 20/20] [TH2-3717] Update common version --- build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index 0ce9a3c..463b613 100644 --- a/build.gradle +++ b/build.gradle @@ -66,7 +66,7 @@ compileTestKotlin { dependencies { api platform('com.exactpro.th2:bom:3.0.0') - implementation 'com.exactpro.th2:common:3.38.0-TH2-3578-2300290805-SNAPSHOT' + implementation 'com.exactpro.th2:common:3.38.0-TH2-3578-2542617414-SNAPSHOT' implementation "com.exactpro.th2:sailfish-utils:3.8.0" implementation "org.slf4j:slf4j-log4j12"