Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TH2-5226] Removed singletons #124

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ dependencies {
implementation "io.prometheus:simpleclient_httpserver"
implementation "io.prometheus:simpleclient_hotspot"

implementation "com.google.guava:guava"

testImplementation(platform(libs.testcontainers.bom))
testImplementation 'org.testcontainers:rabbitmq'
testImplementation 'org.testcontainers:k3s'
Expand Down
116 changes: 91 additions & 25 deletions src/main/java/com/exactpro/th2/infraoperator/Th2CrdController.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.exactpro.th2.infraoperator;

import com.exactpro.th2.infraoperator.configuration.ConfigLoader;
import com.exactpro.th2.infraoperator.configuration.OperatorConfig;
import com.exactpro.th2.infraoperator.metrics.OperatorMetrics;
import com.exactpro.th2.infraoperator.metrics.PrometheusServer;
import com.exactpro.th2.infraoperator.operator.impl.BoxHelmTh2Op;
Expand All @@ -28,49 +30,84 @@
import com.exactpro.th2.infraoperator.spec.strategy.redeploy.ContinuousTaskWorker;
import com.exactpro.th2.infraoperator.spec.strategy.redeploy.tasks.CheckResourceCacheTask;
import com.exactpro.th2.infraoperator.util.RabbitMQUtils;
import com.exactpro.th2.infraoperator.util.Utils;
import io.fabric8.kubernetes.client.KubernetesClient;
import org.apache.logging.log4j.core.LoggerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Deque;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import static com.exactpro.th2.infraoperator.util.KubernetesUtils.createKubernetesClient;

public class Th2CrdController implements AutoCloseable {

private static final Logger LOGGER = LoggerFactory.getLogger(Th2CrdController.class);

public void start() {
var watchManager = DefaultWatchManager.getInstance();
PrometheusServer.start();
private final PrometheusServer prometheusServer;
private final KubernetesClient kubClient;
private final DefaultWatchManager watchManager;
private final RabbitMQContext rabbitMQContext;
private final ContinuousTaskWorker continuousTaskWorker;

public Th2CrdController() throws IOException, URISyntaxException {
OperatorConfig config = ConfigLoader.loadConfiguration();
prometheusServer = new PrometheusServer(config.getPrometheusConfiguration());
kubClient = createKubernetesClient();
rabbitMQContext = new RabbitMQContext(config.getRabbitMQManagement());
watchManager = new DefaultWatchManager(kubClient, rabbitMQContext);
continuousTaskWorker = new ContinuousTaskWorker();

OperatorMetrics.resetCacheErrors();
try {
RabbitMQUtils.deleteRabbitMQRubbish();
RabbitMQContext.declareTopicExchange(); // FIXME: topic exchange should be removed when all namespaces are removed / disabled
RabbitMQUtils.deleteRabbitMQRubbish(kubClient, rabbitMQContext);
rabbitMQContext.declareTopicExchange(); // FIXME: topic exchange should be removed when all namespaces are removed / disabled

watchManager.addTarget(MstoreHelmTh2Op::new);
watchManager.addTarget(EstoreHelmTh2Op::new);
watchManager.addTarget(BoxHelmTh2Op::new);
watchManager.addTarget(CoreBoxHelmTh2Op::new);
watchManager.addTarget(JobHelmTh2Op::new);
watchManager.addTarget(MstoreHelmTh2Op::new);
watchManager.addTarget(EstoreHelmTh2Op::new);
watchManager.addTarget(BoxHelmTh2Op::new);
watchManager.addTarget(CoreBoxHelmTh2Op::new);
watchManager.addTarget(JobHelmTh2Op::new);

watchManager.startInformers();
watchManager.startInformers();
continuousTaskWorker.add(new CheckResourceCacheTask(300));
}

ContinuousTaskWorker continuousTaskWorker = new ContinuousTaskWorker();
continuousTaskWorker.add(new CheckResourceCacheTask(300));
public static void main(String[] args) {
Deque<AutoCloseable> resources = new ConcurrentLinkedDeque<>();
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();

configureShutdownHook(resources, lock, condition);

try {
if (args.length > 0) {
configureLogger(args[0]);
}
Th2CrdController controller = new Th2CrdController();
resources.add(controller);

awaitShutdown(lock, condition);
} catch (Exception e) {
LOGGER.error("Exception in main thread", e);
watchManager.stopInformers();
watchManager.close();
throw e;
System.exit(1);
}
}

public static void main(String[] args) {
if (args.length > 0) {
configureLogger(args[0]);
}
Th2CrdController controller = new Th2CrdController();
controller.start();
@Override
public void close() {
Utils.close(prometheusServer, "Prometheus server");
Utils.close(kubClient, "Kubernetes client");
Utils.close(watchManager, "Watch manager");
Utils.close(rabbitMQContext, "RabbitMQ context");
Utils.close(continuousTaskWorker, "Continuous task worker");
}

private static void configureLogger(String filePath) {
Expand All @@ -83,8 +120,37 @@ private static void configureLogger(String filePath) {
}
}

@Override
public void close() throws Exception {
private static void configureShutdownHook(Deque<AutoCloseable> resources, Lock lock, Condition condition) {
Runtime.getRuntime().addShutdownHook(new Thread(
() -> {
LOGGER.info("Shutdown start");
lock.lock();
try {
condition.signalAll();
} finally {
lock.unlock();
}
resources.descendingIterator().forEachRemaining((resource) -> {
try {
resource.close();
} catch (Exception e) {
LOGGER.error("Cannot close resource {}", resource.getClass(), e);
}
});
LOGGER.info("Shutdown end");
},
"Shutdown hook"
));
}

private static void awaitShutdown(Lock lock, Condition condition) throws InterruptedException {
lock.lock();
try {
LOGGER.info("Wait shutdown");
condition.await();
LOGGER.info("App shutdown");
} finally {
lock.unlock();
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2021 Exactpro (Exactpro Systems Limited)
* Copyright 2020-2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,40 +16,47 @@

package com.exactpro.th2.infraoperator.metrics;

import com.exactpro.th2.infraoperator.configuration.ConfigLoader;
import com.exactpro.th2.infraoperator.spec.shared.PrometheusConfiguration;
import io.prometheus.client.exporter.HTTPServer;
import io.prometheus.client.hotspot.DefaultExports;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class PrometheusServer {
private static final Logger logger = LoggerFactory.getLogger(PrometheusServer.class);
public class PrometheusServer implements AutoCloseable {
private static final Logger LOGGER = LoggerFactory.getLogger(PrometheusServer.class);
private final Lock lock = new ReentrantLock();
@Nullable
private final HTTPServer server;

private static final AtomicReference<HTTPServer> prometheusExporter = new AtomicReference<>();

public static void start() {
static {
DefaultExports.initialize();
PrometheusConfiguration<String> prometheusConfiguration = ConfigLoader.getConfig().getPrometheusConfiguration();
}

String host = prometheusConfiguration.getHost();
int port = Integer.parseInt(prometheusConfiguration.getPort());
boolean enabled = Boolean.parseBoolean(prometheusConfiguration.getEnabled());
public PrometheusServer(PrometheusConfiguration<String> configuration) throws IOException {
if (Boolean.parseBoolean(configuration.getEnabled())) {
String host = configuration.getHost();
int port = Integer.parseInt(configuration.getPort());
server = new HTTPServer(host, port);
LOGGER.info("Started prometheus server on: \"{}:{}\"", host, port);
} else {
server = null;
}
}

prometheusExporter.updateAndGet(server -> {
if (server == null && enabled) {
try {
server = new HTTPServer(host, port);
logger.info("Started prometheus server on: \"{}:{}\"", host, port);
return server;
} catch (IOException e) {
throw new RuntimeException("Failed to create Prometheus exporter", e);
}
@Override
public void close() {
Nikita-Smirnov-Exactpro marked this conversation as resolved.
Show resolved Hide resolved
lock.lock();
try {
if (server != null) {
server.close();
}
return server;
});
} finally {
lock.unlock();
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2021 Exactpro (Exactpro Systems Limited)
* Copyright 2020-2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,14 +21,10 @@
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.MixedOperation;
import io.fabric8.kubernetes.client.dsl.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class DefaultResourceClient<CR extends CustomResource> implements ResourceClient<CR> {

private static final Logger logger = LoggerFactory.getLogger(DefaultResourceClient.class);

private final KubernetesClient client;
private final KubernetesClient kubClient;

private final Class<CR> resourceType;

Expand All @@ -37,15 +33,15 @@ public abstract class DefaultResourceClient<CR extends CustomResource> implement
private final String crdName;

public DefaultResourceClient(
KubernetesClient client,
KubernetesClient kubClient,
Class<CR> resourceType,
String crdName
) {
this.client = client;
this.kubClient = kubClient;
this.resourceType = resourceType;
this.crdName = crdName;

instance = client.resources(resourceType);
instance = kubClient.resources(resourceType);
}

@Override
Expand All @@ -54,7 +50,7 @@ public Class<CR> getResourceType() {
}

public KubernetesClient getClient() {
return this.client;
return this.kubClient;
}

public MixedOperation<CR, ? extends KubernetesResourceList<CR>, ? extends Resource<CR>> getInstance() {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2021 Exactpro (Exactpro Systems Limited)
* Copyright 2020-2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,12 +17,13 @@
package com.exactpro.th2.infraoperator.operator;

import com.exactpro.th2.infraoperator.spec.Th2CustomResource;
import com.exactpro.th2.infraoperator.spec.strategy.linkresolver.mq.RabbitMQContext;
import io.fabric8.kubernetes.client.KubernetesClient;

public abstract class GenericHelmTh2Op<CR extends Th2CustomResource> extends HelmReleaseTh2Op<CR> {

public GenericHelmTh2Op(KubernetesClient client) {
super(client);
public GenericHelmTh2Op(KubernetesClient kubClient, RabbitMQContext rabbitMQContext) {
super(kubClient, rabbitMQContext);
}

}
Loading
Loading