diff --git a/build.gradle b/build.gradle index 4111371b..1880b6e2 100644 --- a/build.gradle +++ b/build.gradle @@ -55,6 +55,8 @@ dependencies { implementation "io.prometheus:simpleclient_httpserver" implementation "io.prometheus:simpleclient_hotspot" + implementation "com.google.guava:guava" + testImplementation(platform(libs.testcontainers.bom)) testImplementation 'org.testcontainers:rabbitmq' testImplementation 'org.testcontainers:k3s' diff --git a/src/main/java/com/exactpro/th2/infraoperator/Th2CrdController.java b/src/main/java/com/exactpro/th2/infraoperator/Th2CrdController.java index 0ebb95fe..b8a2e28d 100644 --- a/src/main/java/com/exactpro/th2/infraoperator/Th2CrdController.java +++ b/src/main/java/com/exactpro/th2/infraoperator/Th2CrdController.java @@ -16,6 +16,8 @@ package com.exactpro.th2.infraoperator; +import com.exactpro.th2.infraoperator.configuration.ConfigLoader; +import com.exactpro.th2.infraoperator.configuration.OperatorConfig; import com.exactpro.th2.infraoperator.metrics.OperatorMetrics; import com.exactpro.th2.infraoperator.metrics.PrometheusServer; import com.exactpro.th2.infraoperator.operator.impl.BoxHelmTh2Op; @@ -28,49 +30,83 @@ import com.exactpro.th2.infraoperator.spec.strategy.redeploy.ContinuousTaskWorker; import com.exactpro.th2.infraoperator.spec.strategy.redeploy.tasks.CheckResourceCacheTask; import com.exactpro.th2.infraoperator.util.RabbitMQUtils; +import com.exactpro.th2.infraoperator.util.Utils; +import io.fabric8.kubernetes.client.KubernetesClient; import org.apache.logging.log4j.core.LoggerContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.net.URISyntaxException; import java.nio.file.Files; import java.nio.file.Path; +import java.util.Deque; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import static com.exactpro.th2.infraoperator.util.KubernetesUtils.createKubernetesClient; public class Th2CrdController implements AutoCloseable { private static final Logger LOGGER = LoggerFactory.getLogger(Th2CrdController.class); - public void start() { - var watchManager = DefaultWatchManager.getInstance(); - PrometheusServer.start(); + private final PrometheusServer prometheusServer; + private final KubernetesClient kubClient; + private final DefaultWatchManager watchManager; + private final RabbitMQContext rabbitMQContext; + private final ContinuousTaskWorker continuousTaskWorker; + + public Th2CrdController() throws IOException, URISyntaxException { + OperatorConfig config = ConfigLoader.loadConfiguration(); + prometheusServer = new PrometheusServer(config.getPrometheusConfiguration()); + kubClient = createKubernetesClient(); + rabbitMQContext = new RabbitMQContext(config.getRabbitMQManagement()); + watchManager = new DefaultWatchManager(kubClient, rabbitMQContext); + continuousTaskWorker = new ContinuousTaskWorker(); + OperatorMetrics.resetCacheErrors(); - try { - RabbitMQUtils.deleteRabbitMQRubbish(); - RabbitMQContext.declareTopicExchange(); // FIXME: topic exchange should be removed when all namespaces are removed / disabled + RabbitMQUtils.deleteRabbitMQRubbish(kubClient, rabbitMQContext); + rabbitMQContext.declareTopicExchange(); // FIXME: topic exchange should be removed when all namespaces are removed / disabled - watchManager.addTarget(MstoreHelmTh2Op::new); - watchManager.addTarget(EstoreHelmTh2Op::new); - watchManager.addTarget(BoxHelmTh2Op::new); - watchManager.addTarget(CoreBoxHelmTh2Op::new); - watchManager.addTarget(JobHelmTh2Op::new); + watchManager.addTarget(MstoreHelmTh2Op::new); + watchManager.addTarget(EstoreHelmTh2Op::new); + watchManager.addTarget(BoxHelmTh2Op::new); + watchManager.addTarget(CoreBoxHelmTh2Op::new); + watchManager.addTarget(JobHelmTh2Op::new); - watchManager.startInformers(); + watchManager.startInformers(); + continuousTaskWorker.add(new CheckResourceCacheTask(300)); + } - ContinuousTaskWorker continuousTaskWorker = new ContinuousTaskWorker(); - continuousTaskWorker.add(new CheckResourceCacheTask(300)); + public static void main(String[] args) { + Deque resources = new ConcurrentLinkedDeque<>(); + Lock lock = new ReentrantLock(); + Condition condition = lock.newCondition(); + + configureShutdownHook(resources, lock, condition); + + try { + if (args.length > 0) { + configureLogger(args[0]); + } + Th2CrdController controller = new Th2CrdController(); + resources.add(controller); + + awaitShutdown(lock, condition); } catch (Exception e) { LOGGER.error("Exception in main thread", e); - watchManager.stopInformers(); - watchManager.close(); - throw e; } } - public static void main(String[] args) { - if (args.length > 0) { - configureLogger(args[0]); - } - Th2CrdController controller = new Th2CrdController(); - controller.start(); + @Override + public void close() { + Utils.close(prometheusServer, "Prometheus server"); + Utils.close(kubClient, "Kubernetes client"); + Utils.close(watchManager, "Watch manager"); + Utils.close(rabbitMQContext, "RabbitMQ context"); + Utils.close(continuousTaskWorker, "Continuous task worker"); } private static void configureLogger(String filePath) { @@ -83,8 +119,37 @@ private static void configureLogger(String filePath) { } } - @Override - public void close() throws Exception { + private static void configureShutdownHook(Deque resources, Lock lock, Condition condition) { + Runtime.getRuntime().addShutdownHook(new Thread( + () -> { + LOGGER.info("Shutdown start"); + lock.lock(); + try { + condition.signalAll(); + } finally { + lock.unlock(); + } + resources.descendingIterator().forEachRemaining((resource) -> { + try { + resource.close(); + } catch (Exception e) { + LOGGER.error("Cannot close resource {}", resource.getClass(), e); + } + }); + LOGGER.info("Shutdown end"); + }, + "Shutdown hook" + )); + } + private static void awaitShutdown(Lock lock, Condition condition) throws InterruptedException { + lock.lock(); + try { + LOGGER.info("Wait shutdown"); + condition.await(); + LOGGER.info("App shutdown"); + } finally { + lock.unlock(); + } } } diff --git a/src/main/java/com/exactpro/th2/infraoperator/metrics/PrometheusServer.java b/src/main/java/com/exactpro/th2/infraoperator/metrics/PrometheusServer.java index ab8e91e5..eb90705d 100644 --- a/src/main/java/com/exactpro/th2/infraoperator/metrics/PrometheusServer.java +++ b/src/main/java/com/exactpro/th2/infraoperator/metrics/PrometheusServer.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2021 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2024 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,40 +16,47 @@ package com.exactpro.th2.infraoperator.metrics; -import com.exactpro.th2.infraoperator.configuration.ConfigLoader; import com.exactpro.th2.infraoperator.spec.shared.PrometheusConfiguration; import io.prometheus.client.exporter.HTTPServer; import io.prometheus.client.hotspot.DefaultExports; +import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; -public class PrometheusServer { - private static final Logger logger = LoggerFactory.getLogger(PrometheusServer.class); +public class PrometheusServer implements AutoCloseable { + private static final Logger LOGGER = LoggerFactory.getLogger(PrometheusServer.class); + private final Lock lock = new ReentrantLock(); + @Nullable + private final HTTPServer server; - private static final AtomicReference prometheusExporter = new AtomicReference<>(); - - public static void start() { + static { DefaultExports.initialize(); - PrometheusConfiguration prometheusConfiguration = ConfigLoader.getConfig().getPrometheusConfiguration(); + } - String host = prometheusConfiguration.getHost(); - int port = Integer.parseInt(prometheusConfiguration.getPort()); - boolean enabled = Boolean.parseBoolean(prometheusConfiguration.getEnabled()); + public PrometheusServer(PrometheusConfiguration configuration) throws IOException { + if (Boolean.parseBoolean(configuration.getEnabled())) { + String host = configuration.getHost(); + int port = Integer.parseInt(configuration.getPort()); + server = new HTTPServer(host, port); + LOGGER.info("Started prometheus server on: \"{}:{}\"", host, port); + } else { + server = null; + } + } - prometheusExporter.updateAndGet(server -> { - if (server == null && enabled) { - try { - server = new HTTPServer(host, port); - logger.info("Started prometheus server on: \"{}:{}\"", host, port); - return server; - } catch (IOException e) { - throw new RuntimeException("Failed to create Prometheus exporter", e); - } + @Override + public void close() { + lock.lock(); + try { + if (server != null) { + server.close(); } - return server; - }); + } finally { + lock.unlock(); + } } } diff --git a/src/main/java/com/exactpro/th2/infraoperator/model/kubernetes/client/DefaultResourceClient.java b/src/main/java/com/exactpro/th2/infraoperator/model/kubernetes/client/DefaultResourceClient.java index b3d8d979..4b961217 100644 --- a/src/main/java/com/exactpro/th2/infraoperator/model/kubernetes/client/DefaultResourceClient.java +++ b/src/main/java/com/exactpro/th2/infraoperator/model/kubernetes/client/DefaultResourceClient.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2021 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2024 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,14 +21,10 @@ import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.dsl.MixedOperation; import io.fabric8.kubernetes.client.dsl.Resource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public abstract class DefaultResourceClient implements ResourceClient { - private static final Logger logger = LoggerFactory.getLogger(DefaultResourceClient.class); - - private final KubernetesClient client; + private final KubernetesClient kubClient; private final Class resourceType; @@ -37,15 +33,15 @@ public abstract class DefaultResourceClient implement private final String crdName; public DefaultResourceClient( - KubernetesClient client, + KubernetesClient kubClient, Class resourceType, String crdName ) { - this.client = client; + this.kubClient = kubClient; this.resourceType = resourceType; this.crdName = crdName; - instance = client.resources(resourceType); + instance = kubClient.resources(resourceType); } @Override @@ -54,7 +50,7 @@ public Class getResourceType() { } public KubernetesClient getClient() { - return this.client; + return this.kubClient; } public MixedOperation, ? extends Resource> getInstance() { diff --git a/src/main/java/com/exactpro/th2/infraoperator/model/kubernetes/client/impl/DictionaryClient.java b/src/main/java/com/exactpro/th2/infraoperator/model/kubernetes/client/impl/DictionaryClient.java deleted file mode 100644 index 399e13e2..00000000 --- a/src/main/java/com/exactpro/th2/infraoperator/model/kubernetes/client/impl/DictionaryClient.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright 2020-2021 Exactpro (Exactpro Systems Limited) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.exactpro.th2.infraoperator.model.kubernetes.client.impl; - -import com.exactpro.th2.infraoperator.model.kubernetes.client.DefaultResourceClient; -import com.exactpro.th2.infraoperator.spec.dictionary.Th2Dictionary; -import io.fabric8.kubernetes.client.KubernetesClient; - -public class DictionaryClient extends DefaultResourceClient { - - public DictionaryClient(KubernetesClient client) { - super( - client, - Th2Dictionary.class, - "th2dictionaries.th2.exactpro.com" - ); - } - -} diff --git a/src/main/java/com/exactpro/th2/infraoperator/operator/GenericHelmTh2Op.java b/src/main/java/com/exactpro/th2/infraoperator/operator/GenericHelmTh2Op.java index 88e6b354..6be079d6 100644 --- a/src/main/java/com/exactpro/th2/infraoperator/operator/GenericHelmTh2Op.java +++ b/src/main/java/com/exactpro/th2/infraoperator/operator/GenericHelmTh2Op.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2021 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2024 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,12 +17,13 @@ package com.exactpro.th2.infraoperator.operator; import com.exactpro.th2.infraoperator.spec.Th2CustomResource; +import com.exactpro.th2.infraoperator.spec.strategy.linkresolver.mq.RabbitMQContext; import io.fabric8.kubernetes.client.KubernetesClient; public abstract class GenericHelmTh2Op extends HelmReleaseTh2Op { - public GenericHelmTh2Op(KubernetesClient client) { - super(client); + public GenericHelmTh2Op(KubernetesClient kubClient, RabbitMQContext rabbitMQContext) { + super(kubClient, rabbitMQContext); } } diff --git a/src/main/java/com/exactpro/th2/infraoperator/operator/HelmReleaseTh2Op.java b/src/main/java/com/exactpro/th2/infraoperator/operator/HelmReleaseTh2Op.java index 3af2a828..ecfacc97 100644 --- a/src/main/java/com/exactpro/th2/infraoperator/operator/HelmReleaseTh2Op.java +++ b/src/main/java/com/exactpro/th2/infraoperator/operator/HelmReleaseTh2Op.java @@ -30,6 +30,7 @@ import com.exactpro.th2.infraoperator.spec.shared.PrometheusConfiguration; import com.exactpro.th2.infraoperator.spec.strategy.linkresolver.mq.BindQueueLinkResolver; import com.exactpro.th2.infraoperator.spec.strategy.linkresolver.mq.DeclareQueueResolver; +import com.exactpro.th2.infraoperator.spec.strategy.linkresolver.mq.RabbitMQContext; import com.exactpro.th2.infraoperator.util.CustomResourceUtils; import com.exactpro.th2.infraoperator.util.JsonUtils; @@ -126,13 +127,17 @@ public abstract class HelmReleaseTh2Op extends Abs protected final MixedOperation, Resource> helmReleaseClient; - public HelmReleaseTh2Op(KubernetesClient client) { + protected final DeclareQueueResolver declareQueueResolver; + protected final BindQueueLinkResolver bindQueueLinkResolver; - super(client); + public HelmReleaseTh2Op(KubernetesClient kubClient, RabbitMQContext rabbitMQContext) { - this.grpcConfigFactory = new GrpcRouterConfigFactory(); + super(kubClient); - helmReleaseClient = kubClient.resources(HelmRelease.class); + this.grpcConfigFactory = new GrpcRouterConfigFactory(); + this.helmReleaseClient = this.kubClient.resources(HelmRelease.class); + this.declareQueueResolver = new DeclareQueueResolver(rabbitMQContext); + this.bindQueueLinkResolver = new BindQueueLinkResolver(rabbitMQContext); } public abstract SharedIndexInformer generateInformerFromFactory(SharedInformerFactory factory); @@ -326,9 +331,9 @@ protected void addedEvent(CR resource) throws IOException { var lock = OperatorState.INSTANCE.getLock(namespace); lock.lock(); try { - DeclareQueueResolver.resolveAdd(resource); - BindQueueLinkResolver.resolveDeclaredLinks(resource); - BindQueueLinkResolver.resolveHiddenLinks(resource); + declareQueueResolver.resolveAdd(resource); + bindQueueLinkResolver.resolveDeclaredLinks(resource); + bindQueueLinkResolver.resolveHiddenLinks(resource); updateGrpcLinkedResourcesIfNeeded(resource); super.addedEvent(resource); } finally { @@ -344,9 +349,9 @@ protected void modifiedEvent(CR resource) throws IOException { var lock = OperatorState.INSTANCE.getLock(namespace); lock.lock(); try { - DeclareQueueResolver.resolveAdd(resource); - BindQueueLinkResolver.resolveDeclaredLinks(resource); - BindQueueLinkResolver.resolveHiddenLinks(resource); + declareQueueResolver.resolveAdd(resource); + bindQueueLinkResolver.resolveDeclaredLinks(resource); + bindQueueLinkResolver.resolveHiddenLinks(resource); updateGrpcLinkedResourcesIfNeeded(resource); super.modifiedEvent(resource); } finally { @@ -361,7 +366,7 @@ protected void deletedEvent(CR resource) { lock.lock(); try { super.deletedEvent(resource); - DeclareQueueResolver.resolveDelete(resource); + declareQueueResolver.resolveDelete(resource); } finally { lock.unlock(); } diff --git a/src/main/java/com/exactpro/th2/infraoperator/operator/StoreHelmTh2Op.java b/src/main/java/com/exactpro/th2/infraoperator/operator/StoreHelmTh2Op.java index b29432c8..abc1b916 100644 --- a/src/main/java/com/exactpro/th2/infraoperator/operator/StoreHelmTh2Op.java +++ b/src/main/java/com/exactpro/th2/infraoperator/operator/StoreHelmTh2Op.java @@ -18,6 +18,7 @@ import com.exactpro.th2.infraoperator.OperatorState; import com.exactpro.th2.infraoperator.spec.Th2CustomResource; +import com.exactpro.th2.infraoperator.spec.strategy.linkresolver.mq.RabbitMQContext; import io.fabric8.kubernetes.client.KubernetesClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,8 +39,8 @@ public abstract class StoreHelmTh2Op extends HelmR public static final String MESSAGE_STORAGE_BOX_ALIAS = "mstore"; - public StoreHelmTh2Op(KubernetesClient client) { - super(client); + public StoreHelmTh2Op(KubernetesClient kubClient, RabbitMQContext rabbitMQContext) { + super(kubClient, rabbitMQContext); } private void nameCheck(CR resource) throws IOException { diff --git a/src/main/java/com/exactpro/th2/infraoperator/operator/impl/BoxHelmTh2Op.java b/src/main/java/com/exactpro/th2/infraoperator/operator/impl/BoxHelmTh2Op.java index c1e188bb..9a35114d 100644 --- a/src/main/java/com/exactpro/th2/infraoperator/operator/impl/BoxHelmTh2Op.java +++ b/src/main/java/com/exactpro/th2/infraoperator/operator/impl/BoxHelmTh2Op.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2021 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2024 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,6 +22,7 @@ import com.exactpro.th2.infraoperator.model.kubernetes.client.impl.BoxClient; import com.exactpro.th2.infraoperator.operator.GenericHelmTh2Op; import com.exactpro.th2.infraoperator.spec.box.Th2Box; +import com.exactpro.th2.infraoperator.spec.strategy.linkresolver.mq.RabbitMQContext; import com.exactpro.th2.infraoperator.util.CustomResourceUtils; import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.informers.SharedIndexInformer; @@ -31,9 +32,9 @@ public class BoxHelmTh2Op extends GenericHelmTh2Op { private final BoxClient boxClient; - public BoxHelmTh2Op(KubernetesClient client) { - super(client); - this.boxClient = new BoxClient(client); + public BoxHelmTh2Op(KubernetesClient kubClient, RabbitMQContext rabbitMQContext) { + super(kubClient, rabbitMQContext); + this.boxClient = new BoxClient(kubClient); } @Override diff --git a/src/main/java/com/exactpro/th2/infraoperator/operator/impl/CoreBoxHelmTh2Op.java b/src/main/java/com/exactpro/th2/infraoperator/operator/impl/CoreBoxHelmTh2Op.java index 4e01f53f..19d47f8a 100644 --- a/src/main/java/com/exactpro/th2/infraoperator/operator/impl/CoreBoxHelmTh2Op.java +++ b/src/main/java/com/exactpro/th2/infraoperator/operator/impl/CoreBoxHelmTh2Op.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2021 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2024 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,6 +22,7 @@ import com.exactpro.th2.infraoperator.model.kubernetes.client.impl.CoreBoxClient; import com.exactpro.th2.infraoperator.operator.GenericHelmTh2Op; import com.exactpro.th2.infraoperator.spec.corebox.Th2CoreBox; +import com.exactpro.th2.infraoperator.spec.strategy.linkresolver.mq.RabbitMQContext; import com.exactpro.th2.infraoperator.util.CustomResourceUtils; import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.informers.SharedIndexInformer; @@ -31,9 +32,9 @@ public class CoreBoxHelmTh2Op extends GenericHelmTh2Op { private final CoreBoxClient coreBoxClient; - public CoreBoxHelmTh2Op(KubernetesClient client) { - super(client); - this.coreBoxClient = new CoreBoxClient(client); + public CoreBoxHelmTh2Op(KubernetesClient kubClient, RabbitMQContext rabbitMQContext) { + super(kubClient, rabbitMQContext); + this.coreBoxClient = new CoreBoxClient(kubClient); } @Override diff --git a/src/main/java/com/exactpro/th2/infraoperator/operator/impl/EstoreHelmTh2Op.java b/src/main/java/com/exactpro/th2/infraoperator/operator/impl/EstoreHelmTh2Op.java index fa863d54..99018cce 100644 --- a/src/main/java/com/exactpro/th2/infraoperator/operator/impl/EstoreHelmTh2Op.java +++ b/src/main/java/com/exactpro/th2/infraoperator/operator/impl/EstoreHelmTh2Op.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2021 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2024 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,6 +22,7 @@ import com.exactpro.th2.infraoperator.model.kubernetes.client.impl.EstoreClient; import com.exactpro.th2.infraoperator.operator.StoreHelmTh2Op; import com.exactpro.th2.infraoperator.spec.estore.Th2Estore; +import com.exactpro.th2.infraoperator.spec.strategy.linkresolver.mq.RabbitMQContext; import com.exactpro.th2.infraoperator.util.CustomResourceUtils; import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.informers.SharedIndexInformer; @@ -31,9 +32,9 @@ public class EstoreHelmTh2Op extends StoreHelmTh2Op { private final EstoreClient estoreClient; - public EstoreHelmTh2Op(KubernetesClient client) { - super(client); - this.estoreClient = new EstoreClient(client); + public EstoreHelmTh2Op(KubernetesClient kubClient, RabbitMQContext rabbitMQContext) { + super(kubClient, rabbitMQContext); + this.estoreClient = new EstoreClient(kubClient); } @Override diff --git a/src/main/java/com/exactpro/th2/infraoperator/operator/impl/JobHelmTh2Op.java b/src/main/java/com/exactpro/th2/infraoperator/operator/impl/JobHelmTh2Op.java index a46e1603..41c29d62 100644 --- a/src/main/java/com/exactpro/th2/infraoperator/operator/impl/JobHelmTh2Op.java +++ b/src/main/java/com/exactpro/th2/infraoperator/operator/impl/JobHelmTh2Op.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2021 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2024 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,6 +22,7 @@ import com.exactpro.th2.infraoperator.model.kubernetes.client.impl.JobClient; import com.exactpro.th2.infraoperator.operator.GenericHelmTh2Op; import com.exactpro.th2.infraoperator.spec.job.Th2Job; +import com.exactpro.th2.infraoperator.spec.strategy.linkresolver.mq.RabbitMQContext; import com.exactpro.th2.infraoperator.util.CustomResourceUtils; import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.informers.SharedIndexInformer; @@ -31,9 +32,9 @@ public class JobHelmTh2Op extends GenericHelmTh2Op { private final JobClient jobClient; - public JobHelmTh2Op(KubernetesClient client) { - super(client); - this.jobClient = new JobClient(client); + public JobHelmTh2Op(KubernetesClient kubClient, RabbitMQContext rabbitMQContext) { + super(kubClient, rabbitMQContext); + this.jobClient = new JobClient(kubClient); } @Override diff --git a/src/main/java/com/exactpro/th2/infraoperator/operator/impl/MstoreHelmTh2Op.java b/src/main/java/com/exactpro/th2/infraoperator/operator/impl/MstoreHelmTh2Op.java index e0763f9f..19a80201 100644 --- a/src/main/java/com/exactpro/th2/infraoperator/operator/impl/MstoreHelmTh2Op.java +++ b/src/main/java/com/exactpro/th2/infraoperator/operator/impl/MstoreHelmTh2Op.java @@ -22,6 +22,7 @@ import com.exactpro.th2.infraoperator.model.kubernetes.client.impl.MstoreClient; import com.exactpro.th2.infraoperator.operator.StoreHelmTh2Op; import com.exactpro.th2.infraoperator.spec.mstore.Th2Mstore; +import com.exactpro.th2.infraoperator.spec.strategy.linkresolver.mq.RabbitMQContext; import com.exactpro.th2.infraoperator.util.CustomResourceUtils; import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.informers.SharedIndexInformer; @@ -31,9 +32,9 @@ public class MstoreHelmTh2Op extends StoreHelmTh2Op { private final MstoreClient mstoreClient; - public MstoreHelmTh2Op(KubernetesClient client) { - super(client); - this.mstoreClient = new MstoreClient(client); + public MstoreHelmTh2Op(KubernetesClient kubClient, RabbitMQContext rabbitMQContext) { + super(kubClient, rabbitMQContext); + this.mstoreClient = new MstoreClient(kubClient); } @Override diff --git a/src/main/java/com/exactpro/th2/infraoperator/operator/manager/impl/ConfigMapEventHandler.java b/src/main/java/com/exactpro/th2/infraoperator/operator/manager/impl/ConfigMapEventHandler.java index f5d14fca..005a0a26 100644 --- a/src/main/java/com/exactpro/th2/infraoperator/operator/manager/impl/ConfigMapEventHandler.java +++ b/src/main/java/com/exactpro/th2/infraoperator/operator/manager/impl/ConfigMapEventHandler.java @@ -105,19 +105,24 @@ private ConfigMapMeta(String alias, String dataFileName) { private static final Logger LOGGER = LoggerFactory.getLogger(ConfigMapEventHandler.class); - private KubernetesClient client; + private final KubernetesClient kubClient; + + private final RabbitMQContext rabbitMQContext; + + private final DefaultWatchManager watchManager; private MixedOperation, Resource> helmReleaseClient; public KubernetesClient getClient() { - return client; + return kubClient; } public static ConfigMapEventHandler newInstance(SharedInformerFactory sharedInformerFactory, KubernetesClient client, + RabbitMQContext rabbitMQContext, + DefaultWatchManager watchManager, EventQueue eventQueue) { - var res = new ConfigMapEventHandler(client); - res.client = client; + var res = new ConfigMapEventHandler(client, rabbitMQContext, watchManager); res.helmReleaseClient = client.resources(HelmRelease.class); SharedIndexInformer configMapInformer = sharedInformerFactory.sharedIndexInformerFor( @@ -129,8 +134,10 @@ public static ConfigMapEventHandler newInstance(SharedInformerFactory sharedInfo return res; } - private ConfigMapEventHandler(KubernetesClient client) { - this.client = client; + private ConfigMapEventHandler(KubernetesClient kubClient, RabbitMQContext rabbitMQContext, DefaultWatchManager watchManager) { + this.kubClient = kubClient; + this.rabbitMQContext = rabbitMQContext; + this.watchManager = watchManager; } @Override @@ -165,10 +172,10 @@ public void eventReceived(Action action, ConfigMap resource) { if (!Objects.equals(rabbitMQConfig, newRabbitMQConfig)) { Histogram.Timer processTimer = OperatorMetrics.getConfigMapEventTimer(resource); configMaps.setRabbitMQConfig4Namespace(namespace, newRabbitMQConfig); - RabbitMQContext.setUpRabbitMqForNamespace(namespace); + rabbitMQContext.setUpRabbitMqForNamespace(namespace); LOGGER.info("RabbitMQ ConfigMap has been updated in namespace \"{}\". Updating all boxes", namespace); - DefaultWatchManager.getInstance().refreshBoxes(namespace); + watchManager.refreshBoxes(namespace); LOGGER.info("box-definition(s) have been updated"); processTimer.observeDuration(); } else { @@ -256,7 +263,7 @@ private void updateDefaultBookName(Action action, String namespace, ConfigMap re Histogram.Timer processTimer = OperatorMetrics.getConfigMapEventTimer(resource); OperatorState.INSTANCE.setBookName(namespace, newBookName); LOGGER.info("\"{}\" has been updated. Updating all boxes", resourceLabel); - DefaultWatchManager.getInstance().refreshBoxes(namespace); + watchManager.refreshBoxes(namespace); LOGGER.info("box-definition(s) have been updated"); processTimer.observeDuration(); } @@ -318,7 +325,7 @@ public void onClose(WatcherException cause) { private String readRabbitMQPasswordForSchema(String namespace, String secretName) throws Exception { - Secret secret = client.secrets().inNamespace(namespace).withName(secretName).get(); + Secret secret = kubClient.secrets().inNamespace(namespace).withName(secretName).get(); if (secret == null) { throw new Exception(String.format("Secret not found \"%s\"", annotationFor(namespace, "Secret", secretName))); diff --git a/src/main/java/com/exactpro/th2/infraoperator/operator/manager/impl/DefaultWatchManager.java b/src/main/java/com/exactpro/th2/infraoperator/operator/manager/impl/DefaultWatchManager.java index af18438a..72d61942 100644 --- a/src/main/java/com/exactpro/th2/infraoperator/operator/manager/impl/DefaultWatchManager.java +++ b/src/main/java/com/exactpro/th2/infraoperator/operator/manager/impl/DefaultWatchManager.java @@ -20,6 +20,7 @@ import com.exactpro.th2.infraoperator.model.kubernetes.client.ResourceClient; import com.exactpro.th2.infraoperator.operator.HelmReleaseTh2Op; import com.exactpro.th2.infraoperator.spec.Th2CustomResource; +import com.exactpro.th2.infraoperator.spec.strategy.linkresolver.mq.RabbitMQContext; import com.exactpro.th2.infraoperator.util.CustomResourceUtils; import com.exactpro.th2.infraoperator.util.Strings; import com.fasterxml.uuid.Generators; @@ -38,18 +39,17 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; -import java.util.function.Function; +import java.util.function.BiFunction; import java.util.function.Supplier; import java.util.stream.Collectors; import static com.exactpro.th2.infraoperator.operator.AbstractTh2Operator.REFRESH_TOKEN_ALIAS; import static com.exactpro.th2.infraoperator.util.CustomResourceUtils.annotationFor; -import static com.exactpro.th2.infraoperator.util.KubernetesUtils.createKubernetesClient; import static com.exactpro.th2.infraoperator.util.WatcherUtils.createExceptionHandler; -public class DefaultWatchManager { +public class DefaultWatchManager implements AutoCloseable { - private static final Logger logger = LoggerFactory.getLogger(DefaultWatchManager.class); + private static final Logger LOGGER = LoggerFactory.getLogger(DefaultWatchManager.class); private boolean isWatching = false; @@ -59,29 +59,29 @@ public class DefaultWatchManager { private final SharedInformerFactory sharedInformerFactory; - private static DefaultWatchManager instance; - private final EventDispatcher eventDispatcher; - private final KubernetesClient client; + private final KubernetesClient kubClient; + + private final RabbitMQContext rabbitMQContext; - private synchronized SharedInformerFactory getInformerFactory() { + private SharedInformerFactory getInformerFactory() { return sharedInformerFactory; } - private DefaultWatchManager(KubernetesClient client) { - this.sharedInformerFactory = client.informers(); + public DefaultWatchManager(KubernetesClient kubClient, RabbitMQContext rabbitMQContext) { + this.sharedInformerFactory = kubClient.informers(); this.eventDispatcher = new EventDispatcher(); - this.client = client; + this.kubClient = kubClient; + this.rabbitMQContext = rabbitMQContext; sharedInformerFactory.addSharedInformerEventListener(exception -> { - logger.error("Exception in InformerFactory : {}", exception.getMessage()); + LOGGER.error("Exception in InformerFactory : {}", exception.getMessage()); }); - instance = this; } - public void startInformers() { - logger.info("Starting all informers..."); + public synchronized void startInformers() { + LOGGER.info("Starting all informers..."); SharedInformerFactory sharedInformerFactory = getInformerFactory(); @@ -92,11 +92,11 @@ public void startInformers() { isWatching = true; sharedInformerFactory.startAllRegisteredInformers(); - logger.info("All informers has been started"); + LOGGER.info("All informers has been started"); } - public void stopInformers() { - logger.info("Shutting down informers"); + private void stopInformers() { + LOGGER.info("Shutting down informers"); getInformerFactory().stopAllRegisteredInformers(); } @@ -107,10 +107,10 @@ private void loadResources(EventHandlerContext context) { private void loadConfigMaps(EventHandlerContext context) { var configMapEventHandler = (ConfigMapEventHandler) context.getHandler(ConfigMapEventHandler.class); - List configMaps = client.configMaps().inAnyNamespace().list().getItems(); + List configMaps = kubClient.configMaps().inAnyNamespace().list().getItems(); configMaps = filterByNamespace(configMaps); for (var configMap : configMaps) { - logger.info("Loading \"{}\"", annotationFor(configMap)); + LOGGER.info("Loading \"{}\"", annotationFor(configMap)); configMapEventHandler.eventReceived(Watcher.Action.ADDED, configMap); } } @@ -138,11 +138,11 @@ private EventHandlerContext registerInformers(SharedInformerFactory sharedInform EventHandlerContext context = new EventHandlerContext(); - context.addHandler(NamespaceEventHandler.newInstance(sharedInformerFactory, eventDispatcher.getEventQueue())); - context.addHandler(Th2DictionaryEventHandler.newInstance(sharedInformerFactory, client, - eventDispatcher.getEventQueue())); - context.addHandler(ConfigMapEventHandler.newInstance(sharedInformerFactory, client, + context.addHandler(NamespaceEventHandler.newInstance(sharedInformerFactory, rabbitMQContext, eventDispatcher.getEventQueue())); + context.addHandler(Th2DictionaryEventHandler.newInstance(sharedInformerFactory, kubClient, eventDispatcher.getEventQueue())); + context.addHandler(ConfigMapEventHandler.newInstance(sharedInformerFactory, kubClient, rabbitMQContext, + this, eventDispatcher.getEventQueue())); /* resourceClients initialization should be done first @@ -173,25 +173,25 @@ private EventHandlerContext registerInformers(SharedInformerFactory sharedInform return context; } - public boolean isWatching() { + public synchronized boolean isWatching() { return isWatching; } - public void addTarget( - Function> operator) { + public synchronized void addTarget( + BiFunction> operator) { helmWatchersCommands.add(() -> { // T extends Th2CustomResource -> T is a Th2CustomResource @SuppressWarnings("unchecked") - var th2ResOp = (HelmReleaseTh2Op) operator.apply(client); + var th2ResOp = (HelmReleaseTh2Op) operator.apply(kubClient, rabbitMQContext); return th2ResOp; }); } - void refreshBoxes(String namespace) { + synchronized void refreshBoxes(String namespace) { if (!isWatching()) { - logger.warn("Not watching for resources yet"); + LOGGER.warn("Not watching for resources yet"); return; } @@ -204,7 +204,7 @@ void refreshBoxes(String namespace) { } } - logger.info("{} boxes updated", refreshedBoxes); + LOGGER.info("{} boxes updated", refreshedBoxes); } private void createResource(String linkNamespace, Th2CustomResource resource, @@ -215,20 +215,16 @@ private void createResource(String linkNamespace, Th2CustomResource resource, resMeta.setAnnotations(Objects.nonNull(resMeta.getAnnotations()) ? resMeta.getAnnotations() : new HashMap<>()); resMeta.getAnnotations().put(REFRESH_TOKEN_ALIAS, refreshToken); resClient.getInstance().inNamespace(linkNamespace).resource(resource).createOrReplace(); - logger.debug("refreshed \"{}\" with refresh-token={}", + LOGGER.debug("refreshed \"{}\" with refresh-token={}", CustomResourceUtils.annotationFor(resource), refreshToken); } - public static synchronized DefaultWatchManager getInstance() { - if (instance == null) { - instance = new DefaultWatchManager(createKubernetesClient()); - } - - return instance; - } - - public void close() { + @Override + public synchronized void close() throws InterruptedException { + stopInformers(); eventDispatcher.interrupt(); - client.close(); + eventDispatcher.join(5_000); + resourceClients.clear(); + helmWatchersCommands.clear(); } } diff --git a/src/main/java/com/exactpro/th2/infraoperator/operator/manager/impl/NamespaceEventHandler.java b/src/main/java/com/exactpro/th2/infraoperator/operator/manager/impl/NamespaceEventHandler.java index 886328d1..53da675b 100644 --- a/src/main/java/com/exactpro/th2/infraoperator/operator/manager/impl/NamespaceEventHandler.java +++ b/src/main/java/com/exactpro/th2/infraoperator/operator/manager/impl/NamespaceEventHandler.java @@ -37,23 +37,27 @@ public class NamespaceEventHandler implements ResourceEventHandler, Watcher { private static final Logger LOGGER = LoggerFactory.getLogger(NamespaceEventHandler.class); + private final RabbitMQContext rabbitMQContext; + private final EventQueue eventQueue; private final OperatorConfig config = ConfigLoader.getConfig(); public static NamespaceEventHandler newInstance(SharedInformerFactory sharedInformerFactory, + RabbitMQContext rabbitMQContext, EventQueue eventQueue) { SharedIndexInformer namespaceInformer = sharedInformerFactory.sharedIndexInformerFor( Namespace.class, RESYNC_TIME); - var res = new NamespaceEventHandler(eventQueue); + var res = new NamespaceEventHandler(rabbitMQContext, eventQueue); namespaceInformer.exceptionHandler(createExceptionHandler(Namespace.class)); namespaceInformer.addEventHandler(res); return res; } - public NamespaceEventHandler(EventQueue eventQueue) { + public NamespaceEventHandler(RabbitMQContext rabbitMQContext, EventQueue eventQueue) { + this.rabbitMQContext = rabbitMQContext; this.eventQueue = eventQueue; } @@ -114,7 +118,7 @@ public void eventReceived(Action action, Namespace resource) { lock.lock(); try { LOGGER.info("Processing {} event for namespace: \"{}\"", action, namespaceName); - RabbitMQContext.cleanupRabbit(namespaceName); + rabbitMQContext.cleanupRabbit(namespaceName); LOGGER.info("Deleted namespace {}", namespaceName); } catch (Exception e) { LOGGER.error("Exception processing event for \"{}\"", resourceLabel, e); diff --git a/src/main/java/com/exactpro/th2/infraoperator/spec/strategy/linkresolver/mq/DeclareQueueResolver.java b/src/main/java/com/exactpro/th2/infraoperator/spec/strategy/linkresolver/mq/DeclareQueueResolver.java index 578326ec..cfc566d7 100644 --- a/src/main/java/com/exactpro/th2/infraoperator/spec/strategy/linkresolver/mq/DeclareQueueResolver.java +++ b/src/main/java/com/exactpro/th2/infraoperator/spec/strategy/linkresolver/mq/DeclareQueueResolver.java @@ -39,41 +39,46 @@ import static com.exactpro.th2.infraoperator.spec.strategy.linkresolver.Util.createEstoreQueue; import static com.exactpro.th2.infraoperator.spec.strategy.linkresolver.Util.createMstoreQueue; -import static com.exactpro.th2.infraoperator.spec.strategy.linkresolver.mq.RabbitMQContext.getChannel; import static com.exactpro.th2.infraoperator.util.ExtractUtils.extractName; public class DeclareQueueResolver { - private static final Logger logger = LoggerFactory.getLogger(DeclareQueueResolver.class); + private static final Logger LOGGER = LoggerFactory.getLogger(DeclareQueueResolver.class); - public static void resolveAdd(Th2CustomResource resource) { + private final RabbitMQContext rabbitMQContext; + + public DeclareQueueResolver(RabbitMQContext rabbitMQContext) { + this.rabbitMQContext = rabbitMQContext; + } + + public void resolveAdd(Th2CustomResource resource) { String namespace = ExtractUtils.extractNamespace(resource); try { declareQueueBunch(namespace, resource); } catch (Exception e) { String message = "Exception while working with rabbitMq"; - logger.error(message, e); + LOGGER.error(message, e); throw new NonTerminalException(message, e); } } - public static void resolveDelete(Th2CustomResource resource) { + public void resolveDelete(Th2CustomResource resource) { String namespace = ExtractUtils.extractNamespace(resource); try { - Channel channel = getChannel(); + Channel channel = rabbitMQContext.getChannel(); //get queues that are associated with current box. Set boxQueueNames = generateBoxQueues(namespace, resource); removeExtinctQueues(channel, boxQueueNames, CustomResourceUtils.annotationFor(resource), namespace); } catch (Exception e) { String message = "Exception while working with rabbitMq"; - logger.error(message, e); + LOGGER.error(message, e); throw new NonTerminalException(message, e); } } - private static void declareQueueBunch(String namespace, Th2CustomResource resource) throws IOException { + private void declareQueueBunch(String namespace, Th2CustomResource resource) throws IOException { - Channel channel = getChannel(); + Channel channel = rabbitMQContext.getChannel(); boolean persistence = ConfigLoader.getConfig().getRabbitMQManagement().getPersistence(); //get queues that are associated with current box and are not linked through Th2Link resources @@ -86,9 +91,9 @@ private static void declareQueueBunch(String namespace, Th2CustomResource resour //remove from set if pin for queue still exists. boxQueues.remove(queueName); var newQueueArguments = RabbitMQContext.generateQueueArguments(pin.getSettings()); - var currentQueue = RabbitMQContext.getQueue(queueName); + var currentQueue = rabbitMQContext.getQueue(queueName); if (currentQueue != null && !currentQueue.getArguments().equals(newQueueArguments)) { - logger.warn("Arguments for queue '{}' were modified. Recreating with new arguments", queueName); + LOGGER.warn("Arguments for queue '{}' were modified. Recreating with new arguments", queueName); channel.queueDelete(queueName); } var declareResult = channel.queueDeclare(queueName @@ -96,14 +101,14 @@ private static void declareQueueBunch(String namespace, Th2CustomResource resour , false , false , newQueueArguments); - logger.info("Queue '{}' of resource {} was successfully declared", + LOGGER.info("Queue '{}' of resource {} was successfully declared", declareResult.getQueue(), extractName(resource)); } //remove from rabbit queues that are left i.e. inactive removeExtinctQueues(channel, boxQueues, CustomResourceUtils.annotationFor(resource), namespace); } - private static Set getBoxPreviousQueues(String namespace, String boxName) { + private Set getBoxPreviousQueues(String namespace, String boxName) { HelmRelease hr = OperatorState.INSTANCE.getHelmReleaseFromCache(boxName, namespace); if (hr == null) { return getBoxQueuesFromRabbit(namespace, boxName); @@ -115,9 +120,9 @@ private static Set getBoxPreviousQueues(String namespace, String boxName * Collect all queues related to the {@code namespace} {@code boxName} component in RabbitMQ * @return mutable set of queues */ - private static @NotNull Set getBoxQueuesFromRabbit(String namespace, String boxName) { + private @NotNull Set getBoxQueuesFromRabbit(String namespace, String boxName) { - List queueInfoList = RabbitMQContext.getQueues(); + List queueInfoList = rabbitMQContext.getQueues(); Set queueNames = new HashSet<>(); queueInfoList.forEach(q -> { @@ -148,16 +153,16 @@ private static void removeExtinctQueues( String mstoreQueue = createMstoreQueue(namespace); if (!extinctQueueNames.isEmpty()) { - logger.info("Trying to delete queues associated with \"{}\"", resourceLabel); + LOGGER.info("Trying to delete queues associated with \"{}\"", resourceLabel); extinctQueueNames .stream() .filter(name -> !name.equals(estoreQueue) && !name.equals(mstoreQueue)) .forEach(queueName -> { try { channel.queueDelete(queueName); - logger.info("Deleted queue: [{}]", queueName); + LOGGER.info("Deleted queue: [{}]", queueName); } catch (IOException e) { - logger.error("Exception deleting queue: [{}]", queueName, e); + LOGGER.error("Exception deleting queue: [{}]", queueName, e); } }); } diff --git a/src/main/java/com/exactpro/th2/infraoperator/spec/strategy/linkresolver/mq/RabbitMQContext.java b/src/main/java/com/exactpro/th2/infraoperator/spec/strategy/linkresolver/mq/RabbitMQContext.java index ca09307d..d4443b2e 100644 --- a/src/main/java/com/exactpro/th2/infraoperator/spec/strategy/linkresolver/mq/RabbitMQContext.java +++ b/src/main/java/com/exactpro/th2/infraoperator/spec/strategy/linkresolver/mq/RabbitMQContext.java @@ -29,6 +29,7 @@ import com.exactpro.th2.infraoperator.spec.strategy.redeploy.tasks.RecreateQueuesAndBindings; import com.exactpro.th2.infraoperator.spec.strategy.redeploy.tasks.RetryRabbitSetup; import com.exactpro.th2.infraoperator.spec.strategy.redeploy.tasks.RetryTopicExchangeTask; +import com.exactpro.th2.infraoperator.util.Utils; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; @@ -55,6 +56,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; import static com.exactpro.th2.infraoperator.spec.strategy.linkresolver.Util.createEstoreQueue; @@ -66,161 +70,151 @@ import static java.util.Objects.requireNonNullElse; import static org.apache.commons.lang3.StringUtils.isNoneBlank; -public final class RabbitMQContext { +public final class RabbitMQContext implements AutoCloseable { - private static final Logger logger = LoggerFactory.getLogger(RabbitMQContext.class); + private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQContext.class); private static final int RETRY_DELAY = 120; public static final String TOPIC = BuiltinExchangeType.TOPIC.getType(); public static final String DIRECT = BuiltinExchangeType.DIRECT.getType(); + private final RabbitMQManagementConfig managementConfig; + private final ChannelContext channelContext; - private static volatile RabbitMQManagementConfig managementConfig; - - private static volatile ChannelContext channelContext; - - private static volatile Client rmqClient; + private final Client rmqClient; private static final RetryableTaskQueue retryableTaskQueue = new RetryableTaskQueue(); - private RabbitMQContext() { + public RabbitMQContext(RabbitMQManagementConfig managementConfig) throws MalformedURLException, URISyntaxException { + this.managementConfig = managementConfig; + this.rmqClient = createClient(managementConfig); + this.channelContext = new ChannelContext(this, managementConfig); } - public static String getTopicExchangeName() { - return getManagementConfig().getExchangeName(); + public String getTopicExchangeName() { + return managementConfig.getExchangeName(); } - public static void declareTopicExchange() { + public void declareTopicExchange() { String exchangeName = getTopicExchangeName(); - RabbitMQManagementConfig rabbitMQManagementConfig = getManagementConfig(); try { - getChannel().exchangeDeclare(exchangeName, TOPIC, rabbitMQManagementConfig.getPersistence()); + getChannel().exchangeDeclare(exchangeName, TOPIC, managementConfig.getPersistence()); } catch (Exception e) { - logger.error("Exception setting up exchange: \"{}\"", exchangeName, e); - RetryTopicExchangeTask retryTopicExchangeTask = new RetryTopicExchangeTask(exchangeName, RETRY_DELAY); + LOGGER.error("Exception setting up exchange: \"{}\"", exchangeName, e); + RetryTopicExchangeTask retryTopicExchangeTask = new RetryTopicExchangeTask(this, exchangeName, RETRY_DELAY); retryableTaskQueue.add(retryTopicExchangeTask, true); - logger.info("Task \"{}\" added to scheduler, with delay \"{}\" seconds", + LOGGER.info("Task \"{}\" added to scheduler, with delay \"{}\" seconds", retryTopicExchangeTask.getName(), RETRY_DELAY); } } - public static void setUpRabbitMqForNamespace(String namespace) { + public void setUpRabbitMqForNamespace(String namespace) { try { createUser(namespace); declareTopicExchange(); declareExchange(toExchangeName(namespace)); createStoreQueues(namespace); } catch (Exception e) { - logger.error("Exception setting up rabbitMq for namespace: \"{}\"", namespace, e); - RetryRabbitSetup retryRabbitSetup = new RetryRabbitSetup(namespace, RETRY_DELAY); + LOGGER.error("Exception setting up rabbitMq for namespace: \"{}\"", namespace, e); + RetryRabbitSetup retryRabbitSetup = new RetryRabbitSetup(this, namespace, RETRY_DELAY); retryableTaskQueue.add(retryRabbitSetup, true); - logger.info("Task \"{}\" added to scheduler, with delay \"{}\" seconds", + LOGGER.info("Task \"{}\" added to scheduler, with delay \"{}\" seconds", retryRabbitSetup.getName(), RETRY_DELAY); } } - private static void createUser(String namespace) throws Exception { + private void createUser(String namespace) { - RabbitMQManagementConfig rabbitMQManagementConfig = getManagementConfig(); RabbitMQConfig rabbitMQConfig = getRabbitMQConfig(namespace); String password = rabbitMQConfig.getPassword(); - String vHostName = rabbitMQManagementConfig.getVhostName(); + String vHostName = managementConfig.getVhostName(); if (StringUtils.isBlank(namespace)) { return; } try { - Client rmqClient = getClient(); - if (rmqClient.getVhost(vHostName) == null) { - logger.error("vHost: \"{}\" is not present", vHostName); + LOGGER.error("vHost: \"{}\" is not present", vHostName); return; } rmqClient.createUser(namespace, password.toCharArray(), new ArrayList<>()); - logger.info("Created user \"{}\" on vHost \"{}\"", namespace, vHostName); + LOGGER.info("Created user \"{}\" on vHost \"{}\"", namespace, vHostName); // set permissions - RabbitMQNamespacePermissions rabbitMQNamespacePermissions = - rabbitMQManagementConfig.getSchemaPermissions(); + RabbitMQNamespacePermissions rabbitMQNamespacePermissions = managementConfig.getSchemaPermissions(); UserPermissions permissions = new UserPermissions(); permissions.setConfigure(rabbitMQNamespacePermissions.getConfigure()); permissions.setRead(rabbitMQNamespacePermissions.getRead()); permissions.setWrite(rabbitMQNamespacePermissions.getWrite()); rmqClient.updatePermissions(vHostName, namespace, permissions); - logger.info("User \"{}\" permissions set in RabbitMQ", namespace); + LOGGER.info("User \"{}\" permissions set in RabbitMQ", namespace); } catch (Exception e) { - logger.error("Exception setting up user: \"{}\" for vHost: \"{}\"", namespace, vHostName, e); + LOGGER.error("Exception setting up user: \"{}\" for vHost: \"{}\"", namespace, vHostName, e); throw e; } } - private static void declareExchange(String exchangeName) throws Exception { - RabbitMQManagementConfig rabbitMQManagementConfig = getManagementConfig(); + private void declareExchange(String exchangeName) throws Exception { try { - getChannel().exchangeDeclare(exchangeName, DIRECT, rabbitMQManagementConfig.getPersistence()); + getChannel().exchangeDeclare(exchangeName, DIRECT, managementConfig.getPersistence()); } catch (Exception e) { - logger.error("Exception setting up exchange: \"{}\"", exchangeName, e); + LOGGER.error("Exception setting up exchange: \"{}\"", exchangeName, e); throw e; } } - private static void createStoreQueues(String namespace) throws Exception { + private void createStoreQueues(String namespace) throws Exception { var channel = getChannel(); - RabbitMQManagementConfig rabbitMQManagementConfig = getManagementConfig(); var declareResult = channel.queueDeclare( createEstoreQueue(namespace), - rabbitMQManagementConfig.getPersistence(), + managementConfig.getPersistence(), false, false, null ); - logger.info("Queue \"{}\" was successfully declared", declareResult.getQueue()); + LOGGER.info("Queue \"{}\" was successfully declared", declareResult.getQueue()); declareResult = channel.queueDeclare( createMstoreQueue(namespace), - rabbitMQManagementConfig.getPersistence(), + managementConfig.getPersistence(), false, false, null ); - logger.info("Queue \"{}\" was successfully declared", declareResult.getQueue()); + LOGGER.info("Queue \"{}\" was successfully declared", declareResult.getQueue()); } - public static void cleanupRabbit(String namespace) throws Exception { + public void cleanupRabbit(String namespace) { removeSchemaExchange(toExchangeName(namespace)); removeSchemaQueues(namespace); removeSchemaUser(namespace); } - private static void removeSchemaUser(String namespace) throws Exception { - RabbitMQManagementConfig rabbitMQManagementConfig = getManagementConfig(); - - String vHostName = rabbitMQManagementConfig.getVhostName(); - - Client rmqClient = getClient(); + private void removeSchemaUser(String namespace) { + String vHostName = managementConfig.getVhostName(); if (rmqClient.getVhost(vHostName) == null) { - logger.error("vHost: \"{}\" is not present", vHostName); + LOGGER.error("vHost: \"{}\" is not present", vHostName); return; } rmqClient.deleteUser(namespace); - logger.info("Deleted user \"{}\" from vHost \"{}\"", namespace, vHostName); + LOGGER.info("Deleted user \"{}\" from vHost \"{}\"", namespace, vHostName); } - private static void removeSchemaExchange(String exchangeName) { + private void removeSchemaExchange(String exchangeName) { try { getChannel().exchangeDelete(exchangeName); } catch (Exception e) { - logger.error("Exception deleting exchange: \"{}\"", exchangeName, e); + LOGGER.error("Exception deleting exchange: \"{}\"", exchangeName, e); } } - private static void removeSchemaQueues(String namespace) { + private void removeSchemaQueues(String namespace) { try { Channel channel = getChannel(); @@ -231,14 +225,14 @@ private static void removeSchemaQueues(String namespace) { if (queue != null && queue.getNamespace().equals(namespace)) { try { channel.queueDelete(queueName); - logger.info("Deleted queue: [{}]", queueName); + LOGGER.info("Deleted queue: [{}]", queueName); } catch (IOException e) { - logger.error("Exception deleting queue: [{}]", queueName, e); + LOGGER.error("Exception deleting queue: [{}]", queueName, e); } } }); } catch (Exception e) { - logger.error("Exception cleaning up queues for: \"{}\"", namespace, e); + LOGGER.error("Exception cleaning up queues for: \"{}\"", namespace, e); } } @@ -246,15 +240,8 @@ public static String toExchangeName(String namespace) { return namespace; } - public static Channel getChannel() { - Channel channel = getChannelContext().channel; - if (!channel.isOpen()) { - logger.warn("RabbitMQ connection is broken, trying to reconnect..."); - getChannelContext().close(); - channel = getChannelContext().channel; - logger.info("RabbitMQ connection has been restored"); - } - return channel; + public Channel getChannel() { + return channelContext.getChannel(); } public static Map generateQueueArguments(PinSettings pinSettings) throws NumberFormatException { @@ -272,49 +259,46 @@ public static Map generateQueueArguments(PinSettings pinSettings } } - public static @NotNull List getQueues() { - String vHostName = getManagementConfig().getVhostName(); + public @NotNull List getQueues() { + String vHostName = managementConfig.getVhostName(); try { - Client rmqClient = getClient(); return requireNonNullElse(rmqClient.getQueues(vHostName), emptyList()); } catch (Exception e) { String message = "Exception while fetching queues"; - logger.error(message, e); + LOGGER.error(message, e); throw new NonTerminalException(message, e); } } - public static @NotNull List getTh2Queues() { + public @NotNull List getTh2Queues() { return getQueues().stream() .filter(queueInfo -> queueInfo.getName() != null && queueInfo.getName().matches(QUEUE_NAME_REGEXP)) .collect(Collectors.toList()); } - public static List getQueueBindings(String queue) { - String vHostName = getManagementConfig().getVhostName(); + public List getQueueBindings(String queue) { + String vHostName = managementConfig.getVhostName(); try { - Client rmqClient = getClient(); return rmqClient.getQueueBindings(vHostName, queue); } catch (Exception e) { String message = "Exception while fetching bindings"; - logger.error(message, e); + LOGGER.error(message, e); throw new NonTerminalException(message, e); } } - public static @NotNull List getExchanges() { + public @NotNull List getExchanges() { try { - Client rmqClient = getClient(); return rmqClient.getExchanges(); } catch (Exception e) { String message = "Exception while fetching exchanges"; - logger.error(message, e); + LOGGER.error(message, e); throw new NonTerminalException(message, e); } } - public static @NotNull List getTh2Exchanges() { + public @NotNull List getTh2Exchanges() { Collection namespacePrefixes = ConfigLoader.getConfig().getNamespacePrefixes(); String topicExchange = getTopicExchangeName(); return getExchanges().stream() @@ -326,15 +310,14 @@ public static List getQueueBindings(String queue) { }).collect(Collectors.toList()); } - public static QueueInfo getQueue(String queueName) { + public QueueInfo getQueue(String queueName) { - String vHostName = getManagementConfig().getVhostName(); + String vHostName = managementConfig.getVhostName(); try { - Client rmqClient = getClient(); return rmqClient.getQueue(vHostName, queueName); } catch (Exception e) { String message = "Exception while fetching queue"; - logger.error(message, e); + LOGGER.error(message, e); throw new NonTerminalException(message, e); } } @@ -347,31 +330,11 @@ public static Client createClient(String host, int port, String username, String ); } - private static RabbitMQManagementConfig getManagementConfig() { - // we do not need to synchronize as we are assigning immutable object from singleton - if (managementConfig == null) { - managementConfig = ConfigLoader.getConfig().getRabbitMQManagement(); - } - return managementConfig; - } - - private static Client getClient() throws MalformedURLException, URISyntaxException { - if (rmqClient == null) { - RabbitMQManagementConfig rabbitMQMngConfig = getManagementConfig(); - rmqClient = createClient(rabbitMQMngConfig.getHost(), - rabbitMQMngConfig.getManagementPort(), - rabbitMQMngConfig.getUsername(), - rabbitMQMngConfig.getPassword()); - } - return rmqClient; - } - - private static ChannelContext getChannelContext() { - // we do not need to synchronize as we are assigning immutable object from singleton - if (channelContext == null) { - channelContext = new ChannelContext(); - } - return channelContext; + private static Client createClient(RabbitMQManagementConfig rabbitMQMngConfig) throws MalformedURLException, URISyntaxException { + return createClient(rabbitMQMngConfig.getHost(), + rabbitMQMngConfig.getManagementPort(), + rabbitMQMngConfig.getUsername(), + rabbitMQMngConfig.getPassword()); } private static RabbitMQConfig getRabbitMQConfig(String namespace) { @@ -384,49 +347,77 @@ private static RabbitMQConfig getRabbitMQConfig(String namespace) { return rabbitMQConfig; } - static class ChannelContext { + @Override + public void close() { + Utils.close(channelContext, "AMQP channel context"); + } + + static class ChannelContext implements AutoCloseable { + + private final Lock lock = new ReentrantLock(); + + private final RabbitMQContext rabbitMQContext; + + private final RabbitMQManagementConfig rabbitMQManagementConfig; private Connection connection; private Channel channel; - ChannelContext() { - ConnectionFactory connectionFactory = createConnectionFactory(); + ChannelContext(RabbitMQContext rabbitMQContext, RabbitMQManagementConfig rabbitMQManagementConfig) { + this.rabbitMQContext = rabbitMQContext; + this.rabbitMQManagementConfig = rabbitMQManagementConfig; + getChannel(); + } + + public Channel getChannel() { + lock.lock(); try { - this.connection = connectionFactory.newConnection(); - this.connection.addShutdownListener(new RmqClientShutdownEventListener()); - this.channel = connection.createChannel(); + if (connection == null || !connection.isOpen()) { + close(); + LOGGER.warn("RabbitMQ connection is broken, trying to reconnect..."); + connection = createConnection(); + } + if (channel == null || !channel.isOpen()) { + channel = connection.createChannel(); + } + return channel; } catch (Exception e) { close(); String message = "Exception while creating rabbitMq channel"; - logger.error(message, e); + LOGGER.error(message, e); throw new NonTerminalException(message, e); + } finally { + lock.unlock(); } } - synchronized void close() { + @Override + public void close() { + lock.lock(); try { if (channel != null && channel.isOpen()) { - channel.close(); + Utils.close(channel, "AMQP channel"); } - } catch (Exception e) { - logger.error("Exception closing RabbitMQ channel", e); - } - try { if (connection != null && connection.isOpen()) { - connection.close(); + Utils.close(connection, "AMQP connection"); } - } catch (Exception e) { - logger.error("Exception closing RabbitMQ connection for", e); + connection = null; + channel = null; + } finally { + lock.unlock(); } - channel = null; - connection = null; - channelContext = null; + } + + private Connection createConnection() throws IOException, TimeoutException { + ConnectionFactory connectionFactory = createConnectionFactory(); + Connection connection = connectionFactory.newConnection(); + connection.addShutdownListener(new RmqClientShutdownEventListener(rabbitMQContext)); + return connection; } @NotNull - private static ConnectionFactory createConnectionFactory() { - RabbitMQManagementConfig rabbitMQManagementConfig = getManagementConfig(); + private ConnectionFactory createConnectionFactory() { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(rabbitMQManagementConfig.getHost()); connectionFactory.setPort(rabbitMQManagementConfig.getApplicationPort()); @@ -438,16 +429,22 @@ private static ConnectionFactory createConnectionFactory() { } private static class RmqClientShutdownEventListener implements ShutdownListener { + private final RabbitMQContext rabbitMQContext; + + private RmqClientShutdownEventListener(RabbitMQContext rabbitMQContext) { + this.rabbitMQContext = rabbitMQContext; + } @Override public void shutdownCompleted(ShutdownSignalException cause) { - logger.error("Detected Rabbit mq connection lose", cause); + LOGGER.error("Detected Rabbit mq connection lose", cause); RecreateQueuesAndBindings recreateQueuesAndBindingsTask = new RecreateQueuesAndBindings( + rabbitMQContext, OperatorState.INSTANCE.getAllBoxResources(), RETRY_DELAY ); retryableTaskQueue.add(recreateQueuesAndBindingsTask, true); - logger.info("Task \"{}\" added to scheduler, with delay \"{}\" seconds", + LOGGER.info("Task \"{}\" added to scheduler, with delay \"{}\" seconds", recreateQueuesAndBindingsTask.getName(), RETRY_DELAY); } } diff --git a/src/main/java/com/exactpro/th2/infraoperator/spec/strategy/redeploy/ContinuousTaskWorker.java b/src/main/java/com/exactpro/th2/infraoperator/spec/strategy/redeploy/ContinuousTaskWorker.java index bda96929..37f4b137 100644 --- a/src/main/java/com/exactpro/th2/infraoperator/spec/strategy/redeploy/ContinuousTaskWorker.java +++ b/src/main/java/com/exactpro/th2/infraoperator/spec/strategy/redeploy/ContinuousTaskWorker.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2021 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2024 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,35 +17,43 @@ package com.exactpro.th2.infraoperator.spec.strategy.redeploy; import com.exactpro.th2.infraoperator.spec.strategy.redeploy.tasks.Task; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; -public class ContinuousTaskWorker { - private static final Logger logger = LoggerFactory.getLogger(ContinuousTaskWorker.class); +public class ContinuousTaskWorker implements AutoCloseable { + private static final Logger LOGGER = LoggerFactory.getLogger(ContinuousTaskWorker.class); private static final int THREAD_POOL_SIZE = 2; private final Map taskMap = new HashMap<>(); - private final ScheduledExecutorService taskScheduler = new ScheduledThreadPoolExecutor(THREAD_POOL_SIZE); + private final ScheduledExecutorService taskScheduler = new ScheduledThreadPoolExecutor(THREAD_POOL_SIZE, + new ThreadFactoryBuilder().setNameFormat("worker-%d").build()); public synchronized void add(Task task) { if (!taskMap.containsKey(task.getName())) { taskMap.put(task.getName(), task); taskScheduler.scheduleWithFixedDelay(task, task.getRetryDelay(), task.getRetryDelay(), TimeUnit.SECONDS); - logger.info("Added task '{}' to scheduler", task.getName()); + LOGGER.info("Added task '{}' to scheduler", task.getName()); } else { - logger.info("Task '{}' is already present in scheduler. Will not be added again", task.getName()); + LOGGER.info("Task '{}' is already present in scheduler. Will not be added again", task.getName()); } } - public void shutdown() { + @Override + public void close() throws Exception { taskScheduler.shutdown(); + if (!taskScheduler.awaitTermination(5, TimeUnit.SECONDS)) { + List tasks = taskScheduler.shutdownNow(); + LOGGER.error("The {} tasks in {} are not completed", tasks.size(), ContinuousTaskWorker.class); + } } } diff --git a/src/main/java/com/exactpro/th2/infraoperator/spec/strategy/redeploy/tasks/RecreateQueuesAndBindings.java b/src/main/java/com/exactpro/th2/infraoperator/spec/strategy/redeploy/tasks/RecreateQueuesAndBindings.java index 1206a9db..fc7b9dfa 100644 --- a/src/main/java/com/exactpro/th2/infraoperator/spec/strategy/redeploy/tasks/RecreateQueuesAndBindings.java +++ b/src/main/java/com/exactpro/th2/infraoperator/spec/strategy/redeploy/tasks/RecreateQueuesAndBindings.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2022 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2024 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,11 +24,22 @@ import java.util.Collection; public class RecreateQueuesAndBindings implements Task { + private final RabbitMQContext rabbitMQContext; + + private final DeclareQueueResolver declareQueueResolver; + + private final BindQueueLinkResolver bindQueueLinkResolver; + private final long retryDelay; private final Collection resources; - public RecreateQueuesAndBindings(Collection resources, long retryDelay) { + public RecreateQueuesAndBindings(RabbitMQContext rabbitMQContext, + Collection resources, + long retryDelay) { + this.rabbitMQContext = rabbitMQContext; + this.declareQueueResolver = new DeclareQueueResolver(rabbitMQContext); + this.bindQueueLinkResolver = new BindQueueLinkResolver(rabbitMQContext); this.resources = resources; this.retryDelay = retryDelay; } @@ -45,11 +56,11 @@ public long getRetryDelay() { @Override public void run() { - RabbitMQContext.getChannel(); + rabbitMQContext.getChannel(); resources.forEach(resource -> { - DeclareQueueResolver.resolveAdd(resource); - BindQueueLinkResolver.resolveDeclaredLinks(resource); - BindQueueLinkResolver.resolveHiddenLinks(resource); + declareQueueResolver.resolveAdd(resource); + bindQueueLinkResolver.resolveDeclaredLinks(resource); + bindQueueLinkResolver.resolveHiddenLinks(resource); }); } } diff --git a/src/main/java/com/exactpro/th2/infraoperator/spec/strategy/redeploy/tasks/RetryRabbitSetup.java b/src/main/java/com/exactpro/th2/infraoperator/spec/strategy/redeploy/tasks/RetryRabbitSetup.java index 3f5e558e..610cfd77 100644 --- a/src/main/java/com/exactpro/th2/infraoperator/spec/strategy/redeploy/tasks/RetryRabbitSetup.java +++ b/src/main/java/com/exactpro/th2/infraoperator/spec/strategy/redeploy/tasks/RetryRabbitSetup.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2022 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2024 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,11 +20,14 @@ public class RetryRabbitSetup implements Task { + private final RabbitMQContext rabbitMQContext; + private final long retryDelay; private final String namespace; - public RetryRabbitSetup(String namespace, long retryDelay) { + public RetryRabbitSetup(RabbitMQContext rabbitMQContext, String namespace, long retryDelay) { + this.rabbitMQContext = rabbitMQContext; this.retryDelay = retryDelay; this.namespace = namespace; } @@ -41,6 +44,6 @@ public long getRetryDelay() { @Override public void run() { - RabbitMQContext.setUpRabbitMqForNamespace(namespace); + rabbitMQContext.setUpRabbitMqForNamespace(namespace); } } diff --git a/src/main/java/com/exactpro/th2/infraoperator/spec/strategy/redeploy/tasks/RetryTopicExchangeTask.java b/src/main/java/com/exactpro/th2/infraoperator/spec/strategy/redeploy/tasks/RetryTopicExchangeTask.java index f67cd9ab..b47d2583 100644 --- a/src/main/java/com/exactpro/th2/infraoperator/spec/strategy/redeploy/tasks/RetryTopicExchangeTask.java +++ b/src/main/java/com/exactpro/th2/infraoperator/spec/strategy/redeploy/tasks/RetryTopicExchangeTask.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2022 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2024 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,11 +20,14 @@ public class RetryTopicExchangeTask implements Task { + private final RabbitMQContext rabbitMQContext; + private final long retryDelay; private final String exchangeName; - public RetryTopicExchangeTask(String exchangeName, long retryDelay) { + public RetryTopicExchangeTask(RabbitMQContext rabbitMQContext, String exchangeName, long retryDelay) { + this.rabbitMQContext = rabbitMQContext; this.retryDelay = retryDelay; this.exchangeName = exchangeName; } @@ -41,6 +44,6 @@ public long getRetryDelay() { @Override public void run() { - RabbitMQContext.declareTopicExchange(); + rabbitMQContext.declareTopicExchange(); } } diff --git a/src/main/kotlin/com/exactpro/th2/infraoperator/spec/strategy/linkresolver/mq/BindQueueLinkResolver.kt b/src/main/kotlin/com/exactpro/th2/infraoperator/spec/strategy/linkresolver/mq/BindQueueLinkResolver.kt index 14868e5c..ecc02f40 100644 --- a/src/main/kotlin/com/exactpro/th2/infraoperator/spec/strategy/linkresolver/mq/BindQueueLinkResolver.kt +++ b/src/main/kotlin/com/exactpro/th2/infraoperator/spec/strategy/linkresolver/mq/BindQueueLinkResolver.kt @@ -33,10 +33,9 @@ import com.exactpro.th2.infraoperator.util.CustomResourceUtils import com.exactpro.th2.infraoperator.util.CustomResourceUtils.annotationFor import io.github.oshai.kotlinlogging.KotlinLogging -object BindQueueLinkResolver { - private val K_LOGGER = KotlinLogging.logger { } - - @JvmStatic +class BindQueueLinkResolver( + private val rabbitMQContext: RabbitMQContext, +) { fun resolveDeclaredLinks(resource: Th2CustomResource) { val namespace = resource.metadata.namespace val resourceName = resource.metadata.name @@ -55,7 +54,6 @@ object BindQueueLinkResolver { } } - @JvmStatic fun resolveHiddenLinks(resource: Th2CustomResource) { val namespace = resource.metadata.namespace val resourceName = resource.metadata.name @@ -110,9 +108,9 @@ object BindQueueLinkResolver { private fun bindQueues(queue: LinkDescription, commitHash: String) { try { - val channel = RabbitMQContext.getChannel() + val channel = rabbitMQContext.channel val queueName = queue.queueName.toString() - val currentQueue = RabbitMQContext.getQueue(queueName) + val currentQueue = rabbitMQContext.getQueue(queueName) if (currentQueue == null) { K_LOGGER.info {"Queue '$queueName' does not yet exist. skipping binding" } return @@ -135,7 +133,7 @@ object BindQueueLinkResolver { resName: String? = null ) { val queueName = queue.toString() - val bindingOnRabbit = RabbitMQContext.getQueueBindings(queueName) + val bindingOnRabbit = rabbitMQContext.getQueueBindings(queueName) ?.map { it.routingKey } ?.filter { it.matches(ROUTING_KEY_REGEXP.toRegex()) && @@ -145,10 +143,10 @@ object BindQueueLinkResolver { RoutingKeyName(queue.namespace, it.box, it.pin).toString() } try { - val channel = RabbitMQContext.getChannel() + val channel = rabbitMQContext.channel bindingOnRabbit?.forEach { if (!currentBindings.contains(it)) { - val currentQueue = RabbitMQContext.getQueue(queueName) + val currentQueue = rabbitMQContext.getQueue(queueName) if (currentQueue == null) { K_LOGGER.info { "Queue '$queueName' already removed. skipping unbinding" } return @@ -163,4 +161,8 @@ object BindQueueLinkResolver { throw NonTerminalException(message, e) } } + + companion object { + private val K_LOGGER = KotlinLogging.logger { } + } } diff --git a/src/main/kotlin/com/exactpro/th2/infraoperator/util/RabbitMQUtils.kt b/src/main/kotlin/com/exactpro/th2/infraoperator/util/RabbitMQUtils.kt index b2ab1b07..64f3c6f5 100644 --- a/src/main/kotlin/com/exactpro/th2/infraoperator/util/RabbitMQUtils.kt +++ b/src/main/kotlin/com/exactpro/th2/infraoperator/util/RabbitMQUtils.kt @@ -39,11 +39,14 @@ import java.io.IOException private val K_LOGGER = KotlinLogging.logger { } -fun deleteRabbitMQRubbish() { +fun deleteRabbitMQRubbish( + kubernetesClient: KubernetesClient, + rabbitMQContext: RabbitMQContext, +) { try { val resourceHolder = collectRabbitMQResources( - RabbitMQContext.getTh2Queues(), - RabbitMQContext.getTh2Exchanges(), + rabbitMQContext.th2Queues, + rabbitMQContext.th2Exchanges, ) if (resourceHolder.isHolderEmpty()) { @@ -51,17 +54,15 @@ fun deleteRabbitMQRubbish() { } val namespacePrefixes = ConfigLoader.loadConfiguration().namespacePrefixes - val topicExchange = RabbitMQContext.getTopicExchangeName() - - createKubernetesClient().use { kuClient -> - resourceHolder.filterRubbishResources( - kuClient, - namespacePrefixes, - topicExchange, - ) - } + val topicExchange = rabbitMQContext.topicExchangeName + + resourceHolder.filterRubbishResources( + kubernetesClient, + namespacePrefixes, + topicExchange, + ) K_LOGGER.info { "RabbitMQ rubbish: $resourceHolder" } - deleteRabbitMQRubbish(resourceHolder, RabbitMQContext::getChannel) + deleteRabbitMQRubbish(resourceHolder, rabbitMQContext::getChannel) } catch (e: Exception) { K_LOGGER.error(e) { "Delete RabbitMQ rubbish failure" } } diff --git a/src/main/kotlin/com/exactpro/th2/infraoperator/util/Utils.kt b/src/main/kotlin/com/exactpro/th2/infraoperator/util/Utils.kt new file mode 100644 index 00000000..ad16c5a6 --- /dev/null +++ b/src/main/kotlin/com/exactpro/th2/infraoperator/util/Utils.kt @@ -0,0 +1,32 @@ +/* + * Copyright 2024-2024 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +@file:JvmName("Utils") + +package com.exactpro.th2.infraoperator.util + +import io.github.oshai.kotlinlogging.KotlinLogging + +private val K_LOGGER = KotlinLogging.logger {} + +fun close(closeable: AutoCloseable, name: String) { + try { + closeable.close() + K_LOGGER.info { "$name closed" } + } catch (e: Exception) { + K_LOGGER.error(e) { "$name close failure" } + } +} \ No newline at end of file diff --git a/src/test/kotlin/com/exactpro/th2/infraoperator/integration/DeleteRubbishOnStartTest.kt b/src/test/kotlin/com/exactpro/th2/infraoperator/integration/DeleteRubbishOnStartTest.kt index b782fae2..5c840987 100644 --- a/src/test/kotlin/com/exactpro/th2/infraoperator/integration/DeleteRubbishOnStartTest.kt +++ b/src/test/kotlin/com/exactpro/th2/infraoperator/integration/DeleteRubbishOnStartTest.kt @@ -18,6 +18,8 @@ package com.exactpro.th2.infraoperator.integration import com.exactpro.th2.infraoperator.Th2CrdController import com.exactpro.th2.infraoperator.configuration.fields.RabbitMQNamespacePermissions +import com.exactpro.th2.infraoperator.operator.StoreHelmTh2Op.EVENT_STORAGE_BOX_ALIAS +import com.exactpro.th2.infraoperator.operator.StoreHelmTh2Op.EVENT_STORAGE_PIN_ALIAS import com.exactpro.th2.infraoperator.operator.StoreHelmTh2Op.MESSAGE_STORAGE_BOX_ALIAS import com.exactpro.th2.infraoperator.operator.StoreHelmTh2Op.MESSAGE_STORAGE_PIN_ALIAS import com.exactpro.th2.infraoperator.spec.box.Th2Box @@ -26,23 +28,20 @@ import com.exactpro.th2.infraoperator.spec.strategy.linkresolver.mq.RabbitMQCont import com.exactpro.th2.infraoperator.spec.strategy.linkresolver.mq.RabbitMQContext.toExchangeName import com.exactpro.th2.infraoperator.util.createKubernetesClient import com.rabbitmq.client.AMQP -import com.rabbitmq.client.Channel import com.rabbitmq.client.Connection import com.rabbitmq.http.client.Client -import com.rabbitmq.http.client.domain.QueueInfo import io.fabric8.kubernetes.client.KubernetesClient import org.junit.jupiter.api.AfterAll import org.junit.jupiter.api.BeforeAll -import org.junit.jupiter.api.Disabled import org.junit.jupiter.api.Tag import org.junit.jupiter.api.TestInstance import org.junit.jupiter.api.Timeout +import org.junit.jupiter.api.assertAll import org.junit.jupiter.api.io.TempDir import org.testcontainers.containers.RabbitMQContainer import org.testcontainers.k3s.K3sContainer import java.nio.file.Path import kotlin.test.Test -import kotlin.test.assertEquals @Tag("integration-test") @TestInstance(TestInstance.Lifecycle.PER_CLASS) @@ -99,9 +98,50 @@ class DeleteRubbishOnStartTest { } @Test - @Disabled("implement Th2CrdController.close") + @Timeout(30_000) fun deleteAllTest() { - TODO() + val namespaces = listOf( + "${TH2_PREFIX}test-b", + "${TH2_PREFIX}test-c" + ) + val component = "test-component" + + rabbitMQConnection.createChannel().use { channel -> + val queues = mutableListOf() + val exchanges = mutableListOf() + + namespaces.forEach { namespace -> + channel.createQueue(namespace, "rubbish-component", PIN_NAME) + .assertQueue().queue.also(queues::add) + channel.createQueue(namespace, component, "rubbish-pin") + .assertQueue().queue.also(queues::add) + channel.createQueue(namespace, component, PIN_NAME) + .assertQueue().queue.also(queues::add) + channel.createQueue(namespace, MESSAGE_STORAGE_BOX_ALIAS, MESSAGE_STORAGE_PIN_ALIAS) + .assertQueue().queue.also(queues::add) + channel.createQueue(namespace, EVENT_STORAGE_BOX_ALIAS, EVENT_STORAGE_PIN_ALIAS) + .assertQueue().queue.also(queues::add) + + toExchangeName(namespace).apply { + channel.createExchange(this, DIRECT) + rabbitMQClient.assertExchange(this, DIRECT, RABBIT_MQ_V_HOST) + exchanges.add(this) + } + } + + Th2CrdController().use { + assertAll( + queues.map { queue -> + { rabbitMQClient.assertNoQueue(queue, RABBIT_MQ_V_HOST) } + } + exchanges.map { exchange -> + { rabbitMQClient.assertNoExchange(exchange, RABBIT_MQ_V_HOST) } + } + listOf( + { rabbitMQClient.assertNoQueues("link\\[.*\\]", RABBIT_MQ_V_HOST) }, + { rabbitMQClient.assertNoExchanges("${TH2_PREFIX}.*", RABBIT_MQ_V_HOST) } + ) + ) + } + } } @Test @@ -160,19 +200,19 @@ class DeleteRubbishOnStartTest { """.trimIndent() kubeClient.createTh2CustomResource(exchangeB, component, gitHash, spec, ::Th2Box) - Th2CrdController().apply(Th2CrdController::start).use { + Th2CrdController().use { kubeClient.awaitPhase(exchangeB, component, RolloutPhase.SUCCEEDED, Th2Box::class.java) rabbitMQClient.assertNoQueue(queue01.queue, RABBIT_MQ_V_HOST) rabbitMQClient.assertNoQueue(queue02.queue, RABBIT_MQ_V_HOST) rabbitMQClient.assertNoQueue(queue03.queue, RABBIT_MQ_V_HOST) - rabbitMQClient.assertQueue(queue11.queue, RABBIT_MQ_QUEUE_CLASSIC_TYPE, RABBIT_MQ_V_HOST) - .assertQueueSize(channel, 1) + rabbitMQClient.assertQueue(queue11.queue, RABBIT_MQ_QUEUE_CLASSIC_TYPE, RABBIT_MQ_V_HOST) + rabbitMQClient.awaitQueueSize(queue11.queue, RABBIT_MQ_V_HOST, 1) rabbitMQClient.assertQueue(queue12.queue, RABBIT_MQ_QUEUE_CLASSIC_TYPE, RABBIT_MQ_V_HOST) - .assertQueueSize(channel, 1) + rabbitMQClient.awaitQueueSize(queue12.queue, RABBIT_MQ_V_HOST, 1) - rabbitMQClient.assertNoExchange(exchangeC) + rabbitMQClient.assertNoExchange(exchangeC, RABBIT_MQ_V_HOST) } } } @@ -198,11 +238,6 @@ class DeleteRubbishOnStartTest { return this } - private fun QueueInfo.assertQueueSize(channel: Channel, size: Int) { - val declareOk = channel.queueDeclare(name, isDurable, isExclusive, isAutoDelete, arguments) - assertEquals(size, declareOk.messageCount) - } - companion object { private const val TH2_PREFIX = "th2-" private const val TH2_BOOK = "test_book" diff --git a/src/test/kotlin/com/exactpro/th2/infraoperator/integration/IntegrationTest.kt b/src/test/kotlin/com/exactpro/th2/infraoperator/integration/IntegrationTest.kt index 1f20e6e0..39630f37 100644 --- a/src/test/kotlin/com/exactpro/th2/infraoperator/integration/IntegrationTest.kt +++ b/src/test/kotlin/com/exactpro/th2/infraoperator/integration/IntegrationTest.kt @@ -124,7 +124,7 @@ class IntegrationTest { kubeClient = createKubernetesClient().apply { configureK3s() } rabbitMQClient = createRabbitMQClient(rabbitMQContainer) - controller = Th2CrdController().apply(Th2CrdController::start) + controller = Th2CrdController() rabbitMQClient.assertExchange(RABBIT_MQ_TOPIC_EXCHANGE, TOPIC, RABBIT_MQ_V_HOST) } @@ -141,6 +141,9 @@ class IntegrationTest { if (this::rabbitMQContainer.isInitialized) { rabbitMQContainer.stop() } + if (this::controller.isInitialized) { + controller.close() + } } @BeforeEach @@ -179,7 +182,7 @@ class IntegrationTest { // FIXME: Secret not found "th2-test:Secret/rabbitMQ" rabbitMQClient.assertNoQueues("link\\[.*\\]", RABBIT_MQ_V_HOST) - rabbitMQClient.assertNoExchange(toExchangeName(TH2_NAMESPACE)) + rabbitMQClient.assertNoExchange(toExchangeName(TH2_NAMESPACE), RABBIT_MQ_V_HOST) rabbitMQClient.assertNoUser(TH2_NAMESPACE) kubeClient.awaitNoResources(TH2_NAMESPACE) diff --git a/src/test/kotlin/com/exactpro/th2/infraoperator/integration/TestRabbitMQUtils.kt b/src/test/kotlin/com/exactpro/th2/infraoperator/integration/TestRabbitMQUtils.kt index 7074340b..4b805ef9 100644 --- a/src/test/kotlin/com/exactpro/th2/infraoperator/integration/TestRabbitMQUtils.kt +++ b/src/test/kotlin/com/exactpro/th2/infraoperator/integration/TestRabbitMQUtils.kt @@ -19,7 +19,9 @@ package com.exactpro.th2.infraoperator.integration import com.exactpro.th2.infraoperator.configuration.fields.RabbitMQNamespacePermissions import com.rabbitmq.http.client.Client import com.rabbitmq.http.client.domain.DestinationType +import com.rabbitmq.http.client.domain.ExchangeInfo import com.rabbitmq.http.client.domain.QueueInfo +import io.github.oshai.kotlinlogging.KotlinLogging import org.junit.jupiter.api.assertAll import org.testcontainers.shaded.org.awaitility.Awaitility.await import java.util.concurrent.TimeUnit @@ -30,6 +32,8 @@ import kotlin.test.assertTrue const val RABBIT_MQ_QUEUE_CLASSIC_TYPE = "classic" +private val K_LOGGER = KotlinLogging.logger {} + fun Client.assertUser( user: String, vHost: String, @@ -87,14 +91,34 @@ fun Client.assertExchange( assertFalse(exchangeInfo.isAutoDelete, "Exchange '$exchange' is auto delete") } +fun Client.assertNoExchanges( + exchangePattern: String, + vHost: String, + timeout: Long = 5_000, + unit: TimeUnit = TimeUnit.MILLISECONDS, +) { + val filter: (ExchangeInfo) -> Boolean = { + exchangeInfo -> exchangeInfo.name.matches(Regex(exchangePattern)) && exchangeInfo.vhost == vHost + } + await("assertNoExchanges('$exchangePattern')") + .timeout(timeout, unit) + .conditionEvaluationListener { _ -> + K_LOGGER.debug { + "Remaining exchanges by '$exchangePattern': ${exchanges.filter(filter).map(ExchangeInfo::getName)}" + } + } + .until { exchanges.none(filter) } +} + fun Client.assertNoExchange( exchange: String, + vHost: String, timeout: Long = 5_000, unit: TimeUnit = TimeUnit.MILLISECONDS, ) { await("assertNoExchange('$exchange')") .timeout(timeout, unit) - .until { exchanges.firstOrNull { it.name == exchange } == null } + .until { exchanges.firstOrNull { it.name == exchange && it.vhost == vHost } == null } } fun Client.assertQueue( @@ -197,9 +221,17 @@ fun Client.assertNoQueues( timeout: Long = 5_000, unit: TimeUnit = TimeUnit.MILLISECONDS, ) { + val filter: (QueueInfo) -> Boolean = { + queueInfo -> queueInfo.name.matches(Regex(queuePattern)) && queueInfo.vhost == vHost + } await("assertNoQueues('$queuePattern')") .timeout(timeout, unit) - .until { queues.map { it.name.matches(Regex(queuePattern)) && it.vhost == vHost }.isEmpty() } + .conditionEvaluationListener { _ -> + K_LOGGER.debug { + "Remaining queues by '$queuePattern': ${queues.filter(filter).map(QueueInfo::getName)}" + } + } + .until { queues.none(filter) } } fun Client.assertNoQueue(