Skip to content

Commit

Permalink
Update tests to provision 3 nodes cluster tests with a replicated log…
Browse files Browse the repository at this point in the history
…let with replication property 2

This commit adds the NodeCtl grpc svc to the repo to generate a grpc client to manually
provision a replicated loglet with a replciation property 2. W/o manually provisioning the
cluster, the replication property defaults to 1.

Note: Whenever the *.proto files change in the restate repo, they need to be updated in this
repository as well if there is an incompatible change.

This fixes #26.
  • Loading branch information
tillrohrmann committed Dec 23, 2024
1 parent 05346b1 commit 033cf50
Show file tree
Hide file tree
Showing 9 changed files with 554 additions and 10 deletions.
44 changes: 42 additions & 2 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ plugins {
id("com.diffplug.spotless") version "6.25.0"
id("com.gradleup.shadow") version "8.3.5"
id("com.github.jk1.dependency-license-report") version "2.9"

alias(libs.plugins.protobuf)
}

group = "dev.restate.sdktesting"
Expand All @@ -19,6 +21,8 @@ repositories {
mavenCentral()
// OSSRH Snapshots repo
maven { url = uri("https://s01.oss.sonatype.org/content/repositories/snapshots/") }
// for protobuf-gradle-plugin dependencies
google()
}

dependencies {
Expand Down Expand Up @@ -57,13 +61,32 @@ dependencies {

implementation(libs.assertj)
implementation(libs.awaitility)

// grpc
implementation(libs.grpc.kotlin.stub)
implementation(libs.grpc.protobuf)
implementation(libs.grpc.netty.shaded)
implementation(libs.protobuf.kotlin)
}

kotlin { jvmToolchain(21) }

val generatedJ2SPDir = layout.buildDirectory.dir("generated/j2sp")

sourceSets { main { java.srcDir(generatedJ2SPDir) } }
val generatedProto = layout.buildDirectory.dir("generated/source/proto/main")

sourceSets {
main {
java {
srcDir(generatedJ2SPDir)
srcDir(generatedProto.get().dir("java"))
srcDir(generatedProto.get().dir("grpc"))
}
kotlin {
srcDir(generatedProto.get().dir("kotlin"))
srcDir(generatedProto.get().dir("grpckt"))
}
}
}

jsonSchema2Pojo {
setSource(files("$projectDir/src/main/json"))
Expand All @@ -76,6 +99,23 @@ jsonSchema2Pojo {
generateBuilders = true
}

protobuf {
protoc { artifact = libs.protoc.asProvider().get().toString() }
plugins {
create("grpc") { artifact = libs.protoc.gen.grpc.java.get().toString() }
create("grpckt") { artifact = libs.protoc.gen.grpc.kotlin.get().toString() + ":jdk8@jar" }
}
generateProtoTasks {
all().forEach {
it.plugins {
create("grpc")
create("grpckt")
}
it.builtins { create("kotlin") }
}
}
}

tasks {
getByName("compileKotlin") { dependsOn(generateJsonSchema2Pojo) }

Expand Down
3 changes: 3 additions & 0 deletions config/allowed-licenses.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
},
{
"moduleLicense": "Bouncy Castle Licence"
},
{
"moduleLicense": "COMMON DEVELOPMENT AND DISTRIBUTION LICENSE (CDDL) Version 1.0"
}
]
}
17 changes: 17 additions & 0 deletions settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ dependencyResolutionManagement {
mavenCentral()
// OSSRH Snapshots repo
maven { url = uri("https://s01.oss.sonatype.org/content/repositories/snapshots/") }
// for protobuf-gradle-plugin dependencies
google()
}

versionCatalogs {
Expand Down Expand Up @@ -75,6 +77,21 @@ dependencyResolutionManagement {
library("symbol-processing-api", "com.google.devtools.ksp", "symbol-processing-api")
.versionRef("ksp")
plugin("ksp", "com.google.devtools.ksp").versionRef("ksp")

library("grpc-kotlin-stub", "io.grpc", "grpc-kotlin-stub").version("1.4.1")

version("grpc", "1.69.0")
library("grpc-protobuf", "io.grpc", "grpc-protobuf").versionRef("grpc")
library("grpc-netty-shaded", "io.grpc", "grpc-netty-shaded").versionRef("grpc")
library("protoc-gen-grpc-java", "io.grpc", "protoc-gen-grpc-java").versionRef("grpc")

library("protoc-gen-grpc-kotlin", "io.grpc", "protoc-gen-grpc-kotlin").version("1.4.1")

version("protobuf", "4.29.2")
library("protobuf-kotlin", "com.google.protobuf", "protobuf-kotlin").versionRef("protobuf")
library("protoc", "com.google.protobuf", "protoc").versionRef("protobuf")

plugin("protobuf", "com.google.protobuf").version("0.9.4")
}
}
}
21 changes: 15 additions & 6 deletions src/main/kotlin/dev/restate/sdktesting/infra/RestateContainer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -46,24 +46,28 @@ class RestateContainer(
private val WAIT_STARTUP_STRATEGY =
WaitAllStrategy()
.withStrategy(
Wait.forHttp("/restate/health")
.forPort(RUNTIME_INGRESS_ENDPOINT_PORT)
Wait.forHttp("/metrics")
.forPort(RUNTIME_NODE_PORT)
.withRateLimiter(
RateLimiterBuilder.newBuilder()
.withRate(200, TimeUnit.MILLISECONDS)
.withConstantThroughput()
.build()))
.withStartupTimeout(120.seconds.toJavaDuration())

private val WAIT_INGRESS_READY_STRATEGY =
WaitAllStrategy()
.withStrategy(
Wait.forHttp("/health")
.forPort(RUNTIME_ADMIN_ENDPOINT_PORT)
Wait.forHttp("/restate/health")
.forPort(RUNTIME_INGRESS_ENDPOINT_PORT)
.withRateLimiter(
RateLimiterBuilder.newBuilder()
.withRate(200, TimeUnit.MILLISECONDS)
.withConstantThroughput()
.build()))
.withStartupTimeout(120.seconds.toJavaDuration())

fun bootstrapRestateCluster(
fun createRestateContainers(
config: RestateDeployerConfig,
network: Network,
envs: Map<String, String>,
Expand All @@ -80,13 +84,14 @@ class RestateContainer(
mapOf<String, String>(
"RESTATE_CLUSTER_NAME" to clusterId,
"RESTATE_BIFROST__DEFAULT_PROVIDER" to "replicated",
"RESTATE_ALLOW_BOOTSTRAP" to "true",
"RESTATE_ALLOW_BOOTSTRAP" to "false",
"RESTATE_ROLES" to "[worker,log-server,admin,metadata-store]",
)
val followerEnvs =
mapOf<String, String>(
"RESTATE_CLUSTER_NAME" to clusterId,
"RESTATE_BIFROST__DEFAULT_PROVIDER" to "replicated",
"RESTATE_ALLOW_BOOTSTRAP" to "false",
"RESTATE_ROLES" to "[worker,admin,log-server]",
"RESTATE_METADATA_STORE_CLIENT__ADDRESS" to
"http://$RESTATE_RUNTIME:$RUNTIME_NODE_PORT")
Expand Down Expand Up @@ -191,6 +196,10 @@ class RestateContainer(
WAIT_STARTUP_STRATEGY.waitUntilReady(this)
}

fun waitIngressReady() {
WAIT_INGRESS_READY_STRATEGY.waitUntilReady(this)
}

fun dumpConfiguration() {
check(isRunning) { "The container is not running, can't dump configuration" }
dockerClient.killContainerCmd(containerId).withSignal("SIGUSR1").exec()
Expand Down
60 changes: 58 additions & 2 deletions src/main/kotlin/dev/restate/sdktesting/infra/RestateDeployer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import dev.restate.admin.model.RegisterDeploymentRequest
import dev.restate.admin.model.RegisterDeploymentRequestAnyOf
import dev.restate.sdktesting.infra.runtimeconfig.IngressOptions
import dev.restate.sdktesting.infra.runtimeconfig.RestateConfigSchema
import io.grpc.ManagedChannelBuilder
import java.io.File
import java.net.URI
import java.net.http.HttpClient
Expand All @@ -28,6 +29,7 @@ import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import kotlin.time.Duration.Companion.seconds
import kotlin.time.toJavaDuration
import kotlinx.coroutines.runBlocking
import org.apache.logging.log4j.CloseableThreadContext
import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.ThreadContext
Expand All @@ -38,6 +40,9 @@ import org.testcontainers.images.builder.Transferable
import org.testcontainers.shaded.com.fasterxml.jackson.databind.ser.impl.SimpleBeanPropertyFilter
import org.testcontainers.shaded.com.fasterxml.jackson.databind.ser.impl.SimpleFilterProvider
import org.testcontainers.shaded.com.github.dockerjava.core.DockerClientConfig
import restate.cluster.*
import restate.node_ctl_svc.NodeCtlSvcGrpcKt
import restate.node_ctl_svc.provisionClusterRequest

class RestateDeployer
private constructor(
Expand Down Expand Up @@ -169,12 +174,23 @@ private constructor(
}
.associate { it.second.first to (it.first to it.second.second) }
private val runtimeContainers: List<RestateContainer> =
RestateContainer.bootstrapRestateCluster(
RestateContainer.createRestateContainers(
config, network, runtimeContainerEnvs, configSchema, copyToContainer, config.restateNodes)

private val deployedContainers: Map<String, ContainerHandle> =
(runtimeContainers.map {
it.hostname to ContainerHandle(it, restartWaitStrategy = { it.waitStartup() })
it.hostname to
ContainerHandle(
it,
restartWaitStrategy = {
it.waitStartup()
// For the KillRuntime and StopRuntime tests, we need to wait for the ingress
// to be ready.
// Otherwise, the client seems to be stuck when invoking services. Maybe it's
// because the new
// route has not been set up by Docker?
it.waitIngressReady()
})
} +
serviceContainers.map { it.key to ContainerHandle(it.value.second) } +
additionalContainers.map { it.key to ContainerHandle(it.value) })
Expand Down Expand Up @@ -282,9 +298,49 @@ private constructor(
.toTypedArray())
.get(150, TimeUnit.SECONDS)

if (config.restateNodes > 1) {
provisionCluster()
}

executor.shutdown()
}

private fun provisionCluster() {
LOG.debug("Manually provisioning the cluster")

val nodePort = getContainerPort(RESTATE_RUNTIME, RUNTIME_NODE_PORT)

val channel = ManagedChannelBuilder.forAddress("localhost", nodePort).usePlaintext().build()
val client = NodeCtlSvcGrpcKt.NodeCtlSvcCoroutineStub(channel)
val request = provisionClusterRequest {
dryRun = false
logProvider = defaultProvider {
provider = "replicated"
replicatedConfig = replicatedProviderConfig {
replicationProperty = "2"
nodesetSelectionStrategy = nodeSetSelectionStrategy {
kind = Cluster.NodeSetSelectionStrategyKind.StrictFaultTolerantGreedy
}
}
}
}

Unreliables.retryUntilSuccess(20, TimeUnit.SECONDS) {
try {
return@retryUntilSuccess runBlocking { client.provisionCluster(request) }
} catch (e: ApiException) {
Thread.sleep(30)
throw IllegalStateException(
"Error when provisioning cluster, got status code ${e.code} with body: ${e.responseBody}",
e)
}
}

channel.shutdown()

LOG.debug("Cluster has been provisioned")
}

private fun discoverDeployment(client: DeploymentApi, spec: ServiceSpec) {
val url = spec.getEndpointUrl(config)
if (spec.skipRegistration) {
Expand Down
78 changes: 78 additions & 0 deletions src/main/proto/node_ctl_svc.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH
//
// This file is part of the Restate service protocol, which is
// released under the MIT license.
//
// You can find a copy of the license in file LICENSE in the root
// directory of this repository or package, or at
// https://github.com/restatedev/proto/blob/main/LICENSE

syntax = "proto3";

import "google/protobuf/empty.proto";
import "restate/cluster.proto";
import "restate/common.proto";
import "restate/node.proto";

package restate.node_ctl_svc;

service NodeCtlSvc {
// Get identity information from this node.
rpc GetIdent(google.protobuf.Empty) returns (IdentResponse);

rpc GetMetadata(GetMetadataRequest) returns (GetMetadataResponse);

// Provision the Restate cluster on this node.
rpc ProvisionCluster(ProvisionClusterRequest) returns (ProvisionClusterResponse);
}

message ProvisionClusterRequest {
bool dry_run = 1;
optional uint32 num_partitions = 2;
optional restate.cluster.ReplicationStrategy placement_strategy = 3;
optional restate.cluster.DefaultProvider log_provider = 4;
}

enum ProvisionClusterResponseKind {
ProvisionClusterResponseType_UNKNOWN = 0;
DRY_RUN = 1;
NEWLY_PROVISIONED = 2;
ALREADY_PROVISIONED = 3;
}

message ProvisionClusterResponse {
ProvisionClusterResponseKind kind = 1;
// This field will be empty if the cluster is already provisioned
optional restate.cluster.ClusterConfiguration cluster_configuration = 3;
}

message IdentResponse {
restate.common.NodeStatus status = 1;
restate.common.NodeId node_id = 2;
string cluster_name = 3;
// indicates which roles are enabled on this node
repeated string roles = 4;
// Age of the running node in seconds (how many seconds since the daemon
// started)
uint64 age_s = 5;
restate.common.AdminStatus admin_status = 6;
restate.common.WorkerStatus worker_status = 7;
restate.common.LogServerStatus log_server_status = 8;
restate.common.MetadataServerStatus metadata_server_status = 9;
uint32 nodes_config_version = 10;
uint32 logs_version = 11;
uint32 schema_version = 12;
uint32 partition_table_version = 13;
}

message GetMetadataRequest {
// If set, we'll first sync with metadata store to esnure we are returning the latest value.
// Otherwise, we'll return the local value on this node.
bool sync = 1;
restate.node.MetadataKind kind = 2;
}

message GetMetadataResponse {
// polymorphic. The value depends on the MetadataKind requested
bytes encoded = 1;
}
Loading

0 comments on commit 033cf50

Please sign in to comment.