Skip to content

Commit

Permalink
Adapt Kafka ingress tests to event shape changes (#203)
Browse files Browse the repository at this point in the history
  • Loading branch information
slinkydeveloper authored Oct 16, 2023
1 parent 7d03797 commit 895b16e
Show file tree
Hide file tree
Showing 9 changed files with 142 additions and 21 deletions.
9 changes: 7 additions & 2 deletions contracts/src/main/proto/counter.proto
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ option java_outer_classname = "CounterProto";

import "google/protobuf/empty.proto";
import "dev/restate/ext.proto";
import "dev/restate/events.proto";

package counter;

Expand All @@ -18,7 +17,7 @@ service Counter {
rpc Get (CounterRequest) returns (GetResponse);
rpc GetAndAdd (CounterAddRequest) returns (CounterUpdateResult);
rpc InfiniteIncrementLoop (CounterRequest) returns (google.protobuf.Empty);
rpc HandleEvent(dev.restate.StringKeyedEvent) returns (google.protobuf.Empty);
rpc HandleEvent(UpdateCounterEvent) returns (google.protobuf.Empty);
}

service ProxyCounter {
Expand All @@ -43,4 +42,10 @@ message GetResponse {
message CounterUpdateResult {
int64 old_value = 1;
int64 new_value = 2;
}

message UpdateCounterEvent {
string counter_name = 1 [(dev.restate.ext.field) = KEY];
bytes payload = 2 [(dev.restate.ext.field) = EVENT_PAYLOAD];
map<string, string> metadata = 3 [(dev.restate.ext.field) = EVENT_METADATA];
}
16 changes: 16 additions & 0 deletions contracts/src/main/proto/event_handler.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
syntax = "proto3";

option java_package = "dev.restate.e2e.services.eventhandler";
option java_outer_classname = "EventHandlerProto";

import "google/protobuf/empty.proto";
import "dev/restate/ext.proto";
import "dev/restate/events.proto";

package eventhandler;

service EventHandler {
option (dev.restate.ext.service_type) = KEYED;

rpc Handle (dev.restate.Event) returns (google.protobuf.Empty);
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import dev.restate.e2e.services.counter.SingletonCounterService;
import dev.restate.e2e.services.errors.FailingService;
import dev.restate.e2e.services.errors.FailingServiceGrpc;
import dev.restate.e2e.services.eventhandler.EventHandlerGrpc;
import dev.restate.e2e.services.eventhandler.EventHandlerService;
import dev.restate.e2e.services.externalcall.RandomNumberListGeneratorGrpc;
import dev.restate.e2e.services.externalcall.RandomNumberListGeneratorService;
import dev.restate.e2e.services.nondeterminism.NonDeterministicService;
Expand Down Expand Up @@ -67,6 +69,9 @@ public static void main(String[] args) {
case UpgradeTestServiceGrpc.SERVICE_NAME:
restateHttpEndpointBuilder.withService(new UpgradeTestService());
break;
case EventHandlerGrpc.SERVICE_NAME:
restateHttpEndpointBuilder.withService(new EventHandlerService());
break;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import static dev.restate.e2e.services.counter.CounterProto.*;

import com.google.protobuf.Empty;
import dev.restate.generated.StringKeyedEvent;
import dev.restate.sdk.blocking.RestateBlockingService;
import dev.restate.sdk.core.StateKey;
import io.grpc.Status;
Expand Down Expand Up @@ -64,7 +63,10 @@ public void addThenFail(CounterAddRequest request, StreamObserver<Empty> respons
public void get(CounterRequest request, StreamObserver<GetResponse> responseObserver) {
var ctx = restateContext();

GetResponse result = GetResponse.newBuilder().setValue(ctx.get(COUNTER_KEY).orElse(0L)).build();
long counter = ctx.get(COUNTER_KEY).orElse(0L);
logger.info("Get counter '{}' value: {}", request.getCounterName(), counter);

GetResponse result = GetResponse.newBuilder().setValue(counter).build();

responseObserver.onNext(result);
responseObserver.onCompleted();
Expand All @@ -88,16 +90,16 @@ public void getAndAdd(
}

@Override
public void handleEvent(StringKeyedEvent request, StreamObserver<Empty> responseObserver) {
public void handleEvent(UpdateCounterEvent request, StreamObserver<Empty> responseObserver) {
var ctx = restateContext();

long counter = ctx.get(COUNTER_KEY).orElse(0L);
logger.info("Old counter '{}' value: {}", request.getKey(), counter);
logger.info("Old counter '{}' value: {}", request.getCounterName(), counter);

counter += Long.parseLong(request.getPayload().toStringUtf8());
ctx.set(COUNTER_KEY, counter);

logger.info("New counter '{}' value: {}", request.getKey(), counter);
logger.info("New counter '{}' value: {}", request.getCounterName(), counter);

responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package dev.restate.e2e.services.eventhandler;

import com.google.protobuf.Empty;
import dev.restate.e2e.services.counter.CounterGrpc;
import dev.restate.e2e.services.counter.CounterProto;
import dev.restate.generated.Event;
import dev.restate.sdk.blocking.RestateBlockingService;
import io.grpc.stub.StreamObserver;

public class EventHandlerService extends EventHandlerGrpc.EventHandlerImplBase
implements RestateBlockingService {

@Override
public void handle(Event event, StreamObserver<Empty> responseObserver) {
restateContext()
.oneWayCall(
CounterGrpc.getAddMethod(),
CounterProto.CounterAddRequest.newBuilder()
.setCounterName(event.getKey().toStringUtf8())
.setValue(Long.parseLong(event.getPayload().toStringUtf8()))
.build());
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
}
}
10 changes: 10 additions & 0 deletions services/node-services/src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { protoMetadata as sideEffectProtoMetadata } from "./generated/side_effec
import { protoMetadata as proxyProtoMetadata } from "./generated/proxy";
import { protoMetadata as rngProtoMetadata } from "./generated/rng";
import { protoMetadata as awakeableHolderProtoMetadata } from "./generated/awakeable_holder";
import { protoMetadata as eventHandlerProtoMetadata } from "./generated/event_handler";
import { CounterService, CounterServiceFQN } from "./counter";
import { ListService, ListServiceFQN } from "./collections";
import { FailingService, FailingServiceFQN } from "./errors";
Expand Down Expand Up @@ -43,6 +44,7 @@ import {
CounterHandlerAPIFQN,
CounterHandlerAPIRouter,
} from "./handler_api";
import { EventHandlerFQN, EventHandlerService } from "./event_handler";

let serverBuilder = restate.createServer();

Expand Down Expand Up @@ -166,6 +168,14 @@ const services = new Map<
keyedRouter: CounterHandlerAPIRouter,
},
],
[
EventHandlerFQN,
{
descriptor: eventHandlerProtoMetadata,
service: "EventHandler",
instance: new EventHandlerService(),
},
],
]);

console.log(services.keys());
Expand Down
3 changes: 2 additions & 1 deletion services/node-services/src/counter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
CounterRequest,
CounterAddRequest,
CounterUpdateResult,
UpdateCounterEvent,
GetResponse,
protobufPackage,
} from "./generated/counter";
Expand Down Expand Up @@ -87,7 +88,7 @@ export class CounterService implements Counter {
}
}

async handleEvent(request: StringKeyedEvent): Promise<Empty> {
async handleEvent(request: UpdateCounterEvent): Promise<Empty> {
console.log("handleEvent: " + JSON.stringify(request));
const ctx = restate.useContext(this);

Expand Down
25 changes: 25 additions & 0 deletions services/node-services/src/event_handler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import * as restate from "@restatedev/restate-sdk";

import { Empty } from "./generated/google/protobuf/empty";
import { EventHandler, protobufPackage } from "./generated/event_handler";
import { Event } from "./generated/dev/restate/events";
import { CounterClientImpl } from "./generated/counter";

export const EventHandlerFQN = protobufPackage + ".EventHandler";

export class EventHandlerService implements EventHandler {
async handle(event: Event): Promise<Empty> {
console.log("handleEvent: " + JSON.stringify(event));
const ctx = restate.useContext(this);

const counterClient = new CounterClientImpl(ctx);
await ctx.oneWayCall(() =>
counterClient.add({
counterName: event.key.toString(),
value: parseInt(event.payload.toString()),
})
);

return Empty.create({});
}
}
58 changes: 45 additions & 13 deletions tests/src/test/kotlin/dev/restate/e2e/KafkaIngressTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import com.fasterxml.jackson.databind.ObjectMapper
import dev.restate.e2e.services.counter.CounterGrpc
import dev.restate.e2e.services.counter.CounterGrpc.CounterBlockingStub
import dev.restate.e2e.services.counter.counterRequest
import dev.restate.e2e.services.eventhandler.EventHandlerGrpc
import dev.restate.e2e.utils.*
import dev.restate.e2e.utils.config.*
import dev.restate.e2e.utils.meta.client.SubscriptionsClient
import dev.restate.e2e.utils.meta.models.CreateSubscriptionRequest
import io.grpc.MethodDescriptor
import java.net.URI
import java.net.URL
import java.net.http.HttpClient
Expand All @@ -24,8 +26,11 @@ import org.awaitility.kotlin.untilAsserted
import org.awaitility.kotlin.untilCallTo
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.RegisterExtension
import org.junit.jupiter.api.parallel.Execution
import org.junit.jupiter.api.parallel.ExecutionMode

private const val TOPIC = "my-topic"
private const val COUNTER_TOPIC = "counter"
private const val EVENT_HANDLER_TOPIC = "event-handler"

private fun kafkaClusterOptions(): RestateConfigSchema {
return RestateConfigSchema()
Expand All @@ -48,8 +53,10 @@ class JavaKafkaIngressTest : BaseKafkaIngressTest() {
RestateDeployerExtension(
RestateDeployer.Builder()
.withEnv(Containers.getRestateEnvironment())
.withServiceEndpoint(Containers.JAVA_COUNTER_SERVICE_SPEC)
.withContainer("kafka", KafkaContainer(TOPIC))
.withServiceEndpoint(
Containers.javaServicesContainer(
"java-counter", CounterGrpc.SERVICE_NAME, EventHandlerGrpc.SERVICE_NAME))
.withContainer("kafka", KafkaContainer(COUNTER_TOPIC, EVENT_HANDLER_TOPIC))
.withConfig(kafkaClusterOptions())
.build())
}
Expand All @@ -62,8 +69,10 @@ class NodeKafkaIngressTest : BaseKafkaIngressTest() {
RestateDeployerExtension(
RestateDeployer.Builder()
.withEnv(Containers.getRestateEnvironment())
.withServiceEndpoint(Containers.NODE_COUNTER_SERVICE_SPEC)
.withContainer("kafka", KafkaContainer(TOPIC))
.withServiceEndpoint(
Containers.nodeServicesContainer(
"node-counter", CounterGrpc.SERVICE_NAME, EventHandlerGrpc.SERVICE_NAME))
.withContainer("kafka", KafkaContainer(COUNTER_TOPIC, EVENT_HANDLER_TOPIC))
.withConfig(kafkaClusterOptions())
.build())
}
Expand All @@ -72,10 +81,33 @@ class NodeKafkaIngressTest : BaseKafkaIngressTest() {
abstract class BaseKafkaIngressTest {

@Test
fun handleKeyedEvent(
@Execution(ExecutionMode.CONCURRENT)
fun handleEventInCounterService(
@InjectMetaURL metaURL: URL,
@InjectContainerPort(hostName = "kafka", port = KafkaContainer.EXTERNAL_PORT) kafkaPort: Int,
@InjectBlockingStub counterClient: CounterBlockingStub
) {
counterEventsTest(
metaURL, kafkaPort, counterClient, COUNTER_TOPIC, CounterGrpc.getHandleEventMethod())
}

@Test
@Execution(ExecutionMode.CONCURRENT)
fun handleEventInEventHandler(
@InjectMetaURL metaURL: URL,
@InjectContainerPort(hostName = "kafka", port = KafkaContainer.EXTERNAL_PORT) kafkaPort: Int,
@InjectBlockingStub counterClient: CounterBlockingStub
) {
counterEventsTest(
metaURL, kafkaPort, counterClient, EVENT_HANDLER_TOPIC, EventHandlerGrpc.getHandleMethod())
}

fun counterEventsTest(
metaURL: URL,
kafkaPort: Int,
counterClient: CounterBlockingStub,
topic: String,
methodDescriptor: MethodDescriptor<*, *>
) {
val counter = UUID.randomUUID().toString()

Expand All @@ -85,15 +117,14 @@ abstract class BaseKafkaIngressTest {
assertThat(
subscriptionsClient.createSubscription(
CreateSubscriptionRequest(
source = "kafka://my-cluster/$TOPIC",
sink =
"service://${CounterGrpc.SERVICE_NAME}/${CounterGrpc.getHandleEventMethod().bareMethodName}",
source = "kafka://my-cluster/$topic",
sink = "service://${methodDescriptor.fullMethodName}",
options = mapOf("auto.offset.reset" to "earliest"))))
.extracting { it.statusCode }
.isEqualTo(201)

// Produce message to kafka
produceMessageToKafka("PLAINTEXT://localhost:$kafkaPort", TOPIC, counter, listOf("1", "2", "3"))
produceMessageToKafka("PLAINTEXT://localhost:$kafkaPort", topic, counter, listOf("1", "2", "3"))

// Now wait for the update to be visible
await untilCallTo
Expand All @@ -117,7 +148,7 @@ class NodeHandlerAPIKafkaIngressTest {
.withServiceEndpoint(
Containers.nodeServicesContainer(
"node-counter", Containers.HANDLER_API_COUNTER_SERVICE_NAME))
.withContainer("kafka", KafkaContainer(TOPIC))
.withContainer("kafka", KafkaContainer(COUNTER_TOPIC))
.withConfig(kafkaClusterOptions())
.build())
}
Expand All @@ -136,14 +167,15 @@ class NodeHandlerAPIKafkaIngressTest {
assertThat(
subscriptionsClient.createSubscription(
CreateSubscriptionRequest(
source = "kafka://my-cluster/$TOPIC",
source = "kafka://my-cluster/$COUNTER_TOPIC",
sink = "service://${Containers.HANDLER_API_COUNTER_SERVICE_NAME}/handleEvent",
options = mapOf("auto.offset.reset" to "earliest"))))
.extracting { it.statusCode }
.isEqualTo(201)

// Produce message to kafka
produceMessageToKafka("PLAINTEXT://localhost:$kafkaPort", TOPIC, counter, listOf("1", "2", "3"))
produceMessageToKafka(
"PLAINTEXT://localhost:$kafkaPort", COUNTER_TOPIC, counter, listOf("1", "2", "3"))

val client = HttpClient.newHttpClient()

Expand Down

0 comments on commit 895b16e

Please sign in to comment.