diff --git a/src/plugins/data/public/search/search_interceptor/search_interceptor.ts b/src/plugins/data/public/search/search_interceptor/search_interceptor.ts index 7f54c63592b14..98a325462ecf3 100644 --- a/src/plugins/data/public/search/search_interceptor/search_interceptor.ts +++ b/src/plugins/data/public/search/search_interceptor/search_interceptor.ts @@ -453,6 +453,16 @@ export class SearchInterceptor { ...searchOptions, }), }) + .then((response) => { + return response.rawResponse + ? response + : { + id: response.id, + rawResponse: response, + isPartial: response.is_partial, + isRunning: response.is_running, + }; + }) .catch((e: IHttpFetchError) => { if (e?.body) { throw e.body; diff --git a/src/plugins/data/server/search/routes/search.ts b/src/plugins/data/server/search/routes/search.ts index 7d05c27cd5781..cc783b369d4b5 100644 --- a/src/plugins/data/server/search/routes/search.ts +++ b/src/plugins/data/server/search/routes/search.ts @@ -9,6 +9,7 @@ import { first } from 'rxjs'; import { schema } from '@kbn/config-schema'; import { reportServerError } from '@kbn/kibana-utils-plugin/server'; +import { PassThrough } from 'stream'; import { reportSearchError } from '../report_search_error'; import { getRequestAbortedSignal } from '../../lib'; import type { DataPluginRouter } from '../types'; @@ -73,7 +74,13 @@ export function registerSearchRoute(router: DataPluginRouter): void { .pipe(first()) .toPromise(); - return res.ok({ body: response }); + if (response.rawResponse.pipe) { + const stream = new PassThrough(); + response.rawResponse.pipe(stream); + return res.ok({ body: stream }); + } else { + return res.ok({ body: response }); + } } catch (err) { return reportSearchError(res, err); } diff --git a/src/plugins/data/server/search/search_service.ts b/src/plugins/data/server/search/search_service.ts index 09bee9bb8bab8..8c6edfd1734a3 100644 --- a/src/plugins/data/server/search/search_service.ts +++ b/src/plugins/data/server/search/search_service.ts @@ -394,11 +394,7 @@ export class SearchService implements Plugin { switchMap((searchRequest) => strategy.search(searchRequest, options, deps).pipe( concatMap((response) => { - response = { - ...response, - isRestored: !!searchRequest.id, - }; - + response.isRestored = !!searchRequest.id; if ( options.sessionId && // if within search session options.isStored && // and search session was saved (saved object exists) diff --git a/src/plugins/data/server/search/strategies/esql_async_search/esql_async_search_strategy.ts b/src/plugins/data/server/search/strategies/esql_async_search/esql_async_search_strategy.ts index a204b6ca69cca..772e439eacaf8 100644 --- a/src/plugins/data/server/search/strategies/esql_async_search/esql_async_search_strategy.ts +++ b/src/plugins/data/server/search/strategies/esql_async_search/esql_async_search_strategy.ts @@ -21,8 +21,8 @@ import { pollSearch } from '../../../../common'; import { getKbnSearchError } from '../../report_search_error'; import type { ISearchStrategy, SearchStrategyDependencies } from '../../types'; import type { IAsyncSearchOptions } from '../../../../common'; -import { toAsyncKibanaSearchResponse } from './response_utils'; import { SearchConfigSchema } from '../../../../config'; +import { sanitizeRequestParams } from '../../sanitize_request_params'; // `drop_null_columns` is going to change the response // now we get `all_columns` and `columns` @@ -73,14 +73,14 @@ export const esqlAsyncSearchStrategyProvider = ( ...(await getCommonDefaultAsyncSubmitParams(searchConfig, options)), ...requestParams, }; - const { body, headers, meta } = id + const response = id ? await client.transport.request( { method: 'GET', path: `/_query/async/${id}`, querystring: { ...params }, }, - { ...options.transport, signal: options.abortSignal, meta: true } + { ...options.transport, signal: options.abortSignal, meta: true, asStream: true } ) : await client.transport.request( { @@ -89,16 +89,16 @@ export const esqlAsyncSearchStrategyProvider = ( body: params, querystring: dropNullColumns ? 'drop_null_columns' : '', }, - { ...options.transport, signal: options.abortSignal, meta: true } + { ...options.transport, signal: options.abortSignal, meta: true, asStream: true } ); - - const finalResponse = toAsyncKibanaSearchResponse( - body, - headers?.warning, - // do not return requestParams on polling calls - id ? undefined : meta?.request?.params - ); - return finalResponse; + const { body, headers, meta } = response; + return { + id: headers['x-elasticsearch-async-id'], + rawResponse: body, + isRunning: headers['x-elasticsearch-async-is-running'] === '?1', + ...(headers?.warning ? { warning: headers?.warning } : {}), + ...(requestParams ? { requestParams: sanitizeRequestParams(meta?.request?.params) } : {}), + }; }; const cancel = async () => {