diff --git a/src/test/kotlin/com/exactpro/th2/infraoperator/integration/IntegrationTest.kt b/src/test/kotlin/com/exactpro/th2/infraoperator/integration/IntegrationTest.kt index c307026e..da43588d 100644 --- a/src/test/kotlin/com/exactpro/th2/infraoperator/integration/IntegrationTest.kt +++ b/src/test/kotlin/com/exactpro/th2/infraoperator/integration/IntegrationTest.kt @@ -70,7 +70,6 @@ import com.exactpro.th2.infraoperator.spec.shared.status.RolloutPhase.DISABLED import com.exactpro.th2.infraoperator.spec.shared.status.RolloutPhase.SUCCEEDED import com.exactpro.th2.infraoperator.spec.strategy.linkresolver.createEstoreQueue import com.exactpro.th2.infraoperator.spec.strategy.linkresolver.createMstoreQueue -import com.exactpro.th2.infraoperator.spec.strategy.linkresolver.mq.RabbitMQContext import com.exactpro.th2.infraoperator.spec.strategy.linkresolver.mq.RabbitMQContext.DIRECT import com.exactpro.th2.infraoperator.spec.strategy.linkresolver.mq.RabbitMQContext.TOPIC import com.exactpro.th2.infraoperator.spec.strategy.linkresolver.mq.RabbitMQContext.toExchangeName @@ -95,13 +94,11 @@ import org.junit.jupiter.api.TestInstance import org.junit.jupiter.api.Timeout import org.junit.jupiter.api.io.TempDir import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.Arguments +import org.junit.jupiter.params.provider.MethodSource import org.junit.jupiter.params.provider.ValueSource -import org.slf4j.LoggerFactory.getLogger import org.testcontainers.containers.RabbitMQContainer -import org.testcontainers.containers.output.Slf4jLogConsumer import org.testcontainers.k3s.K3sContainer -import org.testcontainers.lifecycle.Startable -import org.testcontainers.utility.DockerImageName import strikt.api.expectThat import strikt.assertions.getValue import strikt.assertions.hasSize @@ -149,17 +146,8 @@ class IntegrationTest { operatorConfig = configDir.resolve("infra-operator.yml") configDir.createDirectories() - k3sContainer = - K3sContainer(K3S_DOCKER_IMAGE) - .withLogConsumer(Slf4jLogConsumer(getLogger("K3S")).withSeparateOutputStreams()) - .also(Startable::start) - - rabbitMQContainer = - RabbitMQContainer(RABBITMQ_DOCKER_IMAGE) - .withLogConsumer(Slf4jLogConsumer(getLogger("RABBIT_MQ")).withSeparateOutputStreams()) - .also(Startable::start) - - K_LOGGER.info { "RabbitMQ URL: ${rabbitMQContainer.httpUrl}" } + k3sContainer = createK3sContainer() + rabbitMQContainer = createRabbitMQContainer() Files.writeString(kubeConfig, k3sContainer.kubeConfigYaml) YAML_MAPPER.writeValue(operatorConfig.toFile(), createOperatorConfig(rabbitMQContainer)) @@ -218,8 +206,7 @@ class IntegrationTest { kubeClient.deleteNamespace(TH2_NAMESPACE, 1, MINUTES) // FIXME: Secret not found "th2-test:Secret/rabbitMQ" - rabbitMQClient.assertNoQueue(createEstoreQueue(TH2_NAMESPACE)) - rabbitMQClient.assertNoQueue(createMstoreQueue(TH2_NAMESPACE)) + rabbitMQClient.assertNoQueues("link\\[.*\\]", RABBIT_MQ_V_HOST) rabbitMQClient.assertNoExchange(toExchangeName(TH2_NAMESPACE)) rabbitMQClient.assertNoUser(TH2_NAMESPACE) @@ -244,6 +231,13 @@ class IntegrationTest { abstract fun add(name: String) abstract fun disable(name: String) abstract fun enable(name: String) + abstract fun `mq link`( + subClass: Class, + subConstructor: () -> Th2CustomResource, + subSpecType: String, + subRunAsJob: Boolean + ) + abstract fun `grpc link`(name: String) protected fun addTest(name: String) { val gitHash = RESOURCE_GIT_HASH_COUNTER.incrementAndGet().toString() @@ -346,6 +340,118 @@ class IntegrationTest { ) ) } + + protected fun mkLinkTest( + pubClass: Class, + pubConstructor: () -> Th2CustomResource, + pubSpecType: String, + pubRunAsJob: Boolean, + subClass: Class, + subConstructor: () -> Th2CustomResource, + subSpecType: String, + subRunAsJob: Boolean, + ) { + val gitHash = RESOURCE_GIT_HASH_COUNTER.incrementAndGet().toString() + val pubName = "test-publisher" + val subName = "test-subscriber" + + val publisherSpec = """ + imageName: $IMAGE + imageVersion: $VERSION + type: $pubSpecType + pins: + mq: + publishers: + - name: $PUBLISH_PIN + attributes: [publish] + """.trimIndent() + + val spec = """ + imageName: $IMAGE + imageVersion: $VERSION + type: $subSpecType + pins: + mq: + subscribers: + - name: $SUBSCRIBE_PIN + attributes: [subscribe] + linkTo: + - box: $pubName + pin: $PUBLISH_PIN + """.trimIndent() + + kubeClient.createTh2CustomResource(TH2_NAMESPACE, pubName, gitHash, publisherSpec, pubConstructor) + kubeClient.createTh2CustomResource(TH2_NAMESPACE, subName, gitHash, spec, subConstructor) + + kubeClient.awaitPhase(TH2_NAMESPACE, pubName, SUCCEEDED, pubClass) + kubeClient.awaitPhase(TH2_NAMESPACE, subName, SUCCEEDED, subClass) + + val queueName = "link[$TH2_NAMESPACE:$subName:$SUBSCRIBE_PIN]" + val routingKey = "key[$TH2_NAMESPACE:$pubName:$PUBLISH_PIN]" + + kubeClient.awaitResource(TH2_NAMESPACE, pubName).assertMinCfg( + pubName, + pubRunAsJob, + queues = mapOf( + PUBLISH_PIN to mapOf( + "attributes" to listOf("publish"), + "exchange" to toExchangeName(TH2_NAMESPACE), + "filters" to emptyList(), + "name" to routingKey, + "queue" to "", + ) + ) + ) + kubeClient.awaitResource(TH2_NAMESPACE, hashNameIfNeeded(subName)).assertMinCfg( + subName, + subRunAsJob, + queues = mapOf( + SUBSCRIBE_PIN to mapOf( + "attributes" to listOf("subscribe"), + "exchange" to toExchangeName(TH2_NAMESPACE), + "filters" to emptyList(), + "name" to "", + "queue" to queueName, + ) + ) + ) + + rabbitMQClient.assertBindings( + queueName, + RABBIT_MQ_V_HOST, + setOf("link[$TH2_NAMESPACE:$subName:$SUBSCRIBE_PIN]", routingKey) + ) + rabbitMQClient.assertBindings( + createEstoreQueue(TH2_NAMESPACE), + RABBIT_MQ_V_HOST, + setOf( + "link[$TH2_NAMESPACE:$EVENT_STORAGE_BOX_ALIAS:$EVENT_STORAGE_PIN_ALIAS]", + "key[$TH2_NAMESPACE:$pubName:$EVENT_STORAGE_PIN_ALIAS]", + "key[$TH2_NAMESPACE:$subName:$EVENT_STORAGE_PIN_ALIAS]", + ) + ) + } + + protected fun grpcLinkTest(name: String) { + val gitHash = RESOURCE_GIT_HASH_COUNTER.incrementAndGet().toString() + val spec = """ + imageName: $IMAGE + imageVersion: $VERSION + type: $specType + """.trimIndent() + + kubeClient.createTh2CustomResource(TH2_NAMESPACE, name, gitHash, spec, this::createResources) + kubeClient.awaitPhase(TH2_NAMESPACE, name, SUCCEEDED, resourceClass) + kubeClient.awaitResource(TH2_NAMESPACE, hashNameIfNeeded(name)).assertMinCfg(name, runAsJob) + rabbitMQClient.assertBindings( + createEstoreQueue(TH2_NAMESPACE), + RABBIT_MQ_V_HOST, + setOf( + "link[$TH2_NAMESPACE:$EVENT_STORAGE_BOX_ALIAS:$EVENT_STORAGE_PIN_ALIAS]", + "key[$TH2_NAMESPACE:$name:$EVENT_STORAGE_PIN_ALIAS]" + ) + ) + } } @Nested @@ -415,6 +521,21 @@ class IntegrationTest { @ParameterizedTest @ValueSource(strings = ["th2-core-component", "th2-core-component-more-than-$NAME_LENGTH_LIMIT-characters"]) override fun enable(name: String) = enableTest(name) + + @Timeout(30_000) + @ParameterizedTest + @MethodSource("com.exactpro.th2.infraoperator.integration.IntegrationTest#mqLinkArguments") + override fun `mq link`( + subClass: Class, + subConstructor: () -> Th2CustomResource, + subSpecType: String, + subRunAsJob: Boolean + ) = mkLinkTest(resourceClass, ::createResources, specType, runAsJob, subClass, subConstructor, subSpecType, subRunAsJob) + + @Timeout(30_000) + @ParameterizedTest + @ValueSource(strings = ["th2-core-component", "th2-core-component-more-than-$NAME_LENGTH_LIMIT-characters"]) + override fun `grpc link`(name: String) = grpcLinkTest(name) } @Nested @@ -442,6 +563,21 @@ class IntegrationTest { @ParameterizedTest @ValueSource(strings = ["th2-component", "th2-component-more-than-$NAME_LENGTH_LIMIT-characters"]) override fun enable(name: String) = enableTest(name) + + @Timeout(30_000) + @ParameterizedTest + @MethodSource("com.exactpro.th2.infraoperator.integration.IntegrationTest#mqLinkArguments") + override fun `mq link`( + subClass: Class, + subConstructor: () -> Th2CustomResource, + subSpecType: String, + subRunAsJob: Boolean + ) = mkLinkTest(resourceClass, ::createResources, specType, runAsJob, subClass, subConstructor, subSpecType, subRunAsJob) + + @Timeout(30_000) + @ParameterizedTest + @ValueSource(strings = ["th2-component", "th2-component-more-than-$NAME_LENGTH_LIMIT-characters"]) + override fun `grpc link`(name: String) = grpcLinkTest(name) } @Nested @@ -469,6 +605,21 @@ class IntegrationTest { @ParameterizedTest @ValueSource(strings = ["th2-job", "th2-job-more-than-$NAME_LENGTH_LIMIT-characters"]) override fun enable(name: String) = enableTest(name) + + @Timeout(30_000) + @ParameterizedTest + @MethodSource("com.exactpro.th2.infraoperator.integration.IntegrationTest#mqLinkArguments") + override fun `mq link`( + subClass: Class, + subConstructor: () -> Th2CustomResource, + subSpecType: String, + subRunAsJob: Boolean + ) = mkLinkTest(resourceClass, ::createResources, specType, runAsJob, subClass, subConstructor, subSpecType, subRunAsJob) + + @Timeout(30_000) + @ParameterizedTest + @ValueSource(strings = ["th2-job", "th2-job-more-than-$NAME_LENGTH_LIMIT-characters"]) + override fun `grpc link`(name: String) = grpcLinkTest(name) } @Nested @@ -509,9 +660,6 @@ class IntegrationTest { private val RESOURCE_GIT_HASH_COUNTER = AtomicLong(10_000_000) - private val RABBITMQ_DOCKER_IMAGE = DockerImageName.parse("rabbitmq:3.12.6-management") - private val K3S_DOCKER_IMAGE = DockerImageName.parse("rancher/k3s:v1.21.3-k3s1") - private val CRD_RESOURCE_NAMES = setOf( "helmreleases-crd.yaml", @@ -531,6 +679,8 @@ class IntegrationTest { private const val TH2_PREFIX = "th2-" private const val TH2_NAMESPACE = "${TH2_PREFIX}test" private const val TH2_BOOK = "test_book" + private const val PUBLISH_PIN = "test-publish-pin" + private const val SUBSCRIBE_PIN = "test-subscribe-pin" private const val CFG_FIELD = "test-cfg-field" private const val CFG_VALUE = "test-cfg-value" private const val IMAGE = "ghcr.io/th2-net/th2-estore" @@ -539,6 +689,13 @@ class IntegrationTest { private const val TEST_CONTENT = "test-content" + @JvmStatic + fun mqLinkArguments() = listOf( + Arguments.of(Th2Job::class.java, ::Th2Job, "th2-job", true), + Arguments.of(Th2Box::class.java, ::Th2Box, "th2-codec", false), + Arguments.of(Th2CoreBox::class.java, ::Th2CoreBox, "th2-rpt-data-provider", false), + ) + private fun createOperatorConfig(rabbitMQ: RabbitMQContainer) = OperatorConfig( chart = ChartSpec(), @@ -575,14 +732,6 @@ class IntegrationTest { "${'$'}{RABBITMQ_PASS}", ) - private fun createRabbitMQClient(rabbitMQ: RabbitMQContainer) = - RabbitMQContext.createClient( - rabbitMQ.host, - rabbitMQ.httpPort, - rabbitMQ.adminUsername, - rabbitMQ.adminPassword, - ) - private fun KubernetesClient.configureK3s() { CRD_RESOURCE_NAMES .asSequence() @@ -714,7 +863,11 @@ class IntegrationTest { deleteConfigMap(namespace, CRADLE_MANAGER_CM_NAME) } - private fun HelmRelease.assertMinCfg(name: String, runAsJob: Boolean) { + private fun HelmRelease.assertMinCfg( + name: String, + runAsJob: Boolean, + queues: Map> = emptyMap(), + ) { expectThat(componentValuesSection) { getValue(BOOK_CONFIG_ALIAS).isA>().and { hasSize(1) @@ -780,7 +933,7 @@ class IntegrationTest { getValue("exchange") isEqualTo RABBIT_MQ_TOPIC_EXCHANGE } getValue("queues").isA>().and { - hasSize(1) + hasSize(1 + queues.size) getValue(EVENT_STORAGE_PIN_ALIAS).isA>().and { hasSize(5) getValue("attributes").isA>() isEqualTo listOf("publish", "event") @@ -789,6 +942,9 @@ class IntegrationTest { getValue("name") isEqualTo "key[$TH2_NAMESPACE:$name:$EVENT_STORAGE_PIN_ALIAS]" getValue("queue").isA().isEmpty() } + queues.forEach { (key, value) -> + getValue(key).isA>() isEqualTo value + } } } } diff --git a/src/test/kotlin/com/exactpro/th2/infraoperator/integration/TestIntegrationUtils.kt b/src/test/kotlin/com/exactpro/th2/infraoperator/integration/TestIntegrationUtils.kt new file mode 100644 index 00000000..818f8769 --- /dev/null +++ b/src/test/kotlin/com/exactpro/th2/infraoperator/integration/TestIntegrationUtils.kt @@ -0,0 +1,45 @@ +/* + * Copyright 2024-2024 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.exactpro.th2.infraoperator.integration + +import com.exactpro.th2.infraoperator.spec.strategy.linkresolver.mq.RabbitMQContext +import com.rabbitmq.http.client.Client +import org.slf4j.LoggerFactory +import org.testcontainers.containers.RabbitMQContainer +import org.testcontainers.containers.output.Slf4jLogConsumer +import org.testcontainers.k3s.K3sContainer +import org.testcontainers.lifecycle.Startable +import org.testcontainers.utility.DockerImageName + +private val K3S_DOCKER_IMAGE = DockerImageName.parse("rancher/k3s:v1.21.3-k3s1") +private val RABBITMQ_DOCKER_IMAGE = DockerImageName.parse("rabbitmq:3.12.6-management") + +fun createK3sContainer(): K3sContainer = K3sContainer(K3S_DOCKER_IMAGE) + .withLogConsumer(Slf4jLogConsumer(LoggerFactory.getLogger("K3S")).withSeparateOutputStreams()) + .also(Startable::start) + +fun createRabbitMQContainer(): RabbitMQContainer = RabbitMQContainer(RABBITMQ_DOCKER_IMAGE) + .withLogConsumer(Slf4jLogConsumer(LoggerFactory.getLogger("RABBIT_MQ")).withSeparateOutputStreams()) + .also(Startable::start) + +fun createRabbitMQClient(rabbitMQ: RabbitMQContainer): Client = + RabbitMQContext.createClient( + rabbitMQ.host, + rabbitMQ.httpPort, + rabbitMQ.adminUsername, + rabbitMQ.adminPassword, + ) \ No newline at end of file diff --git a/src/test/kotlin/com/exactpro/th2/infraoperator/integration/TestRabbitMQUtils.kt b/src/test/kotlin/com/exactpro/th2/infraoperator/integration/TestRabbitMQUtils.kt index 7de1ed76..d10c4919 100644 --- a/src/test/kotlin/com/exactpro/th2/infraoperator/integration/TestRabbitMQUtils.kt +++ b/src/test/kotlin/com/exactpro/th2/infraoperator/integration/TestRabbitMQUtils.kt @@ -177,12 +177,13 @@ fun Client.assertBindings( ) } -fun Client.assertNoQueue( - queue: String, +fun Client.assertNoQueues( + queuePattern: String, + vHost: String, timeout: Long = 5_000, unit: TimeUnit = TimeUnit.MILLISECONDS, ) { - await("assertQueue('$queue')") + await("assertNoQueues('$queuePattern')") .timeout(timeout, unit) - .until { queues.firstOrNull { it.name == queue } == null } -} + .until { queues.map { it.name.matches(Regex(queuePattern)) && it.vhost == vHost }.isEmpty() } +} \ No newline at end of file