Skip to content

Commit

Permalink
streaming and serialization fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
berekuk committed Nov 19, 2024
1 parent ec57655 commit 7c0b2e7
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 19 deletions.
31 changes: 18 additions & 13 deletions packages/ai/src/LLMStepInstance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -404,23 +404,28 @@ export function serializeStepParams(
params: StepParams<IOShape>,
visitor: AiSerializationVisitor
): SerializedStep {
let serializedState: SerializedState<IOShape>;
if (params.state.kind === "DONE") {
const { outputs, ...stateWithoutOutputs } = params.state;
serializedState = {
...stateWithoutOutputs,
outputIds: Object.fromEntries(
Object.entries(outputs)
.map(([key, output]) =>
output ? [key, visitor.artifact(output)] : undefined
)
.filter((x) => x !== undefined)
),
};
} else {
serializedState = params.state;
}

return {
id: params.id,
sequentialId: params.sequentialId,
templateName: params.template.name,
state:
params.state.kind === "DONE"
? {
...params.state,
outputIds: Object.fromEntries(
Object.entries(params.state.outputs)
.map(([key, output]) =>
output ? [key, visitor.artifact(output)] : undefined
)
.filter((x) => x !== undefined)
),
}
: params.state,
state: serializedState,
startTime: params.startTime,
conversationMessages: params.conversationMessages,
llmMetricsList: params.llmMetricsList,
Expand Down
10 changes: 4 additions & 6 deletions packages/ai/src/workflows/Workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -148,25 +148,23 @@ export class Workflow<Shape extends IOShape = IOShape> {
if (this.steps.length) {
throw new Error("Workflow already started");
}

this.dispatchEvent({ type: "workflowStarted" });

// add the first step
const initialStep = this.template.getInitialStep(this);
this.addStep(initialStep);
}

// Run workflow to the ReadableStream, appropriate for streaming in Next.js routes
runAsStream(): ReadableStream<string> {
this.startOrThrow();

const stream = new ReadableStream<string>({
start: async (controller) => {
addStreamingListeners(this, controller);

// We need to dispatch this event after we configured the event
// handlers, but before we add any steps.
this.dispatchEvent({ type: "workflowStarted" });
this.startOrThrow();

await this.runUntilComplete();
controller.close();
},
});

Expand Down
1 change: 1 addition & 0 deletions packages/ai/src/workflows/streaming.ts
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ export function addStreamingListeners<Shape extends IOShape>(
kind: "finalResult",
content: event.workflow.getFinalResult(),
});
controller.close();
});
}

Expand Down

0 comments on commit 7c0b2e7

Please sign in to comment.