Skip to content

Commit

Permalink
[TH2-5226] Removed singletons
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita-Smirnov-Exactpro committed Sep 17, 2024
1 parent 1555413 commit e26ccc1
Show file tree
Hide file tree
Showing 25 changed files with 571 additions and 348 deletions.
115 changes: 90 additions & 25 deletions src/main/java/com/exactpro/th2/infraoperator/Th2CrdController.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.exactpro.th2.infraoperator;

import com.exactpro.th2.infraoperator.configuration.ConfigLoader;
import com.exactpro.th2.infraoperator.configuration.OperatorConfig;
import com.exactpro.th2.infraoperator.metrics.OperatorMetrics;
import com.exactpro.th2.infraoperator.metrics.PrometheusServer;
import com.exactpro.th2.infraoperator.operator.impl.BoxHelmTh2Op;
Expand All @@ -28,49 +30,83 @@
import com.exactpro.th2.infraoperator.spec.strategy.redeploy.ContinuousTaskWorker;
import com.exactpro.th2.infraoperator.spec.strategy.redeploy.tasks.CheckResourceCacheTask;
import com.exactpro.th2.infraoperator.util.RabbitMQUtils;
import com.exactpro.th2.infraoperator.util.Utils;
import io.fabric8.kubernetes.client.KubernetesClient;
import org.apache.logging.log4j.core.LoggerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Deque;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import static com.exactpro.th2.infraoperator.util.KubernetesUtils.createKubernetesClient;

public class Th2CrdController implements AutoCloseable {

private static final Logger LOGGER = LoggerFactory.getLogger(Th2CrdController.class);

public void start() {
var watchManager = DefaultWatchManager.getInstance();
PrometheusServer.start();
private final PrometheusServer prometheusServer;
private final KubernetesClient kubClient;
private final DefaultWatchManager watchManager;
private final RabbitMQContext rabbitMQContext;
private final ContinuousTaskWorker continuousTaskWorker;

public Th2CrdController() throws IOException, URISyntaxException {
OperatorConfig config = ConfigLoader.loadConfiguration();
prometheusServer = new PrometheusServer(config.getPrometheusConfiguration());
kubClient = createKubernetesClient();
rabbitMQContext = new RabbitMQContext(config.getRabbitMQManagement());
watchManager = new DefaultWatchManager(kubClient, rabbitMQContext);
continuousTaskWorker = new ContinuousTaskWorker();

OperatorMetrics.resetCacheErrors();
try {
RabbitMQUtils.deleteRabbitMQRubbish();
RabbitMQContext.declareTopicExchange(); // FIXME: topic exchange should be removed when all namespaces are removed / disabled
RabbitMQUtils.deleteRabbitMQRubbish(kubClient, rabbitMQContext);
rabbitMQContext.declareTopicExchange(); // FIXME: topic exchange should be removed when all namespaces are removed / disabled

watchManager.addTarget(MstoreHelmTh2Op::new);
watchManager.addTarget(EstoreHelmTh2Op::new);
watchManager.addTarget(BoxHelmTh2Op::new);
watchManager.addTarget(CoreBoxHelmTh2Op::new);
watchManager.addTarget(JobHelmTh2Op::new);
watchManager.addTarget(MstoreHelmTh2Op::new);
watchManager.addTarget(EstoreHelmTh2Op::new);
watchManager.addTarget(BoxHelmTh2Op::new);
watchManager.addTarget(CoreBoxHelmTh2Op::new);
watchManager.addTarget(JobHelmTh2Op::new);

watchManager.startInformers();
watchManager.startInformers();
continuousTaskWorker.add(new CheckResourceCacheTask(300));
}

ContinuousTaskWorker continuousTaskWorker = new ContinuousTaskWorker();
continuousTaskWorker.add(new CheckResourceCacheTask(300));
public static void main(String[] args) {
Deque<AutoCloseable> resources = new ConcurrentLinkedDeque<>();
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();

configureShutdownHook(resources, lock, condition);

try {
if (args.length > 0) {
configureLogger(args[0]);
}
Th2CrdController controller = new Th2CrdController();
resources.add(controller);

awaitShutdown(lock, condition);
} catch (Exception e) {
LOGGER.error("Exception in main thread", e);
watchManager.stopInformers();
watchManager.close();
throw e;
}
}

public static void main(String[] args) {
if (args.length > 0) {
configureLogger(args[0]);
}
Th2CrdController controller = new Th2CrdController();
controller.start();
@Override
public void close() {
Utils.close(prometheusServer, "Prometheus server");
Utils.close(kubClient, "Kubernetes client");
Utils.close(watchManager, "Watch manager");
Utils.close(rabbitMQContext, "RabbitMQ context");
Utils.close(continuousTaskWorker, "Continuous task worker");
}

private static void configureLogger(String filePath) {
Expand All @@ -83,8 +119,37 @@ private static void configureLogger(String filePath) {
}
}

@Override
public void close() throws Exception {
private static void configureShutdownHook(Deque<AutoCloseable> resources, Lock lock, Condition condition) {
Runtime.getRuntime().addShutdownHook(new Thread(
() -> {
LOGGER.info("Shutdown start");
lock.lock();
try {
condition.signalAll();
} finally {
lock.unlock();
}
resources.descendingIterator().forEachRemaining((resource) -> {
try {
resource.close();
} catch (Exception e) {
LOGGER.error("Cannot close resource {}", resource.getClass(), e);
}
});
LOGGER.info("Shutdown end");
},
"Shutdown hook"
));
}

private static void awaitShutdown(Lock lock, Condition condition) throws InterruptedException {
lock.lock();
try {
LOGGER.info("Wait shutdown");
condition.await();
LOGGER.info("App shutdown");
} finally {
lock.unlock();
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2021 Exactpro (Exactpro Systems Limited)
* Copyright 2020-2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,40 +16,47 @@

package com.exactpro.th2.infraoperator.metrics;

import com.exactpro.th2.infraoperator.configuration.ConfigLoader;
import com.exactpro.th2.infraoperator.spec.shared.PrometheusConfiguration;
import io.prometheus.client.exporter.HTTPServer;
import io.prometheus.client.hotspot.DefaultExports;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class PrometheusServer {
private static final Logger logger = LoggerFactory.getLogger(PrometheusServer.class);
public class PrometheusServer implements AutoCloseable {
private static final Logger LOGGER = LoggerFactory.getLogger(PrometheusServer.class);
private final Lock lock = new ReentrantLock();
@Nullable
private final HTTPServer server;

private static final AtomicReference<HTTPServer> prometheusExporter = new AtomicReference<>();

public static void start() {
static {
DefaultExports.initialize();
PrometheusConfiguration<String> prometheusConfiguration = ConfigLoader.getConfig().getPrometheusConfiguration();
}

String host = prometheusConfiguration.getHost();
int port = Integer.parseInt(prometheusConfiguration.getPort());
boolean enabled = Boolean.parseBoolean(prometheusConfiguration.getEnabled());
public PrometheusServer(PrometheusConfiguration<String> configuration) throws IOException {
if (Boolean.parseBoolean(configuration.getEnabled())) {
String host = configuration.getHost();
int port = Integer.parseInt(configuration.getPort());
server = new HTTPServer(host, port);
LOGGER.info("Started prometheus server on: \"{}:{}\"", host, port);
} else {
server = null;
}
}

prometheusExporter.updateAndGet(server -> {
if (server == null && enabled) {
try {
server = new HTTPServer(host, port);
logger.info("Started prometheus server on: \"{}:{}\"", host, port);
return server;
} catch (IOException e) {
throw new RuntimeException("Failed to create Prometheus exporter", e);
}
@Override
public void close() {
lock.lock();
try {
if (server != null) {
server.close();
}
return server;
});
} finally {
lock.unlock();
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2021 Exactpro (Exactpro Systems Limited)
* Copyright 2020-2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,12 +17,13 @@
package com.exactpro.th2.infraoperator.operator;

import com.exactpro.th2.infraoperator.spec.Th2CustomResource;
import com.exactpro.th2.infraoperator.spec.strategy.linkresolver.mq.RabbitMQContext;
import io.fabric8.kubernetes.client.KubernetesClient;

public abstract class GenericHelmTh2Op<CR extends Th2CustomResource> extends HelmReleaseTh2Op<CR> {

public GenericHelmTh2Op(KubernetesClient client) {
super(client);
public GenericHelmTh2Op(KubernetesClient kubClient, RabbitMQContext rabbitMQContext) {
super(kubClient, rabbitMQContext);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.exactpro.th2.infraoperator.spec.shared.PrometheusConfiguration;
import com.exactpro.th2.infraoperator.spec.strategy.linkresolver.mq.BindQueueLinkResolver;
import com.exactpro.th2.infraoperator.spec.strategy.linkresolver.mq.DeclareQueueResolver;
import com.exactpro.th2.infraoperator.spec.strategy.linkresolver.mq.RabbitMQContext;
import com.exactpro.th2.infraoperator.util.CustomResourceUtils;
import com.exactpro.th2.infraoperator.util.JsonUtils;

Expand Down Expand Up @@ -126,13 +127,17 @@ public abstract class HelmReleaseTh2Op<CR extends Th2CustomResource> extends Abs
protected final MixedOperation<HelmRelease, KubernetesResourceList<HelmRelease>, Resource<HelmRelease>>
helmReleaseClient;

public HelmReleaseTh2Op(KubernetesClient client) {
protected final DeclareQueueResolver declareQueueResolver;
protected final BindQueueLinkResolver bindQueueLinkResolver;

super(client);
public HelmReleaseTh2Op(KubernetesClient kubClient, RabbitMQContext rabbitMQContext) {

this.grpcConfigFactory = new GrpcRouterConfigFactory();
super(kubClient);

helmReleaseClient = kubClient.resources(HelmRelease.class);
this.grpcConfigFactory = new GrpcRouterConfigFactory();
this.helmReleaseClient = this.kubClient.resources(HelmRelease.class);
this.declareQueueResolver = new DeclareQueueResolver(rabbitMQContext);
this.bindQueueLinkResolver = new BindQueueLinkResolver(rabbitMQContext);
}

public abstract SharedIndexInformer<CR> generateInformerFromFactory(SharedInformerFactory factory);
Expand Down Expand Up @@ -326,9 +331,9 @@ protected void addedEvent(CR resource) throws IOException {
var lock = OperatorState.INSTANCE.getLock(namespace);
lock.lock();
try {
DeclareQueueResolver.resolveAdd(resource);
BindQueueLinkResolver.resolveDeclaredLinks(resource);
BindQueueLinkResolver.resolveHiddenLinks(resource);
declareQueueResolver.resolveAdd(resource);
bindQueueLinkResolver.resolveDeclaredLinks(resource);
bindQueueLinkResolver.resolveHiddenLinks(resource);
updateGrpcLinkedResourcesIfNeeded(resource);
super.addedEvent(resource);
} finally {
Expand All @@ -344,9 +349,9 @@ protected void modifiedEvent(CR resource) throws IOException {
var lock = OperatorState.INSTANCE.getLock(namespace);
lock.lock();
try {
DeclareQueueResolver.resolveAdd(resource);
BindQueueLinkResolver.resolveDeclaredLinks(resource);
BindQueueLinkResolver.resolveHiddenLinks(resource);
declareQueueResolver.resolveAdd(resource);
bindQueueLinkResolver.resolveDeclaredLinks(resource);
bindQueueLinkResolver.resolveHiddenLinks(resource);
updateGrpcLinkedResourcesIfNeeded(resource);
super.modifiedEvent(resource);
} finally {
Expand All @@ -361,7 +366,7 @@ protected void deletedEvent(CR resource) {
lock.lock();
try {
super.deletedEvent(resource);
DeclareQueueResolver.resolveDelete(resource);
declareQueueResolver.resolveDelete(resource);
} finally {
lock.unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.exactpro.th2.infraoperator.OperatorState;
import com.exactpro.th2.infraoperator.spec.Th2CustomResource;
import com.exactpro.th2.infraoperator.spec.strategy.linkresolver.mq.RabbitMQContext;
import io.fabric8.kubernetes.client.KubernetesClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -38,8 +39,8 @@ public abstract class StoreHelmTh2Op<CR extends Th2CustomResource> extends HelmR

public static final String MESSAGE_STORAGE_BOX_ALIAS = "mstore";

public StoreHelmTh2Op(KubernetesClient client) {
super(client);
public StoreHelmTh2Op(KubernetesClient kubClient, RabbitMQContext rabbitMQContext) {
super(kubClient, rabbitMQContext);
}

private void nameCheck(CR resource) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2021 Exactpro (Exactpro Systems Limited)
* Copyright 2020-2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,6 +22,7 @@
import com.exactpro.th2.infraoperator.model.kubernetes.client.impl.BoxClient;
import com.exactpro.th2.infraoperator.operator.GenericHelmTh2Op;
import com.exactpro.th2.infraoperator.spec.box.Th2Box;
import com.exactpro.th2.infraoperator.spec.strategy.linkresolver.mq.RabbitMQContext;
import com.exactpro.th2.infraoperator.util.CustomResourceUtils;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
Expand All @@ -31,9 +32,9 @@ public class BoxHelmTh2Op extends GenericHelmTh2Op<Th2Box> {

private final BoxClient boxClient;

public BoxHelmTh2Op(KubernetesClient client) {
super(client);
this.boxClient = new BoxClient(client);
public BoxHelmTh2Op(KubernetesClient kubClient, RabbitMQContext rabbitMQContext) {
super(kubClient, rabbitMQContext);
this.boxClient = new BoxClient(kubClient);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2021 Exactpro (Exactpro Systems Limited)
* Copyright 2020-2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,6 +22,7 @@
import com.exactpro.th2.infraoperator.model.kubernetes.client.impl.CoreBoxClient;
import com.exactpro.th2.infraoperator.operator.GenericHelmTh2Op;
import com.exactpro.th2.infraoperator.spec.corebox.Th2CoreBox;
import com.exactpro.th2.infraoperator.spec.strategy.linkresolver.mq.RabbitMQContext;
import com.exactpro.th2.infraoperator.util.CustomResourceUtils;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
Expand All @@ -31,9 +32,9 @@ public class CoreBoxHelmTh2Op extends GenericHelmTh2Op<Th2CoreBox> {

private final CoreBoxClient coreBoxClient;

public CoreBoxHelmTh2Op(KubernetesClient client) {
super(client);
this.coreBoxClient = new CoreBoxClient(client);
public CoreBoxHelmTh2Op(KubernetesClient kubClient, RabbitMQContext rabbitMQContext) {
super(kubClient, rabbitMQContext);
this.coreBoxClient = new CoreBoxClient(kubClient);
}

@Override
Expand Down
Loading

0 comments on commit e26ccc1

Please sign in to comment.