diff --git a/README.md b/README.md index 2dd2fa7..4790f74 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,5 @@ -# Connect (3.10.2) +# Connect (3.11.0) + The "Connect" component is responsible for the communication with a target system. This component implements the logic of the interaction protocol, receiving and sending messages from and to the system, respectively. @@ -17,6 +18,7 @@ session-alias: "connectivity-alias" workspace: "/home/sailfish/workspace" type: "th2_service:Your_Service_Type" name: "your_service" +sessionGroup: "group" settings: param1: "value1" ``` @@ -29,6 +31,7 @@ Parameters: + settings - the parameters that will be transformed to the actual service's settings specified in the **services.xml** file. + maxMessageBatchSize - the limitation for message batch size which connect sends to the first and to the second publish pins with. The default value is set to 100. + enableMessageSendingEvent - if this option is set to `true`, connect sends a separate event for every message sent which incomes from the pin with the send attribute. The default value is set to true ++ sessionGroup - parameter will be set for all messsages received or sent by this component ## Metrics @@ -76,13 +79,13 @@ You need to perform the following steps: ## Pins -Connect has 2 types of pins for interacting with th2 components. -Messages that were received from / sent to the target system will be sent to the following queues: +Connect has only 1 type of pins for interacting with th2 components. +Messages that were received from / sent to the target system will be sent to the following queue: + +- sended raw messages -- incoming raw messages -- outgoing raw messages -The "Connect" component uses a separate queue to send messages. The component subscribes to that pin at the start and waits for the messages. +The "Connect" component uses a queue to send messages. The component subscribes to that pin at the start and waits for the messages. The messages received from that pin will be sent to the target system. Also, this component is responsible for maintaining connections and sessions in the cases where this is provided by the communication protocol. Here you can automatically send heartbeat messages, send a logon/logout, requests to retransmit messages in the event of a gap, etc. @@ -101,17 +104,15 @@ spec: workspace: "/home/sailfish/workspace" type: "th2_service:Your_Service_Type" name: "your_service" + sessionGroup: "group" maxMessageBatchSize: 100 enableMessageSendingEvent: true settings: param1: "value1" pins: - - name: in_raw - connection-type: mq - attributes: ["first", "raw", "publish", "store"] - - name: out_raw + - name: send_raw connection-type: mq - attributes: ["second", "raw", "publish", "store"] + attributes: ["raw", "publish", "store"] - name: to_send connection-type: mq attributes: ["send", "raw", "subscribe"] @@ -119,6 +120,12 @@ spec: ## Release notes +### 3.11.0 + ++ Add session group support from th2:common version 3.38.0-TH2-3578-2300290805-SNAPSHOT. ++ Replace 2 queues with in/out pins to one queue. ++ Messages are not grouped by direction, both direction publish together. + ### 3.10.2 + Events are made more convenient. Added event names and error logs. Error message moved from the name to the body of the event. diff --git a/build.gradle b/build.gradle index 7e90ed1..463b613 100644 --- a/build.gradle +++ b/build.gradle @@ -66,7 +66,7 @@ compileTestKotlin { dependencies { api platform('com.exactpro.th2:bom:3.0.0') - implementation 'com.exactpro.th2:common:3.33.0' + implementation 'com.exactpro.th2:common:3.38.0-TH2-3578-2542617414-SNAPSHOT' implementation "com.exactpro.th2:sailfish-utils:3.8.0" implementation "org.slf4j:slf4j-log4j12" diff --git a/gradle.properties b/gradle.properties index 36f4a18..f7e2ba2 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1 +1,2 @@ -release_version = 3.10.2 \ No newline at end of file +release_version = 3.11.0 + diff --git a/src/main/java/com/exactpro/th2/conn/ConnectivityBatch.java b/src/main/java/com/exactpro/th2/conn/ConnectivityBatch.java index c5ed40b..49933d6 100644 --- a/src/main/java/com/exactpro/th2/conn/ConnectivityBatch.java +++ b/src/main/java/com/exactpro/th2/conn/ConnectivityBatch.java @@ -17,6 +17,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.stream.Collectors; import java.util.stream.LongStream; @@ -32,7 +33,6 @@ public class ConnectivityBatch { private final String sessionAlias; private final long sequence; - private final Direction direction; private final List connectivityMessages; public ConnectivityBatch(List connectivityMessages) { @@ -42,8 +42,7 @@ public ConnectivityBatch(List connectivityMessages) { ConnectivityMessage firstMessage = connectivityMessages.get(0); this.sessionAlias = firstMessage.getSessionAlias(); - this.direction = firstMessage.getDirection(); - checkMessages(connectivityMessages, sessionAlias, direction); + checkMessages(connectivityMessages, sessionAlias); this.connectivityMessages = List.copyOf(connectivityMessages); this.sequence = firstMessage.getSequence(); @@ -65,54 +64,52 @@ public long getSequence() { return sequence; } - public Direction getDirection() { - return direction; - } - public List getMessages() { return connectivityMessages; } - private static void checkMessages(List iMessages, String sessionAlias, Direction direction) { + private static void checkMessages(List iMessages, String sessionAlias) { if (iMessages.isEmpty()) { throw new IllegalArgumentException("List can't be empty"); } if (!iMessages.stream() - .allMatch(iMessage -> Objects.equals(sessionAlias, iMessage.getSessionAlias()) - && direction == iMessage.getDirection())) { - throw new IllegalArgumentException("List " + iMessages + " has elements with incorrect metadata, expected session alias '"+ sessionAlias +"' direction '" + direction + '\''); - } - - if (LOGGER.isErrorEnabled()) { - boolean sequencesUnordered = false; - List missedSequences = new ArrayList<>(); - for (int index = 0; index < iMessages.size() - 1; index++) { - long nextExpectedSequence = iMessages.get(index).getSequence() + 1; - long nextSequence = iMessages.get(index + 1).getSequence(); - if (nextExpectedSequence != nextSequence) { - sequencesUnordered = true; - } - LongStream.range(nextExpectedSequence, nextSequence).forEach(missedSequences::add); - } - if (sequencesUnordered) { - LOGGER.error( - "List {} hasn't elements with incremental sequence with one step for session alias '{}' and direction '{}'{}", - iMessages.stream() - .map(ConnectivityMessage::getSequence) - .collect(Collectors.toList()), - sessionAlias, - direction, - missedSequences.isEmpty() - ? "" - : String.format(". Missed sequences %s", missedSequences) - ); - } + .allMatch(iMessage -> Objects.equals(sessionAlias, iMessage.getSessionAlias()))) { + throw new IllegalArgumentException("List " + iMessages + " has elements with incorrect metadata, expected session alias '"+ sessionAlias); } - - // FIXME: Replace logging to thowing exception after solving message reordering problem + if (LOGGER.isErrorEnabled()) { + Map> directionToMessages = iMessages.stream().collect(Collectors.groupingBy(ConnectivityMessage::getDirection)); + directionToMessages.forEach((direction, messages) -> checkDirectionMessages(messages, direction, sessionAlias)); + } + // FIXME: Replace logging to thowing exception after solving message reordering problem // throw new IllegalArgumentException("List " + iMessages.stream() // .map(ConnectivityMessage::getSequence) // .collect(Collectors.toList())+ " hasn't elements with incremental sequence with one step"); } + + private static void checkDirectionMessages(List iMessages, Direction direction, String sessionAlias) { + boolean sequencesUnordered = false; + List missedSequences = new ArrayList<>(); + for (int index = 0; index < iMessages.size() - 1; index++) { + long nextExpectedSequence = iMessages.get(index).getSequence() + 1; + long nextSequence = iMessages.get(index + 1).getSequence(); + if (nextExpectedSequence != nextSequence) { + sequencesUnordered = true; + } + LongStream.range(nextExpectedSequence, nextSequence).forEach(missedSequences::add); + } + if (sequencesUnordered) { + LOGGER.error( + "List {} hasn't elements with incremental sequence with one step for session alias '{}' and direction '{}'{}", + iMessages.stream() + .map(ConnectivityMessage::getSequence) + .collect(Collectors.toList()), + sessionAlias, + direction, + missedSequences.isEmpty() + ? "" + : String.format(". Missed sequences %s", missedSequences) + ); + } + } } diff --git a/src/main/java/com/exactpro/th2/conn/ConnectivityMessage.java b/src/main/java/com/exactpro/th2/conn/ConnectivityMessage.java index a45a31f..3a8f9cd 100644 --- a/src/main/java/com/exactpro/th2/conn/ConnectivityMessage.java +++ b/src/main/java/com/exactpro/th2/conn/ConnectivityMessage.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2021 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2022 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -55,14 +55,15 @@ public class ConnectivityMessage { private final MessageID messageID; private final Timestamp timestamp; - public ConnectivityMessage(List sailfishMessages, String sessionAlias, Direction direction, long sequence) { + public ConnectivityMessage(List sailfishMessages, String sessionAlias, Direction direction, long sequence, String sessionGroup) { this.sailfishMessages = Collections.unmodifiableList(requireNonNull(sailfishMessages, "Message can't be null")); if (sailfishMessages.isEmpty()) { throw new IllegalArgumentException("At least one sailfish messages must be passed. Session alias: " + sessionAlias + "; Direction: " + direction); } - messageID = createMessageID(createConnectionID(requireNonNull(sessionAlias, "Session alias can't be null")), + messageID = createMessageID(createConnectionID(requireNonNull(sessionAlias, "Session alias can't be null"), sessionGroup), requireNonNull(direction, "Direction can't be null"), sequence); - timestamp = createTimestamp(sailfishMessages.get(0).getMetaData().getMsgTimestamp().getTime()); + LOGGER.warn("Sailfish transforms th2 message to real send message too slow, delay is about 10 milliseconds. Please add more hardware resources"); + timestamp = createTimestamp(System.currentTimeMillis()); } public String getSessionAlias() { @@ -139,10 +140,13 @@ private static int calculateTotalBodySize(Collection messages) { }).sum(); } - private static ConnectionID createConnectionID(String sessionAlias) { - return ConnectionID.newBuilder() - .setSessionAlias(sessionAlias) - .build(); + private static ConnectionID createConnectionID(String sessionAlias, String sessionGroup) { + ConnectionID.Builder builder = ConnectionID.newBuilder() + .setSessionAlias(sessionAlias); + if (sessionGroup != null) { + builder.setSessionGroup(sessionGroup); + } + return builder.build(); } private static MessageID createMessageID(ConnectionID connectionId, Direction direction, long sequence) { diff --git a/src/main/java/com/exactpro/th2/conn/MessageSender.java b/src/main/java/com/exactpro/th2/conn/MessageSender.java index 044bf70..a3a64a8 100644 --- a/src/main/java/com/exactpro/th2/conn/MessageSender.java +++ b/src/main/java/com/exactpro/th2/conn/MessageSender.java @@ -67,6 +67,8 @@ public void start() { throw new IllegalStateException("Already subscribe"); } + logger.info("Subscribing to queue with messages to send"); + subscriberMonitor = router.subscribeAll(this::handle, SEND_ATTRIBUTE); } diff --git a/src/main/java/com/exactpro/th2/conn/MicroserviceMain.java b/src/main/java/com/exactpro/th2/conn/MicroserviceMain.java index 141c3f4..a7e239f 100644 --- a/src/main/java/com/exactpro/th2/conn/MicroserviceMain.java +++ b/src/main/java/com/exactpro/th2/conn/MicroserviceMain.java @@ -20,7 +20,6 @@ import static com.exactpro.th2.conn.utility.SailfishMetadataExtensions.contains; import static com.exactpro.th2.conn.utility.SailfishMetadataExtensions.getParentEventID; import static io.reactivex.rxjava3.plugins.RxJavaPlugins.createSingleScheduler; -import static java.util.Objects.requireNonNull; import static org.apache.commons.lang.StringUtils.containsIgnoreCase; import static org.apache.commons.lang.StringUtils.repeat; import static org.apache.commons.lang3.ClassUtils.primitiveToWrapper; @@ -66,7 +65,6 @@ import com.exactpro.th2.common.grpc.RawMessageBatch; import com.exactpro.th2.common.schema.factory.CommonFactory; import com.exactpro.th2.common.schema.message.MessageRouter; -import com.exactpro.th2.common.schema.message.QueueAttribute; import com.exactpro.th2.conn.configuration.ConnectivityConfiguration; import com.exactpro.th2.conn.events.EventDispatcher; import com.exactpro.th2.conn.events.EventType; @@ -76,6 +74,7 @@ import io.reactivex.rxjava3.core.Flowable; import io.reactivex.rxjava3.core.Scheduler; import io.reactivex.rxjava3.exceptions.Exceptions; +import io.reactivex.rxjava3.flowables.ConnectableFlowable; import io.reactivex.rxjava3.functions.Action; import io.reactivex.rxjava3.processors.FlowableProcessor; import io.reactivex.rxjava3.processors.UnicastProcessor; @@ -164,7 +163,7 @@ public static void main(String[] args) { EventType.SERVICE_EVENT, serviceEventsRoot.getId() )); - IServiceListener serviceListener = new ServiceListener(directionToSequence, configuration.getSessionAlias(), processor, eventDispatcher); + IServiceListener serviceListener = new ServiceListener(directionToSequence, configuration.getSessionAlias(), processor, eventDispatcher, configuration.getSessionGroup()); IServiceProxy serviceProxy = loadService(serviceFactory, factory, configuration, serviceListener); disposer.register(() -> { LOGGER.info("Stop service proxy"); @@ -209,41 +208,39 @@ private static Object getParamValue(ISettingsProxy settings, String parameterNam return parameterValue; } - private static @NonNull Flowable> createPipeline( + private static @NonNull Flowable createPipeline( Flowable flowable, Action terminateFlowable, MessageRouter eventBatchRouter, MessageRouter rawMessageRouter, int maxMessageBatchSize, boolean enableMessageSendingEvent) { LOGGER.info("AvailableProcessors '{}'", Runtime.getRuntime().availableProcessors()); - return flowable - .doOnNext(message -> LOGGER.trace( - "Message before observeOn with sequence {} and direction {}", - message.getSequence(), - message.getDirection() - )) - .observeOn(PIPELINE_SCHEDULER) - .doOnNext(connectivityMessage -> LOGGER.debug("Start handling connectivity message {}", connectivityMessage)) - .groupBy(ConnectivityMessage::getDirection) - .map(group -> { - @NonNull Direction direction = requireNonNull(group.getKey(), "Direction can't be null"); - Flowable messageConnectable = group - .doOnNext(message -> LOGGER.trace( - "Message inside map with sequence {} and direction {}", - message.getSequence(), - message.getDirection() - )) - .doOnCancel(terminateFlowable) // This call is required for terminate the publisher and prevent creation another group - .publish() - .refCount(enableMessageSendingEvent && direction == Direction.SECOND ? 2 : 1); - - if (enableMessageSendingEvent && direction == Direction.SECOND) { - subscribeToSendMessage(eventBatchRouter, messageConnectable); - } - createPackAndPublishPipeline(direction, messageConnectable, rawMessageRouter, maxMessageBatchSize); - - return messageConnectable; - }); + ConnectableFlowable messageConnectable = flowable + .doOnNext(message -> { + LOGGER.trace( + "Message before observeOn with sequence {} and direction {}", + message.getSequence(), + message.getDirection() + ); + }) + .observeOn(PIPELINE_SCHEDULER) + .doOnNext(connectivityMessage -> { + LOGGER.debug("Start handling connectivity message {}", connectivityMessage); + LOGGER.trace( + "Message inside map with sequence {} and direction {}", + connectivityMessage.getSequence(), + connectivityMessage.getDirection()); + }) + .doOnCancel(terminateFlowable) // This call is required for terminate the publisher and prevent creation another group + .publish(); + + subscribeToSendMessage(eventBatchRouter, messageConnectable); + + createPackAndPublishPipeline(messageConnectable, rawMessageRouter, maxMessageBatchSize); + + messageConnectable.connect(); + + return messageConnectable; } private static void subscribeToSendMessage(MessageRouter eventBatchRouter, Flowable messageConnectable) { @@ -255,6 +252,9 @@ private static void subscribeToSendMessage(MessageRouter eventBatchR // Sailfish does not support sending multiple messages at once. // So we should send only a single event here. // But just in case we are wrong, we add checking for sending multiple events + if (connectivityMessage.getDirection() != Direction.SECOND) { + return; + } boolean sent = false; for (IMessage message : connectivityMessage.getSailfishMessages()) { if (!contains(message.getMetaData(), PARENT_EVENT_ID)) { @@ -274,11 +274,10 @@ private static void subscribeToSendMessage(MessageRouter eventBatchR }); } - private static void createPackAndPublishPipeline(Direction direction, Flowable messageConnectable, + private static void createPackAndPublishPipeline(Flowable messageConnectable, MessageRouter rawMessageRouter, int maxMessageBatchSize) { - LOGGER.info("Map group {}", direction); - Flowable batchConnectable = messageConnectable + messageConnectable .doOnNext(message -> LOGGER.trace( "Message before window with sequence {} and direction {}", message.getSequence(), @@ -295,14 +294,10 @@ private static void createPackAndPublishPipeline(Direction direction, Flowable { try { RawMessageBatch rawBatch = batch.convertToProtoRawBatch(); - rawMessageRouter.sendAll(rawBatch, (direction == Direction.FIRST ? QueueAttribute.FIRST : QueueAttribute.SECOND).toString()); + rawMessageRouter.send(rawBatch); //FIXME: Only one pin can be used } catch (Exception e) { if (LOGGER.isErrorEnabled()) { LOGGER.error("Cannot send batch with sequences: {}", @@ -311,9 +306,9 @@ private static void createPackAndPublishPipeline(Direction direction, Flowable directionToSequence; private final String sessionAlias; + private final String sessionGroup; private final Subscriber subscriber; private final EventDispatcher eventDispatcher; @@ -82,12 +83,14 @@ public ServiceListener( Map directionToSequence, String sessionAlias, Subscriber subscriber, - EventDispatcher eventDispatcher + EventDispatcher eventDispatcher, + String sessionGroup ) { this.directionToSequence = requireNonNull(directionToSequence, "Map direction to sequence counter can't be null"); this.sessionAlias = requireNonNull(sessionAlias, "Session alias can't be null"); this.subscriber = requireNonNull(subscriber, "Subscriber can't be null"); this.eventDispatcher = requireNonNull(eventDispatcher, "Event dispatcher can't be null"); + this.sessionGroup = sessionGroup; } @Override @@ -164,6 +167,6 @@ public void onEvent(IServiceProxy service, ServiceEvent serviceEvent) { private ConnectivityMessage createConnectivityMessage(List messages, Direction direction, AtomicLong directionSeq) { long sequence = directionSeq.incrementAndGet(); LOGGER.debug("On message: direction '{}'; sequence '{}'; messages '{}'", direction, sequence, messages); - return new ConnectivityMessage(messages, sessionAlias, direction, sequence); + return new ConnectivityMessage(messages, sessionAlias, direction, sequence, sessionGroup); } } diff --git a/src/main/java/com/exactpro/th2/conn/configuration/ConnectivityConfiguration.java b/src/main/java/com/exactpro/th2/conn/configuration/ConnectivityConfiguration.java index 691017b..70befe8 100644 --- a/src/main/java/com/exactpro/th2/conn/configuration/ConnectivityConfiguration.java +++ b/src/main/java/com/exactpro/th2/conn/configuration/ConnectivityConfiguration.java @@ -37,6 +37,8 @@ public class ConnectivityConfiguration { @JsonProperty(required = true) private Map settings; + private String sessionGroup; + public boolean isEnableMessageSendingEvent() { return enableMessageSendingEvent; } @@ -64,4 +66,8 @@ public String getName() { public Map getSettings() { return settings; } + + public String getSessionGroup() { + return sessionGroup; + } } diff --git a/src/test/java/com/exactpro/th2/conn/TestServiceListener.java b/src/test/java/com/exactpro/th2/conn/TestServiceListener.java index 18b4bec..e4865d6 100644 --- a/src/test/java/com/exactpro/th2/conn/TestServiceListener.java +++ b/src/test/java/com/exactpro/th2/conn/TestServiceListener.java @@ -45,7 +45,7 @@ public void onEventTest() throws JsonProcessingException { MyEventDispatcher eventDispatcher = new MyEventDispatcher(); ServiceListener serviceListener = new ServiceListener(Map.of(Direction.FIRST, new AtomicLong(1)), - "SessionAlias", processor, eventDispatcher); + "SessionAlias", processor, eventDispatcher, null); ServiceEvent serviceEvent = ServiceEventFactory.createEventInfo(ServiceName.parse("serviceName"), ServiceEvent.Type.INFO, "Warn: incoming message with missing field: 45", null); diff --git a/src/test/kotlin/com/exactpro/th2/conn/TestConnectivityMessage.kt b/src/test/kotlin/com/exactpro/th2/conn/TestConnectivityMessage.kt index c2261ae..df5a9f5 100644 --- a/src/test/kotlin/com/exactpro/th2/conn/TestConnectivityMessage.kt +++ b/src/test/kotlin/com/exactpro/th2/conn/TestConnectivityMessage.kt @@ -41,7 +41,8 @@ class TestConnectivityMessage { ), "test", Direction.SECOND, - 1L + 1L, + null ) val rawMessage: RawMessage = connectivityMessage.convertToProtoRawMessage()