Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Security solution] Bedrock streaming and token tracking #170815

Merged
merged 38 commits into from
Nov 16, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
a1929d0
bedrock magic
stephmilovic Nov 7, 2023
f49cf23
cleanup
stephmilovic Nov 7, 2023
8738b15
cleanup and openai wip
stephmilovic Nov 8, 2023
427f6d2
better
stephmilovic Nov 8, 2023
e32ffdb
wip
stephmilovic Nov 8, 2023
237f6fc
token tracking for bedrock
stephmilovic Nov 9, 2023
4604df4
cleanup
stephmilovic Nov 9, 2023
9e69031
rm
stephmilovic Nov 9, 2023
1a396f5
cleanup
stephmilovic Nov 9, 2023
8696210
fix api tests
stephmilovic Nov 9, 2023
2eabb1b
token tests
stephmilovic Nov 10, 2023
b1154db
security solution tests
stephmilovic Nov 10, 2023
a375387
stack connector tests
stephmilovic Nov 10, 2023
886ad47
Merge branch 'main' into bedrock_streaming
stephmilovic Nov 14, 2023
ab85ac4
WIP
stephmilovic Nov 14, 2023
f5c8a85
update package.json whitespace?
stephmilovic Nov 14, 2023
a57fae7
Merge branch 'bedrock_streaming' into bedrock_streaming_integration_t…
stephmilovic Nov 14, 2023
fe858d7
cleanup
stephmilovic Nov 14, 2023
b1e70a7
fix
stephmilovic Nov 14, 2023
ef6fc8e
make streamApi private
stephmilovic Nov 14, 2023
4f95837
Merge branch 'bedrock_streaming' into bedrock_streaming_integration_t…
stephmilovic Nov 14, 2023
b16913f
comment the code better
stephmilovic Nov 14, 2023
6875c63
fix comment
stephmilovic Nov 14, 2023
9ab90b4
Merge branch 'bedrock_streaming' into bedrock_streaming_integration_t…
stephmilovic Nov 15, 2023
b291fe3
Sergi PR changes
stephmilovic Nov 15, 2023
c8957c5
Sergi was right
stephmilovic Nov 15, 2023
ae4e85d
one more!
stephmilovic Nov 15, 2023
40bd1c9
Merge branch 'bedrock_streaming' into bedrock_streaming_integration_t…
stephmilovic Nov 15, 2023
3f70494
fix whoops
stephmilovic Nov 15, 2023
902e7ba
add tests for shouldTrackGenAiToken
stephmilovic Nov 15, 2023
10c188b
Merge branch 'bedrock_streaming' into bedrock_streaming_integration_t…
stephmilovic Nov 15, 2023
325b0ff
commit
stephmilovic Nov 15, 2023
b5e8e83
done?
stephmilovic Nov 15, 2023
ce6668f
really
stephmilovic Nov 15, 2023
33d1f92
better error handling
stephmilovic Nov 15, 2023
5b7cf70
tests for line buffer
stephmilovic Nov 16, 2023
cd6cb1e
fixed
stephmilovic Nov 16, 2023
25ffb7e
Merge branch 'main' into bedrock_streaming
stephmilovic Nov 16, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -276,11 +276,16 @@ describe('OpenAIConnector', () => {
describe('invokeStream', () => {
let stream;
beforeEach(() => {
const chunk1 =
'data: {"object":"chat.completion.chunk","choices":[{"delta":{"content":"My"}}]}\ndata: {"object":"chat.completion.chunk","choices":[{"delta":{"content":" new"}}]}';

stream = createStreamMock();
stream.write(`data: ${JSON.stringify(mockResponse.data)}`);
stream.write(chunk1);
// .write(`data: ${JSON.stringify(mockResponse.data)}`);
mockRequest = jest.fn().mockResolvedValue({ ...mockResponse, data: stream.transform });
// @ts-ignore
connector.request = mockRequest;
stream.complete();
stephmilovic marked this conversation as resolved.
Show resolved Hide resolved
});

it('the API call is successful with correct request parameters', async () => {
Expand Down Expand Up @@ -314,7 +319,53 @@ describe('OpenAIConnector', () => {
responseBody += data.toString();
});
await waitFor(() => {
expect(responseBody).toEqual(mockResponseString);
expect(responseBody).toEqual('My new');
});
});
it('correctly buffers stream of json lines', async () => {
stream = createStreamMock();
const chunk1 =
'data: {"object":"chat.completion.chunk","choices":[{"delta":{"content":"My"}}]}\ndata: {"object":"chat.completion.chunk","choices":[{"delta":{"content":" new"}}]}';
const chunk2 =
'\ndata: {"object":"chat.completion.chunk","choices":[{"delta":{"content":" message"}}]}\ndata: [DONE]';

stream.write(chunk1);
stream.write(chunk2);
mockRequest = jest.fn().mockResolvedValue({ ...mockResponse, data: stream.transform });
// @ts-ignore
connector.request = mockRequest;

const response = await connector.invokeStream(sampleOpenAiBody);

let responseBody: string = '';
response.on('data', (data: string) => {
responseBody += data.toString();
});
await waitFor(() => {
expect(responseBody).toEqual('My new message');
});
});
it('correctly buffers partial lines', async () => {
stream = createStreamMock();
const chunk1 =
'data: {"object":"chat.completion.chunk","choices":[{"delta":{"content":"My"}}]}\ndata: {"object":"chat.completion.chunk","choices":[{"delta":{"content":" new"';
const chunk2 =
'}}]}\ndata: {"object":"chat.completion.chunk","choices":[{"delta":{"content":" message"}}]}\ndata: [DONE]';

stream.write(chunk1);
stream.write(chunk2);
mockRequest = jest.fn().mockResolvedValue({ ...mockResponse, data: stream.transform });
// @ts-ignore
connector.request = mockRequest;

const response = await connector.invokeStream(sampleOpenAiBody);

let responseBody: string = '';
response.on('data', (data: string) => {
responseBody += data.toString();
});
await waitFor(() => {
expect(responseBody).toEqual('My new message');
});
});
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ export class OpenAIConnector extends SubActionConnector<Config, Secrets> {
body: JSON.stringify(body),
stream: true,
})) as unknown as IncomingMessage;

return res.pipe(new PassThrough()).pipe(transformToString());
}

Expand Down Expand Up @@ -236,20 +237,20 @@ export class OpenAIConnector extends SubActionConnector<Config, Secrets> {
*/
const transformToString = () => {
let lineBuffer: string = '';
const decoder = new TextDecoder();
const encoder = new TextEncoder();

return new Transform({
transform(chunk, encoding, callback) {
const decoder = new TextDecoder();
const encoder = new TextEncoder();
const lines = decoder.decode(chunk).split('\n');
const chunks = decoder.decode(chunk);
const lines = chunks.split('\n');
if (lines[lines.length - 1] === '') {
stephmilovic marked this conversation as resolved.
Show resolved Hide resolved
lines.pop();
}
lines[0] = lineBuffer + lines[0];

lineBuffer = lines.pop() || '';

const nextChunk = lines
// every line starts with "data: ", we remove it and are left with stringified JSON or the string "[DONE]"
.map((str) => str.substring(6))
// filter out empty lines and the "[DONE]" string
.filter((str) => !!str && str !== '[DONE]')
.map((line) => {
const openaiResponse = JSON.parse(line);
Expand All @@ -259,5 +260,24 @@ const transformToString = () => {
const newChunk = encoder.encode(nextChunk);
callback(null, newChunk);
},
flush(callback) {
// Emit an additional chunk with the content of lineBuffer if it has length
if (lineBuffer.length > 0) {
const nextChunk = [lineBuffer]
.map((str) => str.substring(6))
.filter((str) => !!str && str !== '[DONE]')
.map((line) => {
const openaiResponse = JSON.parse(line);
return openaiResponse.choices[0]?.delta.content ?? '';
})
.join('');
const newChunk = encoder.encode(nextChunk);
callback(null, newChunk);
stephmilovic marked this conversation as resolved.
Show resolved Hide resolved
// const additionalChunk = encoder.encode(lineBuffer);
// callback(null, additionalChunk);
} else {
callback();
}
},
});
};