Skip to content

Commit

Permalink
[TH2-5226] Removed singletons (#124)
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita-Smirnov-Exactpro authored Sep 19, 2024
1 parent 1555413 commit b409e5b
Show file tree
Hide file tree
Showing 29 changed files with 574 additions and 393 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/integration-tests.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: "Run integration tests for LDP"
name: "Run integration tests for infra-operator"

on:
push:
Expand Down
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ dependencies {
implementation "io.prometheus:simpleclient_httpserver"
implementation "io.prometheus:simpleclient_hotspot"

implementation "com.google.guava:guava"

testImplementation(platform(libs.testcontainers.bom))
testImplementation 'org.testcontainers:rabbitmq'
testImplementation 'org.testcontainers:k3s'
Expand Down
116 changes: 91 additions & 25 deletions src/main/java/com/exactpro/th2/infraoperator/Th2CrdController.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.exactpro.th2.infraoperator;

import com.exactpro.th2.infraoperator.configuration.ConfigLoader;
import com.exactpro.th2.infraoperator.configuration.OperatorConfig;
import com.exactpro.th2.infraoperator.metrics.OperatorMetrics;
import com.exactpro.th2.infraoperator.metrics.PrometheusServer;
import com.exactpro.th2.infraoperator.operator.impl.BoxHelmTh2Op;
Expand All @@ -28,49 +30,84 @@
import com.exactpro.th2.infraoperator.spec.strategy.redeploy.ContinuousTaskWorker;
import com.exactpro.th2.infraoperator.spec.strategy.redeploy.tasks.CheckResourceCacheTask;
import com.exactpro.th2.infraoperator.util.RabbitMQUtils;
import com.exactpro.th2.infraoperator.util.Utils;
import io.fabric8.kubernetes.client.KubernetesClient;
import org.apache.logging.log4j.core.LoggerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Deque;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import static com.exactpro.th2.infraoperator.util.KubernetesUtils.createKubernetesClient;

public class Th2CrdController implements AutoCloseable {

private static final Logger LOGGER = LoggerFactory.getLogger(Th2CrdController.class);

public void start() {
var watchManager = DefaultWatchManager.getInstance();
PrometheusServer.start();
private final PrometheusServer prometheusServer;
private final KubernetesClient kubClient;
private final DefaultWatchManager watchManager;
private final RabbitMQContext rabbitMQContext;
private final ContinuousTaskWorker continuousTaskWorker;

public Th2CrdController() throws IOException, URISyntaxException {
OperatorConfig config = ConfigLoader.loadConfiguration();
prometheusServer = new PrometheusServer(config.getPrometheusConfiguration());
kubClient = createKubernetesClient();
rabbitMQContext = new RabbitMQContext(config.getRabbitMQManagement());
watchManager = new DefaultWatchManager(kubClient, rabbitMQContext);
continuousTaskWorker = new ContinuousTaskWorker();

OperatorMetrics.resetCacheErrors();
try {
RabbitMQUtils.deleteRabbitMQRubbish();
RabbitMQContext.declareTopicExchange(); // FIXME: topic exchange should be removed when all namespaces are removed / disabled
RabbitMQUtils.deleteRabbitMQRubbish(kubClient, rabbitMQContext);
rabbitMQContext.declareTopicExchange(); // FIXME: topic exchange should be removed when all namespaces are removed / disabled

watchManager.addTarget(MstoreHelmTh2Op::new);
watchManager.addTarget(EstoreHelmTh2Op::new);
watchManager.addTarget(BoxHelmTh2Op::new);
watchManager.addTarget(CoreBoxHelmTh2Op::new);
watchManager.addTarget(JobHelmTh2Op::new);
watchManager.addTarget(MstoreHelmTh2Op::new);
watchManager.addTarget(EstoreHelmTh2Op::new);
watchManager.addTarget(BoxHelmTh2Op::new);
watchManager.addTarget(CoreBoxHelmTh2Op::new);
watchManager.addTarget(JobHelmTh2Op::new);

watchManager.startInformers();
watchManager.startInformers();
continuousTaskWorker.add(new CheckResourceCacheTask(300));
}

ContinuousTaskWorker continuousTaskWorker = new ContinuousTaskWorker();
continuousTaskWorker.add(new CheckResourceCacheTask(300));
public static void main(String[] args) {
Deque<AutoCloseable> resources = new ConcurrentLinkedDeque<>();
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();

configureShutdownHook(resources, lock, condition);

try {
if (args.length > 0) {
configureLogger(args[0]);
}
Th2CrdController controller = new Th2CrdController();
resources.add(controller);

awaitShutdown(lock, condition);
} catch (Exception e) {
LOGGER.error("Exception in main thread", e);
watchManager.stopInformers();
watchManager.close();
throw e;
System.exit(1);
}
}

public static void main(String[] args) {
if (args.length > 0) {
configureLogger(args[0]);
}
Th2CrdController controller = new Th2CrdController();
controller.start();
@Override
public void close() {
Utils.close(prometheusServer, "Prometheus server");
Utils.close(kubClient, "Kubernetes client");
Utils.close(watchManager, "Watch manager");
Utils.close(rabbitMQContext, "RabbitMQ context");
Utils.close(continuousTaskWorker, "Continuous task worker");
}

private static void configureLogger(String filePath) {
Expand All @@ -83,8 +120,37 @@ private static void configureLogger(String filePath) {
}
}

@Override
public void close() throws Exception {
private static void configureShutdownHook(Deque<AutoCloseable> resources, Lock lock, Condition condition) {
Runtime.getRuntime().addShutdownHook(new Thread(
() -> {
LOGGER.info("Shutdown start");
lock.lock();
try {
condition.signalAll();
} finally {
lock.unlock();
}
resources.descendingIterator().forEachRemaining((resource) -> {
try {
resource.close();
} catch (Exception e) {
LOGGER.error("Cannot close resource {}", resource.getClass(), e);
}
});
LOGGER.info("Shutdown end");
},
"Shutdown hook"
));
}

private static void awaitShutdown(Lock lock, Condition condition) throws InterruptedException {
lock.lock();
try {
LOGGER.info("Wait shutdown");
condition.await();
LOGGER.info("App shutdown");
} finally {
lock.unlock();
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2021 Exactpro (Exactpro Systems Limited)
* Copyright 2020-2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,40 +16,39 @@

package com.exactpro.th2.infraoperator.metrics;

import com.exactpro.th2.infraoperator.configuration.ConfigLoader;
import com.exactpro.th2.infraoperator.spec.shared.PrometheusConfiguration;
import io.prometheus.client.exporter.HTTPServer;
import io.prometheus.client.hotspot.DefaultExports;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicReference;

public class PrometheusServer {
private static final Logger logger = LoggerFactory.getLogger(PrometheusServer.class);
public class PrometheusServer implements AutoCloseable {
private static final Logger LOGGER = LoggerFactory.getLogger(PrometheusServer.class);
@Nullable
private final HTTPServer server;

private static final AtomicReference<HTTPServer> prometheusExporter = new AtomicReference<>();

public static void start() {
static {
DefaultExports.initialize();
PrometheusConfiguration<String> prometheusConfiguration = ConfigLoader.getConfig().getPrometheusConfiguration();
}

String host = prometheusConfiguration.getHost();
int port = Integer.parseInt(prometheusConfiguration.getPort());
boolean enabled = Boolean.parseBoolean(prometheusConfiguration.getEnabled());
public PrometheusServer(PrometheusConfiguration<String> configuration) throws IOException {
if (Boolean.parseBoolean(configuration.getEnabled())) {
String host = configuration.getHost();
int port = Integer.parseInt(configuration.getPort());
server = new HTTPServer(host, port);
LOGGER.info("Started prometheus server on: \"{}:{}\"", host, port);
} else {
server = null;
}
}

prometheusExporter.updateAndGet(server -> {
if (server == null && enabled) {
try {
server = new HTTPServer(host, port);
logger.info("Started prometheus server on: \"{}:{}\"", host, port);
return server;
} catch (IOException e) {
throw new RuntimeException("Failed to create Prometheus exporter", e);
}
}
return server;
});
@Override
public void close() {
if (server != null) {
server.close();
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2021 Exactpro (Exactpro Systems Limited)
* Copyright 2020-2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,14 +21,10 @@
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.MixedOperation;
import io.fabric8.kubernetes.client.dsl.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class DefaultResourceClient<CR extends CustomResource> implements ResourceClient<CR> {

private static final Logger logger = LoggerFactory.getLogger(DefaultResourceClient.class);

private final KubernetesClient client;
private final KubernetesClient kubClient;

private final Class<CR> resourceType;

Expand All @@ -37,15 +33,15 @@ public abstract class DefaultResourceClient<CR extends CustomResource> implement
private final String crdName;

public DefaultResourceClient(
KubernetesClient client,
KubernetesClient kubClient,
Class<CR> resourceType,
String crdName
) {
this.client = client;
this.kubClient = kubClient;
this.resourceType = resourceType;
this.crdName = crdName;

instance = client.resources(resourceType);
instance = kubClient.resources(resourceType);
}

@Override
Expand All @@ -54,7 +50,7 @@ public Class<CR> getResourceType() {
}

public KubernetesClient getClient() {
return this.client;
return this.kubClient;
}

public MixedOperation<CR, ? extends KubernetesResourceList<CR>, ? extends Resource<CR>> getInstance() {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2021 Exactpro (Exactpro Systems Limited)
* Copyright 2020-2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,12 +17,13 @@
package com.exactpro.th2.infraoperator.operator;

import com.exactpro.th2.infraoperator.spec.Th2CustomResource;
import com.exactpro.th2.infraoperator.spec.strategy.linkresolver.mq.RabbitMQContext;
import io.fabric8.kubernetes.client.KubernetesClient;

public abstract class GenericHelmTh2Op<CR extends Th2CustomResource> extends HelmReleaseTh2Op<CR> {

public GenericHelmTh2Op(KubernetesClient client) {
super(client);
public GenericHelmTh2Op(KubernetesClient kubClient, RabbitMQContext rabbitMQContext) {
super(kubClient, rabbitMQContext);
}

}
Loading

0 comments on commit b409e5b

Please sign in to comment.