diff --git a/x-pack/plugins/stack_connectors/server/connector_types/openai/openai.test.ts b/x-pack/plugins/stack_connectors/server/connector_types/openai/openai.test.ts index dfc6811168c8a..7769dd8592faf 100644 --- a/x-pack/plugins/stack_connectors/server/connector_types/openai/openai.test.ts +++ b/x-pack/plugins/stack_connectors/server/connector_types/openai/openai.test.ts @@ -274,18 +274,22 @@ describe('OpenAIConnector', () => { }); describe('invokeStream', () => { - let stream; + const mockStream = ( + dataToStream: string[] = [ + 'data: {"object":"chat.completion.chunk","choices":[{"delta":{"content":"My"}}]}\ndata: {"object":"chat.completion.chunk","choices":[{"delta":{"content":" new"}}]}', + ] + ) => { + const streamMock = createStreamMock(); + dataToStream.forEach((chunk) => { + streamMock.write(chunk); + }); + streamMock.complete(); + mockRequest = jest.fn().mockResolvedValue({ ...mockResponse, data: streamMock.transform }); + return mockRequest; + }; beforeEach(() => { - const chunk1 = - 'data: {"object":"chat.completion.chunk","choices":[{"delta":{"content":"My"}}]}\ndata: {"object":"chat.completion.chunk","choices":[{"delta":{"content":" new"}}]}'; - - stream = createStreamMock(); - stream.write(chunk1); - // .write(`data: ${JSON.stringify(mockResponse.data)}`); - mockRequest = jest.fn().mockResolvedValue({ ...mockResponse, data: stream.transform }); // @ts-ignore - connector.request = mockRequest; - stream.complete(); + connector.request = mockStream(); }); it('the API call is successful with correct request parameters', async () => { @@ -312,6 +316,8 @@ describe('OpenAIConnector', () => { }); it('transforms the response into a string', async () => { + // @ts-ignore + connector.request = mockStream(); const response = await connector.invokeStream(sampleOpenAiBody); let responseBody: string = ''; @@ -323,17 +329,11 @@ describe('OpenAIConnector', () => { }); }); it('correctly buffers stream of json lines', async () => { - stream = createStreamMock(); - const chunk1 = - 'data: {"object":"chat.completion.chunk","choices":[{"delta":{"content":"My"}}]}\ndata: {"object":"chat.completion.chunk","choices":[{"delta":{"content":" new"}}]}'; - const chunk2 = - '\ndata: {"object":"chat.completion.chunk","choices":[{"delta":{"content":" message"}}]}\ndata: [DONE]'; - - stream.write(chunk1); - stream.write(chunk2); - mockRequest = jest.fn().mockResolvedValue({ ...mockResponse, data: stream.transform }); + const chunk1 = `data: {"object":"chat.completion.chunk","choices":[{"delta":{"content":"My"}}]}\ndata: {"object":"chat.completion.chunk","choices":[{"delta":{"content":" new"}}]}`; + const chunk2 = `\ndata: {"object":"chat.completion.chunk","choices":[{"delta":{"content":" message"}}]}\ndata: [DONE]`; + // @ts-ignore - connector.request = mockRequest; + connector.request = mockStream([chunk1, chunk2]); const response = await connector.invokeStream(sampleOpenAiBody); @@ -346,17 +346,12 @@ describe('OpenAIConnector', () => { }); }); it('correctly buffers partial lines', async () => { - stream = createStreamMock(); - const chunk1 = - 'data: {"object":"chat.completion.chunk","choices":[{"delta":{"content":"My"}}]}\ndata: {"object":"chat.completion.chunk","choices":[{"delta":{"content":" new"'; - const chunk2 = - '}}]}\ndata: {"object":"chat.completion.chunk","choices":[{"delta":{"content":" message"}}]}\ndata: [DONE]'; - - stream.write(chunk1); - stream.write(chunk2); - mockRequest = jest.fn().mockResolvedValue({ ...mockResponse, data: stream.transform }); + const chunk1 = `data: {"object":"chat.completion.chunk","choices":[{"delta":{"content":"My"}}]}\ndata: {"object":"chat.completion.chunk","choices":[{"delta":{"content":" new"`; + + const chunk2 = `}}]}\ndata: {"object":"chat.completion.chunk","choices":[{"delta":{"content":" message"}}]}\ndata: [DONE]`; + // @ts-ignore - connector.request = mockRequest; + connector.request = mockStream([chunk1, chunk2]); const response = await connector.invokeStream(sampleOpenAiBody); @@ -706,7 +701,7 @@ function createStreamMock() { return { write: (data: string) => { - transform.push(`${data}\n`); + transform.push(data); }, fail: () => { transform.emit('error', new Error('Stream failed')); diff --git a/x-pack/plugins/stack_connectors/server/connector_types/openai/openai.ts b/x-pack/plugins/stack_connectors/server/connector_types/openai/openai.ts index 4f839b2730f7f..78fca4bd84198 100644 --- a/x-pack/plugins/stack_connectors/server/connector_types/openai/openai.ts +++ b/x-pack/plugins/stack_connectors/server/connector_types/openai/openai.ts @@ -238,46 +238,35 @@ export class OpenAIConnector extends SubActionConnector { const transformToString = () => { let lineBuffer: string = ''; const decoder = new TextDecoder(); - const encoder = new TextEncoder(); return new Transform({ transform(chunk, encoding, callback) { const chunks = decoder.decode(chunk); const lines = chunks.split('\n'); - if (lines[lines.length - 1] === '') { - lines.pop(); - } lines[0] = lineBuffer + lines[0]; lineBuffer = lines.pop() || ''; - const nextChunk = lines - .map((str) => str.substring(6)) - .filter((str) => !!str && str !== '[DONE]') - .map((line) => { - const openaiResponse = JSON.parse(line); - return openaiResponse.choices[0]?.delta.content ?? ''; - }) - .join(''); - const newChunk = encoder.encode(nextChunk); - callback(null, newChunk); + callback(null, getNextChunk(lines)); }, flush(callback) { // Emit an additional chunk with the content of lineBuffer if it has length if (lineBuffer.length > 0) { - const nextChunk = [lineBuffer] - .map((str) => str.substring(6)) - .filter((str) => !!str && str !== '[DONE]') - .map((line) => { - const openaiResponse = JSON.parse(line); - return openaiResponse.choices[0]?.delta.content ?? ''; - }) - .join(''); - const newChunk = encoder.encode(nextChunk); - callback(null, newChunk); - // const additionalChunk = encoder.encode(lineBuffer); - // callback(null, additionalChunk); + callback(null, getNextChunk([lineBuffer])); } else { callback(); } }, }); }; + +const getNextChunk = (lines: string[]) => { + const encoder = new TextEncoder(); + const nextChunk = lines + .map((str) => str.substring(6)) + .filter((str) => !!str && str !== '[DONE]') + .map((line) => { + const openaiResponse = JSON.parse(line); + return openaiResponse.choices[0]?.delta.content ?? ''; + }) + .join(''); + return encoder.encode(nextChunk); +};