From 92da61d92121416fae2d45c2191728abb74c09e5 Mon Sep 17 00:00:00 2001 From: Lukas Olson Date: Tue, 20 Aug 2024 15:22:48 -0700 Subject: [PATCH] ESQL streaming PoC --- .../data/common/search/expressions/esql.ts | 15 +++++++++++- .../search_interceptor/search_interceptor.ts | 10 ++++++++ .../data/server/search/routes/search.ts | 9 ++++++- .../data/server/search/search_service.ts | 6 +---- .../esql_async_search_strategy.ts | 24 +++++++++---------- 5 files changed, 45 insertions(+), 19 deletions(-) diff --git a/src/plugins/data/common/search/expressions/esql.ts b/src/plugins/data/common/search/expressions/esql.ts index d7af8ac5ffed1..16ccd934e3a21 100644 --- a/src/plugins/data/common/search/expressions/esql.ts +++ b/src/plugins/data/common/search/expressions/esql.ts @@ -166,10 +166,23 @@ export const getEsqlFn = ({ getStartDependencies }: EsqlFnArguments) => { fieldName: timeField, }); + const delayFilter = { + error_query: { + indices: [ + { + name: 'qbserve-*', + error_type: 'warning', + message: "'Watch out!'", + stall_time_seconds: 5, + }, + ], + }, + }; + params.filter = buildEsQuery( undefined, input.query || [], - [...(input.filters ?? []), ...(timeFilter ? [timeFilter] : [])], + [delayFilter, ...(input.filters ?? []), ...(timeFilter ? [timeFilter] : [])], esQueryConfigs ); } diff --git a/src/plugins/data/public/search/search_interceptor/search_interceptor.ts b/src/plugins/data/public/search/search_interceptor/search_interceptor.ts index 7f54c63592b14..98a325462ecf3 100644 --- a/src/plugins/data/public/search/search_interceptor/search_interceptor.ts +++ b/src/plugins/data/public/search/search_interceptor/search_interceptor.ts @@ -453,6 +453,16 @@ export class SearchInterceptor { ...searchOptions, }), }) + .then((response) => { + return response.rawResponse + ? response + : { + id: response.id, + rawResponse: response, + isPartial: response.is_partial, + isRunning: response.is_running, + }; + }) .catch((e: IHttpFetchError) => { if (e?.body) { throw e.body; diff --git a/src/plugins/data/server/search/routes/search.ts b/src/plugins/data/server/search/routes/search.ts index 7d05c27cd5781..cc783b369d4b5 100644 --- a/src/plugins/data/server/search/routes/search.ts +++ b/src/plugins/data/server/search/routes/search.ts @@ -9,6 +9,7 @@ import { first } from 'rxjs'; import { schema } from '@kbn/config-schema'; import { reportServerError } from '@kbn/kibana-utils-plugin/server'; +import { PassThrough } from 'stream'; import { reportSearchError } from '../report_search_error'; import { getRequestAbortedSignal } from '../../lib'; import type { DataPluginRouter } from '../types'; @@ -73,7 +74,13 @@ export function registerSearchRoute(router: DataPluginRouter): void { .pipe(first()) .toPromise(); - return res.ok({ body: response }); + if (response.rawResponse.pipe) { + const stream = new PassThrough(); + response.rawResponse.pipe(stream); + return res.ok({ body: stream }); + } else { + return res.ok({ body: response }); + } } catch (err) { return reportSearchError(res, err); } diff --git a/src/plugins/data/server/search/search_service.ts b/src/plugins/data/server/search/search_service.ts index 09bee9bb8bab8..8c6edfd1734a3 100644 --- a/src/plugins/data/server/search/search_service.ts +++ b/src/plugins/data/server/search/search_service.ts @@ -394,11 +394,7 @@ export class SearchService implements Plugin { switchMap((searchRequest) => strategy.search(searchRequest, options, deps).pipe( concatMap((response) => { - response = { - ...response, - isRestored: !!searchRequest.id, - }; - + response.isRestored = !!searchRequest.id; if ( options.sessionId && // if within search session options.isStored && // and search session was saved (saved object exists) diff --git a/src/plugins/data/server/search/strategies/esql_async_search/esql_async_search_strategy.ts b/src/plugins/data/server/search/strategies/esql_async_search/esql_async_search_strategy.ts index a204b6ca69cca..772e439eacaf8 100644 --- a/src/plugins/data/server/search/strategies/esql_async_search/esql_async_search_strategy.ts +++ b/src/plugins/data/server/search/strategies/esql_async_search/esql_async_search_strategy.ts @@ -21,8 +21,8 @@ import { pollSearch } from '../../../../common'; import { getKbnSearchError } from '../../report_search_error'; import type { ISearchStrategy, SearchStrategyDependencies } from '../../types'; import type { IAsyncSearchOptions } from '../../../../common'; -import { toAsyncKibanaSearchResponse } from './response_utils'; import { SearchConfigSchema } from '../../../../config'; +import { sanitizeRequestParams } from '../../sanitize_request_params'; // `drop_null_columns` is going to change the response // now we get `all_columns` and `columns` @@ -73,14 +73,14 @@ export const esqlAsyncSearchStrategyProvider = ( ...(await getCommonDefaultAsyncSubmitParams(searchConfig, options)), ...requestParams, }; - const { body, headers, meta } = id + const response = id ? await client.transport.request( { method: 'GET', path: `/_query/async/${id}`, querystring: { ...params }, }, - { ...options.transport, signal: options.abortSignal, meta: true } + { ...options.transport, signal: options.abortSignal, meta: true, asStream: true } ) : await client.transport.request( { @@ -89,16 +89,16 @@ export const esqlAsyncSearchStrategyProvider = ( body: params, querystring: dropNullColumns ? 'drop_null_columns' : '', }, - { ...options.transport, signal: options.abortSignal, meta: true } + { ...options.transport, signal: options.abortSignal, meta: true, asStream: true } ); - - const finalResponse = toAsyncKibanaSearchResponse( - body, - headers?.warning, - // do not return requestParams on polling calls - id ? undefined : meta?.request?.params - ); - return finalResponse; + const { body, headers, meta } = response; + return { + id: headers['x-elasticsearch-async-id'], + rawResponse: body, + isRunning: headers['x-elasticsearch-async-is-running'] === '?1', + ...(headers?.warning ? { warning: headers?.warning } : {}), + ...(requestParams ? { requestParams: sanitizeRequestParams(meta?.request?.params) } : {}), + }; }; const cancel = async () => {