Skip to content

Commit

Permalink
[Obs AI Assistant] Keep connection open, limit no of fields (#186811)
Browse files Browse the repository at this point in the history
Keeps the connection open even when there is no data, to prevent
long-running operations from timing out. Additionally, puts an upper
limit of field names to be analyzed.
  • Loading branch information
dgieselaar authored Jun 26, 2024
1 parent acde027 commit 4aeb790
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ export async function getRelevantFieldNames({
messages: Message[];
chat: FunctionCallChatFunction;
signal: AbortSignal;
}): Promise<{ fields: string[] }> {
}): Promise<{ fields: string[]; stats: { analyzed: number; total: number } }> {
const dataViewsService = await dataViews.dataViewsServiceFactory(savedObjectsClient, esClient);

const hasAnyHitsResponse = await esClient.search({
Expand Down Expand Up @@ -89,8 +89,13 @@ export async function getRelevantFieldNames({

const shortIdTable = new ShortIdTable();

const MAX_CHUNKS = 5;
const FIELD_NAMES_PER_CHUNK = 250;

const fieldNamesToAnalyze = fieldNames.slice(0, MAX_CHUNKS * FIELD_NAMES_PER_CHUNK);

const relevantFields = await Promise.all(
chunk(fieldNames, 250).map(async (fieldsInChunk) => {
chunk(fieldNamesToAnalyze, FIELD_NAMES_PER_CHUNK).map(async (fieldsInChunk) => {
const chunkResponse$ = (
await chat('get_relevant_dataset_names', {
signal,
Expand Down Expand Up @@ -165,5 +170,8 @@ export async function getRelevantFieldNames({
})
);

return { fields: relevantFields.flat() };
return {
fields: relevantFields.flat(),
stats: { analyzed: fieldNamesToAnalyze.length, total: fieldNames.length },
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ export function registerGetDatasetInfoFunction({

try {
const body = await esClient.asCurrentUser.indices.resolveIndex({
name: index === '' ? '*' : index,
name: index === '' ? '*' : index.split(','),
expand_wildcards: 'open',
});
indices = [
Expand Down Expand Up @@ -87,11 +87,11 @@ export function registerGetDatasetInfoFunction({
signal,
chat,
});

return {
content: {
indices: [index],
fields: relevantFieldNames.fields,
stats: relevantFieldNames.stats,
},
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
*/

import { repeat } from 'lodash';
import { identity, Observable, OperatorFunction } from 'rxjs';
import { Observable, OperatorFunction } from 'rxjs';
import {
BufferFlushEvent,
StreamingChatResponseEventType,
Expand All @@ -22,10 +22,6 @@ import {
export function flushBuffer<T extends StreamingChatResponseEventWithoutError | TokenCountEvent>(
isCloud: boolean
): OperatorFunction<T, T | BufferFlushEvent> {
if (!isCloud) {
return identity;
}

return (source: Observable<T>) =>
new Observable<T | BufferFlushEvent>((subscriber) => {
const cloudProxyBufferSize = 4096;
Expand All @@ -41,7 +37,15 @@ export function flushBuffer<T extends StreamingChatResponseEventWithoutError | T
}
};

const intervalId = setInterval(flushBufferIfNeeded, 250);
const keepAlive = () => {
subscriber.next({
data: '0',
type: StreamingChatResponseEventType.BufferFlush,
});
};

const flushIntervalId = isCloud ? setInterval(flushBufferIfNeeded, 250) : undefined;
const keepAliveIntervalId = setInterval(keepAlive, 30_000);

source.subscribe({
next: (value) => {
Expand All @@ -52,11 +56,13 @@ export function flushBuffer<T extends StreamingChatResponseEventWithoutError | T
subscriber.next(value);
},
error: (error) => {
clearInterval(intervalId);
clearInterval(flushIntervalId);
clearInterval(keepAliveIntervalId);
subscriber.error(error);
},
complete: () => {
clearInterval(intervalId);
clearInterval(flushIntervalId);
clearInterval(keepAliveIntervalId);
subscriber.complete();
},
});
Expand Down

0 comments on commit 4aeb790

Please sign in to comment.