From 4aeb7905722fc4c3befa96ad7913fde8d26586f3 Mon Sep 17 00:00:00 2001 From: Dario Gieselaar Date: Wed, 26 Jun 2024 15:46:23 +0200 Subject: [PATCH] [Obs AI Assistant] Keep connection open, limit no of fields (#186811) Keeps the connection open even when there is no data, to prevent long-running operations from timing out. Additionally, puts an upper limit of field names to be analyzed. --- .../get_relevant_field_names.ts | 14 +++++++++--- .../functions/get_dataset_info/index.ts | 4 ++-- .../server/service/util/flush_buffer.ts | 22 ++++++++++++------- 3 files changed, 27 insertions(+), 13 deletions(-) diff --git a/x-pack/plugins/observability_solution/observability_ai_assistant/server/functions/get_dataset_info/get_relevant_field_names.ts b/x-pack/plugins/observability_solution/observability_ai_assistant/server/functions/get_dataset_info/get_relevant_field_names.ts index 557f09784c7f9..74d786bb6727d 100644 --- a/x-pack/plugins/observability_solution/observability_ai_assistant/server/functions/get_dataset_info/get_relevant_field_names.ts +++ b/x-pack/plugins/observability_solution/observability_ai_assistant/server/functions/get_dataset_info/get_relevant_field_names.ts @@ -33,7 +33,7 @@ export async function getRelevantFieldNames({ messages: Message[]; chat: FunctionCallChatFunction; signal: AbortSignal; -}): Promise<{ fields: string[] }> { +}): Promise<{ fields: string[]; stats: { analyzed: number; total: number } }> { const dataViewsService = await dataViews.dataViewsServiceFactory(savedObjectsClient, esClient); const hasAnyHitsResponse = await esClient.search({ @@ -89,8 +89,13 @@ export async function getRelevantFieldNames({ const shortIdTable = new ShortIdTable(); + const MAX_CHUNKS = 5; + const FIELD_NAMES_PER_CHUNK = 250; + + const fieldNamesToAnalyze = fieldNames.slice(0, MAX_CHUNKS * FIELD_NAMES_PER_CHUNK); + const relevantFields = await Promise.all( - chunk(fieldNames, 250).map(async (fieldsInChunk) => { + chunk(fieldNamesToAnalyze, FIELD_NAMES_PER_CHUNK).map(async (fieldsInChunk) => { const chunkResponse$ = ( await chat('get_relevant_dataset_names', { signal, @@ -165,5 +170,8 @@ export async function getRelevantFieldNames({ }) ); - return { fields: relevantFields.flat() }; + return { + fields: relevantFields.flat(), + stats: { analyzed: fieldNamesToAnalyze.length, total: fieldNames.length }, + }; } diff --git a/x-pack/plugins/observability_solution/observability_ai_assistant/server/functions/get_dataset_info/index.ts b/x-pack/plugins/observability_solution/observability_ai_assistant/server/functions/get_dataset_info/index.ts index 4b9128ed549f3..57cac3a4e0c0f 100644 --- a/x-pack/plugins/observability_solution/observability_ai_assistant/server/functions/get_dataset_info/index.ts +++ b/x-pack/plugins/observability_solution/observability_ai_assistant/server/functions/get_dataset_info/index.ts @@ -48,7 +48,7 @@ export function registerGetDatasetInfoFunction({ try { const body = await esClient.asCurrentUser.indices.resolveIndex({ - name: index === '' ? '*' : index, + name: index === '' ? '*' : index.split(','), expand_wildcards: 'open', }); indices = [ @@ -87,11 +87,11 @@ export function registerGetDatasetInfoFunction({ signal, chat, }); - return { content: { indices: [index], fields: relevantFieldNames.fields, + stats: relevantFieldNames.stats, }, }; } diff --git a/x-pack/plugins/observability_solution/observability_ai_assistant/server/service/util/flush_buffer.ts b/x-pack/plugins/observability_solution/observability_ai_assistant/server/service/util/flush_buffer.ts index eb494ec80bb50..a9826a180c969 100644 --- a/x-pack/plugins/observability_solution/observability_ai_assistant/server/service/util/flush_buffer.ts +++ b/x-pack/plugins/observability_solution/observability_ai_assistant/server/service/util/flush_buffer.ts @@ -6,7 +6,7 @@ */ import { repeat } from 'lodash'; -import { identity, Observable, OperatorFunction } from 'rxjs'; +import { Observable, OperatorFunction } from 'rxjs'; import { BufferFlushEvent, StreamingChatResponseEventType, @@ -22,10 +22,6 @@ import { export function flushBuffer( isCloud: boolean ): OperatorFunction { - if (!isCloud) { - return identity; - } - return (source: Observable) => new Observable((subscriber) => { const cloudProxyBufferSize = 4096; @@ -41,7 +37,15 @@ export function flushBuffer { + subscriber.next({ + data: '0', + type: StreamingChatResponseEventType.BufferFlush, + }); + }; + + const flushIntervalId = isCloud ? setInterval(flushBufferIfNeeded, 250) : undefined; + const keepAliveIntervalId = setInterval(keepAlive, 30_000); source.subscribe({ next: (value) => { @@ -52,11 +56,13 @@ export function flushBuffer { - clearInterval(intervalId); + clearInterval(flushIntervalId); + clearInterval(keepAliveIntervalId); subscriber.error(error); }, complete: () => { - clearInterval(intervalId); + clearInterval(flushIntervalId); + clearInterval(keepAliveIntervalId); subscriber.complete(); }, });