Skip to content

Commit

Permalink
ConsumeConnectionManager & PublishConnectionManager
Browse files Browse the repository at this point in the history
  • Loading branch information
Oleg Smelov committed Jul 10, 2024
1 parent f80b343 commit 0be84cf
Show file tree
Hide file tree
Showing 30 changed files with 990 additions and 808 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# th2 common library (Java) (5.13.1)
# th2 common library (Java) (5.14.0)

## Usage

Expand Down Expand Up @@ -513,7 +513,7 @@ dependencies {

### 5.14.0-dev

+ Separate connections for publisher and consumer
+ Separate connections for publisher and consumer (allows to consume while publishing is blocked by RabbitMQ)
+ Updated cradle `5.4.1-dev`
+ Updated kubernetes-client: `6.13.1`

Expand Down Expand Up @@ -611,7 +611,7 @@ dependencies {

### 5.4.1-dev
#### Fix
+ `SubscriberMonitor` is returned from `MessageRouter.subscribe` methods is proxy object to manage RabbitMQ subscribtion without internal listener
+ `SubscriberMonitor` is returned from `MessageRouter.subscribe` methods is proxy object to manage RabbitMQ subscription without internal listener

### 5.4.0-dev
#### Updated
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@ release_version=5.14.0
kotlin_version=1.8.22
description='th2 common library (Java)'
vcs_url=https://github.com/th2-net/th2-common-j
kapt.include.compile.classpath=false
kapt.include.compile.classpath=true
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2021 Exactpro (Exactpro Systems Limited)
* Copyright 2020-2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,7 +24,7 @@
import com.exactpro.th2.common.grpc.EventBatch;
import com.exactpro.th2.common.message.MessageUtils;
import com.exactpro.th2.common.schema.message.impl.rabbitmq.AbstractRabbitSender;
import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager;
import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.PublishConnectionManager;

import static com.exactpro.th2.common.metrics.CommonMetrics.TH2_PIN_LABEL;
import static com.exactpro.th2.common.schema.event.EventBatchRouter.EVENT_TYPE;
Expand All @@ -39,13 +39,13 @@ public class EventBatchSender extends AbstractRabbitSender<EventBatch> {
.register();

public EventBatchSender(
@NotNull ConnectionManager connectionManager,
@NotNull PublishConnectionManager publishConnectionManager,
@NotNull String exchangeName,
@NotNull String routingKey,
@NotNull String th2Pin,
@NotNull String bookName
) {
super(connectionManager, exchangeName, routingKey, th2Pin, EVENT_TYPE, bookName);
super(publishConnectionManager, exchangeName, routingKey, th2Pin, EVENT_TYPE, bookName);
}

@Override
Expand Down Expand Up @@ -82,4 +82,4 @@ protected String toShortTraceString(EventBatch value) {
protected String toShortDebugString(EventBatch value) {
return "EventBatch: parent_event_id = " + value.getParentEventId().getId();
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2023 Exactpro (Exactpro Systems Limited)
* Copyright 2020-2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,7 +21,7 @@
import com.exactpro.th2.common.schema.message.DeliveryMetadata;
import com.exactpro.th2.common.schema.message.ManualAckDeliveryCallback.Confirmation;
import com.exactpro.th2.common.schema.message.impl.rabbitmq.AbstractRabbitSubscriber;
import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager;
import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConsumeConnectionManager;
import com.rabbitmq.client.Delivery;
import io.prometheus.client.Counter;
import org.jetbrains.annotations.NotNull;
Expand All @@ -41,12 +41,12 @@ public class EventBatchSubscriber extends AbstractRabbitSubscriber<EventBatch> {
.register();

public EventBatchSubscriber(
@NotNull ConnectionManager connectionManager,
@NotNull ConsumeConnectionManager consumeConnectionManager,
@NotNull String queue,
@NotNull String th2Pin,
@NotNull ConfirmationListener<EventBatch> listener
) {
super(connectionManager, queue, th2Pin, EVENT_TYPE, listener);
super(consumeConnectionManager, queue, th2Pin, EVENT_TYPE, listener);
}

@Override
Expand Down Expand Up @@ -78,4 +78,4 @@ protected void handle(DeliveryMetadata deliveryMetadata, Delivery delivery, Even
.inc(value.getEventsCount());
super.handle(deliveryMetadata, delivery, value, confirmation);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/*
* Copyright 2020-2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
Expand Down Expand Up @@ -52,7 +53,8 @@
import com.exactpro.th2.common.schema.message.impl.monitor.LogMessageRouterMonitor;
import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.ConnectionManagerConfiguration;
import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.RabbitMQConfiguration;
import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager;
import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.PublishConnectionManager;
import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConsumeConnectionManager;
import com.exactpro.th2.common.schema.message.impl.rabbitmq.custom.MessageConverter;
import com.exactpro.th2.common.schema.message.impl.rabbitmq.custom.RabbitCustomRouter;
import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.GroupBatch;
Expand Down Expand Up @@ -137,10 +139,10 @@ public abstract class AbstractCommonFactory implements AutoCloseable {
private final Class<? extends MessageRouter<EventBatch>> eventBatchRouterClass;
private final Class<? extends GrpcRouter> grpcRouterClass;
private final Class<? extends NotificationRouter<EventBatch>> notificationEventBatchRouterClass;
private final LazyProvider<ConnectionManager> rabbitMqPublishConnectionManager =
lazyAutocloseable("publish-connection-manager", this::createRabbitMQConnectionManager);
private final LazyProvider<ConnectionManager> rabbitMqConsumeConnectionManager =
lazyAutocloseable("consume-connection-manager", this::createRabbitMQConnectionManager);
private final LazyProvider<PublishConnectionManager> rabbitMqPublishConnectionManager =
lazyAutocloseable("publish-connection-manager", this::createRabbitMQPublishConnectionManager);
private final LazyProvider<ConsumeConnectionManager> rabbitMqConsumeConnectionManager =
lazyAutocloseable("consume-connection-manager", this::createRabbitMQConsumeConnectionManager);
private final LazyProvider<MessageRouterContext> routerContext =
lazy("router-context", this::createMessageRouterContext);
private final LazyProvider<MessageRouter<MessageBatch>> messageRouterParsedBatch =
Expand Down Expand Up @@ -662,15 +664,19 @@ protected PrometheusConfiguration loadPrometheusConfiguration() {
return getConfigurationOrLoad(PrometheusConfiguration.class, true);
}

protected ConnectionManager createRabbitMQConnectionManager() {
return new ConnectionManager(getBoxConfiguration().getBoxName(), getRabbitMqConfiguration(), getConnectionManagerConfiguration());
protected PublishConnectionManager createRabbitMQPublishConnectionManager() {
return new PublishConnectionManager(getBoxConfiguration().getBoxName(), getRabbitMqConfiguration(), getConnectionManagerConfiguration());
}

protected ConsumeConnectionManager createRabbitMQConsumeConnectionManager() {
return new ConsumeConnectionManager(getBoxConfiguration().getBoxName(), getRabbitMqConfiguration(), getConnectionManagerConfiguration());
}

protected ConnectionManager getRabbitMqPublishConnectionManager() {
protected PublishConnectionManager getRabbitMqPublishConnectionManager() {
return rabbitMqPublishConnectionManager.get();
}

protected ConnectionManager getRabbitMqConsumeConnectionManager() {
protected ConsumeConnectionManager getRabbitMqConsumeConnectionManager() {
return rabbitMqConsumeConnectionManager.get();
}

Expand Down Expand Up @@ -756,4 +762,4 @@ protected static void configureLogger(Path... paths) {
log4jConfigUtils.configure(listPath, LOG4J2_PROPERTIES_NAME);
ExactproMetaInf.logging();
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2022 Exactpro (Exactpro Systems Limited)
* Copyright 2020-2024 Exactpro (Exactpro Systems Limited)
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
Expand All @@ -16,32 +16,14 @@
package com.exactpro.th2.common.schema.message;

import com.exactpro.th2.common.grpc.MessageGroupBatch;
import com.exactpro.th2.common.schema.box.configuration.BoxConfiguration;
import com.exactpro.th2.common.schema.message.configuration.MessageRouterConfiguration;
import com.exactpro.th2.common.schema.message.impl.context.DefaultMessageRouterContext;
import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager;
import org.jetbrains.annotations.NotNull;

import java.io.IOException;
import java.util.Objects;

/**
* Interface for send and receive RabbitMQ messages
* @param <T> messages for send and receive
*/
public interface MessageRouter<T> extends AutoCloseable {

/**
* Initialization message router
* @param configuration message router configuration
*/
@Deprecated(since = "3.2.2", forRemoval = true)
default void init(@NotNull ConnectionManager connectionManager, @NotNull MessageRouterConfiguration configuration) {
Objects.requireNonNull(connectionManager, "Connection owner can not be null");
Objects.requireNonNull(configuration, "Configuration cannot be null");
init(new DefaultMessageRouterContext(connectionManager, connectionManager, MessageRouterMonitor.DEFAULT_MONITOR, configuration, new BoxConfiguration()));
}

default void init(@NotNull MessageRouterContext context, @NotNull MessageRouter<MessageGroupBatch> groupBatchRouter) {
init(context);
}
Expand Down Expand Up @@ -137,5 +119,4 @@ default void send(T message) throws IOException {
* @throws IOException if can not send message
*/
void sendAll(T message, String... queueAttr) throws IOException;

}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2021 Exactpro (Exactpro Systems Limited)
* Copyright 2020-2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -25,6 +25,7 @@

import com.exactpro.th2.common.schema.message.MessageSender;
import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager;
import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.PublishConnectionManager;

import static com.exactpro.th2.common.metrics.CommonMetrics.EXCHANGE_LABEL;
import static com.exactpro.th2.common.metrics.CommonMetrics.ROUTING_KEY_LABEL;
Expand Down Expand Up @@ -54,18 +55,18 @@ public abstract class AbstractRabbitSender<T> implements MessageSender<T> {
protected final String bookName;
private final AtomicReference<String> routingKey = new AtomicReference<>();
private final AtomicReference<String> exchangeName = new AtomicReference<>();
private final AtomicReference<ConnectionManager> connectionManager = new AtomicReference<>();
private final AtomicReference<PublishConnectionManager> publishConnectionManager = new AtomicReference<>();
private final String th2Type;

public AbstractRabbitSender(
@NotNull ConnectionManager connectionManager,
@NotNull PublishConnectionManager publishConnectionManager,
@NotNull String exchangeName,
@NotNull String routingKey,
@NotNull String th2Pin,
@NotNull String th2Type,
@NotNull String bookName
) {
this.connectionManager.set(requireNonNull(connectionManager, "Connection can not be null"));
this.publishConnectionManager.set(requireNonNull(publishConnectionManager, "Connection manager can not be null"));
this.exchangeName.set(requireNonNull(exchangeName, "Exchange name can not be null"));
this.routingKey.set(requireNonNull(routingKey, "Routing key can not be null"));
this.th2Pin = requireNonNull(th2Pin, "TH2 pin can not be null");
Expand All @@ -84,15 +85,15 @@ public void send(T value) throws IOException {
requireNonNull(value, "Value for send can not be null");

try {
ConnectionManager connection = this.connectionManager.get();
PublishConnectionManager connectionManager = this.publishConnectionManager.get();
byte[] bytes = valueToBytes(value);
MESSAGE_SIZE_PUBLISH_BYTES
.labels(th2Pin, th2Type, exchangeName.get(), routingKey.get())
.inc(bytes.length);
MESSAGE_PUBLISH_TOTAL
.labels(th2Pin, th2Type, exchangeName.get(), routingKey.get())
.inc();
connection.basicPublish(exchangeName.get(), routingKey.get(), null, bytes);
connectionManager.basicPublish(exchangeName.get(), routingKey.get(), null, bytes);

if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Message sent to exchangeName='{}', routing key='{}': '{}'",
Expand All @@ -111,4 +112,4 @@ public void send(T value) throws IOException {
protected abstract String toShortDebugString(T value);

protected abstract byte[] valueToBytes(T value);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/*
* Copyright 2020-2023 Exactpro (Exactpro Systems Limited)
* Copyright 2020-2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
Expand All @@ -21,7 +22,7 @@
import com.exactpro.th2.common.schema.message.ManualAckDeliveryCallback.Confirmation;
import com.exactpro.th2.common.schema.message.MessageSubscriber;
import com.exactpro.th2.common.schema.message.SubscriberMonitor;
import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager;
import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConsumeConnectionManager;
import com.google.common.base.Suppliers;
import com.google.common.io.BaseEncoding;
import com.rabbitmq.client.Delivery;
Expand Down Expand Up @@ -69,7 +70,7 @@ public abstract class AbstractRabbitSubscriber<T> implements MessageSubscriber {
private final boolean manualConfirmation;
private final ConfirmationListener<T> listener;
private final String queue;
private final ConnectionManager connectionManager;
private final ConsumeConnectionManager consumeConnectionManager;
private final AtomicReference<Supplier<SubscriberMonitor>> consumerMonitor = new AtomicReference<>(emptySupplier());
private final AtomicBoolean isAlive = new AtomicBoolean(true);
private final String th2Type;
Expand All @@ -78,13 +79,13 @@ public abstract class AbstractRabbitSubscriber<T> implements MessageSubscriber {
protected final String th2Pin;

public AbstractRabbitSubscriber(
@NotNull ConnectionManager connectionManager,
@NotNull ConsumeConnectionManager consumeConnectionManager,
@NotNull String queue,
@NotNull String th2Pin,
@NotNull String th2Type,
@NotNull ConfirmationListener<T> listener
) {
this.connectionManager = requireNonNull(connectionManager, "Connection can not be null");
this.consumeConnectionManager = requireNonNull(consumeConnectionManager, "Connection can not be null");
this.queue = requireNonNull(queue, "Queue can not be null");
this.th2Pin = requireNonNull(th2Pin, "th2 pin can not be null");
this.th2Type = requireNonNull(th2Type, "th2 type can not be null");
Expand Down Expand Up @@ -186,7 +187,7 @@ private void resubscribe() {
private SubscriberMonitor basicConsume() {
try {
LOGGER.info("Start listening queue name='{}', th2 pin='{}'", queue, th2Pin);
return connectionManager.basicConsume(queue, this::handle, this::canceled);
return consumeConnectionManager.basicConsume(queue, this::handle, this::canceled);
} catch (IOException e) {
throw new IllegalStateException("Can not subscribe to queue = " + queue, e);
} catch (InterruptedException e) {
Expand Down
Loading

0 comments on commit 0be84cf

Please sign in to comment.