From daa5d2c943adf6bbd0831c39ba6f9522ab872810 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Tue, 6 Aug 2024 17:37:01 +0200 Subject: [PATCH] Revert code to parallel start containers on a single deployer. --- .../sdktesting/infra/RestateDeployer.kt | 115 ++++++------------ 1 file changed, 34 insertions(+), 81 deletions(-) diff --git a/src/main/kotlin/dev/restate/sdktesting/infra/RestateDeployer.kt b/src/main/kotlin/dev/restate/sdktesting/infra/RestateDeployer.kt index ae91e9e..a29d44b 100644 --- a/src/main/kotlin/dev/restate/sdktesting/infra/RestateDeployer.kt +++ b/src/main/kotlin/dev/restate/sdktesting/infra/RestateDeployer.kt @@ -19,12 +19,11 @@ import java.io.File import java.net.URI import java.net.http.HttpClient import java.nio.file.Path -import java.util.concurrent.CompletableFuture -import java.util.concurrent.Executors +import java.util.concurrent.TimeUnit import org.apache.logging.log4j.LogManager -import org.apache.logging.log4j.ThreadContext import org.junit.jupiter.api.extension.ExtensionContext import org.junit.jupiter.api.fail +import org.rnorth.ducttape.unreliables.Unreliables import org.testcontainers.containers.* import org.testcontainers.containers.wait.strategy.Wait import org.testcontainers.images.builder.Transferable @@ -48,7 +47,6 @@ private constructor( private val LOG = LogManager.getLogger(RestateDeployer::class.java) private val apiClient = ApiClient() - private val startContainersThreadPoolExecutor = Executors.newCachedThreadPool() fun builder(): Builder { return Builder() @@ -200,21 +198,11 @@ private constructor( // Configure logging configureLogger(testReportDir) - if (config.deployInParallel) { - // Deploy all containers in parallel - val startFutures = - deployServicesConcurrently() + - deployAdditionalContainersConcurrently() + - listOf(deployRuntime()) + - listOf(deployProxy(testReportDir)) - CompletableFuture.allOf(*startFutures.toTypedArray()).join() - } else { - // Deploy sequentially - deployServicesSequentially() - deployAdditionalContainersSequentially() - deployRuntime().join() - deployProxy(testReportDir).join() - } + // Deploy sequentially + deployServices() + deployAdditionalContainers() + deployRuntime() + deployProxy(testReportDir) // Configure proxy configureProxy() @@ -245,19 +233,7 @@ private constructor( runtimeContainer.configureLogger(testReportDir) } - private fun deployServicesConcurrently(): List> { - return serviceContainers.map { (serviceName, serviceContainer) -> - runOnStartupThreadPool { - serviceContainer.second.start() - LOG.debug( - "Started service container {} with endpoint {}", - serviceName, - serviceContainer.first.getEndpointUrl(config)) - } - } - } - - private fun deployServicesSequentially() { + private fun deployServices() { return serviceContainers.forEach { (serviceName, serviceContainer) -> serviceContainer.second.start() LOG.debug( @@ -267,31 +243,23 @@ private constructor( } } - private fun deployAdditionalContainersConcurrently(): List> { - return additionalContainers.map { (containerHost, container) -> - runOnStartupThreadPool { - container.start() - LOG.debug("Started container {} with image {}", containerHost, container.dockerImageName) - } - } - } - - private fun deployAdditionalContainersSequentially() { + private fun deployAdditionalContainers() { return additionalContainers.forEach { (containerHost, container) -> container.start() LOG.debug("Started container {} with image {}", containerHost, container.dockerImageName) } } - private fun deployRuntime(): CompletableFuture<*> { - return runOnStartupThreadPool { - runtimeContainer.start() - LOG.debug("Restate runtime started. Container id {}", runtimeContainer.containerId) - } + private fun deployRuntime() { + runtimeContainer + .dependsOn(serviceContainers.values.map { it.second }) + .dependsOn(additionalContainers.values) + .start() + LOG.debug("Restate runtime started. Container id {}", runtimeContainer.containerId) } - private fun deployProxy(testReportDir: String): CompletableFuture<*> { - return runOnStartupThreadPool { proxyContainer.start(network, testReportDir) } + private fun deployProxy(testReportDir: String) { + proxyContainer.start(network, testReportDir) } private fun configureProxy() { @@ -328,21 +296,24 @@ private constructor( } fun discoverDeployment(client: DeploymentApi, spec: ServiceSpec) { - val url = spec.getEndpointUrl(config) - if (spec.skipRegistration) { - LOG.debug("Skipping registration for endpoint {}", url) - return - } + Unreliables.retryUntilSuccess(5, TimeUnit.SECONDS) { + val url = spec.getEndpointUrl(config) + if (spec.skipRegistration) { + LOG.debug("Skipping registration for endpoint {}", url) + return@retryUntilSuccess + } - val request = - RegisterDeploymentRequest(RegisterDeploymentRequestAnyOf().uri(url.toString()).force(false)) - try { - val response = client.createDeployment(request) - LOG.debug("Successfully executed discovery for endpoint {}. Result: {}", url, response) - } catch (e: ApiException) { - fail( - "Error when discovering endpoint $url, got status code ${e.code} with body: ${e.responseBody}", - e) + val request = + RegisterDeploymentRequest( + RegisterDeploymentRequestAnyOf().uri(url.toString()).force(false)) + try { + val response = client.createDeployment(request) + LOG.debug("Successfully executed discovery for endpoint {}. Result: {}", url, response) + } catch (e: ApiException) { + fail( + "Error when discovering endpoint $url, got status code ${e.code} with body: ${e.responseBody}", + e) + } } } @@ -417,24 +388,6 @@ private constructor( return deployedContainers[hostName]!! } - private fun runOnStartupThreadPool(fn: () -> Unit): CompletableFuture { - val contextMap: Map = ThreadContext.getImmutableContext() - val contextStackTop: String = ThreadContext.peek() - - return CompletableFuture.runAsync( - { - ThreadContext.putAll(contextMap) - ThreadContext.push(contextStackTop) - try { - fn() - } finally { - ThreadContext.pop() - ThreadContext.clearMap() - } - }, - startContainersThreadPoolExecutor) - } - override fun close() { teardownAll() }