Skip to content

Commit

Permalink
shared executor services
Browse files Browse the repository at this point in the history
  • Loading branch information
Oleg Smelov committed Jul 11, 2024
1 parent 8e20f14 commit 721b79b
Show file tree
Hide file tree
Showing 10 changed files with 146 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.fasterxml.jackson.module.kotlin.KotlinFeature;
import com.fasterxml.jackson.module.kotlin.KotlinModule;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.prometheus.client.exporter.HTTPServer;
import io.prometheus.client.hotspot.DefaultExports;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -87,6 +88,10 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import static com.exactpro.cradle.CradleStorage.DEFAULT_MAX_MESSAGE_BATCH_SIZE;
import static com.exactpro.cradle.CradleStorage.DEFAULT_MAX_TEST_EVENT_BATCH_SIZE;
Expand All @@ -95,6 +100,7 @@
import static com.exactpro.cradle.cassandra.CassandraStorageSettings.DEFAULT_RESULT_PAGE_SIZE;
import static com.exactpro.th2.common.schema.factory.LazyProvider.lazy;
import static com.exactpro.th2.common.schema.factory.LazyProvider.lazyAutocloseable;
import static com.exactpro.th2.common.schema.factory.LazyProvider.lazyCloseable;
import static java.util.Objects.requireNonNull;
import static org.apache.commons.lang3.StringUtils.defaultIfBlank;

Expand All @@ -112,6 +118,8 @@ public abstract class AbstractCommonFactory implements AutoCloseable {
protected static final Path LOG4J_PROPERTIES_DEFAULT_PATH_OLD = Path.of("/home/etc");
protected static final Path LOG4J_PROPERTIES_DEFAULT_PATH = Path.of("/var/th2/config");
protected static final String LOG4J2_PROPERTIES_NAME = "log4j2.properties";
private static final String SHARED_EXECUTOR_NAME = "rabbit-shared";
private static final String CHANNEL_CHECKER_EXECUTOR_NAME = "channel-checker-executor";

public static final ObjectMapper MAPPER = new ObjectMapper();

Expand Down Expand Up @@ -139,6 +147,27 @@ public abstract class AbstractCommonFactory implements AutoCloseable {
private final Class<? extends MessageRouter<EventBatch>> eventBatchRouterClass;
private final Class<? extends GrpcRouter> grpcRouterClass;
private final Class<? extends NotificationRouter<EventBatch>> notificationEventBatchRouterClass;
private final LazyProvider<ExecutorService> sharedExecutor = lazyCloseable(SHARED_EXECUTOR_NAME,
() -> Executors.newFixedThreadPool(
getConnectionManagerConfiguration().getWorkingThreads(),
new ThreadFactoryBuilder().setNameFormat("rabbitmq-shared-pool-%d").build()
),
executor -> shutdownExecutor(
executor,
getConnectionManagerConfiguration().getConnectionCloseTimeout(),
SHARED_EXECUTOR_NAME
)
);
private final LazyProvider<ScheduledExecutorService> channelChecker = lazyCloseable(CHANNEL_CHECKER_EXECUTOR_NAME,
() -> Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("channel-checker-%d").build()
),
executor -> shutdownExecutor(
executor,
getConnectionManagerConfiguration().getConnectionCloseTimeout(),
CHANNEL_CHECKER_EXECUTOR_NAME
)
);
private final LazyProvider<PublishConnectionManager> rabbitMqPublishConnectionManager =
lazyAutocloseable("publish-connection-manager", this::createRabbitMQPublishConnectionManager);
private final LazyProvider<ConsumeConnectionManager> rabbitMqConsumeConnectionManager =
Expand Down Expand Up @@ -665,11 +694,11 @@ protected PrometheusConfiguration loadPrometheusConfiguration() {
}

protected PublishConnectionManager createRabbitMQPublishConnectionManager() {
return new PublishConnectionManager(getBoxConfiguration().getBoxName(), getRabbitMqConfiguration(), getConnectionManagerConfiguration());
return new PublishConnectionManager(getBoxConfiguration().getBoxName(), getRabbitMqConfiguration(), getConnectionManagerConfiguration(), sharedExecutor.get(), channelChecker.get());
}

protected ConsumeConnectionManager createRabbitMQConsumeConnectionManager() {
return new ConsumeConnectionManager(getBoxConfiguration().getBoxName(), getRabbitMqConfiguration(), getConnectionManagerConfiguration());
return new ConsumeConnectionManager(getBoxConfiguration().getBoxName(), getRabbitMqConfiguration(), getConnectionManagerConfiguration(), sharedExecutor.get(), channelChecker.get());
}

protected PublishConnectionManager getRabbitMqPublishConnectionManager() {
Expand Down Expand Up @@ -724,6 +753,18 @@ public void close() {
LOGGER.error("Failed to close RabbitMQ consume connection", e);
}

try {
sharedExecutor.close();
} catch (Exception e) {
LOGGER.error("Failed to close shared executor service", e);
}

try {
channelChecker.close();
} catch (Exception e) {
LOGGER.error("Failed to close channel checker executor service", e);
}

try {
grpcRouter.close();
} catch (Exception e) {
Expand Down Expand Up @@ -753,6 +794,19 @@ public void close() {
LOGGER.info("Common factory has been closed");
}

private void shutdownExecutor(ExecutorService executor, int closeTimeout, String name) {
executor.shutdown();
try {
if (!executor.awaitTermination(closeTimeout, TimeUnit.MILLISECONDS)) {
LOGGER.error("Executor {} is not terminated during {} millis", name, closeTimeout);
List<Runnable> runnables = executor.shutdownNow();
LOGGER.error("{} task(s) was(were) not finished in executor {}", runnables.size(), name);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

protected static void configureLogger(Path... paths) {
List<Path> listPath = new ArrayList<>();
listPath.add(LOG4J_PROPERTIES_DEFAULT_PATH);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.ConnectionManagerConfiguration;
import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.RabbitMQConfiguration;
import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.RetryingDelay;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.rabbitmq.client.BlockedListener;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
Expand Down Expand Up @@ -60,7 +59,6 @@
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand All @@ -82,11 +80,7 @@ public abstract class ConnectionManager implements AutoCloseable {
protected final Map<PinId, ChannelHolder> channelsByPin = new ConcurrentHashMap<>();
private final AtomicReference<State> connectionState = new AtomicReference<>(State.OPEN);
private final ConnectionManagerConfiguration configuration;
private final ExecutorService sharedExecutor;
protected final ScheduledExecutorService channelChecker = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("channel-checker-%d").build()
);

protected final ScheduledExecutorService channelChecker;
protected final HealthMetrics metrics = new HealthMetrics(this);

private final RecoveryListener recoveryListener = new RecoveryListener() {
Expand All @@ -109,9 +103,17 @@ public ConnectionManagerConfiguration getConfiguration() {

private enum State { OPEN, CLOSING, CLOSED }

public ConnectionManager(@NotNull String connectionName, @NotNull RabbitMQConfiguration rabbitMQConfiguration, @NotNull ConnectionManagerConfiguration connectionManagerConfiguration) {
public ConnectionManager(
@NotNull String connectionName,
@NotNull RabbitMQConfiguration rabbitMQConfiguration,
@NotNull ConnectionManagerConfiguration connectionManagerConfiguration,
@NotNull ExecutorService sharedExecutor,
@NotNull ScheduledExecutorService channelChecker
) {
Objects.requireNonNull(rabbitMQConfiguration, "RabbitMQ configuration cannot be null");
this.configuration = Objects.requireNonNull(connectionManagerConfiguration, "Connection manager configuration can not be null");
Objects.requireNonNull(sharedExecutor, "Shared executor can not be null");
this.channelChecker = Objects.requireNonNull(channelChecker, "channelChecker executor can not be null");

var factory = new ConnectionFactory();
var virtualHost = rabbitMQConfiguration.getVHost();
Expand Down Expand Up @@ -207,9 +209,6 @@ private void turnOffReadiness(Throwable exception) {
return recoveryDelay;
});

sharedExecutor = Executors.newFixedThreadPool(configuration.getWorkingThreads(), new ThreadFactoryBuilder()
.setNameFormat("rabbitmq-shared-pool-%d")
.build());
factory.setSharedExecutor(sharedExecutor);

try {
Expand Down Expand Up @@ -359,9 +358,6 @@ public void close() {
LOGGER.error("Failed to close connection", e);
}
}

shutdownExecutor(sharedExecutor, closeTimeout, "rabbit-shared");
shutdownExecutor(channelChecker, closeTimeout, "channel-checker");
}

boolean isReady() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import com.exactpro.th2.common.schema.exception.CommonFactoryException
import com.exactpro.th2.common.schema.factory.LazyProvider.ThrowableConsumer
import java.util.concurrent.Callable
import java.util.concurrent.atomic.AtomicReference
import java.util.function.Consumer

/**
* This is class is for internal use only. Please, keep that in mind when using it in another module
Expand Down Expand Up @@ -119,8 +120,12 @@ class LazyProvider<T : Any?> private constructor(
fun <T> lazy(name: String, supplier: Callable<T>): LazyProvider<T> =
LazyProvider(name, supplier, NONE)

@JvmStatic
fun <T> lazyCloseable(name: String, supplier: Callable<T>, onClose: Consumer<T>): LazyProvider<T> =
LazyProvider(name, supplier, onClose::accept)

@JvmStatic
fun <T : AutoCloseable> lazyAutocloseable(name: String, supplier: Callable<T>): LazyProvider<T> =
LazyProvider(name, supplier, AutoCloseable::close)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,19 @@ import com.rabbitmq.client.ShutdownSignalException

import mu.KotlinLogging
import java.io.IOException
import java.util.concurrent.ExecutorService
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
import java.util.concurrent.atomic.AtomicInteger

class ConsumeConnectionManager(
connectionName: String,
rabbitMQConfiguration: RabbitMQConfiguration,
connectionManagerConfiguration: ConnectionManagerConfiguration
) : ConnectionManager(connectionName, rabbitMQConfiguration, connectionManagerConfiguration) {
connectionManagerConfiguration: ConnectionManagerConfiguration,
sharedExecutor: ExecutorService,
channelChecker: ScheduledExecutorService
) : ConnectionManager(connectionName, rabbitMQConfiguration, connectionManagerConfiguration, sharedExecutor, channelChecker) {
private val subscriberName = if (connectionManagerConfiguration.subscriberName.isNullOrBlank()) {
(DEFAULT_SUBSCRIBER_NAME_PREFIX + System.currentTimeMillis()).also {
LOGGER.info { "Subscribers will use the default name: $it" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,16 @@ import com.rabbitmq.client.BlockedListener
import com.rabbitmq.client.Channel
import mu.KotlinLogging
import java.io.IOException
import java.util.concurrent.ExecutorService
import java.util.concurrent.ScheduledExecutorService

class PublishConnectionManager(
connectionName: String,
rabbitMQConfiguration: RabbitMQConfiguration,
connectionManagerConfiguration: ConnectionManagerConfiguration
) : ConnectionManager(connectionName, rabbitMQConfiguration, connectionManagerConfiguration) {
connectionManagerConfiguration: ConnectionManagerConfiguration,
sharedExecutor: ExecutorService,
channelChecker: ScheduledExecutorService
) : ConnectionManager(connectionName, rabbitMQConfiguration, connectionManagerConfiguration, sharedExecutor, channelChecker) {
@Volatile var isPublishingBlocked = false

@Throws(IOException::class, InterruptedException::class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,24 @@ import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.PublishCo
import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConsumeConnectionManager
import com.exactpro.th2.common.schema.message.impl.rabbitmq.custom.RabbitCustomRouter
import com.exactpro.th2.common.util.getRabbitMQConfiguration
import com.google.common.util.concurrent.ThreadFactoryBuilder
import com.rabbitmq.client.BuiltinExchangeType
import mu.KotlinLogging
import org.junit.jupiter.api.Assertions.assertNull
import org.mockito.kotlin.mock
import org.testcontainers.containers.RabbitMQContainer
import java.time.Duration
import java.util.concurrent.ArrayBlockingQueue
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertNotNull

@IntegrationTest
class AbstractRabbitRouterIntegrationTest {
private val channelChecker = Executors.newSingleThreadScheduledExecutor(ThreadFactoryBuilder().setNameFormat("channel-checker-%d").build())
private val sharedExecutor = Executors.newFixedThreadPool(1, ThreadFactoryBuilder().setNameFormat("rabbitmq-shared-pool-%d").build())

@Test
fun `receive unconfirmed message after resubscribe`() {
Expand Down Expand Up @@ -174,7 +178,9 @@ class AbstractRabbitRouterIntegrationTest {
) = PublishConnectionManager(
"test-publish-connection",
getRabbitMQConfiguration(rabbitMQContainer),
getConnectionManagerConfiguration(prefetchCount, confirmationTimeout)
getConnectionManagerConfiguration(prefetchCount, confirmationTimeout),
sharedExecutor,
channelChecker
)

private fun createConsumeConnectionManager(
Expand All @@ -184,7 +190,9 @@ class AbstractRabbitRouterIntegrationTest {
) = ConsumeConnectionManager(
"test-consume-connection",
getRabbitMQConfiguration(rabbitMQContainer),
getConnectionManagerConfiguration(prefetchCount, confirmationTimeout)
getConnectionManagerConfiguration(prefetchCount, confirmationTimeout),
sharedExecutor,
channelChecker
)

private fun createRouter(publishConnectionManager: PublishConnectionManager, consumeConnectionManager: ConsumeConnectionManager) = RabbitCustomRouter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,18 @@ package com.exactpro.th2.common.schema.message.impl.rabbitmq.connection
import com.exactpro.th2.common.schema.message.ContainerConstants.RABBITMQ_IMAGE_NAME
import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.ConnectionManagerConfiguration
import com.exactpro.th2.common.util.getRabbitMQConfiguration
import com.google.common.util.concurrent.ThreadFactoryBuilder
import mu.KotlinLogging
import org.testcontainers.containers.RabbitMQContainer
import java.time.Duration
import java.util.concurrent.Executors
import java.util.concurrent.ThreadLocalRandom

object ConnectionManualBenchmark {
private val LOGGER = KotlinLogging.logger {}
private val channelChecker = Executors.newSingleThreadScheduledExecutor(ThreadFactoryBuilder().setNameFormat("channel-checker-%d").build())
private val sharedExecutor = Executors.newFixedThreadPool(1, ThreadFactoryBuilder().setNameFormat("rabbitmq-shared-pool-%d").build())

@JvmStatic
fun main(args: Array<String>) {
RabbitMQContainer(RABBITMQ_IMAGE_NAME).use { container ->
Expand Down Expand Up @@ -140,13 +145,17 @@ object ConnectionManualBenchmark {
PublishConnectionManager(
"test-publish-connection",
getRabbitMQConfiguration(container),
configuration
configuration,
sharedExecutor,
channelChecker
)

private fun createConsumeConnectionManager(container: RabbitMQContainer, configuration: ConnectionManagerConfiguration) =
ConsumeConnectionManager(
"test-consume-connection",
getRabbitMQConfiguration(container),
configuration
configuration,
sharedExecutor,
channelChecker
)
}
Loading

0 comments on commit 721b79b

Please sign in to comment.