From e4c415e9bfb0f6c0c5f8f9e45e0038910316d6a1 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Mon, 23 Dec 2024 17:37:52 +0100 Subject: [PATCH] Update tests to provision 3 nodes cluster tests with a replicated loglet with replication property 2 This commit adds the NodeCtl grpc svc to the repo to generate a grpc client to manually provision a replicated loglet with a replciation property 2. W/o manually provisioning the cluster, the replication property defaults to 1. Note: Whenever the *.proto files change in the restate repo, they need to be updated in this repository as well if there is an incompatible change. This fixes #26. --- build.gradle.kts | 58 +++++++- config/allowed-licenses.json | 3 + settings.gradle.kts | 17 +++ .../sdktesting/infra/RestateContainer.kt | 21 ++- .../sdktesting/infra/RestateDeployer.kt | 66 ++++++++- src/main/proto/node_ctl_svc.proto | 73 ++++++++++ src/main/proto/restate/cluster.proto | 114 +++++++++++++++ src/main/proto/restate/common.proto | 134 ++++++++++++++++++ src/main/proto/restate/node.proto | 93 ++++++++++++ 9 files changed, 568 insertions(+), 11 deletions(-) create mode 100644 src/main/proto/node_ctl_svc.proto create mode 100644 src/main/proto/restate/cluster.proto create mode 100644 src/main/proto/restate/common.proto create mode 100644 src/main/proto/restate/node.proto diff --git a/build.gradle.kts b/build.gradle.kts index 9e8eac9..e7305ac 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -9,6 +9,8 @@ plugins { id("com.diffplug.spotless") version "6.25.0" id("com.gradleup.shadow") version "8.3.5" id("com.github.jk1.dependency-license-report") version "2.9" + + alias(libs.plugins.protobuf) } group = "dev.restate.sdktesting" @@ -19,6 +21,8 @@ repositories { mavenCentral() // OSSRH Snapshots repo maven { url = uri("https://s01.oss.sonatype.org/content/repositories/snapshots/") } + // for protobuf-gradle-plugin dependencies + google() } dependencies { @@ -57,13 +61,32 @@ dependencies { implementation(libs.assertj) implementation(libs.awaitility) + + // grpc + implementation(libs.grpc.kotlin.stub) + implementation(libs.grpc.protobuf) + implementation(libs.grpc.netty.shaded) + implementation(libs.protobuf.kotlin) } kotlin { jvmToolchain(21) } val generatedJ2SPDir = layout.buildDirectory.dir("generated/j2sp") - -sourceSets { main { java.srcDir(generatedJ2SPDir) } } +val generatedProto = layout.buildDirectory.dir("generated/source/proto/main") + +sourceSets { + main { + java { + srcDir(generatedJ2SPDir) + srcDir(generatedProto.get().dir("java")) + srcDir(generatedProto.get().dir("grpc")) + } + kotlin { + srcDir(generatedProto.get().dir("kotlin")) + srcDir(generatedProto.get().dir("grpckt")) + } + } +} jsonSchema2Pojo { setSource(files("$projectDir/src/main/json")) @@ -76,12 +99,41 @@ jsonSchema2Pojo { generateBuilders = true } +protobuf { + protoc { artifact = libs.protoc.asProvider().get().toString() } + plugins { + create("grpc") { artifact = libs.protoc.gen.grpc.java.get().toString() } + create("grpckt") { artifact = libs.protoc.gen.grpc.kotlin.get().toString() + ":jdk8@jar" } + } + generateProtoTasks { + all().forEach { + it.plugins { + create("grpc") + create("grpckt") + } + it.builtins { create("kotlin") } + } + } +} + tasks { - getByName("compileKotlin") { dependsOn(generateJsonSchema2Pojo) } + getByName("compileKotlin") { + dependsOn(generateJsonSchema2Pojo) + dependsOn(generateProto) + } test { useJUnitPlatform() } } +// Workaround to enforce that kspKotlin runs after the generateProto task. Otherwise, gradle +// complains about an implicit +// dependency. See https://github.com/google/protobuf-gradle-plugin/issues/694. +tasks.configureEach { + if (name == "kspKotlin") { + mustRunAfter(tasks.generateProto) + } +} + spotless { kotlin { ktfmt() diff --git a/config/allowed-licenses.json b/config/allowed-licenses.json index 81867ea..6d78d35 100644 --- a/config/allowed-licenses.json +++ b/config/allowed-licenses.json @@ -20,6 +20,9 @@ }, { "moduleLicense": "Bouncy Castle Licence" + }, + { + "moduleLicense": "COMMON DEVELOPMENT AND DISTRIBUTION LICENSE (CDDL) Version 1.0" } ] } \ No newline at end of file diff --git a/settings.gradle.kts b/settings.gradle.kts index d942a6e..2a4f252 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -7,6 +7,8 @@ dependencyResolutionManagement { mavenCentral() // OSSRH Snapshots repo maven { url = uri("https://s01.oss.sonatype.org/content/repositories/snapshots/") } + // for protobuf-gradle-plugin dependencies + google() } versionCatalogs { @@ -75,6 +77,21 @@ dependencyResolutionManagement { library("symbol-processing-api", "com.google.devtools.ksp", "symbol-processing-api") .versionRef("ksp") plugin("ksp", "com.google.devtools.ksp").versionRef("ksp") + + library("grpc-kotlin-stub", "io.grpc", "grpc-kotlin-stub").version("1.4.1") + + version("grpc", "1.69.0") + library("grpc-protobuf", "io.grpc", "grpc-protobuf").versionRef("grpc") + library("grpc-netty-shaded", "io.grpc", "grpc-netty-shaded").versionRef("grpc") + library("protoc-gen-grpc-java", "io.grpc", "protoc-gen-grpc-java").versionRef("grpc") + + library("protoc-gen-grpc-kotlin", "io.grpc", "protoc-gen-grpc-kotlin").version("1.4.1") + + version("protobuf", "4.29.2") + library("protobuf-kotlin", "com.google.protobuf", "protobuf-kotlin").versionRef("protobuf") + library("protoc", "com.google.protobuf", "protoc").versionRef("protobuf") + + plugin("protobuf", "com.google.protobuf").version("0.9.4") } } } diff --git a/src/main/kotlin/dev/restate/sdktesting/infra/RestateContainer.kt b/src/main/kotlin/dev/restate/sdktesting/infra/RestateContainer.kt index 0e52db4..4f11552 100644 --- a/src/main/kotlin/dev/restate/sdktesting/infra/RestateContainer.kt +++ b/src/main/kotlin/dev/restate/sdktesting/infra/RestateContainer.kt @@ -46,16 +46,20 @@ class RestateContainer( private val WAIT_STARTUP_STRATEGY = WaitAllStrategy() .withStrategy( - Wait.forHttp("/restate/health") - .forPort(RUNTIME_INGRESS_ENDPOINT_PORT) + Wait.forHttp("/metrics") + .forPort(RUNTIME_NODE_PORT) .withRateLimiter( RateLimiterBuilder.newBuilder() .withRate(200, TimeUnit.MILLISECONDS) .withConstantThroughput() .build())) + .withStartupTimeout(120.seconds.toJavaDuration()) + + private val WAIT_INGRESS_READY_STRATEGY = + WaitAllStrategy() .withStrategy( - Wait.forHttp("/health") - .forPort(RUNTIME_ADMIN_ENDPOINT_PORT) + Wait.forHttp("/restate/health") + .forPort(RUNTIME_INGRESS_ENDPOINT_PORT) .withRateLimiter( RateLimiterBuilder.newBuilder() .withRate(200, TimeUnit.MILLISECONDS) @@ -63,7 +67,7 @@ class RestateContainer( .build())) .withStartupTimeout(120.seconds.toJavaDuration()) - fun bootstrapRestateCluster( + fun createRestateContainers( config: RestateDeployerConfig, network: Network, envs: Map, @@ -80,13 +84,14 @@ class RestateContainer( mapOf( "RESTATE_CLUSTER_NAME" to clusterId, "RESTATE_BIFROST__DEFAULT_PROVIDER" to "replicated", - "RESTATE_ALLOW_BOOTSTRAP" to "true", + "RESTATE_ALLOW_BOOTSTRAP" to "false", "RESTATE_ROLES" to "[worker,log-server,admin,metadata-store]", ) val followerEnvs = mapOf( "RESTATE_CLUSTER_NAME" to clusterId, "RESTATE_BIFROST__DEFAULT_PROVIDER" to "replicated", + "RESTATE_ALLOW_BOOTSTRAP" to "false", "RESTATE_ROLES" to "[worker,admin,log-server]", "RESTATE_METADATA_STORE_CLIENT__ADDRESS" to "http://$RESTATE_RUNTIME:$RUNTIME_NODE_PORT") @@ -191,6 +196,10 @@ class RestateContainer( WAIT_STARTUP_STRATEGY.waitUntilReady(this) } + fun waitIngressReady() { + WAIT_INGRESS_READY_STRATEGY.waitUntilReady(this) + } + fun dumpConfiguration() { check(isRunning) { "The container is not running, can't dump configuration" } dockerClient.killContainerCmd(containerId).withSignal("SIGUSR1").exec() diff --git a/src/main/kotlin/dev/restate/sdktesting/infra/RestateDeployer.kt b/src/main/kotlin/dev/restate/sdktesting/infra/RestateDeployer.kt index df26882..518bfc6 100644 --- a/src/main/kotlin/dev/restate/sdktesting/infra/RestateDeployer.kt +++ b/src/main/kotlin/dev/restate/sdktesting/infra/RestateDeployer.kt @@ -15,6 +15,9 @@ import dev.restate.admin.model.RegisterDeploymentRequest import dev.restate.admin.model.RegisterDeploymentRequestAnyOf import dev.restate.sdktesting.infra.runtimeconfig.IngressOptions import dev.restate.sdktesting.infra.runtimeconfig.RestateConfigSchema +import io.grpc.ManagedChannelBuilder +import io.grpc.Status.Code +import io.grpc.StatusException import java.io.File import java.net.URI import java.net.http.HttpClient @@ -28,6 +31,7 @@ import java.util.concurrent.Executors import java.util.concurrent.TimeUnit import kotlin.time.Duration.Companion.seconds import kotlin.time.toJavaDuration +import kotlinx.coroutines.runBlocking import org.apache.logging.log4j.CloseableThreadContext import org.apache.logging.log4j.LogManager import org.apache.logging.log4j.ThreadContext @@ -38,6 +42,9 @@ import org.testcontainers.images.builder.Transferable import org.testcontainers.shaded.com.fasterxml.jackson.databind.ser.impl.SimpleBeanPropertyFilter import org.testcontainers.shaded.com.fasterxml.jackson.databind.ser.impl.SimpleFilterProvider import org.testcontainers.shaded.com.github.dockerjava.core.DockerClientConfig +import restate.cluster.* +import restate.node_ctl_svc.NodeCtlSvcGrpcKt +import restate.node_ctl_svc.provisionClusterRequest class RestateDeployer private constructor( @@ -169,12 +176,23 @@ private constructor( } .associate { it.second.first to (it.first to it.second.second) } private val runtimeContainers: List = - RestateContainer.bootstrapRestateCluster( + RestateContainer.createRestateContainers( config, network, runtimeContainerEnvs, configSchema, copyToContainer, config.restateNodes) private val deployedContainers: Map = (runtimeContainers.map { - it.hostname to ContainerHandle(it, restartWaitStrategy = { it.waitStartup() }) + it.hostname to + ContainerHandle( + it, + restartWaitStrategy = { + it.waitStartup() + // For the KillRuntime and StopRuntime tests, we need to wait for the ingress + // to be ready. + // Otherwise, the client seems to be stuck when invoking services. Maybe it's + // because the new + // route has not been set up by Docker? + it.waitIngressReady() + }) } + serviceContainers.map { it.key to ContainerHandle(it.value.second) } + additionalContainers.map { it.key to ContainerHandle(it.value) }) @@ -282,9 +300,53 @@ private constructor( .toTypedArray()) .get(150, TimeUnit.SECONDS) + if (config.restateNodes > 1) { + provisionCluster() + } + executor.shutdown() } + private fun provisionCluster() { + LOG.debug("Manually provisioning the cluster") + + val nodePort = getContainerPort(RESTATE_RUNTIME, RUNTIME_NODE_PORT) + + val channel = ManagedChannelBuilder.forAddress("localhost", nodePort).usePlaintext().build() + val client = NodeCtlSvcGrpcKt.NodeCtlSvcCoroutineStub(channel) + val request = provisionClusterRequest { + dryRun = false + logProvider = defaultProvider { + provider = "replicated" + replicatedConfig = replicatedProviderConfig { + replicationProperty = "2" + nodesetSelectionStrategy = nodeSetSelectionStrategy { + kind = Cluster.NodeSetSelectionStrategyKind.StrictFaultTolerantGreedy + } + } + } + } + + Unreliables.retryUntilSuccess(20, TimeUnit.SECONDS) { + try { + runBlocking { + client.provisionCluster(request) + Unit + } + } catch (e: StatusException) { + // already exists code indicates that the cluster has been already provisioned + if (e.status.code != Code.ALREADY_EXISTS) { + Thread.sleep(100) + throw Exception("Failed provisioning the cluster. Retrying.", e) + } + } + } + + channel.shutdown() + + LOG.debug("Cluster has been provisioned") + } + private fun discoverDeployment(client: DeploymentApi, spec: ServiceSpec) { val url = spec.getEndpointUrl(config) if (spec.skipRegistration) { diff --git a/src/main/proto/node_ctl_svc.proto b/src/main/proto/node_ctl_svc.proto new file mode 100644 index 0000000..bcaa8db --- /dev/null +++ b/src/main/proto/node_ctl_svc.proto @@ -0,0 +1,73 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH +// +// This file is part of the Restate service protocol, which is +// released under the MIT license. +// +// You can find a copy of the license in file LICENSE in the root +// directory of this repository or package, or at +// https://github.com/restatedev/proto/blob/main/LICENSE + +syntax = "proto3"; + +import "google/protobuf/empty.proto"; +import "restate/cluster.proto"; +import "restate/common.proto"; +import "restate/node.proto"; + +package restate.node_ctl_svc; + +service NodeCtlSvc { + // Get identity information from this node. + rpc GetIdent(google.protobuf.Empty) returns (IdentResponse); + + rpc GetMetadata(GetMetadataRequest) returns (GetMetadataResponse); + + // Provision the Restate cluster on this node. + rpc ProvisionCluster(ProvisionClusterRequest) returns (ProvisionClusterResponse); +} + +message ProvisionClusterRequest { + bool dry_run = 1; + // if unset then the configured cluster num partitions will be used + optional uint32 num_partitions = 2; + // if unset then the configured cluster placement strategy will be used + optional restate.cluster.ReplicationStrategy placement_strategy = 3; + // if unset then the configured cluster default log provider will be used + optional restate.cluster.DefaultProvider log_provider = 4; +} + +message ProvisionClusterResponse { + bool dry_run = 1; + restate.cluster.ClusterConfiguration cluster_configuration = 2; +} + +message IdentResponse { + restate.common.NodeStatus status = 1; + restate.common.NodeId node_id = 2; + string cluster_name = 3; + // indicates which roles are enabled on this node + repeated string roles = 4; + // Age of the running node in seconds (how many seconds since the daemon + // started) + uint64 age_s = 5; + restate.common.AdminStatus admin_status = 6; + restate.common.WorkerStatus worker_status = 7; + restate.common.LogServerStatus log_server_status = 8; + restate.common.MetadataServerStatus metadata_server_status = 9; + uint32 nodes_config_version = 10; + uint32 logs_version = 11; + uint32 schema_version = 12; + uint32 partition_table_version = 13; +} + +message GetMetadataRequest { + // If set, we'll first sync with metadata store to esnure we are returning the latest value. + // Otherwise, we'll return the local value on this node. + bool sync = 1; + restate.node.MetadataKind kind = 2; +} + +message GetMetadataResponse { + // polymorphic. The value depends on the MetadataKind requested + bytes encoded = 1; +} diff --git a/src/main/proto/restate/cluster.proto b/src/main/proto/restate/cluster.proto new file mode 100644 index 0000000..d5dfb65 --- /dev/null +++ b/src/main/proto/restate/cluster.proto @@ -0,0 +1,114 @@ +// Copyright (c) 2024 - 2025 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +syntax = "proto3"; + +import "restate/common.proto"; +import "google/protobuf/empty.proto"; +import "google/protobuf/duration.proto"; +import "google/protobuf/timestamp.proto"; + +package restate.cluster; + +message ClusterState { + google.protobuf.Duration last_refreshed = 1; + restate.common.Version nodes_config_version = 2; + restate.common.Version partition_table_version = 3; + map nodes = 4; + restate.common.Version logs_metadata_version = 5; +} + +message NodeState { + oneof state { + AliveNode alive = 1; + DeadNode dead = 2; + SuspectNode suspect = 3; + } +} + +message SuspectNode { + restate.common.NodeId generational_node_id = 1; + google.protobuf.Timestamp last_attempt = 2; +} + +message AliveNode { + restate.common.NodeId generational_node_id = 1; + google.protobuf.Timestamp last_heartbeat_at = 2; + // partition id is u16 but protobuf doesn't support u16. This must be a value + // that's safe to convert to u16 + map partitions = 3; +} + +message DeadNode { google.protobuf.Timestamp last_seen_alive = 1; } + +enum RunMode { + RunMode_UNKNOWN = 0; + LEADER = 1; + FOLLOWER = 2; +} + +enum ReplayStatus { + ReplayStatus_UNKNOWN = 0; + STARTING = 1; + ACTIVE = 2; + CATCHING_UP = 3; +} + +message PartitionProcessorStatus { + google.protobuf.Timestamp updated_at = 1; + RunMode planned_mode = 2; + RunMode effective_mode = 3; + optional restate.common.LeaderEpoch last_observed_leader_epoch = 4; + optional restate.common.NodeId last_observed_leader_node = 5; + optional restate.common.Lsn last_applied_log_lsn = 6; + optional google.protobuf.Timestamp last_record_applied_at = 7; + uint64 num_skipped_records = 8; + ReplayStatus replay_status = 9; + optional restate.common.Lsn last_persisted_log_lsn = 10; + optional restate.common.Lsn last_archived_log_lsn = 12; + // Set if replay_status is CATCHING_UP + optional restate.common.Lsn target_tail_lsn = 11; +} + +enum NodeSetSelectionStrategyKind { + NodeSetSelectionStrategyKind_UNKNOWN = 0; + StrictFaultTolerantGreedy = 1; +} + +message NodeSetSelectionStrategy { NodeSetSelectionStrategyKind kind = 1; } + +message ReplicatedProviderConfig { + string replication_property = 1; + NodeSetSelectionStrategy nodeset_selection_strategy = 2; +} + +message DefaultProvider { + string provider = 1; + // only required if provider = "replicated" + optional ReplicatedProviderConfig replicated_config = 2; +} + +enum ReplicationStrategyKind { + ReplicationStrategyKind_UNKNOWN = 0; + OnAllNodes = 1; + Factor = 2; +} + +message ReplicationStrategy { + ReplicationStrategyKind kind = 1; + // required if kind == "Factor" + optional uint32 factor = 2; +} + +message ClusterConfiguration { + uint32 num_partitions = 1; + ReplicationStrategy replication_strategy = 2; + DefaultProvider default_provider = 3; +} diff --git a/src/main/proto/restate/common.proto b/src/main/proto/restate/common.proto new file mode 100644 index 0000000..eb8488b --- /dev/null +++ b/src/main/proto/restate/common.proto @@ -0,0 +1,134 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH +// +// This file is part of the Restate service protocol, which is +// released under the MIT license. +// +// You can find a copy of the license in file LICENSE in the root +// directory of this repository or package, or at +// https://github.com/restatedev/proto/blob/main/LICENSE + +syntax = "proto3"; + +package restate.common; + +enum ProtocolVersion { + ProtocolVersion_UNKNOWN = 0; + FLEXBUFFERS = 1; +} + +message NodeId { + uint32 id = 1; + optional uint32 generation = 2; +} + +// Partition Processor leadership epoch number +message LeaderEpoch { uint64 value = 1; } + +// Log sequence number +message Lsn { uint64 value = 1; } + +// A generic type for versioned metadata +message Version { uint32 value = 1; } + +// The handle name or type tag of the message. For every target there must be +// exactly one message handler implementation. +enum TargetName { + reserved 7, 8; + TargetName_UNKNOWN = 0; + METADATA_MANAGER = 1; + INGRESS = 2; + LOCAL_METADATA_STORE = 3; + LOCAL_METADATA_STORE_CLIENT = 4; + ATTACH_REQUEST = 5; + ATTACH_RESPONSE = 6; + CONTROL_PROCESSORS = 9; + // LogServer + LOG_SERVER_STORE = 10; + LOG_SERVER_STORED = 11; + LOG_SERVER_RELEASE = 12; + LOG_SERVER_RELEASED = 13; + LOG_SERVER_SEAL = 14; + LOG_SERVER_SEALED = 15; + LOG_SERVER_GET_LOGLET_INFO = 16; + LOG_SERVER_LOGLET_INFO = 17; + LOG_SERVER_GET_RECORDS = 18; + LOG_SERVER_RECORDS = 19; + LOG_SERVER_TRIM = 20; + LOG_SERVER_TRIMMED = 21; + LOG_SERVER_WAIT_FOR_TAIL = 22; + LOG_SERVER_TAIL_UPDATED = 23; + LOG_SERVER_GET_DIGEST = 24; + LOG_SERVER_DIGEST = 25; + // Reserving space for more log-server messages + // ReplicatedLoglet + REPLICATED_LOGLET_APPEND = 40; + REPLICATED_LOGLET_APPENDED = 41; + REPLICATED_LOGLET_GET_SEQUENCER_STATE = 42; + REPLICATED_LOGLET_SEQUENCER_STATE = 43; + // Partition Processor + PARTITION_CREATE_SNAPSHOT_REQUEST = 50; + PARTITION_CREATE_SNAPSHOT_RESPONSE = 51; + PARTITION_PROCESSOR_RPC = 52; + PARTITION_PROCESSOR_RPC_RESPONSE = 53; + // Node + NODE_GET_NODE_STATE_REQUEST = 60; + NODE_GET_NODE_STATE_RESPONSE = 61; + // Remote Scanner + REMOTE_QUERY_SCANNER_OPEN = 80; + REMOTE_QUERY_SCANNER_OPENED = 81; + REMOTE_QUERY_SCANNER_NEXT = 82; + REMOTE_QUERY_SCANNER_NEXT_RESULT = 83; + REMOTE_QUERY_SCANNER_CLOSE = 84; + REMOTE_QUERY_SCANNER_CLOSED = 85; +} + +// ** Health & Per-role Status + +enum NodeStatus { + NodeStatus_UNKNOWN = 0; + // The node has joined the cluster and is fully operational. + ALIVE = 1; + // The node is not fully running yet. + STARTING_UP = 2; + // The node is performing a graceful shutdown. + SHUTTING_DOWN = 3; +} + +enum NodeRpcStatus { + NodeRpcStatus_UNKNOWN = 0; + NodeRpcStatus_READY = 1; + NodeRpcStatus_STARTING_UP = 2; + NodeRpcStatus_STOPPING = 3; +} + +enum WorkerStatus { + WorkerStatus_UNKNOWN = 0; + WorkerStatus_READY = 1; + WorkerStatus_STARTING_UP = 2; +} + +enum AdminStatus { + AdminStatus_UNKNOWN = 0; + AdminStatus_READY = 1; + AdminStatus_STARTING_UP = 2; +} + +enum LogServerStatus { + LogServerStatus_UNKNOWN = 0; + LogServerStatus_READY = 1; + LogServerStatus_STARTING_UP = 2; + LogServerStatus_FAILSAFE = 3; + LogServerStatus_STOPPING = 4; +} + +enum MetadataServerStatus { + MetadataServerStatus_UNKNOWN = 0; + MetadataServerStatus_READY = 1; + MetadataServerStatus_STARTING_UP = 2; +} + +enum IngressStatus { + IngressStatus_UNKNOWN = 0; + IngressStatus_READY = 1; + IngressStatus_STARTING_UP = 2; +} diff --git a/src/main/proto/restate/node.proto b/src/main/proto/restate/node.proto new file mode 100644 index 0000000..60e5a84 --- /dev/null +++ b/src/main/proto/restate/node.proto @@ -0,0 +1,93 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH +// +// This file is part of the Restate service protocol, which is +// released under the MIT license. +// +// You can find a copy of the license in file LICENSE in the root +// directory of this repository or package, or at +// https://github.com/restatedev/proto/blob/main/LICENSE + +syntax = "proto3"; + +import "restate/common.proto"; + +package restate.node; + +// +// # Wire Protocol Of Streaming Connections +// ------------------------------------- +// +message Header { + restate.common.Version my_nodes_config_version = 1; + optional restate.common.Version my_logs_version = 2; + optional restate.common.Version my_schema_version = 3; + optional restate.common.Version my_partition_table_version = 4; + /// A unique monotonically increasing identifier of this message/request per + /// producer. The uniqueness domain is the generational node id. This is + /// always set for all messages (whether it's a request or a response) + uint64 msg_id = 5; + /// The msg_id at which we are responding to. Unset if this not to be + /// considered a response. Note: If this is set, it's the responsibility of + /// the message producer to ensure that the response is sent to the original + /// producer (generational node id). + optional uint64 in_response_to = 6; + optional SpanContext span_context = 7; +} + +message SpanContext { map fields = 1; } + +// First message sent to an ingress after starting the connection. The message +// must be sent before any other message. +message Hello { + restate.common.ProtocolVersion min_protocol_version = 1; + restate.common.ProtocolVersion max_protocol_version = 2; + // generational node id of sender (who am I) + restate.common.NodeId my_node_id = 3; + string cluster_name = 4; +} + +message Welcome { + restate.common.ProtocolVersion protocol_version = 2; + // generational node id of sender + restate.common.NodeId my_node_id = 3; +} + +// Bidirectional Communication +message Message { + enum Signal { + Signal_UNKNOWN = 0; + SHUTDOWN = 1; + // Connection will be dropped + DRAIN_CONNECTION = 2; + CODEC_ERROR = 3; + } + message ConnectionControl { + Signal signal = 1; + string message = 2; + } + + message BinaryMessage { + restate.common.TargetName target = 1; + bytes payload = 2; + } + + Header header = 1; + oneof body { + ConnectionControl connection_control = 2; + // Sent as first message + Hello hello = 3; + // Sent as first response + Welcome welcome = 4; + BinaryMessage encoded = 5; + } +} + +// # Common node types + +enum MetadataKind { + MetadataKind_UNKNOWN = 0; + NODES_CONFIGURATION = 1; + SCHEMA = 2; + PARTITION_TABLE = 3; + LOGS = 4; +}