Skip to content

Commit

Permalink
Add Handler API Kafka test (#198)
Browse files Browse the repository at this point in the history
  • Loading branch information
slinkydeveloper authored Sep 21, 2023
1 parent 833aad7 commit 4da62d2
Show file tree
Hide file tree
Showing 6 changed files with 158 additions and 35 deletions.
30 changes: 26 additions & 4 deletions services/node-services/src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,19 @@ import {
AwakeableHolderService,
AwakeableHolderServiceFQN,
} from "./awakeable_holder";
import { HandlerAPIEchoTestFQN, HandlerApiEchoRouter } from "./handler_api";
import {
HandlerAPIEchoTestFQN,
HandlerAPIEchoRouter,
CounterHandlerAPIFQN,
CounterHandlerAPIRouter,
} from "./handler_api";

let serverBuilder = restate.createServer();

const services = new Map<string, restate.ServiceOpts | { router: any }>([
const services = new Map<
string,
restate.ServiceOpts | { router: any } | { keyedRouter: any }

Check warning on line 51 in services/node-services/src/app.ts

View workflow job for this annotation

GitHub Actions / build

Unexpected any. Specify a different type

Check warning on line 51 in services/node-services/src/app.ts

View workflow job for this annotation

GitHub Actions / build

Unexpected any. Specify a different type

Check warning on line 51 in services/node-services/src/app.ts

View workflow job for this annotation

GitHub Actions / build

Unexpected any. Specify a different type

Check warning on line 51 in services/node-services/src/app.ts

View workflow job for this annotation

GitHub Actions / build

Unexpected any. Specify a different type

Check warning on line 51 in services/node-services/src/app.ts

View workflow job for this annotation

GitHub Actions / build

Unexpected any. Specify a different type

Check warning on line 51 in services/node-services/src/app.ts

View workflow job for this annotation

GitHub Actions / build

Unexpected any. Specify a different type
>([
[
CounterServiceFQN,
{
Expand Down Expand Up @@ -149,7 +157,13 @@ const services = new Map<string, restate.ServiceOpts | { router: any }>([
[
HandlerAPIEchoTestFQN,
{
router: HandlerApiEchoRouter,
router: HandlerAPIEchoRouter,
},
],
[
CounterHandlerAPIFQN,
{
keyedRouter: CounterHandlerAPIRouter,
},
],
]);
Expand All @@ -171,12 +185,20 @@ for (let service of servicesEnv) {
serverBuilder = serverBuilder.bindService(
foundService as restate.ServiceOpts
);
} else {
} else if (
(foundService as restate.UnKeyedRouter<any>).router !== undefined

Check warning on line 189 in services/node-services/src/app.ts

View workflow job for this annotation

GitHub Actions / build

Unexpected any. Specify a different type

Check warning on line 189 in services/node-services/src/app.ts

View workflow job for this annotation

GitHub Actions / build

Unexpected any. Specify a different type

Check warning on line 189 in services/node-services/src/app.ts

View workflow job for this annotation

GitHub Actions / build

Unexpected any. Specify a different type
) {
console.log("Mounting router " + service);
serverBuilder = serverBuilder.bindRouter(
service,
(foundService as { router: any }).router

Check warning on line 194 in services/node-services/src/app.ts

View workflow job for this annotation

GitHub Actions / build

Unexpected any. Specify a different type

Check warning on line 194 in services/node-services/src/app.ts

View workflow job for this annotation

GitHub Actions / build

Unexpected any. Specify a different type

Check warning on line 194 in services/node-services/src/app.ts

View workflow job for this annotation

GitHub Actions / build

Unexpected any. Specify a different type
);
} else {
console.log("Mounting keyed router " + service);
serverBuilder = serverBuilder.bindKeyedRouter(
service,
(foundService as { keyedRouter: any }).keyedRouter

Check warning on line 200 in services/node-services/src/app.ts

View workflow job for this annotation

GitHub Actions / build

Unexpected any. Specify a different type

Check warning on line 200 in services/node-services/src/app.ts

View workflow job for this annotation

GitHub Actions / build

Unexpected any. Specify a different type

Check warning on line 200 in services/node-services/src/app.ts

View workflow job for this annotation

GitHub Actions / build

Unexpected any. Specify a different type
);
}
}

Expand Down
18 changes: 8 additions & 10 deletions services/node-services/src/counter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@ import {
GetResponse,
protobufPackage,
} from "./generated/counter";
import {
StringKeyedEvent
} from "./generated/dev/restate/events";
import { StringKeyedEvent } from "./generated/dev/restate/events";
import { Empty } from "./generated/google/protobuf/empty";
import { AwakeableHolderServiceClientImpl } from "./generated/awakeable_holder";

Expand Down Expand Up @@ -89,13 +87,13 @@ export class CounterService implements Counter {
}
}

async handleEvent(request: StringKeyedEvent): Promise<Empty> {
console.log("handleEvent: " + JSON.stringify(request));
const ctx = restate.useContext(this);
async handleEvent(request: StringKeyedEvent): Promise<Empty> {
console.log("handleEvent: " + JSON.stringify(request));
const ctx = restate.useContext(this);

const value = (await ctx.get<number>(COUNTER_KEY)) || 0;
ctx.set(COUNTER_KEY, value + parseInt(request.payload.toString()));
const value = (await ctx.get<number>(COUNTER_KEY)) || 0;
ctx.set(COUNTER_KEY, value + parseInt(request.payload.toString()));

return Empty.create({});
}
return Empty.create({});
}
}
36 changes: 34 additions & 2 deletions services/node-services/src/handler_api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ import * as restate from "@restatedev/restate-sdk";

export const HandlerAPIEchoTestFQN = "handlerapi.HandlerAPIEchoTest";

const COUNTER_KEY = "counter";

// These two handlers just test the correct propagation of the input message in the output
const echo = (ctx: restate.RpcContext, msg: any): Promise<any> => {

Check warning on line 8 in services/node-services/src/handler_api.ts

View workflow job for this annotation

GitHub Actions / build

Unexpected any. Specify a different type

Check warning on line 8 in services/node-services/src/handler_api.ts

View workflow job for this annotation

GitHub Actions / build

Unexpected any. Specify a different type

Check warning on line 8 in services/node-services/src/handler_api.ts

View workflow job for this annotation

GitHub Actions / build

Unexpected any. Specify a different type

Check warning on line 8 in services/node-services/src/handler_api.ts

View workflow job for this annotation

GitHub Actions / build

Unexpected any. Specify a different type

Check warning on line 8 in services/node-services/src/handler_api.ts

View workflow job for this annotation

GitHub Actions / build

Unexpected any. Specify a different type

Check warning on line 8 in services/node-services/src/handler_api.ts

View workflow job for this annotation

GitHub Actions / build

Unexpected any. Specify a different type
return msg;
Expand All @@ -10,11 +12,41 @@ const echoEcho = async (ctx: restate.RpcContext, msg: any): Promise<any> => {
return await ctx.rpc(handlerApi).echo(msg);
};

const handlerApi: restate.ServiceApi<typeof HandlerApiEchoRouter> = {
const handlerApi: restate.ServiceApi<typeof HandlerAPIEchoRouter> = {
path: HandlerAPIEchoTestFQN,
};

export const HandlerApiEchoRouter = restate.router({
export const HandlerAPIEchoRouter = restate.router({
echo,
echoEcho,
});

// -- Counter service

const get = async (
ctx: restate.RpcContext,
key: string,
value: any
): Promise<{ counter: number }> => {
console.log("get: " + JSON.stringify(key) + " " + JSON.stringify(value));

const counter = (await ctx.get<number>(COUNTER_KEY)) || 0;

return { counter };
};

const handleEvent = async (ctx: restate.RpcContext, request: restate.Event) => {
console.log("handleEvent: " + JSON.stringify(request));

const value = (await ctx.get<number>(COUNTER_KEY)) || 0;
const eventValue = parseInt(new TextDecoder().decode(request.body()));
console.log("Event value: " + eventValue);
ctx.set(COUNTER_KEY, value + eventValue);
};

export const CounterHandlerAPIFQN = "handlerapi.Counter";

export const CounterHandlerAPIRouter = restate.keyedRouter({
get,
handleEvent: restate.keyedEventHandler(handleEvent),
});
4 changes: 3 additions & 1 deletion tests/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ tasks {
(if (System.getenv("RESTATE_RUNTIME_CONTAINER").isNullOrEmpty())
"ghcr.io/restatedev/restate:main"
else System.getenv("RESTATE_RUNTIME_CONTAINER")),
"RUST_LOG" to (System.getenv("RUST_LOG") ?: "info,restate_invoker=trace,restate=debug"),
"RUST_LOG" to
(System.getenv("RUST_LOG")
?: "info,restate_invoker=trace,restate_ingress_kafka=trace,restate=debug"),
"RUST_BACKTRACE" to "full")

test {
Expand Down
1 change: 1 addition & 0 deletions tests/src/test/kotlin/dev/restate/e2e/Containers.kt
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ object Containers {
nodeServicesContainer("node-proxy", ProxyServiceGrpc.SERVICE_NAME).build()

const val HANDLER_API_ECHO_TEST_SERVICE_NAME = "handlerapi.HandlerAPIEchoTest"
const val HANDLER_API_COUNTER_SERVICE_NAME = "handlerapi.Counter"

val NODE_HANDLER_API_ECHO_TEST_SERVICE_SPEC =
nodeServicesContainer("node-proxy", HANDLER_API_ECHO_TEST_SERVICE_NAME).build()
Expand Down
104 changes: 86 additions & 18 deletions tests/src/test/kotlin/dev/restate/e2e/KafkaIngressTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ import dev.restate.e2e.utils.*
import dev.restate.e2e.utils.config.*
import dev.restate.e2e.utils.meta.client.SubscriptionsClient
import dev.restate.e2e.utils.meta.models.CreateSubscriptionRequest
import java.net.URI
import java.net.URL
import java.net.http.HttpClient
import java.net.http.HttpRequest
import java.util.*
import okhttp3.OkHttpClient
import org.apache.kafka.clients.producer.KafkaProducer
Expand All @@ -17,6 +20,7 @@ import org.apache.kafka.clients.producer.ProducerRecord
import org.assertj.core.api.Assertions.assertThat
import org.awaitility.kotlin.await
import org.awaitility.kotlin.matches
import org.awaitility.kotlin.untilAsserted
import org.awaitility.kotlin.untilCallTo
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.RegisterExtension
Expand Down Expand Up @@ -67,24 +71,6 @@ class NodeKafkaIngressTest : BaseKafkaIngressTest() {

abstract class BaseKafkaIngressTest {

private fun produceMessageToKafka(
bootstrapServer: String,
topic: String,
key: String,
values: List<String>
) {
val props = Properties()
props["bootstrap.servers"] = bootstrapServer
props["key.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"
props["value.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"

val producer: Producer<String, String> = KafkaProducer(props)
for (value in values) {
producer.send(ProducerRecord(topic, key, value))
}
producer.close()
}

@Test
fun handleKeyedEvent(
@InjectMetaURL metaURL: URL,
Expand Down Expand Up @@ -119,3 +105,85 @@ abstract class BaseKafkaIngressTest {
}
}
}

class NodeHandlerAPIKafkaIngressTest {

companion object {
@RegisterExtension
val deployerExt: RestateDeployerExtension =
RestateDeployerExtension(
RestateDeployer.Builder()
.withEnv(Containers.getRestateEnvironment())
.withServiceEndpoint(
Containers.nodeServicesContainer(
"node-counter", Containers.HANDLER_API_COUNTER_SERVICE_NAME))
.withContainer("kafka", KafkaContainer(TOPIC))
.withConfig(kafkaClusterOptions())
.build())
}

@Test
fun handleKeyedEvent(
@InjectMetaURL metaURL: URL,
@InjectContainerPort(hostName = "kafka", port = KafkaContainer.EXTERNAL_PORT) kafkaPort: Int,
@InjectGrpcIngressURL httpEndpointURL: URL
) {
val counter = UUID.randomUUID().toString()

// Create subscription
val subscriptionsClient =
SubscriptionsClient(ObjectMapper(), metaURL.toString(), OkHttpClient())
assertThat(
subscriptionsClient.createSubscription(
CreateSubscriptionRequest(
source = "kafka://my-cluster/$TOPIC",
sink = "service://${Containers.HANDLER_API_COUNTER_SERVICE_NAME}/handleEvent",
options = mapOf("auto.offset.reset" to "earliest"))))
.extracting { it.statusCode }
.isEqualTo(201)

// Produce message to kafka
produceMessageToKafka("PLAINTEXT://localhost:$kafkaPort", TOPIC, counter, listOf("1", "2", "3"))

val client = HttpClient.newHttpClient()

// Now wait for the update to be visible
await untilAsserted
{
val req =
HttpRequest.newBuilder(
URI.create(
"${httpEndpointURL}${Containers.HANDLER_API_COUNTER_SERVICE_NAME}/get"))
.POST(Utils.jacksonBodyPublisher(mapOf("key" to counter)))
.headers("Content-Type", "application/json")
.build()

val response = client.send(req, Utils.jacksonBodyHandler())

assertThat(response.statusCode()).isEqualTo(200)
assertThat(response.headers().firstValue("content-type"))
.get()
.asString()
.contains("application/json")
assertThat(response.body().get("response").get("counter").asInt()).isEqualTo(6)
}
}
}

private fun produceMessageToKafka(
bootstrapServer: String,
topic: String,
key: String,
values: List<String>
) {
val props = Properties()
props["bootstrap.servers"] = bootstrapServer
props["key.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"
props["value.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"

val producer: Producer<String, String> = KafkaProducer(props)
for (value in values) {
producer.send(ProducerRecord(topic, key, value))
}
producer.close()
}

0 comments on commit 4da62d2

Please sign in to comment.