Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Th2 3717 #139

Open
wants to merge 21 commits into
base: dev-version-3
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
afeb2eb
[TH2-3717] Implement ability to specify the group option
Sergey-Sitnikov-Exactpro May 30, 2022
25b3779
[TH2-3717] Add timestamp for multithreading
Sergey-Sitnikov-Exactpro Jun 10, 2022
4d9530d
[TH2-3717] Update timestamp to preserve order
Sergey-Sitnikov-Exactpro Jun 14, 2022
e423cd5
[TH2-3717] Update pipeline creation
Sergey-Sitnikov-Exactpro Jun 14, 2022
d011433
[TH2-3717] Update batch creation
Sergey-Sitnikov-Exactpro Jun 14, 2022
cfb669a
[TH2-3717] Update message checking inside batch
Sergey-Sitnikov-Exactpro Jun 15, 2022
4613de9
[TH2-3717] Update message logging
Sergey-Sitnikov-Exactpro Jun 16, 2022
3cba4b9
moved error message from the name to the body of the event
Dmitriy-Yugay Mar 18, 2022
0bc7b3f
minor fixes
Dmitriy-Yugay Mar 25, 2022
5e32660
Th2 1881 2 (#123)
Dmitriy-Yugay Apr 20, 2022
0b57891
[TH2-3717] Update project version and README file
Sergey-Sitnikov-Exactpro Jun 16, 2022
b2c62c9
[TH2-3717] Update imports
Sergey-Sitnikov-Exactpro Jun 16, 2022
cd8b088
[TH2-3717] Add group to TestServiceListener
Sergey-Sitnikov-Exactpro Jun 16, 2022
ea0b7a7
[TH2-3717] Fix formatting
Sergey-Sitnikov-Exactpro Jun 16, 2022
f5be158
[TH2-3717] Update README file
Sergey-Sitnikov-Exactpro Jun 17, 2022
5f03e02
[TH2-3717] Update sequences check in batch
Sergey-Sitnikov-Exactpro Jun 17, 2022
4b5cb19
[TH2-3717] Update subscribe method
Sergey-Sitnikov-Exactpro Jun 17, 2022
e029758
Merge branch 'dev' into th2-3717
Sergey-Sitnikov-Exactpro Jun 17, 2022
3272189
[TH2-3717] Update publishPipeline method
Sergey-Sitnikov-Exactpro Jun 21, 2022
77236e2
[TH2-3717] Add aditional logging for listener subscriptions
OptimumCode Jun 22, 2022
f08b89b
[TH2-3717] Update common version
OptimumCode Jun 22, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 20 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Connect (3.10.1)
# Connect (3.11.0)

The "Connect" component is responsible for the communication with a target system.
This component implements the logic of the interaction protocol, receiving and sending messages from and to the system, respectively.
Expand All @@ -17,6 +17,7 @@ session-alias: "connectivity-alias"
workspace: "/home/sailfish/workspace"
type: "th2_service:Your_Service_Type"
name: "your_service"
sessionGroup "group"
settings:
param1: "value1"
```
Expand Down Expand Up @@ -76,13 +77,13 @@ You need to perform the following steps:

## Pins

Connect has 2 types of pins for interacting with th2 components.
Messages that were received from / sent to the target system will be sent to the following queues:
Connect has only 1 type of pins for interacting with th2 components.
Messages that were received from / sent to the target system will be sent to the following queue:

- incoming raw messages
- outgoing raw messages
- sended raw messages

The "Connect" component uses a separate queue to send messages. The component subscribes to that pin at the start and waits for the messages.

The "Connect" component uses a queue to send messages. The component subscribes to that pin at the start and waits for the messages.
The messages received from that pin will be sent to the target system.
Also, this component is responsible for maintaining connections and sessions in the cases where this is provided by the communication protocol.
Here you can automatically send heartbeat messages, send a logon/logout, requests to retransmit messages in the event of a gap, etc.
Expand All @@ -101,24 +102,32 @@ spec:
workspace: "/home/sailfish/workspace"
type: "th2_service:Your_Service_Type"
name: "your_service"
sessionGroup: "group"
maxMessageBatchSize: 100
enableMessageSendingEvent: true
settings:
param1: "value1"
pins:
- name: in_raw
connection-type: mq
attributes: ["first", "raw", "publish", "store"]
- name: out_raw
- name: send_raw
connection-type: mq
attributes: ["second", "raw", "publish", "store"]
attributes: ["raw", "publish", "store"]
- name: to_send
connection-type: mq
attributes: ["send", "raw", "subscribe"]
```

## Release notes

### 3.11.0

+ Add session group support from th2:common version 3.38.0-TH2-3578-2300290805-SNAPSHOT.
+ Replace 2 queues with in/out pins to one queue.
+ Messages are not grouped by direction, both direction publish together.

### 3.10.2

+ Events are made more convenient. Added event names and error logs. Error message moved from the name to the body of the event.

### 3.10.1

+ Update `sailfish-core` version from `3.2.1674` to `3.2.1741`
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ compileTestKotlin {
dependencies {
api platform('com.exactpro.th2:bom:3.0.0')

implementation 'com.exactpro.th2:common:3.33.0'
implementation 'com.exactpro.th2:common:3.38.0-TH2-3578-2300290805-SNAPSHOT'
implementation "com.exactpro.th2:sailfish-utils:3.8.0"

implementation "org.slf4j:slf4j-log4j12"
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
release_version = 3.10.1
release_version = 3.11.0
32 changes: 2 additions & 30 deletions src/main/java/com/exactpro/th2/conn/ConnectivityBatch.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,38 +79,10 @@ private static void checkMessages(List<ConnectivityMessage> iMessages, String se
}

if (!iMessages.stream()
.allMatch(iMessage -> Objects.equals(sessionAlias, iMessage.getSessionAlias())
&& direction == iMessage.getDirection())) {
.allMatch(iMessage -> Objects.equals(sessionAlias, iMessage.getSessionAlias()))) {
throw new IllegalArgumentException("List " + iMessages + " has elements with incorrect metadata, expected session alias '"+ sessionAlias +"' direction '" + direction + '\'');
}

if (LOGGER.isErrorEnabled()) {
boolean sequencesUnordered = false;
List<Long> missedSequences = new ArrayList<>();
for (int index = 0; index < iMessages.size() - 1; index++) {
long nextExpectedSequence = iMessages.get(index).getSequence() + 1;
long nextSequence = iMessages.get(index + 1).getSequence();
if (nextExpectedSequence != nextSequence) {
sequencesUnordered = true;
}
LongStream.range(nextExpectedSequence, nextSequence).forEach(missedSequences::add);
}
if (sequencesUnordered) {
LOGGER.error(
"List {} hasn't elements with incremental sequence with one step for session alias '{}' and direction '{}'{}",
iMessages.stream()
.map(ConnectivityMessage::getSequence)
.collect(Collectors.toList()),
sessionAlias,
direction,
missedSequences.isEmpty()
? ""
: String.format(". Missed sequences %s", missedSequences)
);
}
}
Sergey-Sitnikov-Exactpro marked this conversation as resolved.
Show resolved Hide resolved

// FIXME: Replace logging to thowing exception after solving message reordering problem
// FIXME: Replace logging to thowing exception after solving message reordering problem
// throw new IllegalArgumentException("List " + iMessages.stream()
// .map(ConnectivityMessage::getSequence)
// .collect(Collectors.toList())+ " hasn't elements with incremental sequence with one step");
Expand Down
22 changes: 13 additions & 9 deletions src/main/java/com/exactpro/th2/conn/ConnectivityMessage.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2021 Exactpro (Exactpro Systems Limited)
* Copyright 2020-2022 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -55,14 +55,15 @@ public class ConnectivityMessage {
private final MessageID messageID;
private final Timestamp timestamp;

public ConnectivityMessage(List<IMessage> sailfishMessages, String sessionAlias, Direction direction, long sequence) {
public ConnectivityMessage(List<IMessage> sailfishMessages, String sessionAlias, Direction direction, long sequence, String sessionGroup) {
this.sailfishMessages = Collections.unmodifiableList(requireNonNull(sailfishMessages, "Message can't be null"));
if (sailfishMessages.isEmpty()) {
throw new IllegalArgumentException("At least one sailfish messages must be passed. Session alias: " + sessionAlias + "; Direction: " + direction);
}
messageID = createMessageID(createConnectionID(requireNonNull(sessionAlias, "Session alias can't be null")),
messageID = createMessageID(createConnectionID(requireNonNull(sessionAlias, "Session alias can't be null"), sessionGroup),
requireNonNull(direction, "Direction can't be null"), sequence);
timestamp = createTimestamp(sailfishMessages.get(0).getMetaData().getMsgTimestamp().getTime());
LOGGER.warn("Sailfish transforms th2 message to real send message too slow, delay is about 10 milliseconds. Please add more hardware resources");
timestamp = createTimestamp(System.currentTimeMillis());
}

public String getSessionAlias() {
Expand Down Expand Up @@ -122,7 +123,7 @@ public List<IMessage> getSailfishMessages() {
return sailfishMessages;
}

@Override
@Override
Sergey-Sitnikov-Exactpro marked this conversation as resolved.
Show resolved Hide resolved
public String toString() {
return new ToStringBuilder(this)
.append("messageID", shortDebugString(messageID))
Expand All @@ -139,10 +140,13 @@ private static int calculateTotalBodySize(Collection<IMessage> messages) {
}).sum();
}

private static ConnectionID createConnectionID(String sessionAlias) {
return ConnectionID.newBuilder()
.setSessionAlias(sessionAlias)
.build();
private static ConnectionID createConnectionID(String sessionAlias, String sessionGroup) {
ConnectionID.Builder builder = ConnectionID.newBuilder()
.setSessionAlias(sessionAlias);
if (sessionGroup != null) {
builder.setSessionGroup(sessionGroup);
}
return builder.build();
}

private static MessageID createMessageID(ConnectionID connectionId, Direction direction, long sequence) {
Expand Down
8 changes: 5 additions & 3 deletions src/main/java/com/exactpro/th2/conn/MessageSender.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ private void sendMessage(RawMessage protoMsg) throws InterruptedException {
logger.debug("Message sent. Base64 view: {}", Base64.getEncoder().encodeToString(data));
}
} catch (Exception ex) {
Event errorEvent = createErrorEvent("SendError")
Event errorEvent = createErrorEvent("SendError", ex)
.bodyData(EventUtils.createMessageBean("Cannot send message. Message body in base64:"))
.bodyData(EventUtils.createMessageBean(Base64.getEncoder().encodeToString(data)));
EventStoreExtensions.addException(errorEvent, ex);
Expand All @@ -124,10 +124,12 @@ private void storeErrorEvent(Event errorEvent, @Nullable EventID parentId) {
}
}

private Event createErrorEvent(String eventType) {
private Event createErrorEvent(String eventType, Exception e) {
return Event.start().endTimestamp()
.status(Status.FAILED)
.type(eventType);
.type(eventType)
.name("Failed to send raw message")
.exception(e, true);
}

private IMetadata toSailfishMetadata(RawMessage protoMsg) {
Expand Down
49 changes: 26 additions & 23 deletions src/main/java/com/exactpro/th2/conn/MicroserviceMain.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import static com.exactpro.th2.conn.utility.SailfishMetadataExtensions.contains;
import static com.exactpro.th2.conn.utility.SailfishMetadataExtensions.getParentEventID;
import static io.reactivex.rxjava3.plugins.RxJavaPlugins.createSingleScheduler;
import static java.util.Objects.requireNonNull;
import static org.apache.commons.lang.StringUtils.containsIgnoreCase;
import static org.apache.commons.lang.StringUtils.repeat;
import static org.apache.commons.lang3.ClassUtils.primitiveToWrapper;
Expand All @@ -36,6 +35,7 @@
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -65,7 +65,6 @@
import com.exactpro.th2.common.grpc.RawMessageBatch;
import com.exactpro.th2.common.schema.factory.CommonFactory;
import com.exactpro.th2.common.schema.message.MessageRouter;
import com.exactpro.th2.common.schema.message.QueueAttribute;
import com.exactpro.th2.conn.configuration.ConnectivityConfiguration;
import com.exactpro.th2.conn.events.EventDispatcher;
import com.exactpro.th2.conn.events.EventType;
Expand Down Expand Up @@ -162,7 +161,7 @@ public static void main(String[] args) {
EventType.SERVICE_EVENT, serviceEventsRoot.getId()
));

IServiceListener serviceListener = new ServiceListener(directionToSequence, configuration.getSessionAlias(), processor, eventDispatcher);
IServiceListener serviceListener = new ServiceListener(directionToSequence, configuration.getSessionAlias(), processor, eventDispatcher, configuration.getSessionGroup());
IServiceProxy serviceProxy = loadService(serviceFactory, factory, configuration, serviceListener);
disposer.register(() -> {
LOGGER.info("Stop service proxy");
Expand Down Expand Up @@ -215,30 +214,35 @@ private static Object getParamValue(ISettingsProxy settings, String parameterNam
LOGGER.info("AvailableProcessors '{}'", Runtime.getRuntime().availableProcessors());

return flowable
.doOnNext(message -> LOGGER.trace(
"Message before observeOn with sequence {} and direction {}",
message.getSequence(),
message.getDirection()
))
.doOnNext(message -> {
LOGGER.trace(
"Message before observeOn with sequence {} and direction {}",
message.getSequence(),
message.getDirection()
);
})
.observeOn(PIPELINE_SCHEDULER)
.doOnNext(connectivityMessage -> LOGGER.debug("Start handling connectivity message {}", connectivityMessage))
.groupBy(ConnectivityMessage::getDirection)
.groupBy(ConnectivityMessage::getSessionAlias)
Sergey-Sitnikov-Exactpro marked this conversation as resolved.
Show resolved Hide resolved
.map(group -> {
@NonNull Direction direction = requireNonNull(group.getKey(), "Direction can't be null");
AtomicReference<@NonNull Direction> direction = new AtomicReference<>();
Sergey-Sitnikov-Exactpro marked this conversation as resolved.
Show resolved Hide resolved
Flowable<ConnectivityMessage> messageConnectable = group
.doOnNext(message -> LOGGER.trace(
"Message inside map with sequence {} and direction {}",
message.getSequence(),
message.getDirection()
))
.doOnNext(message -> {
LOGGER.trace(
"Message inside map with sequence {} and direction {}",
message.getSequence(),
message.getDirection());
direction.set(message.getDirection());
}
)
.doOnCancel(terminateFlowable) // This call is required for terminate the publisher and prevent creation another group
.publish()
.refCount(enableMessageSendingEvent && direction == Direction.SECOND ? 2 : 1);
.refCount(enableMessageSendingEvent && direction.get() == Direction.SECOND ? 2 : 1);
Sergey-Sitnikov-Exactpro marked this conversation as resolved.
Show resolved Hide resolved

if (enableMessageSendingEvent && direction == Direction.SECOND) {
if (enableMessageSendingEvent && direction.get() == Direction.SECOND) {
subscribeToSendMessage(eventBatchRouter, messageConnectable);
}
createPackAndPublishPipeline(direction, messageConnectable, rawMessageRouter, maxMessageBatchSize);
createPackAndPublishPipeline(messageConnectable, rawMessageRouter, maxMessageBatchSize);

return messageConnectable;
});
Expand Down Expand Up @@ -272,10 +276,9 @@ private static void subscribeToSendMessage(MessageRouter<EventBatch> eventBatchR
});
}

private static void createPackAndPublishPipeline(Direction direction, Flowable<ConnectivityMessage> messageConnectable,
private static void createPackAndPublishPipeline(Flowable<ConnectivityMessage> messageConnectable,
MessageRouter<RawMessageBatch> rawMessageRouter, int maxMessageBatchSize) {

LOGGER.info("Map group {}", direction);
Flowable<ConnectivityBatch> batchConnectable = messageConnectable
.doOnNext(message -> LOGGER.trace(
"Message before window with sequence {} and direction {}",
Expand All @@ -300,7 +303,7 @@ private static void createPackAndPublishPipeline(Direction direction, Flowable<C
.subscribe(batch -> {
Sergey-Sitnikov-Exactpro marked this conversation as resolved.
Show resolved Hide resolved
try {
RawMessageBatch rawBatch = batch.convertToProtoRawBatch();
rawMessageRouter.sendAll(rawBatch, (direction == Direction.FIRST ? QueueAttribute.FIRST : QueueAttribute.SECOND).toString());
rawMessageRouter.send(rawBatch); //FIXME: Only one pin can be used
} catch (Exception e) {
if (LOGGER.isErrorEnabled()) {
LOGGER.error("Cannot send batch with sequences: {}",
Expand All @@ -309,9 +312,9 @@ private static void createPackAndPublishPipeline(Direction direction, Flowable<C
}
}
});
LOGGER.info("Subscribed to transfer raw batch group {}", direction);
LOGGER.info("Subscribed to transfer raw batch group");

LOGGER.info("Connected to publish batches group {}", direction);
LOGGER.info("Connected to publish batches group");
}

private interface Disposable {
Expand Down
12 changes: 9 additions & 3 deletions src/main/java/com/exactpro/th2/conn/ServiceListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.exactpro.sf.services.ServiceHandlerRoute;
import com.exactpro.th2.common.event.Event;
import com.exactpro.th2.common.event.Event.Status;
import com.exactpro.th2.common.event.EventUtils;
import com.exactpro.th2.conn.events.EventDispatcher;
import com.exactpro.th2.conn.events.EventHolder;
import com.exactpro.th2.conn.utility.EventStoreExtensions;
Expand Down Expand Up @@ -74,19 +75,22 @@ public class ServiceListener implements IServiceListener {

private final Map<Direction, AtomicLong> directionToSequence;
private final String sessionAlias;
private final String sessionGroup;
private final Subscriber<ConnectivityMessage> subscriber;
private final EventDispatcher eventDispatcher;

public ServiceListener(
Map<Direction, AtomicLong> directionToSequence,
String sessionAlias,
Subscriber<ConnectivityMessage> subscriber,
EventDispatcher eventDispatcher
EventDispatcher eventDispatcher,
String sessionGroup
Sergey-Sitnikov-Exactpro marked this conversation as resolved.
Show resolved Hide resolved
) {
this.directionToSequence = requireNonNull(directionToSequence, "Map direction to sequence counter can't be null");
this.sessionAlias = requireNonNull(sessionAlias, "Session alias can't be null");
this.subscriber = requireNonNull(subscriber, "Subscriber can't be null");
this.eventDispatcher = requireNonNull(eventDispatcher, "Event dispatcher can't be null");
this.sessionGroup = sessionGroup;
}

@Override
Expand Down Expand Up @@ -144,11 +148,13 @@ public void onMessage(IServiceProxy service, IMessage message, boolean rejected,
@Override
public void onEvent(IServiceProxy service, ServiceEvent serviceEvent) {
LOGGER.info("Session '{}' emitted service event '{}'", sessionAlias, serviceEvent);
String eventName = "Service [" + serviceEvent.getServiceName().getServiceName() + "] emitted event with status " + serviceEvent.getLevel();
try {
Event event = Event.start().endTimestamp()
.name(serviceEvent.getMessage())
.name(eventName)
.status(serviceEvent.getLevel() == Level.ERROR ? Status.FAILED : Status.PASSED)
.type("Service event")
.bodyData(EventUtils.createMessageBean(serviceEvent.getMessage()))
.description(serviceEvent.getDetails());

eventDispatcher.store(EventHolder.createServiceEvent(event));
Expand All @@ -161,6 +167,6 @@ public void onEvent(IServiceProxy service, ServiceEvent serviceEvent) {
private ConnectivityMessage createConnectivityMessage(List<IMessage> messages, Direction direction, AtomicLong directionSeq) {
long sequence = directionSeq.incrementAndGet();
LOGGER.debug("On message: direction '{}'; sequence '{}'; messages '{}'", direction, sequence, messages);
return new ConnectivityMessage(messages, sessionAlias, direction, sequence);
return new ConnectivityMessage(messages, sessionAlias, direction, sequence, sessionGroup);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public class ConnectivityConfiguration {
@JsonProperty(required = true)
private Map<String, Object> settings;

private String sessionGroup;

public boolean isEnableMessageSendingEvent() {
return enableMessageSendingEvent;
}
Expand Down Expand Up @@ -64,4 +66,8 @@ public String getName() {
public Map<String, Object> getSettings() {
return settings;
}

public String getSessionGroup() {
return sessionGroup;
}
}
Loading