Skip to content

Commit

Permalink
[TH2-5226] added DeleteRubbishOnStartTest
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita-Smirnov-Exactpro committed Sep 16, 2024
1 parent 20ef110 commit 1555413
Show file tree
Hide file tree
Showing 10 changed files with 568 additions and 235 deletions.
13 changes: 7 additions & 6 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,6 @@ wrapper {
distributionType Wrapper.DistributionType.BIN
}

test {
useJUnitPlatform {
excludeTags("integration-test")
}
}

tasks.register("downloadCRDs", Download) {
group = "verification"
src([
Expand All @@ -96,6 +90,13 @@ tasks.register("downloadCRDs", Download) {
dest layout.buildDirectory.dir('resources/test/crds').get()
}

test {
dependsOn("downloadCRDs")
useJUnitPlatform {
excludeTags("integration-test")
}
}

tasks.register("integrationTest", Test) {
group = "verification"
dependsOn("downloadCRDs")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import java.nio.file.Files;
import java.nio.file.Path;

public class Th2CrdController {
public class Th2CrdController implements AutoCloseable {

private static final Logger LOGGER = LoggerFactory.getLogger(Th2CrdController.class);

Expand All @@ -45,7 +45,7 @@ public void start() {
OperatorMetrics.resetCacheErrors();
try {
RabbitMQUtils.deleteRabbitMQRubbish();
RabbitMQContext.declareTopicExchange();
RabbitMQContext.declareTopicExchange(); // FIXME: topic exchange should be removed when all namespaces are removed / disabled

watchManager.addTarget(MstoreHelmTh2Op::new);
watchManager.addTarget(EstoreHelmTh2Op::new);
Expand Down Expand Up @@ -82,4 +82,9 @@ private static void configureLogger(String filePath) {
LOGGER.info("Logger configuration from {} file is applied", path);
}
}

@Override
public void close() throws Exception {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.exactpro.th2.infraoperator.spec.strategy.redeploy.tasks.RecreateQueuesAndBindings;
import com.exactpro.th2.infraoperator.spec.strategy.redeploy.tasks.RetryRabbitSetup;
import com.exactpro.th2.infraoperator.spec.strategy.redeploy.tasks.RetryTopicExchangeTask;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
Expand Down Expand Up @@ -70,8 +71,8 @@ public final class RabbitMQContext {
private static final Logger logger = LoggerFactory.getLogger(RabbitMQContext.class);

private static final int RETRY_DELAY = 120;
public static final String TOPIC = "topic";
public static final String DIRECT = "direct";
public static final String TOPIC = BuiltinExchangeType.TOPIC.getType();
public static final String DIRECT = BuiltinExchangeType.DIRECT.getType();

private static volatile RabbitMQManagementConfig managementConfig;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import com.exactpro.th2.infraoperator.model.box.mq.factory.MessageRouterConfigFa
import com.exactpro.th2.infraoperator.spec.Th2CustomResource
import com.exactpro.th2.infraoperator.spec.estore.Th2Estore
import com.exactpro.th2.infraoperator.spec.mstore.Th2Mstore
import com.exactpro.th2.infraoperator.spec.strategy.linkresolver.createEstoreQueue
import com.exactpro.th2.infraoperator.spec.strategy.linkresolver.createMstoreQueue
import com.exactpro.th2.infraoperator.spec.strategy.linkresolver.mq.RabbitMQContext
import com.rabbitmq.client.Channel
import com.rabbitmq.http.client.domain.ExchangeInfo
Expand Down Expand Up @@ -92,10 +94,11 @@ internal fun ResourceHolder.filterRubbishResources(
exchanges.remove(topicExchange) // FIXME: topic exchange should be declare after each namespace creation

K_LOGGER.debug { "Search RabbitMQ resources in $namespaces namespaces" }
// exchanges.remove(topicExchange) FIXME: uncomment this line when topic exchange is declared after each namespace creation

val factories: Map<Class<out Th2CustomResource>, MessageRouterConfigFactory> = createFactories()
namespaces.forEach { namespace ->
queues.remove(createEstoreQueue(namespace))
queues.remove(createMstoreQueue(namespace))
exchanges.remove(namespace)

client.customResources(namespace).asSequence()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
/*
* Copyright 2024-2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.exactpro.th2.infraoperator.integration

import com.exactpro.th2.infraoperator.Th2CrdController
import com.exactpro.th2.infraoperator.configuration.fields.RabbitMQNamespacePermissions
import com.exactpro.th2.infraoperator.operator.StoreHelmTh2Op.MESSAGE_STORAGE_BOX_ALIAS
import com.exactpro.th2.infraoperator.operator.StoreHelmTh2Op.MESSAGE_STORAGE_PIN_ALIAS
import com.exactpro.th2.infraoperator.spec.box.Th2Box
import com.exactpro.th2.infraoperator.spec.shared.status.RolloutPhase
import com.exactpro.th2.infraoperator.spec.strategy.linkresolver.mq.RabbitMQContext.DIRECT
import com.exactpro.th2.infraoperator.spec.strategy.linkresolver.mq.RabbitMQContext.toExchangeName
import com.exactpro.th2.infraoperator.util.createKubernetesClient
import com.rabbitmq.client.AMQP
import com.rabbitmq.client.Channel
import com.rabbitmq.client.Connection
import com.rabbitmq.http.client.Client
import com.rabbitmq.http.client.domain.QueueInfo
import io.fabric8.kubernetes.client.KubernetesClient
import org.junit.jupiter.api.AfterAll
import org.junit.jupiter.api.BeforeAll
import org.junit.jupiter.api.Disabled
import org.junit.jupiter.api.Tag
import org.junit.jupiter.api.TestInstance
import org.junit.jupiter.api.Timeout
import org.junit.jupiter.api.io.TempDir
import org.testcontainers.containers.RabbitMQContainer
import org.testcontainers.k3s.K3sContainer
import java.nio.file.Path
import kotlin.test.Test
import kotlin.test.assertEquals

@Tag("integration-test")
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class DeleteRubbishOnStartTest {

private lateinit var k3sContainer: K3sContainer

private lateinit var rabbitMQContainer: RabbitMQContainer

private lateinit var kubeClient: KubernetesClient

private lateinit var rabbitMQClient: Client

private lateinit var rabbitMQConnection: Connection

@BeforeAll
@Timeout(30_000)
fun beforeAll(@TempDir tempDir: Path) {
k3sContainer = createK3sContainer()
rabbitMQContainer = createRabbitMQContainer()

prepareTh2CfgDir(
k3sContainer.kubeConfigYaml,
createOperatorConfig(
rabbitMQContainer,
setOf(TH2_PREFIX),
RABBIT_MQ_V_HOST,
RABBIT_MQ_TOPIC_EXCHANGE,
RABBIT_MQ_NAMESPACE_PERMISSIONS,
),
tempDir,
)

kubeClient = createKubernetesClient().apply { configureK3s() }
rabbitMQClient = createRabbitMQClient(rabbitMQContainer)
rabbitMQConnection = createRabbitMQConnection(rabbitMQContainer, RABBIT_MQ_V_HOST)
}

@AfterAll
@Timeout(30_000)
fun afterAll() {
if (this::kubeClient.isInitialized) {
kubeClient.close()
}
if (this::k3sContainer.isInitialized) {
k3sContainer.stop()
}
if (this::rabbitMQContainer.isInitialized) {
rabbitMQContainer.stop()
}
if (this::rabbitMQConnection.isInitialized && rabbitMQConnection.isOpen) {
rabbitMQConnection.close()
}
}

@Test
@Disabled("implement Th2CrdController.close")
fun deleteAllTest() {
TODO()
}

@Test
fun deleteRubbishTest() {
var gitHash = RESOURCE_GIT_HASH_COUNTER.incrementAndGet().toString()

val namespaceB = "${TH2_PREFIX}test-b"
val namespaceC = "${TH2_PREFIX}test-c"

val exchangeB = toExchangeName(namespaceB)
val exchangeC = toExchangeName(namespaceC)

val component = "test-component"

prepareNamespace(gitHash, namespaceB)

rabbitMQConnection.createChannel().use { channel ->
channel.confirmSelect()
/** queue of not existed component */
val queue01 = channel.createQueue(namespaceB, "rubbish-component", PIN_NAME).assertQueue()
/** queue of not exited pin */
val queue02 = channel.createQueue(namespaceB, component, "rubbish-pin").assertQueue()
/** mstore queue of not existed namespace */
val queue03 = channel.createQueue(namespaceC, MESSAGE_STORAGE_BOX_ALIAS, MESSAGE_STORAGE_PIN_ALIAS).assertQueue()

/** mstore queue of existed namespace */
val queue11 = channel.createQueue(namespaceB, MESSAGE_STORAGE_BOX_ALIAS, MESSAGE_STORAGE_PIN_ALIAS).assertQueue()
.also {
channel.basicPublish("", it.queue, null, "test-content".toByteArray())
}
/** queue of exited component and pin */
val queue12 = channel.createQueue(namespaceB, component, PIN_NAME).assertQueue()
.also {
channel.basicPublish("", it.queue, null, "test-content".toByteArray())
}

/** exchange of not exited namespace */
channel.createExchange(exchangeC, DIRECT)
rabbitMQClient.assertExchange(exchangeC, DIRECT, RABBIT_MQ_V_HOST)

// TODO: add routing keys check:
// * existed key to exited queue
// * existed exchange A to existed queue B
// * not existed key to exited queue

gitHash = RESOURCE_GIT_HASH_COUNTER.incrementAndGet().toString()
val spec = """
imageName: "ghcr.io/th2-net/th2-component"
imageVersion: "0.0.0"
type: th2-codec
pins:
mq:
subscribers:
- name: $PIN_NAME
attributes: [subscribe]
""".trimIndent()
kubeClient.createTh2CustomResource(exchangeB, component, gitHash, spec, ::Th2Box)

Th2CrdController().apply(Th2CrdController::start).use {
kubeClient.awaitPhase(exchangeB, component, RolloutPhase.SUCCEEDED, Th2Box::class.java)

rabbitMQClient.assertNoQueue(queue01.queue, RABBIT_MQ_V_HOST)
rabbitMQClient.assertNoQueue(queue02.queue, RABBIT_MQ_V_HOST)
rabbitMQClient.assertNoQueue(queue03.queue, RABBIT_MQ_V_HOST)

rabbitMQClient.assertQueue(queue11.queue, RABBIT_MQ_QUEUE_CLASSIC_TYPE, RABBIT_MQ_V_HOST)
.assertQueueSize(channel, 1)
rabbitMQClient.assertQueue(queue12.queue, RABBIT_MQ_QUEUE_CLASSIC_TYPE, RABBIT_MQ_V_HOST)
.assertQueueSize(channel, 1)

rabbitMQClient.assertNoExchange(exchangeC)
}
}
}

private fun prepareNamespace(gitHash: String, namespace: String) {
kubeClient.createNamespace(namespace)
kubeClient.createRabbitMQSecret(namespace, gitHash)
kubeClient.createRabbitMQAppConfigCfgMap(
namespace,
gitHash,
createRabbitMQConfig(rabbitMQContainer, RABBIT_MQ_V_HOST, toExchangeName(namespace), namespace)
)

kubeClient.createBookConfigCfgMap(namespace, gitHash, TH2_BOOK)
kubeClient.createLoggingCfgMap(namespace, gitHash)
kubeClient.createMQRouterCfgMap(namespace, gitHash)
kubeClient.createGrpcRouterCfgMap(namespace, gitHash)
kubeClient.createCradleManagerCfgMap(namespace, gitHash)
}

private fun AMQP.Queue.DeclareOk.assertQueue(): AMQP.Queue.DeclareOk {
rabbitMQClient.assertQueue(queue, RABBIT_MQ_QUEUE_CLASSIC_TYPE, RABBIT_MQ_V_HOST)
return this
}

private fun QueueInfo.assertQueueSize(channel: Channel, size: Int) {
val declareOk = channel.queueDeclare(name, isDurable, isExclusive, isAutoDelete, arguments)
assertEquals(size, declareOk.messageCount)
}

companion object {
private const val TH2_PREFIX = "th2-"
private const val TH2_BOOK = "test_book"

private val RABBIT_MQ_NAMESPACE_PERMISSIONS = RabbitMQNamespacePermissions()
private const val RABBIT_MQ_V_HOST = "/"
private const val RABBIT_MQ_TOPIC_EXCHANGE = "test-global-exchange"

private const val PIN_NAME = "test-pin"
}
}
Loading

0 comments on commit 1555413

Please sign in to comment.