From 90b8889352394323ba0161935ef9527caf96e556 Mon Sep 17 00:00:00 2001 From: noble-varghese Date: Wed, 6 Dec 2023 19:32:09 +0530 Subject: [PATCH] fix: handling last package in streaming mode --- src/streaming.ts | 42 +++++++++++++++++++++++------------------- 1 file changed, 23 insertions(+), 19 deletions(-) diff --git a/src/streaming.ts b/src/streaming.ts index 883ec2c..371c827 100644 --- a/src/streaming.ts +++ b/src/streaming.ts @@ -4,9 +4,9 @@ import { APIError } from "./error"; type Bytes = string | ArrayBuffer | Uint8Array | Buffer | null | undefined; type ServerSentEvent = { - event: string | null; - data: string; - raw: string[]; + event: string | null; + data: string; + raw: string[]; }; export const safeJSON = (text: string) => { @@ -26,7 +26,7 @@ export const createResponseHeaders = ( headers.entries(), ), { - get (target, name) { + get(target, name) { const key = name.toString(); return target[key.toLowerCase()] || target[key]; }, @@ -40,12 +40,12 @@ export class Stream implements AsyncIterable { private response: Response; private decoder: SSEDecoder; - constructor (response: Response) { + constructor(response: Response) { this.response = response; this.decoder = new SSEDecoder(); } - private async *iterMessages (): AsyncGenerator { + private async *iterMessages(): AsyncGenerator { if (!this.response.body) { throw new Error("Attempted to iterate over a response with no body"); } @@ -65,13 +65,17 @@ export class Stream implements AsyncIterable { } } - async *[Symbol.asyncIterator] (): AsyncIterator { + async *[Symbol.asyncIterator](): AsyncIterator { let done = false; try { for await (const sse of this.iterMessages()) { + if (sse.data.startsWith('[DONE]')) { + done = true; + continue; + } if (sse.event === null) { try { - yield sse.data === "[DONE]" ? { "model": "", "choices": [{}] } : JSON.parse(sse.data) + yield JSON.parse(sse.data) } catch (e) { console.error("Could not parse message into JSON:", sse.data); console.error("From chunk:", sse.raw); @@ -100,13 +104,13 @@ class SSEDecoder { private event: string | null; private chunks: string[]; - constructor () { + constructor() { this.event = null; this.data = []; this.chunks = []; } - decode (line: string) { + decode(line: string) { if (line.endsWith("\r")) { line = line.substring(0, line.length - 1); } @@ -165,12 +169,12 @@ class LineDecoder { trailingCR: boolean; textDecoder: any; // TextDecoder found in browsers; not typed to avoid pulling in either "dom" or "node" types. - constructor () { + constructor() { this.buffer = []; this.trailingCR = false; } - decode (chunk: Bytes): string[] { + decode(chunk: Bytes): string[] { let text = this.decodeText(chunk); if (this.trailingCR) { @@ -206,7 +210,7 @@ class LineDecoder { return lines; } - decodeText (bytes: Bytes): string { + decodeText(bytes: Bytes): string { if (bytes == null) return ""; if (typeof bytes === "string") return bytes; @@ -243,7 +247,7 @@ class LineDecoder { ); } - flush (): string[] { + flush(): string[] { if (!this.buffer.length && !this.trailingCR) { return []; } @@ -255,7 +259,7 @@ class LineDecoder { } } -function partition (str: string, delimiter: string): [string, string, string] { +function partition(str: string, delimiter: string): [string, string, string] { const index = str.indexOf(delimiter); if (index !== -1) { return [str.substring(0, index), delimiter, str.substring(index + delimiter.length)]; @@ -270,12 +274,12 @@ function partition (str: string, delimiter: string): [string, string, string] { * * This polyfill was pulled from https://github.com/MattiasBuelens/web-streams-polyfill/pull/122#issuecomment-1627354490 */ -function readableStreamAsyncIterable (stream: any): AsyncIterableIterator { +function readableStreamAsyncIterable(stream: any): AsyncIterableIterator { if (stream[Symbol.asyncIterator]) return stream; const reader = stream.getReader(); return { - async next () { + async next() { try { const result = await reader.read(); if (result?.done) reader.releaseLock(); // release lock when stream becomes closed @@ -285,13 +289,13 @@ function readableStreamAsyncIterable (stream: any): AsyncIterableIterator throw e; } }, - async return () { + async return() { const cancelPromise = reader.cancel(); reader.releaseLock(); await cancelPromise; return { done: true, value: undefined }; }, - [Symbol.asyncIterator] () { + [Symbol.asyncIterator]() { return this; }, };