diff --git a/contracts/src/main/proto/counter.proto b/contracts/src/main/proto/counter.proto index 322d2a51..0c3dc82f 100644 --- a/contracts/src/main/proto/counter.proto +++ b/contracts/src/main/proto/counter.proto @@ -5,7 +5,6 @@ option java_outer_classname = "CounterProto"; import "google/protobuf/empty.proto"; import "dev/restate/ext.proto"; -import "dev/restate/events.proto"; package counter; @@ -18,7 +17,7 @@ service Counter { rpc Get (CounterRequest) returns (GetResponse); rpc GetAndAdd (CounterAddRequest) returns (CounterUpdateResult); rpc InfiniteIncrementLoop (CounterRequest) returns (google.protobuf.Empty); - rpc HandleEvent(dev.restate.StringKeyedEvent) returns (google.protobuf.Empty); + rpc HandleEvent(UpdateCounterEvent) returns (google.protobuf.Empty); } service ProxyCounter { @@ -43,4 +42,10 @@ message GetResponse { message CounterUpdateResult { int64 old_value = 1; int64 new_value = 2; +} + +message UpdateCounterEvent { + string counter_name = 1 [(dev.restate.ext.field) = KEY]; + bytes payload = 2 [(dev.restate.ext.field) = EVENT_PAYLOAD]; + map metadata = 3 [(dev.restate.ext.field) = EVENT_METADATA]; } \ No newline at end of file diff --git a/contracts/src/main/proto/event_handler.proto b/contracts/src/main/proto/event_handler.proto new file mode 100644 index 00000000..cb08d9e3 --- /dev/null +++ b/contracts/src/main/proto/event_handler.proto @@ -0,0 +1,16 @@ +syntax = "proto3"; + +option java_package = "dev.restate.e2e.services.eventhandler"; +option java_outer_classname = "EventHandlerProto"; + +import "google/protobuf/empty.proto"; +import "dev/restate/ext.proto"; +import "dev/restate/events.proto"; + +package eventhandler; + +service EventHandler { + option (dev.restate.ext.service_type) = KEYED; + + rpc Handle (dev.restate.Event) returns (google.protobuf.Empty); +} diff --git a/services/java-services/src/main/java/dev/restate/e2e/services/Main.java b/services/java-services/src/main/java/dev/restate/e2e/services/Main.java index 3ed2402f..e48e8376 100644 --- a/services/java-services/src/main/java/dev/restate/e2e/services/Main.java +++ b/services/java-services/src/main/java/dev/restate/e2e/services/Main.java @@ -10,6 +10,8 @@ import dev.restate.e2e.services.counter.SingletonCounterService; import dev.restate.e2e.services.errors.FailingService; import dev.restate.e2e.services.errors.FailingServiceGrpc; +import dev.restate.e2e.services.eventhandler.EventHandlerGrpc; +import dev.restate.e2e.services.eventhandler.EventHandlerService; import dev.restate.e2e.services.externalcall.RandomNumberListGeneratorGrpc; import dev.restate.e2e.services.externalcall.RandomNumberListGeneratorService; import dev.restate.e2e.services.nondeterminism.NonDeterministicService; @@ -67,6 +69,9 @@ public static void main(String[] args) { case UpgradeTestServiceGrpc.SERVICE_NAME: restateHttpEndpointBuilder.withService(new UpgradeTestService()); break; + case EventHandlerGrpc.SERVICE_NAME: + restateHttpEndpointBuilder.withService(new EventHandlerService()); + break; } } diff --git a/services/java-services/src/main/java/dev/restate/e2e/services/counter/CounterService.java b/services/java-services/src/main/java/dev/restate/e2e/services/counter/CounterService.java index c3e802df..09ee2d52 100644 --- a/services/java-services/src/main/java/dev/restate/e2e/services/counter/CounterService.java +++ b/services/java-services/src/main/java/dev/restate/e2e/services/counter/CounterService.java @@ -3,7 +3,6 @@ import static dev.restate.e2e.services.counter.CounterProto.*; import com.google.protobuf.Empty; -import dev.restate.generated.StringKeyedEvent; import dev.restate.sdk.blocking.RestateBlockingService; import dev.restate.sdk.core.StateKey; import io.grpc.Status; @@ -64,7 +63,10 @@ public void addThenFail(CounterAddRequest request, StreamObserver respons public void get(CounterRequest request, StreamObserver responseObserver) { var ctx = restateContext(); - GetResponse result = GetResponse.newBuilder().setValue(ctx.get(COUNTER_KEY).orElse(0L)).build(); + long counter = ctx.get(COUNTER_KEY).orElse(0L); + logger.info("Get counter '{}' value: {}", request.getCounterName(), counter); + + GetResponse result = GetResponse.newBuilder().setValue(counter).build(); responseObserver.onNext(result); responseObserver.onCompleted(); @@ -88,16 +90,16 @@ public void getAndAdd( } @Override - public void handleEvent(StringKeyedEvent request, StreamObserver responseObserver) { + public void handleEvent(UpdateCounterEvent request, StreamObserver responseObserver) { var ctx = restateContext(); long counter = ctx.get(COUNTER_KEY).orElse(0L); - logger.info("Old counter '{}' value: {}", request.getKey(), counter); + logger.info("Old counter '{}' value: {}", request.getCounterName(), counter); counter += Long.parseLong(request.getPayload().toStringUtf8()); ctx.set(COUNTER_KEY, counter); - logger.info("New counter '{}' value: {}", request.getKey(), counter); + logger.info("New counter '{}' value: {}", request.getCounterName(), counter); responseObserver.onNext(Empty.getDefaultInstance()); responseObserver.onCompleted(); diff --git a/services/java-services/src/main/java/dev/restate/e2e/services/eventhandler/EventHandlerService.java b/services/java-services/src/main/java/dev/restate/e2e/services/eventhandler/EventHandlerService.java new file mode 100644 index 00000000..59bfccb8 --- /dev/null +++ b/services/java-services/src/main/java/dev/restate/e2e/services/eventhandler/EventHandlerService.java @@ -0,0 +1,25 @@ +package dev.restate.e2e.services.eventhandler; + +import com.google.protobuf.Empty; +import dev.restate.e2e.services.counter.CounterGrpc; +import dev.restate.e2e.services.counter.CounterProto; +import dev.restate.generated.Event; +import dev.restate.sdk.blocking.RestateBlockingService; +import io.grpc.stub.StreamObserver; + +public class EventHandlerService extends EventHandlerGrpc.EventHandlerImplBase + implements RestateBlockingService { + + @Override + public void handle(Event event, StreamObserver responseObserver) { + restateContext() + .oneWayCall( + CounterGrpc.getAddMethod(), + CounterProto.CounterAddRequest.newBuilder() + .setCounterName(event.getKey().toStringUtf8()) + .setValue(Long.parseLong(event.getPayload().toStringUtf8())) + .build()); + responseObserver.onNext(Empty.getDefaultInstance()); + responseObserver.onCompleted(); + } +} diff --git a/services/node-services/src/app.ts b/services/node-services/src/app.ts index 8c23a4bf..2796f1da 100644 --- a/services/node-services/src/app.ts +++ b/services/node-services/src/app.ts @@ -12,6 +12,7 @@ import { protoMetadata as sideEffectProtoMetadata } from "./generated/side_effec import { protoMetadata as proxyProtoMetadata } from "./generated/proxy"; import { protoMetadata as rngProtoMetadata } from "./generated/rng"; import { protoMetadata as awakeableHolderProtoMetadata } from "./generated/awakeable_holder"; +import { protoMetadata as eventHandlerProtoMetadata } from "./generated/event_handler"; import { CounterService, CounterServiceFQN } from "./counter"; import { ListService, ListServiceFQN } from "./collections"; import { FailingService, FailingServiceFQN } from "./errors"; @@ -43,6 +44,7 @@ import { CounterHandlerAPIFQN, CounterHandlerAPIRouter, } from "./handler_api"; +import { EventHandlerFQN, EventHandlerService } from "./event_handler"; let serverBuilder = restate.createServer(); @@ -166,6 +168,14 @@ const services = new Map< keyedRouter: CounterHandlerAPIRouter, }, ], + [ + EventHandlerFQN, + { + descriptor: eventHandlerProtoMetadata, + service: "EventHandler", + instance: new EventHandlerService(), + }, + ], ]); console.log(services.keys()); diff --git a/services/node-services/src/counter.ts b/services/node-services/src/counter.ts index d2b89108..47934638 100644 --- a/services/node-services/src/counter.ts +++ b/services/node-services/src/counter.ts @@ -5,6 +5,7 @@ import { CounterRequest, CounterAddRequest, CounterUpdateResult, + UpdateCounterEvent, GetResponse, protobufPackage, } from "./generated/counter"; @@ -87,7 +88,7 @@ export class CounterService implements Counter { } } - async handleEvent(request: StringKeyedEvent): Promise { + async handleEvent(request: UpdateCounterEvent): Promise { console.log("handleEvent: " + JSON.stringify(request)); const ctx = restate.useContext(this); diff --git a/services/node-services/src/event_handler.ts b/services/node-services/src/event_handler.ts new file mode 100644 index 00000000..f49d0658 --- /dev/null +++ b/services/node-services/src/event_handler.ts @@ -0,0 +1,25 @@ +import * as restate from "@restatedev/restate-sdk"; + +import { Empty } from "./generated/google/protobuf/empty"; +import { EventHandler, protobufPackage } from "./generated/event_handler"; +import { Event } from "./generated/dev/restate/events"; +import { CounterClientImpl } from "./generated/counter"; + +export const EventHandlerFQN = protobufPackage + ".EventHandler"; + +export class EventHandlerService implements EventHandler { + async handle(event: Event): Promise { + console.log("handleEvent: " + JSON.stringify(event)); + const ctx = restate.useContext(this); + + const counterClient = new CounterClientImpl(ctx); + await ctx.oneWayCall(() => + counterClient.add({ + counterName: event.key.toString(), + value: parseInt(event.payload.toString()), + }) + ); + + return Empty.create({}); + } +} diff --git a/tests/src/test/kotlin/dev/restate/e2e/KafkaIngressTest.kt b/tests/src/test/kotlin/dev/restate/e2e/KafkaIngressTest.kt index 8e30f16b..9d3fffd1 100644 --- a/tests/src/test/kotlin/dev/restate/e2e/KafkaIngressTest.kt +++ b/tests/src/test/kotlin/dev/restate/e2e/KafkaIngressTest.kt @@ -4,10 +4,12 @@ import com.fasterxml.jackson.databind.ObjectMapper import dev.restate.e2e.services.counter.CounterGrpc import dev.restate.e2e.services.counter.CounterGrpc.CounterBlockingStub import dev.restate.e2e.services.counter.counterRequest +import dev.restate.e2e.services.eventhandler.EventHandlerGrpc import dev.restate.e2e.utils.* import dev.restate.e2e.utils.config.* import dev.restate.e2e.utils.meta.client.SubscriptionsClient import dev.restate.e2e.utils.meta.models.CreateSubscriptionRequest +import io.grpc.MethodDescriptor import java.net.URI import java.net.URL import java.net.http.HttpClient @@ -24,8 +26,11 @@ import org.awaitility.kotlin.untilAsserted import org.awaitility.kotlin.untilCallTo import org.junit.jupiter.api.Test import org.junit.jupiter.api.extension.RegisterExtension +import org.junit.jupiter.api.parallel.Execution +import org.junit.jupiter.api.parallel.ExecutionMode -private const val TOPIC = "my-topic" +private const val COUNTER_TOPIC = "counter" +private const val EVENT_HANDLER_TOPIC = "event-handler" private fun kafkaClusterOptions(): RestateConfigSchema { return RestateConfigSchema() @@ -48,8 +53,10 @@ class JavaKafkaIngressTest : BaseKafkaIngressTest() { RestateDeployerExtension( RestateDeployer.Builder() .withEnv(Containers.getRestateEnvironment()) - .withServiceEndpoint(Containers.JAVA_COUNTER_SERVICE_SPEC) - .withContainer("kafka", KafkaContainer(TOPIC)) + .withServiceEndpoint( + Containers.javaServicesContainer( + "java-counter", CounterGrpc.SERVICE_NAME, EventHandlerGrpc.SERVICE_NAME)) + .withContainer("kafka", KafkaContainer(COUNTER_TOPIC, EVENT_HANDLER_TOPIC)) .withConfig(kafkaClusterOptions()) .build()) } @@ -62,8 +69,10 @@ class NodeKafkaIngressTest : BaseKafkaIngressTest() { RestateDeployerExtension( RestateDeployer.Builder() .withEnv(Containers.getRestateEnvironment()) - .withServiceEndpoint(Containers.NODE_COUNTER_SERVICE_SPEC) - .withContainer("kafka", KafkaContainer(TOPIC)) + .withServiceEndpoint( + Containers.nodeServicesContainer( + "node-counter", CounterGrpc.SERVICE_NAME, EventHandlerGrpc.SERVICE_NAME)) + .withContainer("kafka", KafkaContainer(COUNTER_TOPIC, EVENT_HANDLER_TOPIC)) .withConfig(kafkaClusterOptions()) .build()) } @@ -72,10 +81,33 @@ class NodeKafkaIngressTest : BaseKafkaIngressTest() { abstract class BaseKafkaIngressTest { @Test - fun handleKeyedEvent( + @Execution(ExecutionMode.CONCURRENT) + fun handleEventInCounterService( + @InjectMetaURL metaURL: URL, + @InjectContainerPort(hostName = "kafka", port = KafkaContainer.EXTERNAL_PORT) kafkaPort: Int, + @InjectBlockingStub counterClient: CounterBlockingStub + ) { + counterEventsTest( + metaURL, kafkaPort, counterClient, COUNTER_TOPIC, CounterGrpc.getHandleEventMethod()) + } + + @Test + @Execution(ExecutionMode.CONCURRENT) + fun handleEventInEventHandler( @InjectMetaURL metaURL: URL, @InjectContainerPort(hostName = "kafka", port = KafkaContainer.EXTERNAL_PORT) kafkaPort: Int, @InjectBlockingStub counterClient: CounterBlockingStub + ) { + counterEventsTest( + metaURL, kafkaPort, counterClient, EVENT_HANDLER_TOPIC, EventHandlerGrpc.getHandleMethod()) + } + + fun counterEventsTest( + metaURL: URL, + kafkaPort: Int, + counterClient: CounterBlockingStub, + topic: String, + methodDescriptor: MethodDescriptor<*, *> ) { val counter = UUID.randomUUID().toString() @@ -85,15 +117,14 @@ abstract class BaseKafkaIngressTest { assertThat( subscriptionsClient.createSubscription( CreateSubscriptionRequest( - source = "kafka://my-cluster/$TOPIC", - sink = - "service://${CounterGrpc.SERVICE_NAME}/${CounterGrpc.getHandleEventMethod().bareMethodName}", + source = "kafka://my-cluster/$topic", + sink = "service://${methodDescriptor.fullMethodName}", options = mapOf("auto.offset.reset" to "earliest")))) .extracting { it.statusCode } .isEqualTo(201) // Produce message to kafka - produceMessageToKafka("PLAINTEXT://localhost:$kafkaPort", TOPIC, counter, listOf("1", "2", "3")) + produceMessageToKafka("PLAINTEXT://localhost:$kafkaPort", topic, counter, listOf("1", "2", "3")) // Now wait for the update to be visible await untilCallTo @@ -117,7 +148,7 @@ class NodeHandlerAPIKafkaIngressTest { .withServiceEndpoint( Containers.nodeServicesContainer( "node-counter", Containers.HANDLER_API_COUNTER_SERVICE_NAME)) - .withContainer("kafka", KafkaContainer(TOPIC)) + .withContainer("kafka", KafkaContainer(COUNTER_TOPIC)) .withConfig(kafkaClusterOptions()) .build()) } @@ -136,14 +167,15 @@ class NodeHandlerAPIKafkaIngressTest { assertThat( subscriptionsClient.createSubscription( CreateSubscriptionRequest( - source = "kafka://my-cluster/$TOPIC", + source = "kafka://my-cluster/$COUNTER_TOPIC", sink = "service://${Containers.HANDLER_API_COUNTER_SERVICE_NAME}/handleEvent", options = mapOf("auto.offset.reset" to "earliest")))) .extracting { it.statusCode } .isEqualTo(201) // Produce message to kafka - produceMessageToKafka("PLAINTEXT://localhost:$kafkaPort", TOPIC, counter, listOf("1", "2", "3")) + produceMessageToKafka( + "PLAINTEXT://localhost:$kafkaPort", COUNTER_TOPIC, counter, listOf("1", "2", "3")) val client = HttpClient.newHttpClient()