diff --git a/services/node-services/src/app.ts b/services/node-services/src/app.ts index 2c1a358b..8c23a4bf 100644 --- a/services/node-services/src/app.ts +++ b/services/node-services/src/app.ts @@ -37,11 +37,19 @@ import { AwakeableHolderService, AwakeableHolderServiceFQN, } from "./awakeable_holder"; -import { HandlerAPIEchoTestFQN, HandlerApiEchoRouter } from "./handler_api"; +import { + HandlerAPIEchoTestFQN, + HandlerAPIEchoRouter, + CounterHandlerAPIFQN, + CounterHandlerAPIRouter, +} from "./handler_api"; let serverBuilder = restate.createServer(); -const services = new Map([ +const services = new Map< + string, + restate.ServiceOpts | { router: any } | { keyedRouter: any } +>([ [ CounterServiceFQN, { @@ -149,7 +157,13 @@ const services = new Map([ [ HandlerAPIEchoTestFQN, { - router: HandlerApiEchoRouter, + router: HandlerAPIEchoRouter, + }, + ], + [ + CounterHandlerAPIFQN, + { + keyedRouter: CounterHandlerAPIRouter, }, ], ]); @@ -171,12 +185,20 @@ for (let service of servicesEnv) { serverBuilder = serverBuilder.bindService( foundService as restate.ServiceOpts ); - } else { + } else if ( + (foundService as restate.UnKeyedRouter).router !== undefined + ) { console.log("Mounting router " + service); serverBuilder = serverBuilder.bindRouter( service, (foundService as { router: any }).router ); + } else { + console.log("Mounting keyed router " + service); + serverBuilder = serverBuilder.bindKeyedRouter( + service, + (foundService as { keyedRouter: any }).keyedRouter + ); } } diff --git a/services/node-services/src/counter.ts b/services/node-services/src/counter.ts index 991a5fa4..d2b89108 100644 --- a/services/node-services/src/counter.ts +++ b/services/node-services/src/counter.ts @@ -8,9 +8,7 @@ import { GetResponse, protobufPackage, } from "./generated/counter"; -import { - StringKeyedEvent -} from "./generated/dev/restate/events"; +import { StringKeyedEvent } from "./generated/dev/restate/events"; import { Empty } from "./generated/google/protobuf/empty"; import { AwakeableHolderServiceClientImpl } from "./generated/awakeable_holder"; @@ -89,13 +87,13 @@ export class CounterService implements Counter { } } - async handleEvent(request: StringKeyedEvent): Promise { - console.log("handleEvent: " + JSON.stringify(request)); - const ctx = restate.useContext(this); + async handleEvent(request: StringKeyedEvent): Promise { + console.log("handleEvent: " + JSON.stringify(request)); + const ctx = restate.useContext(this); - const value = (await ctx.get(COUNTER_KEY)) || 0; - ctx.set(COUNTER_KEY, value + parseInt(request.payload.toString())); + const value = (await ctx.get(COUNTER_KEY)) || 0; + ctx.set(COUNTER_KEY, value + parseInt(request.payload.toString())); - return Empty.create({}); - } + return Empty.create({}); + } } diff --git a/services/node-services/src/handler_api.ts b/services/node-services/src/handler_api.ts index 75065b24..b13b2760 100644 --- a/services/node-services/src/handler_api.ts +++ b/services/node-services/src/handler_api.ts @@ -2,6 +2,8 @@ import * as restate from "@restatedev/restate-sdk"; export const HandlerAPIEchoTestFQN = "handlerapi.HandlerAPIEchoTest"; +const COUNTER_KEY = "counter"; + // These two handlers just test the correct propagation of the input message in the output const echo = (ctx: restate.RpcContext, msg: any): Promise => { return msg; @@ -10,11 +12,41 @@ const echoEcho = async (ctx: restate.RpcContext, msg: any): Promise => { return await ctx.rpc(handlerApi).echo(msg); }; -const handlerApi: restate.ServiceApi = { +const handlerApi: restate.ServiceApi = { path: HandlerAPIEchoTestFQN, }; -export const HandlerApiEchoRouter = restate.router({ +export const HandlerAPIEchoRouter = restate.router({ echo, echoEcho, }); + +// -- Counter service + +const get = async ( + ctx: restate.RpcContext, + key: string, + value: any +): Promise<{ counter: number }> => { + console.log("get: " + JSON.stringify(key) + " " + JSON.stringify(value)); + + const counter = (await ctx.get(COUNTER_KEY)) || 0; + + return { counter }; +}; + +const handleEvent = async (ctx: restate.RpcContext, request: restate.Event) => { + console.log("handleEvent: " + JSON.stringify(request)); + + const value = (await ctx.get(COUNTER_KEY)) || 0; + const eventValue = parseInt(new TextDecoder().decode(request.body())); + console.log("Event value: " + eventValue); + ctx.set(COUNTER_KEY, value + eventValue); +}; + +export const CounterHandlerAPIFQN = "handlerapi.Counter"; + +export const CounterHandlerAPIRouter = restate.keyedRouter({ + get, + handleEvent: restate.keyedEventHandler(handleEvent), +}); diff --git a/tests/build.gradle.kts b/tests/build.gradle.kts index c04f3d3d..61175af7 100644 --- a/tests/build.gradle.kts +++ b/tests/build.gradle.kts @@ -35,7 +35,9 @@ tasks { (if (System.getenv("RESTATE_RUNTIME_CONTAINER").isNullOrEmpty()) "ghcr.io/restatedev/restate:main" else System.getenv("RESTATE_RUNTIME_CONTAINER")), - "RUST_LOG" to (System.getenv("RUST_LOG") ?: "info,restate_invoker=trace,restate=debug"), + "RUST_LOG" to + (System.getenv("RUST_LOG") + ?: "info,restate_invoker=trace,restate_ingress_kafka=trace,restate=debug"), "RUST_BACKTRACE" to "full") test { diff --git a/tests/src/test/kotlin/dev/restate/e2e/Containers.kt b/tests/src/test/kotlin/dev/restate/e2e/Containers.kt index 90fe2aac..99c4ebb6 100644 --- a/tests/src/test/kotlin/dev/restate/e2e/Containers.kt +++ b/tests/src/test/kotlin/dev/restate/e2e/Containers.kt @@ -104,6 +104,7 @@ object Containers { nodeServicesContainer("node-proxy", ProxyServiceGrpc.SERVICE_NAME).build() const val HANDLER_API_ECHO_TEST_SERVICE_NAME = "handlerapi.HandlerAPIEchoTest" + const val HANDLER_API_COUNTER_SERVICE_NAME = "handlerapi.Counter" val NODE_HANDLER_API_ECHO_TEST_SERVICE_SPEC = nodeServicesContainer("node-proxy", HANDLER_API_ECHO_TEST_SERVICE_NAME).build() diff --git a/tests/src/test/kotlin/dev/restate/e2e/KafkaIngressTest.kt b/tests/src/test/kotlin/dev/restate/e2e/KafkaIngressTest.kt index 81f57327..8e30f16b 100644 --- a/tests/src/test/kotlin/dev/restate/e2e/KafkaIngressTest.kt +++ b/tests/src/test/kotlin/dev/restate/e2e/KafkaIngressTest.kt @@ -8,7 +8,10 @@ import dev.restate.e2e.utils.* import dev.restate.e2e.utils.config.* import dev.restate.e2e.utils.meta.client.SubscriptionsClient import dev.restate.e2e.utils.meta.models.CreateSubscriptionRequest +import java.net.URI import java.net.URL +import java.net.http.HttpClient +import java.net.http.HttpRequest import java.util.* import okhttp3.OkHttpClient import org.apache.kafka.clients.producer.KafkaProducer @@ -17,6 +20,7 @@ import org.apache.kafka.clients.producer.ProducerRecord import org.assertj.core.api.Assertions.assertThat import org.awaitility.kotlin.await import org.awaitility.kotlin.matches +import org.awaitility.kotlin.untilAsserted import org.awaitility.kotlin.untilCallTo import org.junit.jupiter.api.Test import org.junit.jupiter.api.extension.RegisterExtension @@ -67,24 +71,6 @@ class NodeKafkaIngressTest : BaseKafkaIngressTest() { abstract class BaseKafkaIngressTest { - private fun produceMessageToKafka( - bootstrapServer: String, - topic: String, - key: String, - values: List - ) { - val props = Properties() - props["bootstrap.servers"] = bootstrapServer - props["key.serializer"] = "org.apache.kafka.common.serialization.StringSerializer" - props["value.serializer"] = "org.apache.kafka.common.serialization.StringSerializer" - - val producer: Producer = KafkaProducer(props) - for (value in values) { - producer.send(ProducerRecord(topic, key, value)) - } - producer.close() - } - @Test fun handleKeyedEvent( @InjectMetaURL metaURL: URL, @@ -119,3 +105,85 @@ abstract class BaseKafkaIngressTest { } } } + +class NodeHandlerAPIKafkaIngressTest { + + companion object { + @RegisterExtension + val deployerExt: RestateDeployerExtension = + RestateDeployerExtension( + RestateDeployer.Builder() + .withEnv(Containers.getRestateEnvironment()) + .withServiceEndpoint( + Containers.nodeServicesContainer( + "node-counter", Containers.HANDLER_API_COUNTER_SERVICE_NAME)) + .withContainer("kafka", KafkaContainer(TOPIC)) + .withConfig(kafkaClusterOptions()) + .build()) + } + + @Test + fun handleKeyedEvent( + @InjectMetaURL metaURL: URL, + @InjectContainerPort(hostName = "kafka", port = KafkaContainer.EXTERNAL_PORT) kafkaPort: Int, + @InjectGrpcIngressURL httpEndpointURL: URL + ) { + val counter = UUID.randomUUID().toString() + + // Create subscription + val subscriptionsClient = + SubscriptionsClient(ObjectMapper(), metaURL.toString(), OkHttpClient()) + assertThat( + subscriptionsClient.createSubscription( + CreateSubscriptionRequest( + source = "kafka://my-cluster/$TOPIC", + sink = "service://${Containers.HANDLER_API_COUNTER_SERVICE_NAME}/handleEvent", + options = mapOf("auto.offset.reset" to "earliest")))) + .extracting { it.statusCode } + .isEqualTo(201) + + // Produce message to kafka + produceMessageToKafka("PLAINTEXT://localhost:$kafkaPort", TOPIC, counter, listOf("1", "2", "3")) + + val client = HttpClient.newHttpClient() + + // Now wait for the update to be visible + await untilAsserted + { + val req = + HttpRequest.newBuilder( + URI.create( + "${httpEndpointURL}${Containers.HANDLER_API_COUNTER_SERVICE_NAME}/get")) + .POST(Utils.jacksonBodyPublisher(mapOf("key" to counter))) + .headers("Content-Type", "application/json") + .build() + + val response = client.send(req, Utils.jacksonBodyHandler()) + + assertThat(response.statusCode()).isEqualTo(200) + assertThat(response.headers().firstValue("content-type")) + .get() + .asString() + .contains("application/json") + assertThat(response.body().get("response").get("counter").asInt()).isEqualTo(6) + } + } +} + +private fun produceMessageToKafka( + bootstrapServer: String, + topic: String, + key: String, + values: List +) { + val props = Properties() + props["bootstrap.servers"] = bootstrapServer + props["key.serializer"] = "org.apache.kafka.common.serialization.StringSerializer" + props["value.serializer"] = "org.apache.kafka.common.serialization.StringSerializer" + + val producer: Producer = KafkaProducer(props) + for (value in values) { + producer.send(ProducerRecord(topic, key, value)) + } + producer.close() +}