Skip to content

Commit

Permalink
fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
stephmilovic committed Nov 16, 2023
1 parent 5b7cf70 commit cd6cb1e
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -274,18 +274,22 @@ describe('OpenAIConnector', () => {
});

describe('invokeStream', () => {
let stream;
const mockStream = (
dataToStream: string[] = [
'data: {"object":"chat.completion.chunk","choices":[{"delta":{"content":"My"}}]}\ndata: {"object":"chat.completion.chunk","choices":[{"delta":{"content":" new"}}]}',
]
) => {
const streamMock = createStreamMock();
dataToStream.forEach((chunk) => {
streamMock.write(chunk);
});
streamMock.complete();
mockRequest = jest.fn().mockResolvedValue({ ...mockResponse, data: streamMock.transform });
return mockRequest;
};
beforeEach(() => {
const chunk1 =
'data: {"object":"chat.completion.chunk","choices":[{"delta":{"content":"My"}}]}\ndata: {"object":"chat.completion.chunk","choices":[{"delta":{"content":" new"}}]}';

stream = createStreamMock();
stream.write(chunk1);
// .write(`data: ${JSON.stringify(mockResponse.data)}`);
mockRequest = jest.fn().mockResolvedValue({ ...mockResponse, data: stream.transform });
// @ts-ignore
connector.request = mockRequest;
stream.complete();
connector.request = mockStream();
});

it('the API call is successful with correct request parameters', async () => {
Expand All @@ -312,6 +316,8 @@ describe('OpenAIConnector', () => {
});

it('transforms the response into a string', async () => {
// @ts-ignore
connector.request = mockStream();
const response = await connector.invokeStream(sampleOpenAiBody);

let responseBody: string = '';
Expand All @@ -323,17 +329,11 @@ describe('OpenAIConnector', () => {
});
});
it('correctly buffers stream of json lines', async () => {
stream = createStreamMock();
const chunk1 =
'data: {"object":"chat.completion.chunk","choices":[{"delta":{"content":"My"}}]}\ndata: {"object":"chat.completion.chunk","choices":[{"delta":{"content":" new"}}]}';
const chunk2 =
'\ndata: {"object":"chat.completion.chunk","choices":[{"delta":{"content":" message"}}]}\ndata: [DONE]';

stream.write(chunk1);
stream.write(chunk2);
mockRequest = jest.fn().mockResolvedValue({ ...mockResponse, data: stream.transform });
const chunk1 = `data: {"object":"chat.completion.chunk","choices":[{"delta":{"content":"My"}}]}\ndata: {"object":"chat.completion.chunk","choices":[{"delta":{"content":" new"}}]}`;
const chunk2 = `\ndata: {"object":"chat.completion.chunk","choices":[{"delta":{"content":" message"}}]}\ndata: [DONE]`;

// @ts-ignore
connector.request = mockRequest;
connector.request = mockStream([chunk1, chunk2]);

const response = await connector.invokeStream(sampleOpenAiBody);

Expand All @@ -346,17 +346,12 @@ describe('OpenAIConnector', () => {
});
});
it('correctly buffers partial lines', async () => {
stream = createStreamMock();
const chunk1 =
'data: {"object":"chat.completion.chunk","choices":[{"delta":{"content":"My"}}]}\ndata: {"object":"chat.completion.chunk","choices":[{"delta":{"content":" new"';
const chunk2 =
'}}]}\ndata: {"object":"chat.completion.chunk","choices":[{"delta":{"content":" message"}}]}\ndata: [DONE]';

stream.write(chunk1);
stream.write(chunk2);
mockRequest = jest.fn().mockResolvedValue({ ...mockResponse, data: stream.transform });
const chunk1 = `data: {"object":"chat.completion.chunk","choices":[{"delta":{"content":"My"}}]}\ndata: {"object":"chat.completion.chunk","choices":[{"delta":{"content":" new"`;

const chunk2 = `}}]}\ndata: {"object":"chat.completion.chunk","choices":[{"delta":{"content":" message"}}]}\ndata: [DONE]`;

// @ts-ignore
connector.request = mockRequest;
connector.request = mockStream([chunk1, chunk2]);

const response = await connector.invokeStream(sampleOpenAiBody);

Expand Down Expand Up @@ -706,7 +701,7 @@ function createStreamMock() {

return {
write: (data: string) => {
transform.push(`${data}\n`);
transform.push(data);
},
fail: () => {
transform.emit('error', new Error('Stream failed'));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,46 +238,35 @@ export class OpenAIConnector extends SubActionConnector<Config, Secrets> {
const transformToString = () => {
let lineBuffer: string = '';
const decoder = new TextDecoder();
const encoder = new TextEncoder();

return new Transform({
transform(chunk, encoding, callback) {
const chunks = decoder.decode(chunk);
const lines = chunks.split('\n');
if (lines[lines.length - 1] === '') {
lines.pop();
}
lines[0] = lineBuffer + lines[0];
lineBuffer = lines.pop() || '';
const nextChunk = lines
.map((str) => str.substring(6))
.filter((str) => !!str && str !== '[DONE]')
.map((line) => {
const openaiResponse = JSON.parse(line);
return openaiResponse.choices[0]?.delta.content ?? '';
})
.join('');
const newChunk = encoder.encode(nextChunk);
callback(null, newChunk);
callback(null, getNextChunk(lines));
},
flush(callback) {
// Emit an additional chunk with the content of lineBuffer if it has length
if (lineBuffer.length > 0) {
const nextChunk = [lineBuffer]
.map((str) => str.substring(6))
.filter((str) => !!str && str !== '[DONE]')
.map((line) => {
const openaiResponse = JSON.parse(line);
return openaiResponse.choices[0]?.delta.content ?? '';
})
.join('');
const newChunk = encoder.encode(nextChunk);
callback(null, newChunk);
// const additionalChunk = encoder.encode(lineBuffer);
// callback(null, additionalChunk);
callback(null, getNextChunk([lineBuffer]));
} else {
callback();
}
},
});
};

const getNextChunk = (lines: string[]) => {
const encoder = new TextEncoder();
const nextChunk = lines
.map((str) => str.substring(6))
.filter((str) => !!str && str !== '[DONE]')
.map((line) => {
const openaiResponse = JSON.parse(line);
return openaiResponse.choices[0]?.delta.content ?? '';
})
.join('');
return encoder.encode(nextChunk);
};

0 comments on commit cd6cb1e

Please sign in to comment.