Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to limit outgoing message rate #142

Open
wants to merge 10 commits into
base: dev-version-3
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 7 additions & 36 deletions .github/workflows/dev-docker-publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,39 +10,10 @@ on:
- README.md

jobs:
build:
runs-on: ubuntu-20.04
steps:
- uses: actions/checkout@v2
# Prepare custom build version
- name: Get branch name
id: branch
run: echo ::set-output name=branch_name::${GITHUB_REF#refs/*/}
- name: Get release_version
id: ver
uses: christian-draeger/[email protected]
with:
path: gradle.properties
property: release_version
- name: Build custom release version
id: release_ver
run: echo ::set-output name=value::"${{ steps.ver.outputs.value }}-${{ steps.branch.outputs.branch_name }}-${{ github.run_id }}"
- name: Show custom release version
run: echo ${{ steps.release_ver.outputs.value }}
# Build and publish image
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v1
- uses: docker/login-action@v1
with:
registry: ghcr.io
username: ${{ github.repository_owner }}
password: ${{ secrets.CR_PAT }}
- run: echo "::set-output name=REPOSITORY_NAME::$(echo '${{ github.repository }}' | awk -F '/' '{print $2}')"
id: meta
- name: Build and push
id: docker_build
uses: docker/build-push-action@v2
with:
push: true
tags: ghcr.io/${{ github.repository }}:${{ steps.release_ver.outputs.value }}
labels: com.exactpro.th2.${{ steps.meta.outputs.REPOSITORY_NAME }}=${{ steps.ver.outputs.value }}
build-job:
uses: th2-net/.github/.github/workflows/compound-java-dev.yml@main
with:
build-target: 'Docker'
docker-username: ${{ github.actor }}
secrets:
docker-password: ${{ secrets.GITHUB_TOKEN }}
40 changes: 12 additions & 28 deletions .github/workflows/docker-publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,17 @@ name: Build and publish Docker distributions to Github Container Registry ghcr.i
on:
push:
branches:
- master
- version-*
- master
- version-*
paths:
- gradle.properties
# - package_info.json

jobs:
build:
runs-on: ubuntu-20.04
steps:
- uses: actions/checkout@v2
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v1
- uses: docker/login-action@v1
with:
registry: ghcr.io
username: ${{ github.repository_owner }}
password: ${{ secrets.CR_PAT }}
- run: echo "::set-output name=REPOSITORY_NAME::$(echo '${{ github.repository }}' | awk -F '/' '{print $2}')"
id: meta
- name: Read version from gradle.properties
id: read_property
uses: christian-draeger/[email protected]
with:
path: ./gradle.properties
property: release_version
- name: Build and push
id: docker_build
uses: docker/build-push-action@v2
with:
push: true
tags: ghcr.io/${{ github.repository }}:${{ steps.read_property.outputs.value }}
labels: com.exactpro.th2.${{ steps.meta.outputs.REPOSITORY_NAME }}=${{ steps.read_property.outputs.value }}
build-job:
uses: th2-net/.github/.github/workflows/compound-java.yml@main
with:
build-target: 'Docker'
docker-username: ${{ github.actor }}
secrets:
docker-password: ${{ secrets.GITHUB_TOKEN }}
2 changes: 1 addition & 1 deletion .gitlab-ci.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
variables:
APP_NAME: "th2-conn"
APP_NAME: "th2-conn-sailfish"

include:
- project: "vivarium/th2/pipelines"
Expand Down
20 changes: 14 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Connect (3.11.0)
# Connect (3.12.0)

The "Connect" component is responsible for the communication with a target system.
This component implements the logic of the interaction protocol, receiving and sending messages from and to the system, respectively.
Expand Down Expand Up @@ -28,6 +28,7 @@ Parameters:
+ name - the service name that will be displayed in the events inside the report;
+ settings - the parameters that will be transformed to the actual service's settings specified in the **services.xml** file.
+ maxMessageBatchSize - the limitation for message batch size which connect sends to the first and to the second publish pins with. The default value is set to 100.
+ maxMessageRate - max outgoing message rate in messages per second
+ maxMessageFlushTime - defines maximum time between outgoing message batches in milliseconds. The default value is set to 1000.
+ enableMessageSendingEvent - if this option is set to `true`, connect sends a separate event for every message sent which incomes from the pin with the send attribute. The default value is set to true

Expand Down Expand Up @@ -103,35 +104,42 @@ spec:
type: "th2_service:Your_Service_Type"
name: "your_service"
maxMessageBatchSize: 100
maxMessageRate: 100000
maxMessageFlushTime: 1000
enableMessageSendingEvent: true
settings:
param1: "value1"
pins:
- name: in_raw
connection-type: mq
attributes: ["first", "raw", "publish", "store"]
attributes: [ "first", "raw", "publish", "store" ]
- name: out_raw
connection-type: mq
attributes: ["second", "raw", "publish", "store"]
attributes: [ "second", "raw", "publish", "store" ]
- name: to_send
connection-type: mq
attributes: ["send", "raw", "subscribe"]
attributes: [ "send", "raw", "subscribe" ]
```

## Release notes

### 3.12.0
+ Outgoing message rate now can be limited via `maxMessageRate` setting

### 3.11.0

+ Updated `sailfish-core` version from `3.2.1741` to `3.3.54`
+ Updated `common` from `3.33.0` to `3.44.0`
+ Updated `kotlin` form `1.5.30` to `1.6.21`
+ Renamed project to `conn-sailfish`
+ Added `maxMessageFlushTime` option
+ Dependencies with vulnerabilities were updated:
+ Sailfish updated from `3.2.1741` to `3.3.13`

### 3.10.2

+ Events are made more convenient. Added event names and error logs. Error message moved from the name to the body of the event.
+ Use temporal directory for last layer in sailfish's workspace


### 3.10.1

+ Update `sailfish-core` version from `3.2.1674` to `3.2.1741`
Expand Down
56 changes: 36 additions & 20 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,45 @@ plugins {
id 'com.palantir.docker' version '0.25.0'
id 'org.jetbrains.kotlin.jvm' version '1.6.21'
id 'application'
id "org.owasp.dependencycheck" version "8.2.1"
}

dependencyCheck {
formats=['SARIF', 'JSON', 'HTML']
failBuildOnCVSS=5
analyzers {
assemblyEnabled = false
nugetconfEnabled = false
nodeEnabled = false
}
}

ext {
sharedDir = file("${project.rootDir}/shared")
sailfishVersion = '3.3.13'
sailfishVersion = '3.3.122'
}

ext.excludeSailfish = { rcd ->
rcd.excludeModule("com.exactpro.sf", "sailfish-core")
rcd.excludeModule("com.exactpro.sf", "sailfish-common")
rcd.excludeModule("com.exactpro.sf", "sailfish-rest-api-client")
rcd.excludeModule("com.exactpro.sf", "service-http")
}

group = 'com.exactpro.th2'
version = release_version

repositories {
maven {
name 'MavenLocal'
url sharedDir
name 'Sonatype_snapshots'
url 'https://s01.oss.sonatype.org/content/repositories/snapshots/'
content {
excludeSailfish(it)
}
}

// ignoreGradleMetadataRedirection is used for sonatype because
// Sailfish dependencies have constrains that interfere with our BOM
// so we exclude Gradle metadata for this repositories

// so we exclude Gradle metadata for this repositories.
// We've checked these versions - they are compatible and safe to use
maven {
name 'Sonatype_snapshots'
url 'https://s01.oss.sonatype.org/content/repositories/snapshots/'
Expand All @@ -34,18 +53,20 @@ repositories {
maven {
name 'Sonatype_releases'
url 'https://s01.oss.sonatype.org/content/repositories/releases/'
metadataSources {
mavenPom()
artifact()
ignoreGradleMetadataRedirection()
content {
excludeSailfish(it)
}
}
mavenCentral {
maven {
name 'Sonatype_releases'
url 'https://s01.oss.sonatype.org/content/repositories/releases/'
metadataSources {
mavenPom()
artifact()
ignoreGradleMetadataRedirection()
}
}
mavenCentral()
mavenLocal()

configurations.all {
Expand Down Expand Up @@ -80,22 +101,17 @@ compileTestKotlin {
}

dependencies {
api platform('com.exactpro.th2:bom:4.0.1')
api platform('com.exactpro.th2:bom:4.1.0')

implementation 'com.exactpro.th2:common:3.42.0-dev-3174619150-SNAPSHOT'
implementation 'com.exactpro.th2:sailfish-utils:3.13.0-dev-3204703314-SNAPSHOT'

implementation "org.slf4j:slf4j-api"
implementation 'com.exactpro.th2:common:3.44.0'
implementation 'com.exactpro.th2:sailfish-utils:3.15.0'

implementation group: 'io.reactivex.rxjava3', name: 'rxjava', version: '3.0.4'

implementation 'io.prometheus:simpleclient'

implementation("com.exactpro.sf:sailfish-core:${sailfishVersion}")

testImplementation('org.apache.logging.log4j:log4j-slf4j-impl') {
because('logging in testing')
}
testImplementation 'org.junit.jupiter:junit-jupiter:5.9.0'
}

Expand Down
4 changes: 4 additions & 0 deletions gradle.properties
Original file line number Diff line number Diff line change
@@ -1 +1,5 @@
<<<<<<< HEAD

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolve conflicts

release_version=3.12.0
=======
release_version=3.11.0
>>>>>>> dev
2 changes: 1 addition & 1 deletion settings.gradle
Original file line number Diff line number Diff line change
@@ -1 +1 @@
rootProject.name = 'conn'
rootProject.name = 'conn-sailfish'
13 changes: 2 additions & 11 deletions src/main/java/com/exactpro/th2/conn/ConnectivityMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.exactpro.th2.conn;

import static com.exactpro.sf.common.messages.MetadataExtensions.getMessageProperties;
import static com.exactpro.th2.common.message.MessageUtils.toTimestamp;
import static com.google.protobuf.TextFormat.shortDebugString;
import static java.util.Objects.requireNonNull;
import static org.apache.commons.lang3.ObjectUtils.defaultIfNull;
Expand Down Expand Up @@ -46,8 +47,6 @@

public class ConnectivityMessage {
private static final Logger LOGGER = LoggerFactory.getLogger(ConnectivityMessage.class);
public static final long MILLISECONDS_IN_SECOND = 1_000L;
public static final long NANOSECONDS_IN_MILLISECOND = 1_000_000L;

private final List<IMessage> sailfishMessages;

Expand All @@ -62,7 +61,7 @@ public ConnectivityMessage(List<IMessage> sailfishMessages, String sessionAlias,
}
messageID = createMessageID(createConnectionID(requireNonNull(sessionAlias, "Session alias can't be null")),
requireNonNull(direction, "Direction can't be null"), sequence);
timestamp = createTimestamp(sailfishMessages.get(0).getMetaData().getMsgTimestamp().getTime());
timestamp = toTimestamp(sailfishMessages.get(0).getMetaData().getPreciseMsgTimestamp());
}

public String getSessionAlias() {
Expand Down Expand Up @@ -158,12 +157,4 @@ private static RawMessageMetadata.Builder createRawMessageMetadataBuilder(Messag
.setId(messageID)
.setTimestamp(timestamp);
}

// TODO: Required nanosecond accuracy
private static Timestamp createTimestamp(long milliseconds) {
return Timestamp.newBuilder()
.setSeconds(milliseconds / MILLISECONDS_IN_SECOND)
.setNanos((int) (milliseconds % MILLISECONDS_IN_SECOND * NANOSECONDS_IN_MILLISECOND))
.build();
}
}
29 changes: 19 additions & 10 deletions src/main/java/com/exactpro/th2/conn/MessageSender.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,6 @@
*/
package com.exactpro.th2.conn;

import static java.util.Objects.requireNonNull;

import java.io.IOException;
import java.util.Base64;
import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.exactpro.sf.common.messages.IMetadata;
import com.exactpro.sf.common.messages.MetadataExtensions;
import com.exactpro.sf.common.messages.impl.Metadata;
Expand All @@ -40,8 +31,15 @@
import com.exactpro.th2.conn.events.EventHolder;
import com.exactpro.th2.conn.utility.EventStoreExtensions;
import com.exactpro.th2.conn.utility.SailfishMetadataExtensions;

import io.reactivex.rxjava3.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Base64;
import java.util.Map;

import static java.util.Objects.requireNonNull;

public class MessageSender {
private static final String SEND_ATTRIBUTE = "send";
Expand All @@ -50,16 +48,26 @@ public class MessageSender {
private final MessageRouter<RawMessageBatch> router;
private final EventDispatcher eventDispatcher;
private final EventID untrackedMessagesRoot;
private final RateLimiter rateLimiter;
private volatile SubscriberMonitor subscriberMonitor;

public MessageSender(IServiceProxy serviceProxy,
MessageRouter<RawMessageBatch> router,
EventDispatcher eventDispatcher,
EventID untrackedMessagesRoot) {
this(serviceProxy, router, eventDispatcher, untrackedMessagesRoot, Integer.MAX_VALUE);
}

public MessageSender(IServiceProxy serviceProxy,
MessageRouter<RawMessageBatch> router,
EventDispatcher eventDispatcher,
EventID untrackedMessagesRoot,
int maxMessageRate) {
this.serviceProxy = requireNonNull(serviceProxy, "Service proxy can't be null");
this.router = requireNonNull(router, "Message router can't be null");
this.eventDispatcher = requireNonNull(eventDispatcher, "'Event dispatcher' can't be null");
this.untrackedMessagesRoot = requireNonNull(untrackedMessagesRoot, "'untrackedMessagesRoot' can't be null");
this.rateLimiter = new RateLimiter(maxMessageRate);
}

public void start() {
Expand All @@ -86,6 +94,7 @@ public void stop() throws IOException {
private void handle(String consumerTag, RawMessageBatch messageBatch) {
for (RawMessage protoMessage : messageBatch.getMessagesList()) {
try {
rateLimiter.acquire();
sendMessage(protoMessage);
} catch (InterruptedException e) {
logger.error("Send message operation interrupted. Consumer tag {}", consumerTag, e);
Expand Down
Loading