From a43fc45e37b56f7bb73b7aae032e49404f296b3b Mon Sep 17 00:00:00 2001 From: Oleg Date: Mon, 23 Oct 2023 19:24:13 +0400 Subject: [PATCH] Use thread-safe initialization for resource in common factory --- .../schema/factory/AbstractCommonFactory.java | 535 ++++++++---------- .../th2/common/schema/factory/LazyProvider.kt | 106 ++++ .../common/schema/factory/LazyProviderTest.kt | 135 +++++ 3 files changed, 479 insertions(+), 297 deletions(-) create mode 100644 src/main/kotlin/com/exactpro/th2/common/schema/factory/LazyProvider.kt create mode 100644 src/test/kotlin/com/exactpro/th2/common/schema/factory/LazyProviderTest.kt diff --git a/src/main/java/com/exactpro/th2/common/schema/factory/AbstractCommonFactory.java b/src/main/java/com/exactpro/th2/common/schema/factory/AbstractCommonFactory.java index 4cc7e554..494e2ff1 100644 --- a/src/main/java/com/exactpro/th2/common/schema/factory/AbstractCommonFactory.java +++ b/src/main/java/com/exactpro/th2/common/schema/factory/AbstractCommonFactory.java @@ -67,6 +67,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.commons.text.StringSubstitutor; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -90,6 +91,8 @@ import static com.exactpro.cradle.cassandra.CassandraStorageSettings.DEFAULT_COUNTER_PERSISTENCE_INTERVAL_MS; import static com.exactpro.cradle.cassandra.CassandraStorageSettings.DEFAULT_MAX_UNCOMPRESSED_TEST_EVENT_SIZE; import static com.exactpro.cradle.cassandra.CassandraStorageSettings.DEFAULT_RESULT_PAGE_SIZE; +import static com.exactpro.th2.common.schema.factory.LazyProvider.lazy; +import static com.exactpro.th2.common.schema.factory.LazyProvider.lazyAutocloseable; import static java.util.Objects.requireNonNull; import static org.apache.commons.lang3.StringUtils.defaultIfBlank; @@ -134,21 +137,33 @@ public abstract class AbstractCommonFactory implements AutoCloseable { private final Class> eventBatchRouterClass; private final Class grpcRouterClass; private final Class> notificationEventBatchRouterClass; - private final AtomicReference rabbitMqConnectionManager = new AtomicReference<>(); - private final AtomicReference routerContext = new AtomicReference<>(); - private final AtomicReference> messageRouterParsedBatch = new AtomicReference<>(); - private final AtomicReference> messageRouterRawBatch = new AtomicReference<>(); - private final AtomicReference> messageRouterMessageGroupBatch = new AtomicReference<>(); - private final AtomicReference> transportGroupBatchRouter = new AtomicReference<>(); - private final AtomicReference> eventBatchRouter = new AtomicReference<>(); - private final AtomicReference> notificationEventBatchRouter = new AtomicReference<>(); - private final AtomicReference rootEventId = new AtomicReference<>(); - private final AtomicReference grpcRouter = new AtomicReference<>(); - private final AtomicReference prometheusExporter = new AtomicReference<>(); - private final AtomicReference cradleManager = new AtomicReference<>(); + private final LazyProvider rabbitMqConnectionManager = + lazyAutocloseable("connection-manager", this::createRabbitMQConnectionManager); + private final LazyProvider routerContext = + lazy("router-context", this::createMessageRouterContext); + private final LazyProvider> messageRouterParsedBatch = + lazyAutocloseable("parsed-message-router", this::createMessageRouterParsedBatch); + private final LazyProvider> messageRouterRawBatch = + lazyAutocloseable("raw-message-router", this::createMessageRouterRawBatch); + private final LazyProvider> messageRouterMessageGroupBatch = + lazyAutocloseable("group-message-router", this::createMessageRouterGroupBatch); + private final LazyProvider> transportGroupBatchRouter = + lazyAutocloseable("transport-router", this::createTransportGroupBatchMessageRouter); + private final LazyProvider> eventBatchRouter = + lazyAutocloseable("event-router", this::createEventBatchRouter); + private final LazyProvider> notificationEventBatchRouter = + lazyAutocloseable("notification-router", this::createNotificationEventBatchRouter); + private final LazyProvider rootEventId = lazy("root-event-id", this::createRootEventID); + private final LazyProvider grpcRouter = + lazyAutocloseable("grpc-router", this::createGrpcRouter); + private final LazyProvider prometheusExporter = + lazyAutocloseable("prometheus-exporter", this::createPrometheusHTTPServer); + + private final LazyProvider cradleManager = + lazyAutocloseable("cradle-manager", this::createCradleManager); + private final Map, MessageRouter> customMessageRouters = new ConcurrentHashMap<>(); private final MetricMonitor livenessMonitor = CommonMetrics.registerLiveness("common_factory_liveness"); - static { configureLogger(); } @@ -170,20 +185,8 @@ public AbstractCommonFactory(FactorySettings settings) { public void start() { DefaultExports.initialize(); - PrometheusConfiguration prometheusConfiguration = loadPrometheusConfiguration(); livenessMonitor.enable(); - - this.prometheusExporter.updateAndGet(server -> { - if (server == null && prometheusConfiguration.getEnabled()) { - try { - return new HTTPServer(prometheusConfiguration.getHost(), prometheusConfiguration.getPort()); - } catch (IOException e) { - throw new CommonFactoryException("Failed to create Prometheus exporter", e); - } - } - return server; - }); } /** @@ -192,19 +195,7 @@ public void start() { * @throws IllegalStateException if can not read configuration */ public MessageRouter getMessageRouterParsedBatch() { - return messageRouterParsedBatch.updateAndGet(router -> { - if (router == null) { - try { - router = messageRouterParsedBatchClass.getConstructor().newInstance(); - router.init(getMessageRouterContext(), getMessageRouterMessageGroupBatch()); - } catch (InstantiationException | IllegalAccessException | InvocationTargetException | - NoSuchMethodException e) { - throw new CommonFactoryException("Can not create parsed message router", e); - } - } - - return router; - }); + return messageRouterParsedBatch.get(); } /** @@ -213,19 +204,7 @@ public MessageRouter getMessageRouterParsedBatch() { * @throws IllegalStateException if can not read configuration */ public MessageRouter getMessageRouterRawBatch() { - return messageRouterRawBatch.updateAndGet(router -> { - if (router == null) { - try { - router = messageRouterRawBatchClass.getConstructor().newInstance(); - router.init(getMessageRouterContext(), getMessageRouterMessageGroupBatch()); - } catch (InstantiationException | IllegalAccessException | InvocationTargetException | - NoSuchMethodException e) { - throw new CommonFactoryException("Can not create raw message router", e); - } - } - - return router; - }); + return messageRouterRawBatch.get(); } /** @@ -233,14 +212,7 @@ public MessageRouter getMessageRouterRawBatch() { * @throws IllegalStateException if can not read configuration */ public MessageRouter getTransportGroupBatchRouter() { - return transportGroupBatchRouter.updateAndGet(router -> { - if (router == null) { - router = new TransportGroupBatchRouter(); - router.init(getMessageRouterContext()); - } - - return router; - }); + return transportGroupBatchRouter.get(); } /** @@ -249,19 +221,7 @@ public MessageRouter getTransportGroupBatchRouter() { * @throws IllegalStateException if can not read configuration */ public MessageRouter getMessageRouterMessageGroupBatch() { - return messageRouterMessageGroupBatch.updateAndGet(router -> { - if (router == null) { - try { - router = messageRouterMessageGroupBatchClass.getConstructor().newInstance(); - router.init(getMessageRouterContext()); - } catch (InstantiationException | IllegalAccessException | InvocationTargetException | - NoSuchMethodException e) { - throw new CommonFactoryException("Can not create group message router", e); - } - } - - return router; - }); + return messageRouterMessageGroupBatch.get(); } /** @@ -270,23 +230,7 @@ public MessageRouter getMessageRouterMessageGroupBatch() { * @throws IllegalStateException if can not read configuration */ public MessageRouter getEventBatchRouter() { - return eventBatchRouter.updateAndGet(router -> { - if (router == null) { - try { - router = eventBatchRouterClass.getConstructor().newInstance(); - router.init(new DefaultMessageRouterContext( - getRabbitMqConnectionManager(), - MessageRouterMonitor.DEFAULT_MONITOR, - getMessageRouterConfiguration(), - getBoxConfiguration() - )); - } catch (InstantiationException | IllegalAccessException | InvocationTargetException | - NoSuchMethodException e) { - throw new CommonFactoryException("Can not create event batch router", e); - } - } - return router; - }); + return eventBatchRouter.get(); } /** @@ -295,19 +239,7 @@ public MessageRouter getEventBatchRouter() { * @throws IllegalStateException if can not read configuration */ public GrpcRouter getGrpcRouter() { - return grpcRouter.updateAndGet(router -> { - if (router == null) { - try { - router = grpcRouterClass.getConstructor().newInstance(); - router.init(getGrpcConfiguration(), getGrpcRouterConfiguration()); - } catch (InstantiationException | IllegalAccessException | InvocationTargetException | - NoSuchMethodException e) { - throw new CommonFactoryException("Can not create GRPC router", e); - } - } - - return router; - }); + return grpcRouter.get(); } /** @@ -316,18 +248,7 @@ public GrpcRouter getGrpcRouter() { * @throws IllegalStateException if cannot read configuration */ public NotificationRouter getNotificationEventBatchRouter() { - return notificationEventBatchRouter.updateAndGet(router -> { - if (router == null) { - try { - router = notificationEventBatchRouterClass.getConstructor().newInstance(); - router.init(getMessageRouterContext()); - } catch (InstantiationException | IllegalAccessException | InvocationTargetException | - NoSuchMethodException e) { - throw new CommonFactoryException("Can not create notification router", e); - } - } - return router; - }); + return notificationEventBatchRouter.get(); } /** @@ -368,9 +289,9 @@ public void registerCustomMessageRouter( throw new IllegalStateException("Message router for type " + msgClass.getCanonicalName() + " is already registered"); } requireNonNull(labels, "Labels can't be null for custom message router"); - var router = new RabbitCustomRouter<>(msgClass.getSimpleName(), labels, messageConverter, defaultSendAttributes, + MessageRouter router = new RabbitCustomRouter<>(msgClass.getSimpleName(), labels, messageConverter, defaultSendAttributes, defaultSubscribeAttributes); - router.init(getRabbitMqConnectionManager(), getMessageRouterConfiguration()); + router.init(getMessageRouterContext()); return router; } ); @@ -439,6 +360,85 @@ public BoxConfiguration getBoxConfiguration() { return getConfigurationOrLoad(BoxConfiguration.class, true); } + /** + * @return Cradle manager + * @throws CommonFactoryException if cradle manager was not initialized + */ + public CradleManager getCradleManager() { + return cradleManager.get(); + + } + + @Nullable + private HTTPServer createPrometheusHTTPServer() { + PrometheusConfiguration configuration = loadPrometheusConfiguration(); + if (configuration.getEnabled()) { + try { + return new HTTPServer(configuration.getHost(), configuration.getPort()); + } catch (IOException e) { + throw new CommonFactoryException("Failed to create Prometheus exporter", e); + } + } + return null; + } + + private MessageRouter createMessageRouterParsedBatch() { + try { + MessageRouter router = messageRouterParsedBatchClass.getConstructor().newInstance(); + router.init(getMessageRouterContext(), getMessageRouterMessageGroupBatch()); + return router; + } catch (InstantiationException | IllegalAccessException | InvocationTargetException | + NoSuchMethodException e) { + throw new CommonFactoryException("Can not create parsed message router", e); + } + } + + private MessageRouter createMessageRouterRawBatch() throws NoSuchMethodException, + InvocationTargetException, InstantiationException, IllegalAccessException { + MessageRouter router = messageRouterRawBatchClass.getConstructor().newInstance(); + router.init(getMessageRouterContext(), getMessageRouterMessageGroupBatch()); + return router; + } + + private MessageRouter createTransportGroupBatchMessageRouter() { + var router = new TransportGroupBatchRouter(); + router.init(getMessageRouterContext()); + return router; + } + + private MessageRouter createMessageRouterGroupBatch() throws NoSuchMethodException, + InvocationTargetException, InstantiationException, IllegalAccessException { + var router = messageRouterMessageGroupBatchClass.getConstructor().newInstance(); + router.init(getMessageRouterContext()); + return router; + } + + private MessageRouter createEventBatchRouter() throws NoSuchMethodException, InvocationTargetException, + InstantiationException, IllegalAccessException { + var router = eventBatchRouterClass.getConstructor().newInstance(); + router.init(createEventRouterContext()); + return router; + } + + @NotNull + private MessageRouterContext createEventRouterContext() { + return createRouterContext(MessageRouterMonitor.DEFAULT_MONITOR); + } + + private GrpcRouter createGrpcRouter() throws NoSuchMethodException, InvocationTargetException, + InstantiationException, IllegalAccessException { + GrpcRouter router = grpcRouterClass.getConstructor().newInstance(); + router.init(getGrpcConfiguration(), getGrpcRouterConfiguration()); + return router; + } + + private NotificationRouter createNotificationEventBatchRouter() throws NoSuchMethodException, + InvocationTargetException, InstantiationException, IllegalAccessException { + var router = notificationEventBatchRouterClass.getConstructor().newInstance(); + router.init(getMessageRouterContext()); + return router; + } + private CradleConfidentialConfiguration getCradleConfidentialConfiguration() { return getConfigurationOrLoad(CradleConfidentialConfiguration.class, false); } @@ -451,62 +451,51 @@ private CassandraStorageSettings getCassandraStorageSettings() { return getConfigurationOrLoad(CassandraStorageSettings.class, true); } - /** - * @return Cradle manager - * @throws IllegalStateException if cannot read configuration or initialization failure - */ - public CradleManager getCradleManager() { - return cradleManager.updateAndGet(manager -> { - if (manager == null) { - try { - CradleConfidentialConfiguration confidentialConfiguration = getCradleConfidentialConfiguration(); - CassandraConnectionSettings cassandraConnectionSettings = new CassandraConnectionSettings( - confidentialConfiguration.getHost(), - confidentialConfiguration.getPort(), - confidentialConfiguration.getDataCenter() - ); - if (StringUtils.isNotEmpty(confidentialConfiguration.getUsername())) { - cassandraConnectionSettings.setUsername(confidentialConfiguration.getUsername()); - } - if (StringUtils.isNotEmpty(confidentialConfiguration.getPassword())) { - cassandraConnectionSettings.setPassword(confidentialConfiguration.getPassword()); - } - - // Deserialize on config by two different beans for backward compatibility - CradleNonConfidentialConfiguration nonConfidentialConfiguration = getCradleNonConfidentialConfiguration(); - // FIXME: this approach should be replaced to module structure in future - CassandraStorageSettings cassandraStorageSettings = getCassandraStorageSettings(); - cassandraStorageSettings.setKeyspace(confidentialConfiguration.getKeyspace()); + private CradleManager createCradleManager() { + try { + CradleConfidentialConfiguration confidentialConfiguration = getCradleConfidentialConfiguration(); + CassandraConnectionSettings cassandraConnectionSettings = new CassandraConnectionSettings( + confidentialConfiguration.getHost(), + confidentialConfiguration.getPort(), + confidentialConfiguration.getDataCenter() + ); + if (StringUtils.isNotEmpty(confidentialConfiguration.getUsername())) { + cassandraConnectionSettings.setUsername(confidentialConfiguration.getUsername()); + } + if (StringUtils.isNotEmpty(confidentialConfiguration.getPassword())) { + cassandraConnectionSettings.setPassword(confidentialConfiguration.getPassword()); + } - if (cassandraStorageSettings.getResultPageSize() == DEFAULT_RESULT_PAGE_SIZE && nonConfidentialConfiguration.getPageSize() > 0) { - cassandraStorageSettings.setResultPageSize(nonConfidentialConfiguration.getPageSize()); - } - if (cassandraStorageSettings.getMaxMessageBatchSize() == DEFAULT_MAX_MESSAGE_BATCH_SIZE && nonConfidentialConfiguration.getCradleMaxMessageBatchSize() > 0) { - cassandraStorageSettings.setMaxMessageBatchSize((int) nonConfidentialConfiguration.getCradleMaxMessageBatchSize()); - } - if (cassandraStorageSettings.getMaxTestEventBatchSize() == DEFAULT_MAX_TEST_EVENT_BATCH_SIZE && nonConfidentialConfiguration.getCradleMaxEventBatchSize() > 0) { - cassandraStorageSettings.setMaxTestEventBatchSize((int) nonConfidentialConfiguration.getCradleMaxEventBatchSize()); - } - if (cassandraStorageSettings.getCounterPersistenceInterval() == DEFAULT_COUNTER_PERSISTENCE_INTERVAL_MS && nonConfidentialConfiguration.getStatisticsPersistenceIntervalMillis() >= 0) { - cassandraStorageSettings.setCounterPersistenceInterval((int) nonConfidentialConfiguration.getStatisticsPersistenceIntervalMillis()); - } - if (cassandraStorageSettings.getMaxUncompressedTestEventSize() == DEFAULT_MAX_UNCOMPRESSED_TEST_EVENT_SIZE && nonConfidentialConfiguration.getMaxUncompressedEventBatchSize() > 0) { - cassandraStorageSettings.setMaxUncompressedTestEventSize((int) nonConfidentialConfiguration.getMaxUncompressedEventBatchSize()); - } + // Deserialize on config by two different beans for backward compatibility + CradleNonConfidentialConfiguration nonConfidentialConfiguration = getCradleNonConfidentialConfiguration(); + // FIXME: this approach should be replaced to module structure in future + CassandraStorageSettings cassandraStorageSettings = getCassandraStorageSettings(); + cassandraStorageSettings.setKeyspace(confidentialConfiguration.getKeyspace()); - manager = new CassandraCradleManager( - cassandraConnectionSettings, - cassandraStorageSettings, - nonConfidentialConfiguration.getPrepareStorage() - ); - } catch (CradleStorageException | RuntimeException | IOException e) { - throw new CommonFactoryException("Cannot create Cradle manager", e); - } + if (cassandraStorageSettings.getResultPageSize() == DEFAULT_RESULT_PAGE_SIZE && nonConfidentialConfiguration.getPageSize() > 0) { + cassandraStorageSettings.setResultPageSize(nonConfidentialConfiguration.getPageSize()); + } + if (cassandraStorageSettings.getMaxMessageBatchSize() == DEFAULT_MAX_MESSAGE_BATCH_SIZE && nonConfidentialConfiguration.getCradleMaxMessageBatchSize() > 0) { + cassandraStorageSettings.setMaxMessageBatchSize((int) nonConfidentialConfiguration.getCradleMaxMessageBatchSize()); + } + if (cassandraStorageSettings.getMaxTestEventBatchSize() == DEFAULT_MAX_TEST_EVENT_BATCH_SIZE && nonConfidentialConfiguration.getCradleMaxEventBatchSize() > 0) { + cassandraStorageSettings.setMaxTestEventBatchSize((int) nonConfidentialConfiguration.getCradleMaxEventBatchSize()); + } + if (cassandraStorageSettings.getCounterPersistenceInterval() == DEFAULT_COUNTER_PERSISTENCE_INTERVAL_MS && nonConfidentialConfiguration.getStatisticsPersistenceIntervalMillis() >= 0) { + cassandraStorageSettings.setCounterPersistenceInterval((int) nonConfidentialConfiguration.getStatisticsPersistenceIntervalMillis()); + } + if (cassandraStorageSettings.getMaxUncompressedTestEventSize() == DEFAULT_MAX_UNCOMPRESSED_TEST_EVENT_SIZE && nonConfidentialConfiguration.getMaxUncompressedEventBatchSize() > 0) { + cassandraStorageSettings.setMaxUncompressedTestEventSize((int) nonConfidentialConfiguration.getMaxUncompressedEventBatchSize()); } - return manager; - }); - + return new CassandraCradleManager( + cassandraConnectionSettings, + cassandraStorageSettings, + nonConfidentialConfiguration.getPrepareStorage() + ); + } catch (CradleStorageException | RuntimeException | IOException e) { + throw new CommonFactoryException("Cannot create Cradle manager", e); + } } /** @@ -589,31 +578,27 @@ public T getCustomConfiguration(Class confClass) { */ @NotNull public EventID getRootEventId() { - return rootEventId.updateAndGet(id -> { - if (id == null) { - try { - BoxConfiguration boxConfiguration = getBoxConfiguration(); - com.exactpro.th2.common.grpc.Event rootEvent = Event - .start() - .endTimestamp() - .name(boxConfiguration.getBoxName() + " " + Instant.now()) - .description("Root event") - .status(Event.Status.PASSED) - .type("Microservice") - .toProto(boxConfiguration.getBookName(), boxConfiguration.getBoxName()); - - try { - getEventBatchRouter().sendAll(EventBatch.newBuilder().addEvents(rootEvent).build()); - return rootEvent.getId(); - } catch (IOException e) { - throw new CommonFactoryException("Can not send root event", e); - } - } catch (IOException e) { - throw new CommonFactoryException("Can not create root event", e); - } - } - return id; - }); + return rootEventId.get(); + } + + @NotNull + private EventID createRootEventID() throws IOException { + BoxConfiguration boxConfiguration = getBoxConfiguration(); + com.exactpro.th2.common.grpc.Event rootEvent = Event + .start() + .endTimestamp() + .name(boxConfiguration.getBoxName() + " " + Instant.now()) + .description("Root event") + .status(Event.Status.PASSED) + .type("Microservice") + .toProto(boxConfiguration.getBookName(), boxConfiguration.getBoxName()); + + try { + getEventBatchRouter().sendAll(EventBatch.newBuilder().addEvents(rootEvent).build()); + return rootEvent.getId(); + } catch (IOException e) { + throw new CommonFactoryException("Can not send root event", e); + } } protected abstract ConfigurationManager getConfigurationManager(); @@ -641,29 +626,30 @@ public EventID getRootEventId() { * @return Context for all routers except event router */ protected MessageRouterContext getMessageRouterContext() { - return routerContext.updateAndGet(ctx -> { - if (ctx == null) { - try { - MessageRouterMonitor contextMonitor = new BroadcastMessageRouterMonitor( - new LogMessageRouterMonitor(), - new EventMessageRouterMonitor( - getEventBatchRouter(), - getRootEventId() - ) - ); - - return new DefaultMessageRouterContext( - getRabbitMqConnectionManager(), - contextMonitor, - getMessageRouterConfiguration(), - getBoxConfiguration() - ); - } catch (Exception e) { - throw new CommonFactoryException("Can not create message router context", e); - } - } - return ctx; - }); + return routerContext.get(); + } + + @NotNull + private MessageRouterContext createMessageRouterContext() { + MessageRouterMonitor contextMonitor = new BroadcastMessageRouterMonitor( + new LogMessageRouterMonitor(), + new EventMessageRouterMonitor( + getEventBatchRouter(), + getRootEventId() + ) + ); + + return createRouterContext(contextMonitor); + } + + @NotNull + private MessageRouterContext createRouterContext(MessageRouterMonitor contextMonitor) { + return new DefaultMessageRouterContext( + getRabbitMqConnectionManager(), + contextMonitor, + getMessageRouterConfiguration(), + getBoxConfiguration() + ); } protected PrometheusConfiguration loadPrometheusConfiguration() { @@ -675,12 +661,7 @@ protected ConnectionManager createRabbitMQConnectionManager() { } protected ConnectionManager getRabbitMqConnectionManager() { - return rabbitMqConnectionManager.updateAndGet(connectionManager -> { - if (connectionManager == null) { - return createRabbitMQConnectionManager(); - } - return connectionManager; - }); + return rabbitMqConnectionManager.get(); } public MessageID.Builder newMessageIDBuilder() { @@ -697,64 +678,35 @@ public EventID.Builder newEventIDBuilder() { public void close() { LOGGER.info("Closing common factory"); - messageRouterParsedBatch.getAndUpdate(router -> { - if (router != null) { - try { - router.close(); - } catch (Exception e) { - LOGGER.error("Failed to close message router for parsed message batches", e); - } - } - - return router; - }); - - messageRouterRawBatch.getAndUpdate(router -> { - if (router != null) { - try { - router.close(); - } catch (Exception e) { - LOGGER.error("Failed to close message router for raw message batches", e); - } - } - - return router; - }); - - messageRouterMessageGroupBatch.getAndUpdate(router -> { - if (router != null) { - try { - router.close(); - } catch (Exception e) { - LOGGER.error("Failed to close message router for message group batches", e); - } - } + try { + messageRouterParsedBatch.close(); + } catch (Exception e) { + LOGGER.error("Failed to close message router for parsed message batches", e); + } - return router; - }); + try { + messageRouterRawBatch.close(); + } catch (Exception e) { + LOGGER.error("Failed to close message router for raw message batches", e); + } - rabbitMqConnectionManager.updateAndGet(connection -> { - if (connection != null) { - try { - connection.close(); - } catch (Exception e) { - LOGGER.error("Failed to close RabbitMQ connection", e); - } - } - return connection; - }); + try { + messageRouterMessageGroupBatch.close(); + } catch (Exception e) { + LOGGER.error("Failed to close message router for message group batches", e); + } - grpcRouter.getAndUpdate(router -> { - if (router != null) { - try { - router.close(); - } catch (Exception e) { - LOGGER.error("Failed to close gRPC router", e); - } - } + try { + rabbitMqConnectionManager.close(); + } catch (Exception e) { + LOGGER.error("Failed to close RabbitMQ connection", e); + } - return router; - }); + try { + grpcRouter.close(); + } catch (Exception e) { + LOGGER.error("Failed to close gRPC router", e); + } customMessageRouters.forEach((messageType, router) -> { try { @@ -764,28 +716,17 @@ public void close() { } }); - cradleManager.getAndUpdate(manager -> { - if (manager != null) { - try { - manager.close(); - } catch (Exception e) { - LOGGER.error("Failed to close Cradle manager", e); - } - } - - return manager; - }); + try { + cradleManager.close(); + } catch (Exception e) { + LOGGER.error("Failed to close Cradle manager", e); + } - prometheusExporter.updateAndGet(server -> { - if (server != null) { - try { - server.close(); - } catch (Exception e) { - LOGGER.error("Failed to close Prometheus exporter", e); - } - } - return null; - }); + try { + prometheusExporter.close(); + } catch (Exception e) { + LOGGER.error("Failed to close Prometheus exporter", e); + } LOGGER.info("Common factory has been closed"); } diff --git a/src/main/kotlin/com/exactpro/th2/common/schema/factory/LazyProvider.kt b/src/main/kotlin/com/exactpro/th2/common/schema/factory/LazyProvider.kt new file mode 100644 index 00000000..8aac6b8c --- /dev/null +++ b/src/main/kotlin/com/exactpro/th2/common/schema/factory/LazyProvider.kt @@ -0,0 +1,106 @@ +/* + * Copyright 2023 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.exactpro.th2.common.schema.factory + +import com.exactpro.th2.common.schema.exception.CommonFactoryException +import com.exactpro.th2.common.schema.factory.LazyProvider.ThrowableConsumer +import java.util.concurrent.Callable +import java.util.concurrent.atomic.AtomicReference + +internal class LazyProvider private constructor( + private val name: String, + private val supplier: Callable, + private val onClose: ThrowableConsumer +) : AutoCloseable { + private val reference = AtomicReference?>() + + init { + require(name.isNotBlank()) { "blank name provided" } + } + + fun get(): T { + return if (reference.compareAndSet(null, State.Init)) { + val holder = State.Hold(initialize()) + if (!reference.compareAndSet(State.Init, holder)) { + onClose.consume(holder.value) + error("provider '$name' already closed") + } + holder.value + } else { + var currentState: State? + do { + currentState = reference.get() + } while (currentState == State.Init || currentState == null) + currentState.value + } + } + + @Throws(Exception::class) + override fun close() { + if (reference.compareAndSet(null, State.Closed)) { + // factory was not yet initialized. Just return + return + } + + val prevState = reference.getAndSet(State.Closed) + if (prevState is State.Hold) { + onClose.consume(prevState.value) + } + } + + private fun initialize(): T { + return try { + supplier.call() + } catch (e: Exception) { + reference.set(State.Error) + throw CommonFactoryException("cannot initialize '$name'", e) + } + } + + private fun interface ThrowableConsumer { + @Throws(Exception::class) + fun consume(value: T) + } + + private sealed class State { + abstract val value: T + object Init : State() { + override val value: Nothing + get() = error("no value on initialization") + } + class Hold(override val value: T): State() + object Closed : State() { + override val value: Nothing + get() = error("no value after close call") + } + object Error : State() { + override val value: Nothing + get() = error("no value on initialization error") + } + } + + companion object { + private val NONE: ThrowableConsumer = ThrowableConsumer { } + + @JvmStatic + fun lazy(name: String, supplier: Callable): LazyProvider = + LazyProvider(name, supplier, NONE) + + @JvmStatic + fun lazyAutocloseable(name: String, supplier: Callable): LazyProvider = + LazyProvider(name, supplier, AutoCloseable::close) + } +} diff --git a/src/test/kotlin/com/exactpro/th2/common/schema/factory/LazyProviderTest.kt b/src/test/kotlin/com/exactpro/th2/common/schema/factory/LazyProviderTest.kt new file mode 100644 index 00000000..465e866b --- /dev/null +++ b/src/test/kotlin/com/exactpro/th2/common/schema/factory/LazyProviderTest.kt @@ -0,0 +1,135 @@ +/* + * Copyright 2023 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.exactpro.th2.common.schema.factory + +import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.RepeatedTest +import org.mockito.kotlin.doReturn +import org.mockito.kotlin.inOrder +import org.mockito.kotlin.mock +import org.mockito.kotlin.mockingDetails +import org.mockito.kotlin.never +import org.mockito.kotlin.times +import org.mockito.kotlin.verify +import java.util.concurrent.Callable +import java.util.concurrent.ExecutionException +import java.util.concurrent.ForkJoinPool +import java.util.concurrent.ThreadLocalRandom + +internal class LazyProviderTest { + @RepeatedTest(50) + fun `concurrent get invocation`() { + val supplier: Callable = mock { } + val provider: LazyProvider = LazyProvider.lazy("test", supplier) + + ForkJoinPool.commonPool().invokeAll( + listOf( + Callable { provider.get() }, + Callable { provider.get() }, + Callable { provider.get() }, + Callable { provider.get() }, + Callable { provider.get() }, + ) + ) + + verify(supplier, times(1)).call() + } + + @RepeatedTest(50) + fun `concurrent close invocation`() { + val resource: AutoCloseable = mock { } + val supplier: Callable = mock { + on { call() } doReturn resource + } + val provider: LazyProvider = LazyProvider.lazyAutocloseable("test", supplier) + // init value + provider.get() + ForkJoinPool.commonPool().invokeAll( + listOf( + Callable { provider.close() }, + Callable { provider.close() }, + Callable { provider.close() }, + Callable { provider.close() }, + Callable { provider.close() }, + Callable { provider.close() }, + ) + ) + + inOrder(resource, supplier) { + verify(supplier, times(1)).call() + verify(resource, times(1)).close() + } + } + + @RepeatedTest(100) + fun `concurrent get and close invocation`() { + val resource: AutoCloseable = mock { } + val supplier: Callable = mock { + on { call() } doReturn resource + } + val provider: LazyProvider = LazyProvider.lazyAutocloseable("test", supplier) + + val task = Callable { + val i = ThreadLocalRandom.current().nextInt(100) + if (i % 2 == 0) { + provider.get() + } else { + provider.close() + } + } + + val results = ForkJoinPool.commonPool().invokeAll( + listOf( + task, + task, + task, + task, + // make sure at least one call to close method is done + Callable { provider.close() }, + ) + ) + + val details = mockingDetails(supplier) + if (details.invocations.isEmpty()) { + verify(supplier, never()).call() + verify(resource, never()).close() + } else { + inOrder(supplier, resource) { + verify(supplier, times(1)).call() + verify(resource, times(1)).close() + } + } + val errors = results.asSequence().mapNotNull { + try { + it.get() + null + } catch (ex: ExecutionException) { + ex + } + }.toList() + val possibleErrors = listOf( + "no value after close call", + "provider 'test' already closed", + ) + Assertions.assertTrue(errors.all { ex -> + val errorMessage: String = ex.cause!!.message!! + possibleErrors.any { errorMessage.endsWith(it) } + }) { + "unexpected errors thrown: $errors" + } + } +} \ No newline at end of file