Skip to content

Commit

Permalink
[Obs AI Assistant] Filter token event in evaluation framework (#179692)
Browse files Browse the repository at this point in the history
  • Loading branch information
dgieselaar authored Mar 29, 2024
1 parent b65a751 commit 5f22d4a
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import {
StreamingChatResponseEventType,
type StreamingChatResponseEventWithoutError,
type StreamingChatResponseEvent,
TokenCountEvent,
} from '../../common/conversation_complete';
import {
FunctionRegistry,
Expand Down Expand Up @@ -163,10 +164,17 @@ export async function createChatService({

const subscription = toObservable(response)
.pipe(
map((line) => JSON.parse(line) as StreamingChatResponseEvent | BufferFlushEvent),
map(
(line) =>
JSON.parse(line) as
| StreamingChatResponseEvent
| BufferFlushEvent
| TokenCountEvent
),
filter(
(line): line is StreamingChatResponseEvent =>
line.type !== StreamingChatResponseEventType.BufferFlush
line.type !== StreamingChatResponseEventType.BufferFlush &&
line.type !== StreamingChatResponseEventType.TokenCount
),
throwSerializedChatCompletionErrors()
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import {
MessageAddEvent,
StreamingChatResponseEvent,
StreamingChatResponseEventType,
TokenCountEvent,
} from '../../common/conversation_complete';
import { ObservabilityAIAssistantScreenContext } from '../../common/types';
import { concatenateChatCompletionChunks } from '../../common/utils/concatenate_chat_completion_chunks';
Expand Down Expand Up @@ -240,11 +241,15 @@ export class KibanaClient {
.split('\n')
.map((line) => line.trim())
.filter(Boolean)
.map((line) => JSON.parse(line) as StreamingChatResponseEvent | BufferFlushEvent)
.map(
(line) =>
JSON.parse(line) as StreamingChatResponseEvent | BufferFlushEvent | TokenCountEvent
)
),
filter(
(line): line is ChatCompletionChunkEvent | ChatCompletionErrorEvent =>
line.type !== StreamingChatResponseEventType.BufferFlush
line.type === StreamingChatResponseEventType.ChatCompletionChunk ||
line.type === StreamingChatResponseEventType.ChatCompletionError
),
throwSerializedChatCompletionErrors(),
concatenateChatCompletionChunks()
Expand Down Expand Up @@ -329,7 +334,13 @@ export class KibanaClient {
.split('\n')
.map((line) => line.trim())
.filter(Boolean)
.map((line) => JSON.parse(line) as StreamingChatResponseEvent | BufferFlushEvent)
.map(
(line) =>
JSON.parse(line) as
| StreamingChatResponseEvent
| BufferFlushEvent
| TokenCountEvent
)
),
filter(
(event): event is MessageAddEvent | ConversationCreateEvent =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import { identity, pickBy } from 'lodash';
export type KibanaConfig = ReturnType<typeof readKibanaConfig>;

export const readKibanaConfig = () => {
const kibanaConfigDir = path.join(__filename, '../../../../../../config');
const kibanaConfigDir = path.join(__filename, '../../../../../../../config');
const kibanaDevConfig = path.join(kibanaConfigDir, 'kibana.dev.yml');
const kibanaConfig = path.join(kibanaConfigDir, 'kibana.yml');

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ export class ObservabilityAIAssistantClient {
`Token count for conversation: ${JSON.stringify(tokenCountResult)}`
);

apm.addLabels({
apm.currentTransaction?.addLabels({
tokenCountPrompt: tokenCountResult.prompt,
tokenCountCompletion: tokenCountResult.completion,
tokenCountTotal: tokenCountResult.total,
Expand Down Expand Up @@ -632,28 +632,37 @@ export class ObservabilityAIAssistantClient {
signal.addEventListener('abort', () => response.destroy());

const response$ = adapter.streamIntoObservable(response).pipe(shareReplay());

response$
.pipe(rejectTokenCountEvents(), concatenateChatCompletionChunks(), lastOperator())
.subscribe({
error: (error) => {
this.dependencies.logger.debug('Error in chat response');
this.dependencies.logger.debug(error);
span?.setOutcome('failure');
span?.end();
},
next: (message) => {
this.dependencies.logger.debug(`Received message:\n${JSON.stringify(message)}`);
},
complete: () => {
span?.setOutcome('success');
span?.end();
},
});

lastValueFrom(response$)
.then(() => {
span?.setOutcome('success');
})
.catch(() => {
span?.setOutcome('failure');
})
.finally(() => {
span?.end();
});
response$.subscribe({
next: (event) => {
if (event.type === StreamingChatResponseEventType.TokenCount) {
span?.addLabels({
tokenCountPrompt: event.tokens.prompt,
tokenCountCompletion: event.tokens.completion,
tokenCountTotal: event.tokens.total,
});
}
},
error: () => {},
});

return response$;
} catch (error) {
Expand Down

0 comments on commit 5f22d4a

Please sign in to comment.