Skip to content

Commit

Permalink
fix: handling last package in streaming mode
Browse files Browse the repository at this point in the history
  • Loading branch information
noble-varghese committed Dec 6, 2023
1 parent 56c853b commit 90b8889
Showing 1 changed file with 23 additions and 19 deletions.
42 changes: 23 additions & 19 deletions src/streaming.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import { APIError } from "./error";
type Bytes = string | ArrayBuffer | Uint8Array | Buffer | null | undefined;

type ServerSentEvent = {
event: string | null;
data: string;
raw: string[];
event: string | null;
data: string;
raw: string[];
};

export const safeJSON = (text: string) => {
Expand All @@ -26,7 +26,7 @@ export const createResponseHeaders = (
headers.entries(),
),
{
get (target, name) {
get(target, name) {
const key = name.toString();
return target[key.toLowerCase()] || target[key];
},
Expand All @@ -40,12 +40,12 @@ export class Stream<Item> implements AsyncIterable<Item> {
private response: Response;
private decoder: SSEDecoder;

constructor (response: Response) {
constructor(response: Response) {
this.response = response;
this.decoder = new SSEDecoder();
}

private async *iterMessages (): AsyncGenerator<ServerSentEvent, void, unknown> {
private async *iterMessages(): AsyncGenerator<ServerSentEvent, void, unknown> {
if (!this.response.body) {
throw new Error("Attempted to iterate over a response with no body");
}
Expand All @@ -65,13 +65,17 @@ export class Stream<Item> implements AsyncIterable<Item> {
}
}

async *[Symbol.asyncIterator] (): AsyncIterator<Item, any, undefined> {
async *[Symbol.asyncIterator](): AsyncIterator<Item, any, undefined> {
let done = false;
try {
for await (const sse of this.iterMessages()) {
if (sse.data.startsWith('[DONE]')) {
done = true;
continue;
}
if (sse.event === null) {
try {
yield sse.data === "[DONE]" ? { "model": "", "choices": [{}] } : JSON.parse(sse.data)
yield JSON.parse(sse.data)
} catch (e) {
console.error("Could not parse message into JSON:", sse.data);
console.error("From chunk:", sse.raw);
Expand Down Expand Up @@ -100,13 +104,13 @@ class SSEDecoder {
private event: string | null;
private chunks: string[];

constructor () {
constructor() {
this.event = null;
this.data = [];
this.chunks = [];
}

decode (line: string) {
decode(line: string) {
if (line.endsWith("\r")) {
line = line.substring(0, line.length - 1);
}
Expand Down Expand Up @@ -165,12 +169,12 @@ class LineDecoder {
trailingCR: boolean;
textDecoder: any; // TextDecoder found in browsers; not typed to avoid pulling in either "dom" or "node" types.

constructor () {
constructor() {
this.buffer = [];
this.trailingCR = false;
}

decode (chunk: Bytes): string[] {
decode(chunk: Bytes): string[] {
let text = this.decodeText(chunk);

if (this.trailingCR) {
Expand Down Expand Up @@ -206,7 +210,7 @@ class LineDecoder {
return lines;
}

decodeText (bytes: Bytes): string {
decodeText(bytes: Bytes): string {
if (bytes == null) return "";
if (typeof bytes === "string") return bytes;

Expand Down Expand Up @@ -243,7 +247,7 @@ class LineDecoder {
);
}

flush (): string[] {
flush(): string[] {
if (!this.buffer.length && !this.trailingCR) {
return [];
}
Expand All @@ -255,7 +259,7 @@ class LineDecoder {
}
}

function partition (str: string, delimiter: string): [string, string, string] {
function partition(str: string, delimiter: string): [string, string, string] {
const index = str.indexOf(delimiter);
if (index !== -1) {
return [str.substring(0, index), delimiter, str.substring(index + delimiter.length)];
Expand All @@ -270,12 +274,12 @@ function partition (str: string, delimiter: string): [string, string, string] {
*
* This polyfill was pulled from https://github.com/MattiasBuelens/web-streams-polyfill/pull/122#issuecomment-1627354490
*/
function readableStreamAsyncIterable<T> (stream: any): AsyncIterableIterator<T> {
function readableStreamAsyncIterable<T>(stream: any): AsyncIterableIterator<T> {
if (stream[Symbol.asyncIterator]) return stream;

const reader = stream.getReader();
return {
async next () {
async next() {
try {
const result = await reader.read();
if (result?.done) reader.releaseLock(); // release lock when stream becomes closed
Expand All @@ -285,13 +289,13 @@ function readableStreamAsyncIterable<T> (stream: any): AsyncIterableIterator<T>
throw e;
}
},
async return () {
async return() {
const cancelPromise = reader.cancel();
reader.releaseLock();
await cancelPromise;
return { done: true, value: undefined };
},
[Symbol.asyncIterator] () {
[Symbol.asyncIterator]() {
return this;
},
};
Expand Down

0 comments on commit 90b8889

Please sign in to comment.