diff --git a/.gitignore b/.gitignore index 8bc5f04..f8f0793 100644 --- a/.gitignore +++ b/.gitignore @@ -131,4 +131,4 @@ dist .vscode -kv \ No newline at end of file +kv* \ No newline at end of file diff --git a/deno.json b/deno.json index 451f93c..77577cd 100644 --- a/deno.json +++ b/deno.json @@ -18,6 +18,7 @@ "@hono/hono": "jsr:@hono/hono@^4.6.2", "@std/assert": "jsr:@std/assert@^1.0.5", "@std/async": "jsr:@std/async@^1.0.5", + "@std/http": "jsr:@std/http@^1.0.6", "@std/path": "jsr:@std/path@^1.0.6" }, "tasks": { diff --git a/src/actors/proxy.ts b/src/actors/proxy.ts index 95dd164..e1fe53a 100644 --- a/src/actors/proxy.ts +++ b/src/actors/proxy.ts @@ -3,6 +3,7 @@ import { ACTOR_ID_HEADER_NAME, type ActorConstructor, } from "./runtime.ts"; +import { EVENT_STREAM_RESPONSE_HEADER, readFromStream } from "./stream.ts"; /** * options to create a new actor proxy. @@ -41,12 +42,14 @@ export const actors = { return new Proxy>({} as Promisify, { get: (_, prop) => { return async (...args: unknown[]) => { + const abortCtrl = new AbortController(); const resp = await fetch( `${server}/actors/${ typeof actor === "string" ? actor : actor.name }/invoke/${String(prop)}`, { method: "POST", + signal: abortCtrl.signal, headers: { "Content-Type": "application/json", [ACTOR_ID_HEADER_NAME]: id, @@ -59,6 +62,18 @@ export const actors = { }), }, ); + if ( + resp.headers.get("content-type") === + EVENT_STREAM_RESPONSE_HEADER + ) { + const iterator = readFromStream(resp); + const retn = iterator.return; + iterator.return = function (val) { + abortCtrl.abort(); + return retn?.call(iterator, val) ?? val; + }; + return iterator; + } return resp.json(); }; }, diff --git a/src/actors/runtime.test.ts b/src/actors/runtime.test.ts index 64384ec..fd34178 100644 --- a/src/actors/runtime.test.ts +++ b/src/actors/runtime.test.ts @@ -5,6 +5,7 @@ import type { ActorState } from "./state.ts"; class Counter { private count: number; + private subscribers: Record void> = {}; constructor(protected state: ActorState) { this.count = 0; @@ -14,13 +15,61 @@ class Counter { } async increment(): Promise { - await this.state.storage.put("counter", ++this.count); + this.count++; + await this.state.storage.put("counter", this.count); + this.notifySubscribers(); + return this.count; + } + + async decrement(): Promise { + this.count--; + await this.state.storage.put("counter", this.count); + this.notifySubscribers(); return this.count; } getCount(): number { return this.count; } + + watch(): AsyncIterableIterator { + const subscription = crypto.randomUUID(); + const queue: Array<(value: IteratorResult) => void> = []; + + const pushQueue = (value: IteratorResult) => { + queue.forEach((resolve) => resolve(value)); + }; + + const nextPromise = () => + new Promise>((resolve) => { + queue.push(resolve); + }); + + const iterator: AsyncIterableIterator = { + next: () => nextPromise(), + return: () => { + // Clean up the subscription when iterator.return() is called + delete this.subscribers[subscription]; + // Return the "done" value for the iterator + return Promise.resolve({ value: undefined, done: true }); + }, + [Symbol.asyncIterator]() { + return this; + }, + }; + + this.subscribers[subscription] = (count: number) => { + pushQueue({ value: count, done: false }); + }; + + return iterator; + } + + private notifySubscribers() { + Object.values(this.subscribers).forEach((subscriber) => + subscriber(this.count) + ); + } } const runServer = (rt: ActorRuntime): AsyncDisposable => { @@ -39,6 +88,7 @@ Deno.test("counter increment and getCount", async () => { const counterProxy = actors.proxy(Counter); const actor = counterProxy.id(actorId); + const watcher = await actor.watch(); // Test increment const number = await actor.increment(); assertEquals(number, 1); @@ -51,4 +101,15 @@ Deno.test("counter increment and getCount", async () => { // Test getCount again assertEquals(await actor.getCount(), 2); + + assertEquals(await actor.decrement(), 1); + + const counters = [1, 2, 1]; + let idx = 0; + while (idx < counters.length) { + const { value, done } = await watcher.next(); + assertEquals(value, counters[idx++]); + assertEquals(done, false); + } + watcher.return?.(); }); diff --git a/src/actors/runtime.ts b/src/actors/runtime.ts index 7207599..1c234ec 100644 --- a/src/actors/runtime.ts +++ b/src/actors/runtime.ts @@ -1,5 +1,8 @@ +import { type ServerSentEventMessage, ServerSentEventStream } from "@std/http"; import { ActorState } from "./state.ts"; import { DenoKvActorStorage } from "./storage/denoKv.ts"; +import { EVENT_STREAM_RESPONSE_HEADER } from "./stream.ts"; + /** * Represents an actor. */ @@ -7,6 +10,14 @@ import { DenoKvActorStorage } from "./storage/denoKv.ts"; export interface Actor { } +const isEventStreamResponse = ( + invokeResponse: unknown | AsyncIterableIterator, +): invokeResponse is AsyncIterableIterator => { + return ( + typeof (invokeResponse as AsyncIterableIterator)?.next === + "function" + ); +}; /** * The name of the header used to specify the actor ID. */ @@ -150,6 +161,34 @@ export class ActorRuntime { const res = await (methodImpl as Function).bind(actor)( ...Array.isArray(args) ? args : [args], ); + if (isEventStreamResponse(res)) { + req.signal.onabort = () => { + res?.return?.(); + }; + + return new Response( + new ReadableStream({ + async pull(controller) { + for await (const content of res) { + controller.enqueue({ + data: encodeURIComponent(JSON.stringify(content)), + id: Date.now(), + event: "message", + }); + } + controller.close(); + }, + cancel() { + res?.return?.(); + }, + }).pipeThrough(new ServerSentEventStream()), + { + headers: { + "Content-Type": EVENT_STREAM_RESPONSE_HEADER, + }, + }, + ); + } return Response.json(res); } } diff --git a/src/actors/stream.ts b/src/actors/stream.ts new file mode 100644 index 0000000..1c27a7e --- /dev/null +++ b/src/actors/stream.ts @@ -0,0 +1,47 @@ +export const EVENT_STREAM_RESPONSE_HEADER: string = "text/event-stream"; +export async function* readFromStream( + response: Response, +): AsyncIterableIterator { + if (!response.body) { + return; + } + + const reader = response.body.pipeThrough(new TextDecoderStream()).getReader(); + + let buffer = ""; + + while (true) { + const { value, done } = await reader.read(); + + if (done) { + break; + } + + buffer += value; + const lines = buffer.split("\n"); + buffer = lines.pop() ?? ""; + + for (const data of lines) { + if (!data.startsWith("data:")) { + continue; + } + + try { + const chunk = data.replace("data:", ""); + yield JSON.parse(decodeURIComponent(chunk)); + } catch (_err) { + console.log("error parsing data", _err, data); + continue; + } + } + } + + // Process any remaining buffer after the stream ends + if (buffer.length > 0 && buffer.startsWith("data:")) { + try { + yield JSON.parse(decodeURIComponent(buffer.replace("data:", ""))); + } catch (_err) { + console.log("error parsing data", _err, buffer); + } + } +}