Skip to content

Commit

Permalink
Revert code to parallel start containers on a single deployer.
Browse files Browse the repository at this point in the history
  • Loading branch information
slinkydeveloper committed Aug 6, 2024
1 parent e88c866 commit daa5d2c
Showing 1 changed file with 34 additions and 81 deletions.
115 changes: 34 additions & 81 deletions src/main/kotlin/dev/restate/sdktesting/infra/RestateDeployer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,11 @@ import java.io.File
import java.net.URI
import java.net.http.HttpClient
import java.nio.file.Path
import java.util.concurrent.CompletableFuture
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.ThreadContext
import org.junit.jupiter.api.extension.ExtensionContext
import org.junit.jupiter.api.fail
import org.rnorth.ducttape.unreliables.Unreliables
import org.testcontainers.containers.*
import org.testcontainers.containers.wait.strategy.Wait
import org.testcontainers.images.builder.Transferable
Expand All @@ -48,7 +47,6 @@ private constructor(
private val LOG = LogManager.getLogger(RestateDeployer::class.java)

private val apiClient = ApiClient()
private val startContainersThreadPoolExecutor = Executors.newCachedThreadPool()

fun builder(): Builder {
return Builder()
Expand Down Expand Up @@ -200,21 +198,11 @@ private constructor(
// Configure logging
configureLogger(testReportDir)

if (config.deployInParallel) {
// Deploy all containers in parallel
val startFutures =
deployServicesConcurrently() +
deployAdditionalContainersConcurrently() +
listOf(deployRuntime()) +
listOf(deployProxy(testReportDir))
CompletableFuture.allOf(*startFutures.toTypedArray()).join()
} else {
// Deploy sequentially
deployServicesSequentially()
deployAdditionalContainersSequentially()
deployRuntime().join()
deployProxy(testReportDir).join()
}
// Deploy sequentially
deployServices()
deployAdditionalContainers()
deployRuntime()
deployProxy(testReportDir)

// Configure proxy
configureProxy()
Expand Down Expand Up @@ -245,19 +233,7 @@ private constructor(
runtimeContainer.configureLogger(testReportDir)
}

private fun deployServicesConcurrently(): List<CompletableFuture<*>> {
return serviceContainers.map { (serviceName, serviceContainer) ->
runOnStartupThreadPool {
serviceContainer.second.start()
LOG.debug(
"Started service container {} with endpoint {}",
serviceName,
serviceContainer.first.getEndpointUrl(config))
}
}
}

private fun deployServicesSequentially() {
private fun deployServices() {
return serviceContainers.forEach { (serviceName, serviceContainer) ->
serviceContainer.second.start()
LOG.debug(
Expand All @@ -267,31 +243,23 @@ private constructor(
}
}

private fun deployAdditionalContainersConcurrently(): List<CompletableFuture<*>> {
return additionalContainers.map { (containerHost, container) ->
runOnStartupThreadPool {
container.start()
LOG.debug("Started container {} with image {}", containerHost, container.dockerImageName)
}
}
}

private fun deployAdditionalContainersSequentially() {
private fun deployAdditionalContainers() {
return additionalContainers.forEach { (containerHost, container) ->
container.start()
LOG.debug("Started container {} with image {}", containerHost, container.dockerImageName)
}
}

private fun deployRuntime(): CompletableFuture<*> {
return runOnStartupThreadPool {
runtimeContainer.start()
LOG.debug("Restate runtime started. Container id {}", runtimeContainer.containerId)
}
private fun deployRuntime() {
runtimeContainer
.dependsOn(serviceContainers.values.map { it.second })
.dependsOn(additionalContainers.values)
.start()
LOG.debug("Restate runtime started. Container id {}", runtimeContainer.containerId)
}

private fun deployProxy(testReportDir: String): CompletableFuture<*> {
return runOnStartupThreadPool { proxyContainer.start(network, testReportDir) }
private fun deployProxy(testReportDir: String) {
proxyContainer.start(network, testReportDir)
}

private fun configureProxy() {
Expand Down Expand Up @@ -328,21 +296,24 @@ private constructor(
}

fun discoverDeployment(client: DeploymentApi, spec: ServiceSpec) {
val url = spec.getEndpointUrl(config)
if (spec.skipRegistration) {
LOG.debug("Skipping registration for endpoint {}", url)
return
}
Unreliables.retryUntilSuccess(5, TimeUnit.SECONDS) {
val url = spec.getEndpointUrl(config)
if (spec.skipRegistration) {
LOG.debug("Skipping registration for endpoint {}", url)
return@retryUntilSuccess
}

val request =
RegisterDeploymentRequest(RegisterDeploymentRequestAnyOf().uri(url.toString()).force(false))
try {
val response = client.createDeployment(request)
LOG.debug("Successfully executed discovery for endpoint {}. Result: {}", url, response)
} catch (e: ApiException) {
fail(
"Error when discovering endpoint $url, got status code ${e.code} with body: ${e.responseBody}",
e)
val request =
RegisterDeploymentRequest(
RegisterDeploymentRequestAnyOf().uri(url.toString()).force(false))
try {
val response = client.createDeployment(request)
LOG.debug("Successfully executed discovery for endpoint {}. Result: {}", url, response)
} catch (e: ApiException) {
fail(
"Error when discovering endpoint $url, got status code ${e.code} with body: ${e.responseBody}",
e)
}
}
}

Expand Down Expand Up @@ -417,24 +388,6 @@ private constructor(
return deployedContainers[hostName]!!
}

private fun runOnStartupThreadPool(fn: () -> Unit): CompletableFuture<Void> {
val contextMap: Map<String, String> = ThreadContext.getImmutableContext()
val contextStackTop: String = ThreadContext.peek()

return CompletableFuture.runAsync(
{
ThreadContext.putAll(contextMap)
ThreadContext.push(contextStackTop)
try {
fn()
} finally {
ThreadContext.pop()
ThreadContext.clearMap()
}
},
startContainersThreadPoolExecutor)
}

override fun close() {
teardownAll()
}
Expand Down

0 comments on commit daa5d2c

Please sign in to comment.