Skip to content

Commit

Permalink
Implement sse (#2)
Browse files Browse the repository at this point in the history
Signed-off-by: Marcos Candeia <[email protected]>
  • Loading branch information
mcandeia authored Sep 24, 2024
1 parent b6e5c83 commit f65fe3c
Show file tree
Hide file tree
Showing 6 changed files with 165 additions and 2 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -131,4 +131,4 @@ dist


.vscode
kv
kv*
1 change: 1 addition & 0 deletions deno.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"@hono/hono": "jsr:@hono/hono@^4.6.2",
"@std/assert": "jsr:@std/assert@^1.0.5",
"@std/async": "jsr:@std/async@^1.0.5",
"@std/http": "jsr:@std/http@^1.0.6",
"@std/path": "jsr:@std/path@^1.0.6"
},
"tasks": {
Expand Down
15 changes: 15 additions & 0 deletions src/actors/proxy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {
ACTOR_ID_HEADER_NAME,
type ActorConstructor,
} from "./runtime.ts";
import { EVENT_STREAM_RESPONSE_HEADER, readFromStream } from "./stream.ts";

/**
* options to create a new actor proxy.
Expand Down Expand Up @@ -41,12 +42,14 @@ export const actors = {
return new Proxy<Promisify<TInstance>>({} as Promisify<TInstance>, {
get: (_, prop) => {
return async (...args: unknown[]) => {
const abortCtrl = new AbortController();
const resp = await fetch(
`${server}/actors/${
typeof actor === "string" ? actor : actor.name
}/invoke/${String(prop)}`,
{
method: "POST",
signal: abortCtrl.signal,
headers: {
"Content-Type": "application/json",
[ACTOR_ID_HEADER_NAME]: id,
Expand All @@ -59,6 +62,18 @@ export const actors = {
}),
},
);
if (
resp.headers.get("content-type") ===
EVENT_STREAM_RESPONSE_HEADER
) {
const iterator = readFromStream(resp);
const retn = iterator.return;
iterator.return = function (val) {
abortCtrl.abort();
return retn?.call(iterator, val) ?? val;
};
return iterator;
}
return resp.json();
};
},
Expand Down
63 changes: 62 additions & 1 deletion src/actors/runtime.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import type { ActorState } from "./state.ts";

class Counter {
private count: number;
private subscribers: Record<string, (count: number) => void> = {};

constructor(protected state: ActorState) {
this.count = 0;
Expand All @@ -14,13 +15,61 @@ class Counter {
}

async increment(): Promise<number> {
await this.state.storage.put("counter", ++this.count);
this.count++;
await this.state.storage.put("counter", this.count);
this.notifySubscribers();
return this.count;
}

async decrement(): Promise<number> {
this.count--;
await this.state.storage.put("counter", this.count);
this.notifySubscribers();
return this.count;
}

getCount(): number {
return this.count;
}

watch(): AsyncIterableIterator<number> {
const subscription = crypto.randomUUID();
const queue: Array<(value: IteratorResult<number>) => void> = [];

const pushQueue = (value: IteratorResult<number>) => {
queue.forEach((resolve) => resolve(value));
};

const nextPromise = () =>
new Promise<IteratorResult<number>>((resolve) => {
queue.push(resolve);
});

const iterator: AsyncIterableIterator<number> = {
next: () => nextPromise(),
return: () => {
// Clean up the subscription when iterator.return() is called
delete this.subscribers[subscription];
// Return the "done" value for the iterator
return Promise.resolve({ value: undefined, done: true });
},
[Symbol.asyncIterator]() {
return this;
},
};

this.subscribers[subscription] = (count: number) => {
pushQueue({ value: count, done: false });
};

return iterator;
}

private notifySubscribers() {
Object.values(this.subscribers).forEach((subscriber) =>
subscriber(this.count)
);
}
}

const runServer = (rt: ActorRuntime): AsyncDisposable => {
Expand All @@ -39,6 +88,7 @@ Deno.test("counter increment and getCount", async () => {
const counterProxy = actors.proxy(Counter);

const actor = counterProxy.id(actorId);
const watcher = await actor.watch();
// Test increment
const number = await actor.increment();
assertEquals(number, 1);
Expand All @@ -51,4 +101,15 @@ Deno.test("counter increment and getCount", async () => {

// Test getCount again
assertEquals(await actor.getCount(), 2);

assertEquals(await actor.decrement(), 1);

const counters = [1, 2, 1];
let idx = 0;
while (idx < counters.length) {
const { value, done } = await watcher.next();
assertEquals(value, counters[idx++]);
assertEquals(done, false);
}
watcher.return?.();
});
39 changes: 39 additions & 0 deletions src/actors/runtime.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,23 @@
import { type ServerSentEventMessage, ServerSentEventStream } from "@std/http";
import { ActorState } from "./state.ts";
import { DenoKvActorStorage } from "./storage/denoKv.ts";
import { EVENT_STREAM_RESPONSE_HEADER } from "./stream.ts";

/**
* Represents an actor.
*/
// deno-lint-ignore no-empty-interface
export interface Actor {
}

const isEventStreamResponse = (
invokeResponse: unknown | AsyncIterableIterator<unknown>,
): invokeResponse is AsyncIterableIterator<unknown> => {
return (
typeof (invokeResponse as AsyncIterableIterator<unknown>)?.next ===
"function"
);
};
/**
* The name of the header used to specify the actor ID.
*/
Expand Down Expand Up @@ -150,6 +161,34 @@ export class ActorRuntime {
const res = await (methodImpl as Function).bind(actor)(
...Array.isArray(args) ? args : [args],
);
if (isEventStreamResponse(res)) {
req.signal.onabort = () => {
res?.return?.();
};

return new Response(
new ReadableStream<ServerSentEventMessage>({
async pull(controller) {
for await (const content of res) {
controller.enqueue({
data: encodeURIComponent(JSON.stringify(content)),
id: Date.now(),
event: "message",
});
}
controller.close();
},
cancel() {
res?.return?.();
},
}).pipeThrough(new ServerSentEventStream()),
{
headers: {
"Content-Type": EVENT_STREAM_RESPONSE_HEADER,
},
},
);
}
return Response.json(res);
}
}
47 changes: 47 additions & 0 deletions src/actors/stream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
export const EVENT_STREAM_RESPONSE_HEADER: string = "text/event-stream";
export async function* readFromStream<T>(
response: Response,
): AsyncIterableIterator<T> {
if (!response.body) {
return;
}

const reader = response.body.pipeThrough(new TextDecoderStream()).getReader();

let buffer = "";

while (true) {
const { value, done } = await reader.read();

if (done) {
break;
}

buffer += value;
const lines = buffer.split("\n");
buffer = lines.pop() ?? "";

for (const data of lines) {
if (!data.startsWith("data:")) {
continue;
}

try {
const chunk = data.replace("data:", "");
yield JSON.parse(decodeURIComponent(chunk));
} catch (_err) {
console.log("error parsing data", _err, data);
continue;
}
}
}

// Process any remaining buffer after the stream ends
if (buffer.length > 0 && buffer.startsWith("data:")) {
try {
yield JSON.parse(decodeURIComponent(buffer.replace("data:", "")));
} catch (_err) {
console.log("error parsing data", _err, buffer);
}
}
}

0 comments on commit f65fe3c

Please sign in to comment.