Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Th2 3717 #139

Open
wants to merge 21 commits into
base: dev-version-3
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
afeb2eb
[TH2-3717] Implement ability to specify the group option
Sergey-Sitnikov-Exactpro May 30, 2022
25b3779
[TH2-3717] Add timestamp for multithreading
Sergey-Sitnikov-Exactpro Jun 10, 2022
4d9530d
[TH2-3717] Update timestamp to preserve order
Sergey-Sitnikov-Exactpro Jun 14, 2022
e423cd5
[TH2-3717] Update pipeline creation
Sergey-Sitnikov-Exactpro Jun 14, 2022
d011433
[TH2-3717] Update batch creation
Sergey-Sitnikov-Exactpro Jun 14, 2022
cfb669a
[TH2-3717] Update message checking inside batch
Sergey-Sitnikov-Exactpro Jun 15, 2022
4613de9
[TH2-3717] Update message logging
Sergey-Sitnikov-Exactpro Jun 16, 2022
3cba4b9
moved error message from the name to the body of the event
Dmitriy-Yugay Mar 18, 2022
0bc7b3f
minor fixes
Dmitriy-Yugay Mar 25, 2022
5e32660
Th2 1881 2 (#123)
Dmitriy-Yugay Apr 20, 2022
0b57891
[TH2-3717] Update project version and README file
Sergey-Sitnikov-Exactpro Jun 16, 2022
b2c62c9
[TH2-3717] Update imports
Sergey-Sitnikov-Exactpro Jun 16, 2022
cd8b088
[TH2-3717] Add group to TestServiceListener
Sergey-Sitnikov-Exactpro Jun 16, 2022
ea0b7a7
[TH2-3717] Fix formatting
Sergey-Sitnikov-Exactpro Jun 16, 2022
f5be158
[TH2-3717] Update README file
Sergey-Sitnikov-Exactpro Jun 17, 2022
5f03e02
[TH2-3717] Update sequences check in batch
Sergey-Sitnikov-Exactpro Jun 17, 2022
4b5cb19
[TH2-3717] Update subscribe method
Sergey-Sitnikov-Exactpro Jun 17, 2022
e029758
Merge branch 'dev' into th2-3717
Sergey-Sitnikov-Exactpro Jun 17, 2022
3272189
[TH2-3717] Update publishPipeline method
Sergey-Sitnikov-Exactpro Jun 21, 2022
77236e2
[TH2-3717] Add aditional logging for listener subscriptions
OptimumCode Jun 22, 2022
f08b89b
[TH2-3717] Update common version
OptimumCode Jun 22, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 21 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Connect (3.10.1)
# Connect (3.11.0)

The "Connect" component is responsible for the communication with a target system.
This component implements the logic of the interaction protocol, receiving and sending messages from and to the system, respectively.
Expand All @@ -17,6 +17,7 @@ session-alias: "connectivity-alias"
workspace: "/home/sailfish/workspace"
type: "th2_service:Your_Service_Type"
name: "your_service"
sessionGroup: "group"
settings:
param1: "value1"
```
Expand All @@ -29,6 +30,7 @@ Parameters:
+ settings - the parameters that will be transformed to the actual service's settings specified in the **services.xml** file.
+ maxMessageBatchSize - the limitation for message batch size which connect sends to the first and to the second publish pins with. The default value is set to 100.
+ enableMessageSendingEvent - if this option is set to `true`, connect sends a separate event for every message sent which incomes from the pin with the send attribute. The default value is set to true
+ sessionGroup - parameter will be set for all messsages received or sent by this component

## Metrics

Expand Down Expand Up @@ -76,13 +78,13 @@ You need to perform the following steps:

## Pins

Connect has 2 types of pins for interacting with th2 components.
Messages that were received from / sent to the target system will be sent to the following queues:
Connect has only 1 type of pins for interacting with th2 components.
Messages that were received from / sent to the target system will be sent to the following queue:

- incoming raw messages
- outgoing raw messages
- sended raw messages

The "Connect" component uses a separate queue to send messages. The component subscribes to that pin at the start and waits for the messages.

The "Connect" component uses a queue to send messages. The component subscribes to that pin at the start and waits for the messages.
The messages received from that pin will be sent to the target system.
Also, this component is responsible for maintaining connections and sessions in the cases where this is provided by the communication protocol.
Here you can automatically send heartbeat messages, send a logon/logout, requests to retransmit messages in the event of a gap, etc.
Expand All @@ -101,24 +103,32 @@ spec:
workspace: "/home/sailfish/workspace"
type: "th2_service:Your_Service_Type"
name: "your_service"
sessionGroup: "group"
maxMessageBatchSize: 100
enableMessageSendingEvent: true
settings:
param1: "value1"
pins:
- name: in_raw
connection-type: mq
attributes: ["first", "raw", "publish", "store"]
- name: out_raw
- name: send_raw
connection-type: mq
attributes: ["second", "raw", "publish", "store"]
attributes: ["raw", "publish", "store"]
- name: to_send
connection-type: mq
attributes: ["send", "raw", "subscribe"]
```

## Release notes

### 3.11.0

+ Add session group support from th2:common version 3.38.0-TH2-3578-2300290805-SNAPSHOT.
+ Replace 2 queues with in/out pins to one queue.
+ Messages are not grouped by direction, both direction publish together.

### 3.10.2

+ Events are made more convenient. Added event names and error logs. Error message moved from the name to the body of the event.

### 3.10.1

+ Update `sailfish-core` version from `3.2.1674` to `3.2.1741`
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ compileTestKotlin {
dependencies {
api platform('com.exactpro.th2:bom:3.0.0')

implementation 'com.exactpro.th2:common:3.33.0'
implementation 'com.exactpro.th2:common:3.38.0-TH2-3578-2300290805-SNAPSHOT'
implementation "com.exactpro.th2:sailfish-utils:3.8.0"

implementation "org.slf4j:slf4j-log4j12"
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
release_version = 3.10.1
release_version = 3.11.0
75 changes: 36 additions & 39 deletions src/main/java/com/exactpro/th2/conn/ConnectivityBatch.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
Expand All @@ -32,7 +33,6 @@ public class ConnectivityBatch {

private final String sessionAlias;
private final long sequence;
private final Direction direction;
private final List<ConnectivityMessage> connectivityMessages;

public ConnectivityBatch(List<ConnectivityMessage> connectivityMessages) {
Expand All @@ -42,8 +42,7 @@ public ConnectivityBatch(List<ConnectivityMessage> connectivityMessages) {

ConnectivityMessage firstMessage = connectivityMessages.get(0);
this.sessionAlias = firstMessage.getSessionAlias();
this.direction = firstMessage.getDirection();
checkMessages(connectivityMessages, sessionAlias, direction);
checkMessages(connectivityMessages, sessionAlias);

this.connectivityMessages = List.copyOf(connectivityMessages);
this.sequence = firstMessage.getSequence();
Expand All @@ -65,54 +64,52 @@ public long getSequence() {
return sequence;
}

public Direction getDirection() {
return direction;
}

public List<ConnectivityMessage> getMessages() {
return connectivityMessages;
}

private static void checkMessages(List<ConnectivityMessage> iMessages, String sessionAlias, Direction direction) {
private static void checkMessages(List<ConnectivityMessage> iMessages, String sessionAlias) {
if (iMessages.isEmpty()) {
throw new IllegalArgumentException("List can't be empty");
}

if (!iMessages.stream()
.allMatch(iMessage -> Objects.equals(sessionAlias, iMessage.getSessionAlias())
&& direction == iMessage.getDirection())) {
throw new IllegalArgumentException("List " + iMessages + " has elements with incorrect metadata, expected session alias '"+ sessionAlias +"' direction '" + direction + '\'');
}

if (LOGGER.isErrorEnabled()) {
boolean sequencesUnordered = false;
List<Long> missedSequences = new ArrayList<>();
for (int index = 0; index < iMessages.size() - 1; index++) {
long nextExpectedSequence = iMessages.get(index).getSequence() + 1;
long nextSequence = iMessages.get(index + 1).getSequence();
if (nextExpectedSequence != nextSequence) {
sequencesUnordered = true;
}
LongStream.range(nextExpectedSequence, nextSequence).forEach(missedSequences::add);
}
if (sequencesUnordered) {
LOGGER.error(
"List {} hasn't elements with incremental sequence with one step for session alias '{}' and direction '{}'{}",
iMessages.stream()
.map(ConnectivityMessage::getSequence)
.collect(Collectors.toList()),
sessionAlias,
direction,
missedSequences.isEmpty()
? ""
: String.format(". Missed sequences %s", missedSequences)
);
}
.allMatch(iMessage -> Objects.equals(sessionAlias, iMessage.getSessionAlias()))) {
throw new IllegalArgumentException("List " + iMessages + " has elements with incorrect metadata, expected session alias '"+ sessionAlias);
}

// FIXME: Replace logging to thowing exception after solving message reordering problem
if (LOGGER.isErrorEnabled()) {
Map<Direction, List<ConnectivityMessage>> directionToMessages = iMessages.stream().collect(Collectors.groupingBy(ConnectivityMessage::getDirection));
directionToMessages.forEach((direction, messages) -> checkDirectionMessages(messages, direction, sessionAlias));
}
// FIXME: Replace logging to thowing exception after solving message reordering problem
// throw new IllegalArgumentException("List " + iMessages.stream()
// .map(ConnectivityMessage::getSequence)
// .collect(Collectors.toList())+ " hasn't elements with incremental sequence with one step");
}

private static void checkDirectionMessages(List<ConnectivityMessage> iMessages, Direction direction, String sessionAlias) {
boolean sequencesUnordered = false;
List<Long> missedSequences = new ArrayList<>();
for (int index = 0; index < iMessages.size() - 1; index++) {
long nextExpectedSequence = iMessages.get(index).getSequence() + 1;
long nextSequence = iMessages.get(index + 1).getSequence();
if (nextExpectedSequence != nextSequence) {
sequencesUnordered = true;
}
LongStream.range(nextExpectedSequence, nextSequence).forEach(missedSequences::add);
}
if (sequencesUnordered) {
LOGGER.error(
"List {} hasn't elements with incremental sequence with one step for session alias '{}' and direction '{}'{}",
iMessages.stream()
.map(ConnectivityMessage::getSequence)
.collect(Collectors.toList()),
sessionAlias,
direction,
missedSequences.isEmpty()
? ""
: String.format(". Missed sequences %s", missedSequences)
);
}
}
}
20 changes: 12 additions & 8 deletions src/main/java/com/exactpro/th2/conn/ConnectivityMessage.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2021 Exactpro (Exactpro Systems Limited)
* Copyright 2020-2022 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -55,14 +55,15 @@ public class ConnectivityMessage {
private final MessageID messageID;
private final Timestamp timestamp;

public ConnectivityMessage(List<IMessage> sailfishMessages, String sessionAlias, Direction direction, long sequence) {
public ConnectivityMessage(List<IMessage> sailfishMessages, String sessionAlias, Direction direction, long sequence, String sessionGroup) {
this.sailfishMessages = Collections.unmodifiableList(requireNonNull(sailfishMessages, "Message can't be null"));
if (sailfishMessages.isEmpty()) {
throw new IllegalArgumentException("At least one sailfish messages must be passed. Session alias: " + sessionAlias + "; Direction: " + direction);
}
messageID = createMessageID(createConnectionID(requireNonNull(sessionAlias, "Session alias can't be null")),
messageID = createMessageID(createConnectionID(requireNonNull(sessionAlias, "Session alias can't be null"), sessionGroup),
requireNonNull(direction, "Direction can't be null"), sequence);
timestamp = createTimestamp(sailfishMessages.get(0).getMetaData().getMsgTimestamp().getTime());
LOGGER.warn("Sailfish transforms th2 message to real send message too slow, delay is about 10 milliseconds. Please add more hardware resources");
timestamp = createTimestamp(System.currentTimeMillis());
}

public String getSessionAlias() {
Expand Down Expand Up @@ -139,10 +140,13 @@ private static int calculateTotalBodySize(Collection<IMessage> messages) {
}).sum();
}

private static ConnectionID createConnectionID(String sessionAlias) {
return ConnectionID.newBuilder()
.setSessionAlias(sessionAlias)
.build();
private static ConnectionID createConnectionID(String sessionAlias, String sessionGroup) {
ConnectionID.Builder builder = ConnectionID.newBuilder()
.setSessionAlias(sessionAlias);
if (sessionGroup != null) {
builder.setSessionGroup(sessionGroup);
}
return builder.build();
}

private static MessageID createMessageID(ConnectionID connectionId, Direction direction, long sequence) {
Expand Down
8 changes: 5 additions & 3 deletions src/main/java/com/exactpro/th2/conn/MessageSender.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ private void sendMessage(RawMessage protoMsg) throws InterruptedException {
logger.debug("Message sent. Base64 view: {}", Base64.getEncoder().encodeToString(data));
}
} catch (Exception ex) {
Event errorEvent = createErrorEvent("SendError")
Event errorEvent = createErrorEvent("SendError", ex)
.bodyData(EventUtils.createMessageBean("Cannot send message. Message body in base64:"))
.bodyData(EventUtils.createMessageBean(Base64.getEncoder().encodeToString(data)));
EventStoreExtensions.addException(errorEvent, ex);
Expand All @@ -124,10 +124,12 @@ private void storeErrorEvent(Event errorEvent, @Nullable EventID parentId) {
}
}

private Event createErrorEvent(String eventType) {
private Event createErrorEvent(String eventType, Exception e) {
return Event.start().endTimestamp()
.status(Status.FAILED)
.type(eventType);
.type(eventType)
.name("Failed to send raw message")
.exception(e, true);
}

private IMetadata toSailfishMetadata(RawMessage protoMsg) {
Expand Down
Loading