Skip to content

Commit

Permalink
Create readme for event processing - enrichment example
Browse files Browse the repository at this point in the history
  • Loading branch information
gvdongen committed Dec 13, 2024
1 parent af6f5a8 commit e7becda
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 11 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Hello world - TypeScript example
# Async Tasks: Parallelizing work

Sample project configuration of a Restate service using the TypeScript SDK.
This example shows how to use the Restate SDK to **execute a list of tasks in parallel and then gather their result**.
Also known as fan-out, fan-in.

Have a look at the [TypeScript Quickstart guide](https://docs.restate.dev/get_started/quickstart?sdk=ts) for more information on how to use this project.
The example implements a [worker service](src/worker_service.ts), that takes a task as input.
It then splits the task into subtasks, executes them in parallel, and then gathers the results.

Restate guarantees and manages the execution of all the subtasks across failures.
You can run this on FaaS infrastructure, like AWS Lambda, and it will scale automatically.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ const workerService = restate.service({
runSubtask: async (ctx: Context, subtask: SubTask) => {
// Processing logic goes here ...
// Can be moved to a separate service to scale independently
console.info
},
},
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,81 @@

This example shows an example of:
- **Event enrichment** over different sources: RPC and Kafka
- **Stateful actors** updated over Kafka
- **Digital twins** updated over Kafka
- **Stateful actors / Digital twins** updated over Kafka
- **Streaming join**
- Populating state from events and making it queryable via RPC handlers.

The example implements a package delivery tracking service.
Packages are registered via an RPC handler, and their location is updated via Kafka events.
The Package Tracker Virtual Object tracks the package details and its location history.

## Running the example

1. Make sure you have installed the dependencies: `npm install`.

2. Start the Kafka broker via Docker Compose: `docker compose up -d`.

3. Start Restate Server with the Kafka broker configuration in a separate shell: `npx restate-server --config-file restate.toml`

4. Start the data upload service: `npm run app-dev`

5. Register the example at Restate server by calling
`npx restate -y deployment register "localhost:9080"`.

6. Let Restate subscribe to the Kafka topic `package-location-updates` and invoke `package-tracker/updateLocation` on each message.
```shell
curl localhost:9070/subscriptions -H 'content-type: application/json' \
-d '{
"source": "kafka://my-cluster/package-location-updates",
"sink": "service://package-tracker/updateLocation",
"options": {"auto.offset.reset": "earliest"}
}'
```

## Demo scenario

1. Register a new package via the RPC handler:
```shell
curl localhost:8080/package-tracker/package1/registerPackage \
-H 'content-type: application/json' -d '{"finalDestination": "Bridge 6, Amsterdam"}'
```

2. Start a Kafka producer and send some messages to update the location of the package on the `package-location-updates` topic:
```shell
docker exec -it broker kafka-console-producer --bootstrap-server broker:29092 --topic package-location-updates --property parse.key=true --property key.separator=:
```
Send messages like
```
package1:{"timestamp": "2024-10-10 13:00", "location": "Pinetree Road 5, Paris"}
package1:{"timestamp": "2024-10-10 14:00", "location": "Mountain Road 155, Brussels"}
```

3. Query the package location via the RPC handler:
```shell
curl localhost:8080/package-tracker/package1/getLocation
```
or via the CLI: `npx restate kv get package-tracker package1`

You can see how the state was enriched by the initial RPC event and the subsequent Kafka events:
```
🤖 State:
―――――――――
Service package-tracker
Key package1
KEY VALUE
details {
"finalDestination": "Bridge 6, Amsterdam",
"locations": [
{
"location": "Pinetree Road 5, Paris",
"timestamp": "2024-10-10 13:00"
},
{
"location": "Mountain Road 155, Brussels",
"timestamp": "2024-10-10 14:00"
}
]
}
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
version: '3'
services:
broker:
image: confluentinc/cp-kafka:7.5.0
container_name: broker
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_BROKER_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_NODE_ID: 1
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@broker:29093
KAFKA_LISTENERS: PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk

init-kafka:
image: confluentinc/cp-kafka:7.5.0
depends_on:
- broker
entrypoint: [ '/bin/sh', '-c' ]
command: |
"# blocks until kafka is reachable
kafka-topics --bootstrap-server broker:29092 --list
echo -e 'Creating kafka topics'
kafka-topics --bootstrap-server broker:29092 --create --if-not-exists --topic package-location-updates --replication-factor 1 --partitions 1
echo -e 'Successfully created the following topics:'
kafka-topics --bootstrap-server broker:29092 --list"
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[[ingress.kafka-clusters]]
name = "my-cluster"
brokers = ["PLAINTEXT://localhost:9092"]
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ const packageTracker = restate.object({

// Connected to a Kafka topic for real-time location updates
updateLocation: async (ctx: ObjectContext, locationUpdate: LocationUpdate) => {
const packageInfo = await ctx.get<PackageInfo>("package");
const packageInfo = await ctx.get<PackageInfo>("details");
if (!packageInfo) {
throw new TerminalError("Package not found");
throw new TerminalError(`Package ${ctx.key} not found`);
}

// Update the package details in the state
packageInfo.locations.push(locationUpdate);
(packageInfo.locations ??= []).push(locationUpdate);
ctx.set("details", packageInfo);
},

Expand All @@ -51,9 +51,8 @@ curl localhost:8080/package-tracker/package123/getPackageInfo
*/

type PackageInfo = {
id: string;
finalDestination: string;
locations: LocationUpdate[];
locations?: LocationUpdate[];
};

type LocationUpdate = {
Expand Down

0 comments on commit e7becda

Please sign in to comment.