Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TH2-5226] infra-operator periodically deletes redundant resources from RabbitMQ instead of remove on start #119

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
03123a6
[TH2-5226] moved kotlin files to the src/kotlin dir
Nikita-Smirnov-Exactpro Aug 7, 2024
a19d4b5
[TH2-5226] Added RabbitMQGcTask
Nikita-Smirnov-Exactpro Aug 8, 2024
04e15b9
[TH2-5226] Updated libraries
Nikita-Smirnov-Exactpro Aug 8, 2024
a565d2d
[TH2-5226] Implemented RabbitMQGcTask.deleteRedundantExchanges
Nikita-Smirnov-Exactpro Aug 8, 2024
17879e0
[TH2-5226] added configure logging method
Nikita-Smirnov-Exactpro Aug 8, 2024
1add5b3
[TH2-5226] Refactored
Nikita-Smirnov-Exactpro Aug 8, 2024
98b1655
[TH2-5226] added wrapper closure into build script
Nikita-Smirnov-Exactpro Aug 9, 2024
d34bbb3
[TH2-5226] moved kotlin files to the src/kotlin dir
Nikita-Smirnov-Exactpro Aug 7, 2024
9391349
[TH2-5226] Added `rabbitMQManagement.cleanUpOnStart` option
Nikita-Smirnov-Exactpro Aug 13, 2024
bd6c67b
[TH2-5226] Corrected GitHub workflow
Nikita-Smirnov-Exactpro Aug 13, 2024
3e4130c
Merge remote-tracking branch 'origin/TH2-5226-hotfix-2' into TH2-5226
Nikita-Smirnov-Exactpro Aug 14, 2024
0b04c17
[TH2-5226] registered exception handler for informers
Nikita-Smirnov-Exactpro Aug 15, 2024
c4041b9
[TH2-5226] added deleteRabbitMQRubbish method
Nikita-Smirnov-Exactpro Aug 16, 2024
a6d9014
[TH2-5226] set true for clearOnStart by default
Nikita-Smirnov-Exactpro Aug 16, 2024
ada9bf0
[TH2-5226] refactored
Nikita-Smirnov-Exactpro Aug 16, 2024
34816d5
Merge remote-tracking branch 'origin/master' into TH2-5226
Nikita-Smirnov-Exactpro Aug 16, 2024
1bc6904
[TH2-5226] corrected after review
Nikita-Smirnov-Exactpro Aug 19, 2024
2e47d42
[TH2-5226] refactored after review
Nikita-Smirnov-Exactpro Aug 19, 2024
123585d
Update src/main/kotlin/com/exactpro/th2/infraoperator/util/RabbitMQUt…
Nikita-Smirnov-Exactpro Aug 19, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/build-sanpshot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ jobs:
with:
build-target: 'Docker'
docker-username: ${{ github.actor }}
# FIXME: strict scanner was disabled for 4.6.4 hotfix publishing and must be removed after that
strict-scanner: false
secrets:
docker-password: ${{ secrets.GITHUB_TOKEN }}
nvd-api-key: ${{ secrets.NVD_APIKEY }}
2 changes: 1 addition & 1 deletion .github/workflows/ci-unwelcome-words.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ on:

jobs:
test:
runs-on: ubuntu-20.04
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
with:
Expand Down
Empty file.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ RUN gradle --no-daemon clean build dockerPrepare -Prelease_version=${release_ver
FROM adoptopenjdk/openjdk11:alpine
WORKDIR /home
COPY --from=build /home/gradle/build/docker .
ENTRYPOINT ["/home/service/bin/service"]
ENTRYPOINT ["/home/service/bin/service", "/var/th2/config/log4j2.properties"]
45 changes: 23 additions & 22 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
# infra-operator

The infra-operator is a java implementation of Kubernetes
[custom resource controller](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/#custom-controllers).
It is part of _th2 infrastructure_. Together with [infra-mgr](https://github.com/th2-net/th2-infra-mgr) and
[helm-operator](https://github.com/fluxcd/helm-operator)
it ensures the synchronization of custom resource files from GitHub and the actual resources
in Kubernetes. The infra-operator uses [fabric8](https://fabric8.io/guide/) library for communication with Kubernetes.

[Custom resources](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/)
allow us to extend Kubernetes API with custom components specifically designed for our needs.
However, since such custom components are not part of the default Kubernetes installation, it is infra-operator's
task to look over them. The infra-operator monitors 5 kind of custom resources, which are defined in
The infra-operator is a java implementation of Kubernetes
[custom resource controller](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/#custom-controllers).
It is part of _th2 infrastructure_. Together with [infra-mgr](https://github.com/th2-net/th2-infra-mgr) and
[helm-operator](https://github.com/fluxcd/helm-operator)
it ensures the synchronization of custom resource files from GitHub and the actual resources
in Kubernetes. The infra-operator uses [fabric8](https://fabric8.io/guide/) library for communication with Kubernetes.

[Custom resources](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/)
allow us to extend Kubernetes API with custom components specifically designed for our needs.
However, since such custom components are not part of the default Kubernetes installation, it is infra-operator's
task to look over them. The infra-operator monitors 5 kind of custom resources, which are defined in
[th2-infra](https://github.com/th2-net/th2-infra/blob/master/values/CRD) repository.

For more information about custom resources used in th2 and their configuration,
For more information about custom resources used in th2 and their configuration,
please refer to [th2-documentation](https://github.com/th2-net/th2-documentation)

The infra-operator is also responsible for queues and users permission management on [RabbitMQ](https://www.rabbitmq.com/documentation.html).
Expand All @@ -23,9 +23,9 @@ The list below covers the main duties and objectives of this component.
#### Main objectives
* It monitors Kubernetes events related to the _Th2CustomResources_ and generates or modifies the corresponding Helm Releases.
* Based on the config map `rabbit-mq-app-config` which is deployed by infra-mgr, it creates Vhost in RabbitMQ for every schema namespace.
* For each Vhost it creates a user in RabbitMQ and configures its permissions.
* Based on the pins described in CRs, and the pins described in _Th2Link_ resources it declares queues in RabbitMQ.
* It binds queues in RabbitMQ according to _Th2Link_ resources.
* For each Vhost it creates a user in RabbitMQ and configures its permissions.
* Based on the pins described in CRs, and the pins described in _Th2Link_ resources it declares queues in RabbitMQ.
* It binds queues in RabbitMQ according to _Th2Link_ resources.
* Generate RabbitMQ configs for each resource that needs it.
* Generate [gRPC](https://grpc.io/docs/) configs for each resource that needs it.

Expand Down Expand Up @@ -87,11 +87,8 @@ rabbitMQManagement:
persistence: true
# determines if the RabbitMQ resources are persistent or not

rubbishCollectionInterval: 900
# Interval in seconds of rubbish collection (RC) in RabbitMQ. Each RC task integration consist of:
# 1) collect redundant resources according current RabbitMQ and Kubernetes resources state.
# 2) delete only resources included into both current and previous integrations (previous intersect current).
# 3) save collected resources but not included in the previous integration (current minus previous) for the next iteration
cleanUpOnStart: true
# if option is true, operator removes redundant queues and exchanges from RabbitMQ on start

schemaPermissions:
# this section describes what permissions schema RabbitMQ user will have on its own resources
Expand Down Expand Up @@ -151,7 +148,8 @@ openshift:
## Release notes

### 4.7.0
+ Periodically deletes redundant resources from RabbitMQ instead of remove on start
+ Improved clean rubbish from RabbitMQ on start to delete only redundant resources.
`cleanUpOnStart` is enabled by default
+ Migrated to th2 plugin `0.1.1`

+ Updated:
Expand All @@ -161,4 +159,7 @@ openshift:
+ force logging-interceptor: `4.12.0`
+ http-client: `5.2.0`
+ java-uuid-generator: `5.1.0`
+ kotlin-logging: `3.0.5`
+ kotlin-logging: `3.0.5`

### 4.6.4
+ Added `rabbitMQManagement.cleanUpOnStart` option
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ dependencies {
implementation "io.prometheus:simpleclient_hotspot"

testImplementation libs.mockito.core
testImplementation libs.mockito.kotlin
testImplementation libs.junit.jupiter.api
testRuntimeOnly libs.junit.jupiter.engine

Expand Down
1 change: 1 addition & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ http-client = { group = "com.rabbitmq", name = "http-client", version = "5.2.0"
java-uuid-generator = { group = "com.fasterxml.uuid", name = "java-uuid-generator", version = "5.1.0" }

mockito-core = { group = "org.mockito", name = "mockito-core", version = "5.12.0" }
mockito-kotlin = { group = "org.mockito.kotlin", name = "mockito-kotlin", version = "5.4.0" }
junit-jupiter-api = { group = "org.junit.jupiter", name = "junit-jupiter-api", version.ref = "jupiter" }
junit-jupiter-engine = { group = "org.junit.jupiter", name = "junit-jupiter-engine", version.ref = "jupiter" }

Expand Down
18 changes: 10 additions & 8 deletions src/main/java/com/exactpro/th2/infraoperator/Th2CrdController.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.exactpro.th2.infraoperator.spec.strategy.linkresolver.mq.RabbitMQContext;
import com.exactpro.th2.infraoperator.spec.strategy.redeploy.ContinuousTaskWorker;
import com.exactpro.th2.infraoperator.spec.strategy.redeploy.tasks.CheckResourceCacheTask;
import com.exactpro.th2.infraoperator.util.RabbitMQUtils;
import org.apache.logging.log4j.core.LoggerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -38,15 +39,16 @@ public class Th2CrdController {

private static final Logger LOGGER = LoggerFactory.getLogger(Th2CrdController.class);

private static final Path LOG4J2_PROPERTIES_DEFAULT_PATH = Path.of("var", "th2", "config", "log4j2.properties");

public static void main(String[] args) {
configureLogger();
if (args.length > 0) {
configureLogger(args[0]);
}
var watchManager = DefaultWatchManager.getInstance();
PrometheusServer.start();
OperatorMetrics.resetCacheErrors();
try {
RabbitMQContext.declareTopicExchange();
RabbitMQUtils.deleteRabbitMQRubbish();

watchManager.addTarget(MstoreHelmTh2Op::new);
watchManager.addTarget(EstoreHelmTh2Op::new);
Expand All @@ -58,7 +60,6 @@ public static void main(String[] args) {

ContinuousTaskWorker continuousTaskWorker = new ContinuousTaskWorker();
continuousTaskWorker.add(new CheckResourceCacheTask(300));
continuousTaskWorker.add(RabbitMQContext.createGarbageCollectTask());
} catch (Exception e) {
LOGGER.error("Exception in main thread", e);
watchManager.stopInformers();
Expand All @@ -67,12 +68,13 @@ public static void main(String[] args) {
}
}

private static void configureLogger() {
if (Files.exists(LOG4J2_PROPERTIES_DEFAULT_PATH)) {
private static void configureLogger(String filePath) {
Path path = Path.of(filePath);
if (Files.exists(path)) {
LoggerContext loggerContext = LoggerContext.getContext(false);
loggerContext.setConfigLocation(LOG4J2_PROPERTIES_DEFAULT_PATH.toUri());
loggerContext.setConfigLocation(path.toUri());
loggerContext.reconfigure();
LOGGER.info("Logger configuration from {} file is applied", LOG4J2_PROPERTIES_DEFAULT_PATH);
LOGGER.info("Logger configuration from {} file is applied", path);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2021 Exactpro (Exactpro Systems Limited)
* Copyright 2020-2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -29,9 +29,7 @@
import com.exactpro.th2.infraoperator.spec.strategy.redeploy.tasks.TriggerRedeployTask;
import com.exactpro.th2.infraoperator.util.CustomResourceUtils;
import com.exactpro.th2.infraoperator.util.ExtractUtils;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.Namespace;
import io.fabric8.kubernetes.api.model.OwnerReference;
import io.fabric8.kubernetes.api.model.OwnerReferenceBuilder;
import io.fabric8.kubernetes.client.KubernetesClient;
Expand All @@ -52,6 +50,7 @@
import static com.exactpro.th2.infraoperator.operator.HelmReleaseTh2Op.ANTECEDENT_LABEL_KEY_ALIAS;
import static com.exactpro.th2.infraoperator.util.ExtractUtils.extractName;
import static com.exactpro.th2.infraoperator.util.ExtractUtils.extractNamespace;
import static com.exactpro.th2.infraoperator.util.KubernetesUtils.isNotActive;
import static io.fabric8.kubernetes.client.Watcher.Action.MODIFIED;

public abstract class AbstractTh2Operator<CR extends Th2CustomResource> implements Watcher<CR> {
Expand Down Expand Up @@ -97,9 +96,7 @@ public void eventReceived(Action action, CR resource) {
action, resourceLabel, e);

String namespace = resource.getMetadata().getNamespace();
Namespace namespaceObj = kubClient.namespaces().withName(namespace).get();
if (namespaceObj == null || !namespaceObj.getStatus().getPhase().equals("Active")) {
logger.info("Namespace \"{}\" deleted or not active, cancelling", namespace);
if (isNotActive(kubClient, namespace)) {
return;
}

Expand Down Expand Up @@ -129,9 +126,7 @@ public void eventReceived(Action action, CR resource) {

} catch (Exception e) {
String namespace = resource.getMetadata().getNamespace();
Namespace namespaceObj = kubClient.namespaces().withName(namespace).get();
if (namespaceObj == null || !namespaceObj.getStatus().getPhase().equals("Active")) {
logger.info("Namespace \"{}\" deleted or not active, cancelling", namespace);
if (isNotActive(kubClient, namespace)) {
return;
}
resource.getStatus().failed(e.getMessage());
Expand Down Expand Up @@ -260,7 +255,7 @@ protected CR updateStatus(CR resource) {
}
}

protected void setupAndCreateKubObj(CR resource) throws IOException {
protected void setupAndCreateKubObj(CR resource) {

var kubObj = loadKubObj();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2021 Exactpro (Exactpro Systems Limited)
* Copyright 2020-2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -29,7 +29,6 @@
import com.exactpro.th2.infraoperator.util.CustomResourceUtils;
import com.exactpro.th2.infraoperator.util.ExtractUtils;
import com.exactpro.th2.infraoperator.util.HelmReleaseUtils;
import com.exactpro.th2.infraoperator.util.Strings;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectReader;
Expand All @@ -45,15 +44,25 @@
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
import io.fabric8.kubernetes.client.informers.SharedInformerFactory;
import io.prometheus.client.Histogram;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.Base64;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;

import static com.exactpro.th2.infraoperator.configuration.OperatorConfig.RABBITMQ_SECRET_PASSWORD_KEY;
import static com.exactpro.th2.infraoperator.operator.HelmReleaseTh2Op.*;
import static com.exactpro.th2.infraoperator.operator.HelmReleaseTh2Op.CHECKSUM_ALIAS;
import static com.exactpro.th2.infraoperator.operator.HelmReleaseTh2Op.CONFIG_ALIAS;
import static com.exactpro.th2.infraoperator.operator.HelmReleaseTh2Op.CRADLE_MGR_ALIAS;
import static com.exactpro.th2.infraoperator.operator.HelmReleaseTh2Op.GRPC_ROUTER_ALIAS;
import static com.exactpro.th2.infraoperator.operator.HelmReleaseTh2Op.LOGGING_ALIAS;
import static com.exactpro.th2.infraoperator.operator.HelmReleaseTh2Op.MQ_ROUTER_ALIAS;
import static com.exactpro.th2.infraoperator.util.CustomResourceUtils.annotationFor;
import static com.exactpro.th2.infraoperator.util.JsonUtils.JSON_MAPPER;
import static com.exactpro.th2.infraoperator.util.WatcherUtils.createExceptionHandler;

public class ConfigMapEventHandler implements Watcher<ConfigMap> {
public static final String SECRET_TYPE_OPAQUE = "Opaque";
Expand Down Expand Up @@ -115,6 +124,7 @@ public static ConfigMapEventHandler newInstance(SharedInformerFactory sharedInfo
ConfigMap.class,
CustomResourceUtils.RESYNC_TIME);

configMapInformer.exceptionHandler(createExceptionHandler(ConfigMap.class));
configMapInformer.addEventHandler(new GenericResourceEventHandler<>(res, eventQueue));
return res;
}
Expand Down Expand Up @@ -142,7 +152,7 @@ public void eventReceived(Action action, ConfigMap resource) {
RabbitMQConfig rabbitMQConfig = configMaps.getRabbitMQConfig4Namespace(namespace);

String configContent = resource.getData().get(RabbitMQConfig.CONFIG_MAP_RABBITMQ_PROP_NAME);
if (Strings.isNullOrEmpty(configContent)) {
if (StringUtils.isBlank(configContent)) {
logger.error("Key \"{}\" not found in \"{}\"", RabbitMQConfig.CONFIG_MAP_RABBITMQ_PROP_NAME,
resourceLabel);
return;
Expand Down Expand Up @@ -288,7 +298,7 @@ protected void createKubObj(String namespace, HelmRelease helmRelease) {
OperatorState.INSTANCE.putHelmReleaseInCache(helmRelease, namespace);
}

private Map<String, Object> getConfigFromCR(CustomResource customResource, String key) {
private Map<String, Object> getConfigFromCR(CustomResource<?, ?> customResource, String key) {
Th2Spec spec = (Th2Spec) customResource.getSpec();
switch (key) {
case MQ_ROUTER_ALIAS:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2021 Exactpro (Exactpro Systems Limited)
* Copyright 2020-2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -26,21 +26,26 @@
import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientBuilder;
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
import io.fabric8.kubernetes.client.informers.SharedInformerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static com.exactpro.th2.infraoperator.operator.AbstractTh2Operator.REFRESH_TOKEN_ALIAS;
import static com.exactpro.th2.infraoperator.util.CustomResourceUtils.annotationFor;
import static com.exactpro.th2.infraoperator.util.ExtractUtils.extractName;
import static com.exactpro.th2.infraoperator.util.KubernetesUtils.createKubernetesClient;
import static com.exactpro.th2.infraoperator.util.WatcherUtils.createExceptionHandler;

public class DefaultWatchManager {

Expand Down Expand Up @@ -157,7 +162,11 @@ private EventHandlerContext registerInformers(SharedInformerFactory sharedInform
var handler = new BoxResourceEventHandler<>(
helmReleaseTh2Op,
eventDispatcher.getEventQueue());
helmReleaseTh2Op.generateInformerFromFactory(getInformerFactory()).addEventHandler(handler);

SharedIndexInformer<Th2CustomResource> customResourceInformer =
helmReleaseTh2Op.generateInformerFromFactory(getInformerFactory());
customResourceInformer.exceptionHandler(createExceptionHandler(Th2CustomResource.class));
customResourceInformer.addEventHandler(handler);
context.addHandler(handler);
}

Expand All @@ -181,20 +190,6 @@ public <T extends Th2CustomResource> void addTarget(
}

void refreshBoxes(String namespace) {
refreshBoxes(namespace, null, true);
}

void refreshBoxes(String namespace, Set<String> boxes) {
refreshBoxes(namespace, boxes, false);
}

private void refreshBoxes(String namespace, Set<String> boxes, boolean refreshAllBoxes) {

if (!refreshAllBoxes && (boxes == null || boxes.size() == 0)) {
logger.warn("Empty set of boxes was given to refresh");
return;
}

if (!isWatching()) {
logger.warn("Not watching for resources yet");
return;
Expand All @@ -204,10 +199,8 @@ private void refreshBoxes(String namespace, Set<String> boxes, boolean refreshAl
for (var resourceClient : resourceClients) {
var mixedOperation = resourceClient.getInstance();
for (var res : mixedOperation.inNamespace(namespace).list().getItems()) {
if (refreshAllBoxes || boxes.contains(extractName(res))) {
createResource(namespace, res, resourceClient);
refreshedBoxes++;
}
createResource(namespace, res, resourceClient);
refreshedBoxes++;
}
}

Expand All @@ -228,7 +221,7 @@ private void createResource(String linkNamespace, Th2CustomResource resource,

public static synchronized DefaultWatchManager getInstance() {
if (instance == null) {
instance = new DefaultWatchManager(new KubernetesClientBuilder().build());
instance = new DefaultWatchManager(createKubernetesClient());
}

return instance;
Expand Down
Loading