Skip to content

Commit

Permalink
[TH2-5226] added mq link test
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita-Smirnov-Exactpro committed Sep 16, 2024
1 parent f2f35d6 commit 5dc5513
Show file tree
Hide file tree
Showing 3 changed files with 238 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ import com.exactpro.th2.infraoperator.spec.shared.status.RolloutPhase.DISABLED
import com.exactpro.th2.infraoperator.spec.shared.status.RolloutPhase.SUCCEEDED
import com.exactpro.th2.infraoperator.spec.strategy.linkresolver.createEstoreQueue
import com.exactpro.th2.infraoperator.spec.strategy.linkresolver.createMstoreQueue
import com.exactpro.th2.infraoperator.spec.strategy.linkresolver.mq.RabbitMQContext
import com.exactpro.th2.infraoperator.spec.strategy.linkresolver.mq.RabbitMQContext.DIRECT
import com.exactpro.th2.infraoperator.spec.strategy.linkresolver.mq.RabbitMQContext.TOPIC
import com.exactpro.th2.infraoperator.spec.strategy.linkresolver.mq.RabbitMQContext.toExchangeName
Expand All @@ -95,13 +94,11 @@ import org.junit.jupiter.api.TestInstance
import org.junit.jupiter.api.Timeout
import org.junit.jupiter.api.io.TempDir
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.Arguments
import org.junit.jupiter.params.provider.MethodSource
import org.junit.jupiter.params.provider.ValueSource
import org.slf4j.LoggerFactory.getLogger
import org.testcontainers.containers.RabbitMQContainer
import org.testcontainers.containers.output.Slf4jLogConsumer
import org.testcontainers.k3s.K3sContainer
import org.testcontainers.lifecycle.Startable
import org.testcontainers.utility.DockerImageName
import strikt.api.expectThat
import strikt.assertions.getValue
import strikt.assertions.hasSize
Expand Down Expand Up @@ -149,17 +146,8 @@ class IntegrationTest {
operatorConfig = configDir.resolve("infra-operator.yml")
configDir.createDirectories()

k3sContainer =
K3sContainer(K3S_DOCKER_IMAGE)
.withLogConsumer(Slf4jLogConsumer(getLogger("K3S")).withSeparateOutputStreams())
.also(Startable::start)

rabbitMQContainer =
RabbitMQContainer(RABBITMQ_DOCKER_IMAGE)
.withLogConsumer(Slf4jLogConsumer(getLogger("RABBIT_MQ")).withSeparateOutputStreams())
.also(Startable::start)

K_LOGGER.info { "RabbitMQ URL: ${rabbitMQContainer.httpUrl}" }
k3sContainer = createK3sContainer()
rabbitMQContainer = createRabbitMQContainer()

Files.writeString(kubeConfig, k3sContainer.kubeConfigYaml)
YAML_MAPPER.writeValue(operatorConfig.toFile(), createOperatorConfig(rabbitMQContainer))
Expand Down Expand Up @@ -218,8 +206,7 @@ class IntegrationTest {
kubeClient.deleteNamespace(TH2_NAMESPACE, 1, MINUTES)
// FIXME: Secret not found "th2-test:Secret/rabbitMQ"

rabbitMQClient.assertNoQueue(createEstoreQueue(TH2_NAMESPACE))
rabbitMQClient.assertNoQueue(createMstoreQueue(TH2_NAMESPACE))
rabbitMQClient.assertNoQueues("link\\[.*\\]", RABBIT_MQ_V_HOST)
rabbitMQClient.assertNoExchange(toExchangeName(TH2_NAMESPACE))
rabbitMQClient.assertNoUser(TH2_NAMESPACE)

Expand All @@ -244,6 +231,13 @@ class IntegrationTest {
abstract fun add(name: String)
abstract fun disable(name: String)
abstract fun enable(name: String)
abstract fun `mq link`(
subClass: Class<out Th2CustomResource>,
subConstructor: () -> Th2CustomResource,
subSpecType: String,
subRunAsJob: Boolean
)
abstract fun `grpc link`(name: String)

protected fun addTest(name: String) {
val gitHash = RESOURCE_GIT_HASH_COUNTER.incrementAndGet().toString()
Expand Down Expand Up @@ -346,6 +340,118 @@ class IntegrationTest {
)
)
}

protected fun mkLinkTest(
pubClass: Class<out Th2CustomResource>,
pubConstructor: () -> Th2CustomResource,
pubSpecType: String,
pubRunAsJob: Boolean,
subClass: Class<out Th2CustomResource>,
subConstructor: () -> Th2CustomResource,
subSpecType: String,
subRunAsJob: Boolean,
) {
val gitHash = RESOURCE_GIT_HASH_COUNTER.incrementAndGet().toString()
val pubName = "test-publisher"
val subName = "test-subscriber"

val publisherSpec = """
imageName: $IMAGE
imageVersion: $VERSION
type: $pubSpecType
pins:
mq:
publishers:
- name: $PUBLISH_PIN
attributes: [publish]
""".trimIndent()

val spec = """
imageName: $IMAGE
imageVersion: $VERSION
type: $subSpecType
pins:
mq:
subscribers:
- name: $SUBSCRIBE_PIN
attributes: [subscribe]
linkTo:
- box: $pubName
pin: $PUBLISH_PIN
""".trimIndent()

kubeClient.createTh2CustomResource(TH2_NAMESPACE, pubName, gitHash, publisherSpec, pubConstructor)
kubeClient.createTh2CustomResource(TH2_NAMESPACE, subName, gitHash, spec, subConstructor)

kubeClient.awaitPhase(TH2_NAMESPACE, pubName, SUCCEEDED, pubClass)
kubeClient.awaitPhase(TH2_NAMESPACE, subName, SUCCEEDED, subClass)

val queueName = "link[$TH2_NAMESPACE:$subName:$SUBSCRIBE_PIN]"
val routingKey = "key[$TH2_NAMESPACE:$pubName:$PUBLISH_PIN]"

kubeClient.awaitResource<HelmRelease>(TH2_NAMESPACE, pubName).assertMinCfg(
pubName,
pubRunAsJob,
queues = mapOf(
PUBLISH_PIN to mapOf(
"attributes" to listOf("publish"),
"exchange" to toExchangeName(TH2_NAMESPACE),
"filters" to emptyList<String>(),
"name" to routingKey,
"queue" to "",
)
)
)
kubeClient.awaitResource<HelmRelease>(TH2_NAMESPACE, hashNameIfNeeded(subName)).assertMinCfg(
subName,
subRunAsJob,
queues = mapOf(
SUBSCRIBE_PIN to mapOf(
"attributes" to listOf("subscribe"),
"exchange" to toExchangeName(TH2_NAMESPACE),
"filters" to emptyList<String>(),
"name" to "",
"queue" to queueName,
)
)
)

rabbitMQClient.assertBindings(
queueName,
RABBIT_MQ_V_HOST,
setOf("link[$TH2_NAMESPACE:$subName:$SUBSCRIBE_PIN]", routingKey)
)
rabbitMQClient.assertBindings(
createEstoreQueue(TH2_NAMESPACE),
RABBIT_MQ_V_HOST,
setOf(
"link[$TH2_NAMESPACE:$EVENT_STORAGE_BOX_ALIAS:$EVENT_STORAGE_PIN_ALIAS]",
"key[$TH2_NAMESPACE:$pubName:$EVENT_STORAGE_PIN_ALIAS]",
"key[$TH2_NAMESPACE:$subName:$EVENT_STORAGE_PIN_ALIAS]",
)
)
}

protected fun grpcLinkTest(name: String) {
val gitHash = RESOURCE_GIT_HASH_COUNTER.incrementAndGet().toString()
val spec = """
imageName: $IMAGE
imageVersion: $VERSION
type: $specType
""".trimIndent()

kubeClient.createTh2CustomResource(TH2_NAMESPACE, name, gitHash, spec, this::createResources)
kubeClient.awaitPhase(TH2_NAMESPACE, name, SUCCEEDED, resourceClass)
kubeClient.awaitResource<HelmRelease>(TH2_NAMESPACE, hashNameIfNeeded(name)).assertMinCfg(name, runAsJob)
rabbitMQClient.assertBindings(
createEstoreQueue(TH2_NAMESPACE),
RABBIT_MQ_V_HOST,
setOf(
"link[$TH2_NAMESPACE:$EVENT_STORAGE_BOX_ALIAS:$EVENT_STORAGE_PIN_ALIAS]",
"key[$TH2_NAMESPACE:$name:$EVENT_STORAGE_PIN_ALIAS]"
)
)
}
}

@Nested
Expand Down Expand Up @@ -415,6 +521,21 @@ class IntegrationTest {
@ParameterizedTest
@ValueSource(strings = ["th2-core-component", "th2-core-component-more-than-$NAME_LENGTH_LIMIT-characters"])
override fun enable(name: String) = enableTest(name)

@Timeout(30_000)
@ParameterizedTest
@MethodSource("com.exactpro.th2.infraoperator.integration.IntegrationTest#mqLinkArguments")
override fun `mq link`(
subClass: Class<out Th2CustomResource>,
subConstructor: () -> Th2CustomResource,
subSpecType: String,
subRunAsJob: Boolean
) = mkLinkTest(resourceClass, ::createResources, specType, runAsJob, subClass, subConstructor, subSpecType, subRunAsJob)

@Timeout(30_000)
@ParameterizedTest
@ValueSource(strings = ["th2-core-component", "th2-core-component-more-than-$NAME_LENGTH_LIMIT-characters"])
override fun `grpc link`(name: String) = grpcLinkTest(name)
}

@Nested
Expand Down Expand Up @@ -442,6 +563,21 @@ class IntegrationTest {
@ParameterizedTest
@ValueSource(strings = ["th2-component", "th2-component-more-than-$NAME_LENGTH_LIMIT-characters"])
override fun enable(name: String) = enableTest(name)

@Timeout(30_000)
@ParameterizedTest
@MethodSource("com.exactpro.th2.infraoperator.integration.IntegrationTest#mqLinkArguments")
override fun `mq link`(
subClass: Class<out Th2CustomResource>,
subConstructor: () -> Th2CustomResource,
subSpecType: String,
subRunAsJob: Boolean
) = mkLinkTest(resourceClass, ::createResources, specType, runAsJob, subClass, subConstructor, subSpecType, subRunAsJob)

@Timeout(30_000)
@ParameterizedTest
@ValueSource(strings = ["th2-component", "th2-component-more-than-$NAME_LENGTH_LIMIT-characters"])
override fun `grpc link`(name: String) = grpcLinkTest(name)
}

@Nested
Expand Down Expand Up @@ -469,6 +605,21 @@ class IntegrationTest {
@ParameterizedTest
@ValueSource(strings = ["th2-job", "th2-job-more-than-$NAME_LENGTH_LIMIT-characters"])
override fun enable(name: String) = enableTest(name)

@Timeout(30_000)
@ParameterizedTest
@MethodSource("com.exactpro.th2.infraoperator.integration.IntegrationTest#mqLinkArguments")
override fun `mq link`(
subClass: Class<out Th2CustomResource>,
subConstructor: () -> Th2CustomResource,
subSpecType: String,
subRunAsJob: Boolean
) = mkLinkTest(resourceClass, ::createResources, specType, runAsJob, subClass, subConstructor, subSpecType, subRunAsJob)

@Timeout(30_000)
@ParameterizedTest
@ValueSource(strings = ["th2-job", "th2-job-more-than-$NAME_LENGTH_LIMIT-characters"])
override fun `grpc link`(name: String) = grpcLinkTest(name)
}

@Nested
Expand Down Expand Up @@ -509,9 +660,6 @@ class IntegrationTest {

private val RESOURCE_GIT_HASH_COUNTER = AtomicLong(10_000_000)

private val RABBITMQ_DOCKER_IMAGE = DockerImageName.parse("rabbitmq:3.12.6-management")
private val K3S_DOCKER_IMAGE = DockerImageName.parse("rancher/k3s:v1.21.3-k3s1")

private val CRD_RESOURCE_NAMES =
setOf(
"helmreleases-crd.yaml",
Expand All @@ -531,6 +679,8 @@ class IntegrationTest {
private const val TH2_PREFIX = "th2-"
private const val TH2_NAMESPACE = "${TH2_PREFIX}test"
private const val TH2_BOOK = "test_book"
private const val PUBLISH_PIN = "test-publish-pin"
private const val SUBSCRIBE_PIN = "test-subscribe-pin"
private const val CFG_FIELD = "test-cfg-field"
private const val CFG_VALUE = "test-cfg-value"
private const val IMAGE = "ghcr.io/th2-net/th2-estore"
Expand All @@ -539,6 +689,13 @@ class IntegrationTest {

private const val TEST_CONTENT = "test-content"

@JvmStatic
fun mqLinkArguments() = listOf(
Arguments.of(Th2Job::class.java, ::Th2Job, "th2-job", true),
Arguments.of(Th2Box::class.java, ::Th2Box, "th2-codec", false),
Arguments.of(Th2CoreBox::class.java, ::Th2CoreBox, "th2-rpt-data-provider", false),
)

private fun createOperatorConfig(rabbitMQ: RabbitMQContainer) =
OperatorConfig(
chart = ChartSpec(),
Expand Down Expand Up @@ -575,14 +732,6 @@ class IntegrationTest {
"${'$'}{RABBITMQ_PASS}",
)

private fun createRabbitMQClient(rabbitMQ: RabbitMQContainer) =
RabbitMQContext.createClient(
rabbitMQ.host,
rabbitMQ.httpPort,
rabbitMQ.adminUsername,
rabbitMQ.adminPassword,
)

private fun KubernetesClient.configureK3s() {
CRD_RESOURCE_NAMES
.asSequence()
Expand Down Expand Up @@ -714,7 +863,11 @@ class IntegrationTest {
deleteConfigMap(namespace, CRADLE_MANAGER_CM_NAME)
}

private fun HelmRelease.assertMinCfg(name: String, runAsJob: Boolean) {
private fun HelmRelease.assertMinCfg(
name: String,
runAsJob: Boolean,
queues: Map<String, Map<String, Any>> = emptyMap(),
) {
expectThat(componentValuesSection) {
getValue(BOOK_CONFIG_ALIAS).isA<Map<String, Any?>>().and {
hasSize(1)
Expand Down Expand Up @@ -780,7 +933,7 @@ class IntegrationTest {
getValue("exchange") isEqualTo RABBIT_MQ_TOPIC_EXCHANGE
}
getValue("queues").isA<Map<String, Any?>>().and {
hasSize(1)
hasSize(1 + queues.size)
getValue(EVENT_STORAGE_PIN_ALIAS).isA<Map<String, Any?>>().and {
hasSize(5)
getValue("attributes").isA<List<String>>() isEqualTo listOf("publish", "event")
Expand All @@ -789,6 +942,9 @@ class IntegrationTest {
getValue("name") isEqualTo "key[$TH2_NAMESPACE:$name:$EVENT_STORAGE_PIN_ALIAS]"
getValue("queue").isA<String>().isEmpty()
}
queues.forEach { (key, value) ->
getValue(key).isA<Map<String, Any?>>() isEqualTo value
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2024-2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.exactpro.th2.infraoperator.integration

import com.exactpro.th2.infraoperator.spec.strategy.linkresolver.mq.RabbitMQContext
import com.rabbitmq.http.client.Client
import org.slf4j.LoggerFactory
import org.testcontainers.containers.RabbitMQContainer
import org.testcontainers.containers.output.Slf4jLogConsumer
import org.testcontainers.k3s.K3sContainer
import org.testcontainers.lifecycle.Startable
import org.testcontainers.utility.DockerImageName

private val K3S_DOCKER_IMAGE = DockerImageName.parse("rancher/k3s:v1.21.3-k3s1")
private val RABBITMQ_DOCKER_IMAGE = DockerImageName.parse("rabbitmq:3.12.6-management")

fun createK3sContainer(): K3sContainer = K3sContainer(K3S_DOCKER_IMAGE)
.withLogConsumer(Slf4jLogConsumer(LoggerFactory.getLogger("K3S")).withSeparateOutputStreams())
.also(Startable::start)

fun createRabbitMQContainer(): RabbitMQContainer = RabbitMQContainer(RABBITMQ_DOCKER_IMAGE)
.withLogConsumer(Slf4jLogConsumer(LoggerFactory.getLogger("RABBIT_MQ")).withSeparateOutputStreams())
.also(Startable::start)

fun createRabbitMQClient(rabbitMQ: RabbitMQContainer): Client =
RabbitMQContext.createClient(
rabbitMQ.host,
rabbitMQ.httpPort,
rabbitMQ.adminUsername,
rabbitMQ.adminPassword,
)
Original file line number Diff line number Diff line change
Expand Up @@ -177,12 +177,13 @@ fun Client.assertBindings(
)
}

fun Client.assertNoQueue(
queue: String,
fun Client.assertNoQueues(
queuePattern: String,
vHost: String,
timeout: Long = 5_000,
unit: TimeUnit = TimeUnit.MILLISECONDS,
) {
await("assertQueue('$queue')")
await("assertNoQueues('$queuePattern')")
.timeout(timeout, unit)
.until { queues.firstOrNull { it.name == queue } == null }
}
.until { queues.map { it.name.matches(Regex(queuePattern)) && it.vhost == vHost }.isEmpty() }
}

0 comments on commit 5dc5513

Please sign in to comment.