From 7b81bcf60a7fbc287fdfe2aa6b44009b04024229 Mon Sep 17 00:00:00 2001 From: Peter Pisljar Date: Thu, 7 Nov 2024 12:19:13 +0100 Subject: [PATCH] pass elasticsearch response to browser as a stream (#193060) Remove decoding and re-encoding of the ES-response from Kibana-server. Resolves https://github.com/elastic/kibana/issues/189640 --- packages/kbn-search-types/src/types.ts | 6 ++ .../search/strategies/es_search/index.ts | 1 + .../es_search/response_utils.test.ts | 30 +------ .../strategies/es_search/response_utils.ts | 19 ----- src/plugins/data/common/search/utils.ts | 6 +- .../search_interceptor/search_interceptor.ts | 85 +++++++++++++++++-- src/plugins/data/server/index.ts | 3 +- .../data/server/search/routes/search.ts | 28 +++++- .../es_search/es_search_strategy.test.ts | 30 ++++++- .../es_search/es_search_strategy.ts | 22 ++++- .../search/strategies/es_search/index.ts | 1 - .../ese_search/ese_search_strategy.test.ts | 21 +++++ .../ese_search/ese_search_strategy.ts | 30 +++---- .../strategies/ese_search/response_utils.ts | 20 +++-- .../esql_async_search_strategy.test.ts | 4 + .../esql_async_search_strategy.ts | 28 +++--- .../esql_async_search/response_utils.ts | 20 +++-- .../log_entry_search_strategy.test.ts | 12 ++- 18 files changed, 257 insertions(+), 109 deletions(-) rename src/plugins/data/{server => common}/search/strategies/es_search/response_utils.test.ts (77%) rename src/plugins/data/{server => common}/search/strategies/es_search/response_utils.ts (72%) diff --git a/packages/kbn-search-types/src/types.ts b/packages/kbn-search-types/src/types.ts index 0f6f3ecc9c06d..0f19dff76ccf7 100644 --- a/packages/kbn-search-types/src/types.ts +++ b/packages/kbn-search-types/src/types.ts @@ -114,6 +114,11 @@ export interface ISearchOptions { * To pass an abort signal, use {@link ISearchOptions.abortSignal} */ transport?: Omit; + + /** + * When set es results are streamed back to the caller without any parsing of the content. + */ + stream?: boolean; } /** @@ -130,4 +135,5 @@ export type ISearchOptionsSerializable = Pick< | 'isRestore' | 'retrieveResults' | 'executionContext' + | 'stream' >; diff --git a/src/plugins/data/common/search/strategies/es_search/index.ts b/src/plugins/data/common/search/strategies/es_search/index.ts index 34d385a3f5d62..978132b3130d7 100644 --- a/src/plugins/data/common/search/strategies/es_search/index.ts +++ b/src/plugins/data/common/search/strategies/es_search/index.ts @@ -8,3 +8,4 @@ */ export * from './types'; +export * from './response_utils'; diff --git a/src/plugins/data/server/search/strategies/es_search/response_utils.test.ts b/src/plugins/data/common/search/strategies/es_search/response_utils.test.ts similarity index 77% rename from src/plugins/data/server/search/strategies/es_search/response_utils.test.ts rename to src/plugins/data/common/search/strategies/es_search/response_utils.test.ts index add9653bf6a14..8c9fb108f23ef 100644 --- a/src/plugins/data/server/search/strategies/es_search/response_utils.test.ts +++ b/src/plugins/data/common/search/strategies/es_search/response_utils.test.ts @@ -7,7 +7,7 @@ * License v3.0 only", or the "Server Side Public License, v 1". */ -import { getTotalLoaded, toKibanaSearchResponse, shimHitsTotal } from './response_utils'; +import { getTotalLoaded, shimHitsTotal } from './response_utils'; import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey'; describe('response utils', () => { @@ -29,34 +29,6 @@ describe('response utils', () => { }); }); - describe('toKibanaSearchResponse', () => { - it('returns rawResponse, isPartial, isRunning, total, and loaded', () => { - const result = toKibanaSearchResponse({ - _shards: { - successful: 10, - failed: 5, - skipped: 5, - total: 100, - }, - } as unknown as estypes.SearchResponse); - - expect(result).toEqual({ - rawResponse: { - _shards: { - successful: 10, - failed: 5, - skipped: 5, - total: 100, - }, - }, - isRunning: false, - isPartial: false, - total: 100, - loaded: 15, - }); - }); - }); - describe('shimHitsTotal', () => { test('returns the total if it is already numeric', () => { const result = shimHitsTotal({ diff --git a/src/plugins/data/server/search/strategies/es_search/response_utils.ts b/src/plugins/data/common/search/strategies/es_search/response_utils.ts similarity index 72% rename from src/plugins/data/server/search/strategies/es_search/response_utils.ts rename to src/plugins/data/common/search/strategies/es_search/response_utils.ts index 09683dd22b45c..04d1a5d11cbee 100644 --- a/src/plugins/data/server/search/strategies/es_search/response_utils.ts +++ b/src/plugins/data/common/search/strategies/es_search/response_utils.ts @@ -7,10 +7,8 @@ * License v3.0 only", or the "Server Side Public License, v 1". */ -import type { ConnectionRequestParams } from '@elastic/transport'; import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey'; import { ISearchOptions } from '@kbn/search-types'; -import { sanitizeRequestParams } from '../../sanitize_request_params'; /** * Get the `total`/`loaded` for this response (see `IKibanaSearchResponse`). Note that `skipped` is @@ -23,23 +21,6 @@ export function getTotalLoaded(response: estypes.SearchResponse) { return { total, loaded }; } -/** - * Get the Kibana representation of this response (see `IKibanaSearchResponse`). - * @internal - */ -export function toKibanaSearchResponse( - rawResponse: estypes.SearchResponse, - requestParams?: ConnectionRequestParams -) { - return { - rawResponse, - isPartial: false, - isRunning: false, - ...(requestParams ? { requestParams: sanitizeRequestParams(requestParams) } : {}), - ...getTotalLoaded(rawResponse), - }; -} - /** * Temporary workaround until https://github.com/elastic/kibana/issues/26356 is addressed. * Since we are setting `track_total_hits` in the request, `hits.total` will be an object diff --git a/src/plugins/data/common/search/utils.ts b/src/plugins/data/common/search/utils.ts index 33c6b34ec3635..baf3fe79b1ac2 100644 --- a/src/plugins/data/common/search/utils.ts +++ b/src/plugins/data/common/search/utils.ts @@ -17,8 +17,10 @@ import { AggTypesDependencies } from '..'; /** * @returns true if response is abort */ -export const isAbortResponse = (response?: IKibanaSearchResponse) => { - return !response || !response.rawResponse; +export const isAbortResponse = ( + response?: IKibanaSearchResponse | { response: IKibanaSearchResponse } +) => { + return !response || !('rawResponse' in response || 'response' in response); }; /** diff --git a/src/plugins/data/public/search/search_interceptor/search_interceptor.ts b/src/plugins/data/public/search/search_interceptor/search_interceptor.ts index 174dc35697ebc..458171e64a1d3 100644 --- a/src/plugins/data/public/search/search_interceptor/search_interceptor.ts +++ b/src/plugins/data/public/search/search_interceptor/search_interceptor.ts @@ -60,11 +60,19 @@ import type { } from '@kbn/search-types'; import { createEsError, isEsError, renderSearchError } from '@kbn/search-errors'; import type { IKibanaSearchResponse, ISearchOptions } from '@kbn/search-types'; +import { + AsyncSearchGetResponse, + ErrorResponseBase, + SqlGetAsyncResponse, +} from '@elastic/elasticsearch/lib/api/typesWithBodyKey'; import { ENHANCED_ES_SEARCH_STRATEGY, + ESQL_ASYNC_SEARCH_STRATEGY, + getTotalLoaded, IAsyncSearchOptions, isRunningResponse, pollSearch, + shimHitsTotal, UI_SETTINGS, } from '../../../common'; import { SearchUsageCollector } from '../collectors'; @@ -445,14 +453,75 @@ export class SearchInterceptor { if (this.bFetchDisabled) { const { executionContext, strategy, ...searchOptions } = this.getSerializableOptions(options); return this.deps.http - .post(`/internal/search/${strategy}${request.id ? `/${request.id}` : ''}`, { - version: '1', - signal: abortSignal, - context: executionContext, - body: JSON.stringify({ - ...request, - ...searchOptions, - }), + .post( + `/internal/search/${strategy}${request.id ? `/${request.id}` : ''}`, + { + version: '1', + signal: abortSignal, + context: executionContext, + body: JSON.stringify({ + ...request, + ...searchOptions, + stream: + strategy === ESQL_ASYNC_SEARCH_STRATEGY || + strategy === ENHANCED_ES_SEARCH_STRATEGY || + strategy === undefined, // undefined strategy is treated as enhanced ES + }), + asResponse: true, + } + ) + .then((rawResponse) => { + const warning = rawResponse.response?.headers.get('warning'); + const requestParams = + rawResponse.body && 'requestParams' in rawResponse.body + ? rawResponse.body.requestParams + : JSON.parse(rawResponse.response?.headers.get('kbn-search-request-params') || '{}'); + const isRestored = + rawResponse.body && 'isRestored' in rawResponse.body + ? rawResponse.body.isRestored + : rawResponse.response?.headers.get('kbn-search-is-restored') === '?1'; + + if (rawResponse.body && 'error' in rawResponse.body) { + // eslint-disable-next-line no-throw-literal + throw { + attributes: { + error: rawResponse.body.error, + rawResponse: rawResponse.body, + requestParams, + isRestored, + }, + }; + } + + switch (strategy) { + case ENHANCED_ES_SEARCH_STRATEGY: + if (rawResponse.body?.rawResponse) return rawResponse.body; + const typedResponse = rawResponse.body as unknown as AsyncSearchGetResponse; + const shimmedResponse = shimHitsTotal(typedResponse.response, { + legacyHitsTotal: searchOptions.legacyHitsTotal, + }); + return { + id: typedResponse.id, + isPartial: typedResponse.is_partial, + isRunning: typedResponse.is_running, + rawResponse: shimmedResponse, + warning, + requestParams, + isRestored, + ...getTotalLoaded(shimmedResponse), + }; + case ESQL_ASYNC_SEARCH_STRATEGY: + const esqlResponse = rawResponse.body as unknown as SqlGetAsyncResponse; + return { + id: esqlResponse.id, + rawResponse: esqlResponse, + isPartial: esqlResponse.is_partial, + isRunning: esqlResponse.is_running, + warning, + }; + default: + return rawResponse.body; + } }) .catch((e: IHttpFetchError) => { if (e?.body) { diff --git a/src/plugins/data/server/index.ts b/src/plugins/data/server/index.ts index ea54bfb9f70ca..b4fe8438a46b1 100644 --- a/src/plugins/data/server/index.ts +++ b/src/plugins/data/server/index.ts @@ -68,12 +68,13 @@ export type { AsyncSearchStatusResponse, } from './search'; export { - shimHitsTotal, SearchSessionService, NoSearchIdInSessionError, INITIAL_SEARCH_SESSION_REST_VERSION, } from './search'; +export { shimHitsTotal } from '../common/search'; + // Search namespace export const search = { aggs: { diff --git a/src/plugins/data/server/search/routes/search.ts b/src/plugins/data/server/search/routes/search.ts index 3f92583236ef6..24ece630e7368 100644 --- a/src/plugins/data/server/search/routes/search.ts +++ b/src/plugins/data/server/search/routes/search.ts @@ -10,6 +10,7 @@ import { first } from 'rxjs'; import { schema } from '@kbn/config-schema'; import { reportServerError } from '@kbn/kibana-utils-plugin/server'; +import { IncomingMessage } from 'http'; import { reportSearchError } from '../report_search_error'; import { getRequestAbortedSignal } from '../../lib'; import type { DataPluginRouter } from '../types'; @@ -25,6 +26,12 @@ export function registerSearchRoute(router: DataPluginRouter): void { .addVersion( { version: '1', + security: { + authz: { + enabled: false, + reason: 'This route is opted out from authorization', + }, + }, validate: { request: { params: schema.object({ @@ -38,6 +45,7 @@ export function registerSearchRoute(router: DataPluginRouter): void { isStored: schema.maybe(schema.boolean()), isRestore: schema.maybe(schema.boolean()), retrieveResults: schema.maybe(schema.boolean()), + stream: schema.maybe(schema.boolean()), }, { unknowns: 'allow' } ), @@ -51,6 +59,7 @@ export function registerSearchRoute(router: DataPluginRouter): void { isStored, isRestore, retrieveResults, + stream, ...searchRequest } = request.body; const { strategy, id } = request.params; @@ -69,12 +78,23 @@ export function registerSearchRoute(router: DataPluginRouter): void { isStored, isRestore, retrieveResults, + stream, } ) .pipe(first()) .toPromise(); - return res.ok({ body: response }); + if (response && (response.rawResponse as unknown as IncomingMessage).pipe) { + return res.ok({ + body: response.rawResponse, + headers: { + 'kbn-search-is-restored': response.isRestored ? '?1' : '?0', + 'kbn-search-request-params': JSON.stringify(response.requestParams), + }, + }); + } else { + return res.ok({ body: response }); + } } catch (err) { return reportSearchError(res, err); } @@ -89,6 +109,12 @@ export function registerSearchRoute(router: DataPluginRouter): void { .addVersion( { version: '1', + security: { + authz: { + enabled: false, + reason: 'This route is opted out from authorization', + }, + }, validate: { request: { params: schema.object({ diff --git a/src/plugins/data/server/search/strategies/es_search/es_search_strategy.test.ts b/src/plugins/data/server/search/strategies/es_search/es_search_strategy.test.ts index 65ea7f6016729..c771cc08b5a5d 100644 --- a/src/plugins/data/server/search/strategies/es_search/es_search_strategy.test.ts +++ b/src/plugins/data/server/search/strategies/es_search/es_search_strategy.test.ts @@ -10,7 +10,7 @@ import * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey'; import { elasticsearchServiceMock } from '@kbn/core/server/mocks'; import { pluginInitializerContextConfigMock } from '@kbn/core/server/mocks'; -import { esSearchStrategyProvider } from './es_search_strategy'; +import { esSearchStrategyProvider, toKibanaSearchResponse } from './es_search_strategy'; import { SearchStrategyDependencies } from '../../types'; import indexNotFoundException from '../../../../common/search/test_data/index_not_found_exception.json'; @@ -211,3 +211,31 @@ describe('ES search strategy', () => { } }); }); + +describe('toKibanaSearchResponse', () => { + it('returns rawResponse, isPartial, isRunning, total, and loaded', () => { + const result = toKibanaSearchResponse({ + _shards: { + successful: 10, + failed: 5, + skipped: 5, + total: 100, + }, + } as unknown as estypes.SearchResponse); + + expect(result).toEqual({ + rawResponse: { + _shards: { + successful: 10, + failed: 5, + skipped: 5, + total: 100, + }, + }, + isRunning: false, + isPartial: false, + total: 100, + loaded: 15, + }); + }); +}); diff --git a/src/plugins/data/server/search/strategies/es_search/es_search_strategy.ts b/src/plugins/data/server/search/strategies/es_search/es_search_strategy.ts index 9955dec39866f..39e2b6616239d 100644 --- a/src/plugins/data/server/search/strategies/es_search/es_search_strategy.ts +++ b/src/plugins/data/server/search/strategies/es_search/es_search_strategy.ts @@ -8,15 +8,35 @@ */ import { firstValueFrom, from, Observable } from 'rxjs'; +import type { ConnectionRequestParams } from '@elastic/transport'; import { tap } from 'rxjs'; import type { Logger, SharedGlobalConfig } from '@kbn/core/server'; +import { estypes } from '@elastic/elasticsearch'; +import { shimHitsTotal, getTotalLoaded } from '../../../../common'; +import { sanitizeRequestParams } from '../../sanitize_request_params'; import { getKbnSearchError, KbnSearchError } from '../../report_search_error'; import type { ISearchStrategy } from '../../types'; import type { SearchUsage } from '../../collectors/search'; import { getDefaultSearchParams, getShardTimeout } from './request_utils'; -import { shimHitsTotal, toKibanaSearchResponse } from './response_utils'; import { searchUsageObserver } from '../../collectors/search/usage'; +/** + * Get the Kibana representation of this response (see `IKibanaSearchResponse`). + * @internal + */ +export function toKibanaSearchResponse( + rawResponse: estypes.SearchResponse, + requestParams?: ConnectionRequestParams +) { + return { + rawResponse, + isPartial: false, + isRunning: false, + ...(requestParams ? { requestParams: sanitizeRequestParams(requestParams) } : {}), + ...getTotalLoaded(rawResponse), + }; +} + export const esSearchStrategyProvider = ( config$: Observable, logger: Logger, diff --git a/src/plugins/data/server/search/strategies/es_search/index.ts b/src/plugins/data/server/search/strategies/es_search/index.ts index 5d2104e47e1c0..2bd2a0f72d13a 100644 --- a/src/plugins/data/server/search/strategies/es_search/index.ts +++ b/src/plugins/data/server/search/strategies/es_search/index.ts @@ -9,5 +9,4 @@ export { esSearchStrategyProvider } from './es_search_strategy'; export * from './request_utils'; -export * from './response_utils'; export { ES_SEARCH_STRATEGY } from '../../../../common'; diff --git a/src/plugins/data/server/search/strategies/ese_search/ese_search_strategy.test.ts b/src/plugins/data/server/search/strategies/ese_search/ese_search_strategy.test.ts index 2274cdf952309..a778ebbc89675 100644 --- a/src/plugins/data/server/search/strategies/ese_search/ese_search_strategy.test.ts +++ b/src/plugins/data/server/search/strategies/ese_search/ese_search_strategy.test.ts @@ -33,6 +33,11 @@ const mockAsyncStatusResponse = (isComplete = false) => ({ failed: 0, }, }, + headers: { + 'x-elasticsearch-async-id': + 'FlVYVkw0clJIUS1TMHpHdXA3a29pZUEedldKX1c1bnBRVXFmalZ4emV1cjFCUToxNjYzMDgx', + 'x-elasticsearch-async-is-running': isComplete ? '?0' : '?1', + }, }); const mockAsyncResponse = { @@ -47,6 +52,10 @@ const mockAsyncResponse = { }, }, }, + headers: { + 'x-elasticsearch-async-id': 'foo', + 'x-elasticsearch-async-is-running': '?0', + }, }; const mockRollupResponse = { @@ -335,6 +344,10 @@ describe('ES search strategy', () => { ...mockAsyncResponse.body, is_running: true, }, + headers: { + ...mockAsyncResponse.headers, + 'x-elasticsearch-async-is-running': '?1', + }, }); const params = { index: 'logstash-*', body: { query: {} } }; @@ -367,6 +380,10 @@ describe('ES search strategy', () => { ...mockAsyncResponse.body, is_running: true, }, + headers: { + ...mockAsyncResponse.headers, + 'x-elasticsearch-async-is-running': '?1', + }, }); const errResponse = new errors.ResponseError({ @@ -518,6 +535,10 @@ describe('ES search strategy', () => { ...mockAsyncResponse.body, is_running: true, }, + headers: { + ...mockAsyncResponse.headers, + 'x-elasticsearch-async-is-running': '?1', + }, }); const params = { index: 'logstash-*', body: { query: {} } }; diff --git a/src/plugins/data/server/search/strategies/ese_search/ese_search_strategy.ts b/src/plugins/data/server/search/strategies/ese_search/ese_search_strategy.ts index 7e746b3adf571..dbaf97d03a633 100644 --- a/src/plugins/data/server/search/strategies/ese_search/ese_search_strategy.ts +++ b/src/plugins/data/server/search/strategies/ese_search/ese_search_strategy.ts @@ -26,12 +26,8 @@ import { } from './request_utils'; import { toAsyncKibanaSearchResponse, toAsyncKibanaSearchStatusResponse } from './response_utils'; import { SearchUsage, searchUsageObserver } from '../../collectors/search'; -import { - getDefaultSearchParams, - getShardTimeout, - getTotalLoaded, - shimHitsTotal, -} from '../es_search'; +import { getDefaultSearchParams, getShardTimeout } from '../es_search'; +import { getTotalLoaded, shimHitsTotal } from '../../../../common/search/strategies/es_search'; import { SearchConfigSchema } from '../../../config'; import { sanitizeRequestParams } from '../../sanitize_request_params'; @@ -85,12 +81,17 @@ export const enhancedEsSearchStrategyProvider = ( ? { wait_for_completion_timeout: request.params.wait_for_completion_timeout } : {}), }; - const { body, headers } = await client.asyncSearch.get( + const { body, headers, meta } = await client.asyncSearch.get( { ...params, id: id! }, - { ...options.transport, signal: options.abortSignal, meta: true } + { + ...options.transport, + signal: options.abortSignal, + meta: true, + asStream: options.stream, + } ); - const response = shimHitsTotal(body.response, options); - return toAsyncKibanaSearchResponse({ ...body, response }, headers?.warning); + + return toAsyncKibanaSearchResponse(body, headers, meta?.request?.params, options); } async function submitAsyncSearch( @@ -107,13 +108,10 @@ export const enhancedEsSearchStrategyProvider = ( ...options.transport, signal: options.abortSignal, meta: true, + asStream: options.stream, }); - const response = shimHitsTotal(body.response, options); - return toAsyncKibanaSearchResponse( - { ...body, response }, - headers?.warning, - meta?.request?.params - ); + + return toAsyncKibanaSearchResponse(body, headers, meta?.request?.params, options); } function asyncSearch( diff --git a/src/plugins/data/server/search/strategies/ese_search/response_utils.ts b/src/plugins/data/server/search/strategies/ese_search/response_utils.ts index af9a122d57479..cb8470a90e41a 100644 --- a/src/plugins/data/server/search/strategies/ese_search/response_utils.ts +++ b/src/plugins/data/server/search/strategies/ese_search/response_utils.ts @@ -9,10 +9,11 @@ import type { ConnectionRequestParams } from '@elastic/transport'; import type { IKibanaSearchResponse } from '@kbn/search-types'; +import { IncomingHttpHeaders } from 'http'; import type { AsyncSearchResponse } from './types'; -import { getTotalLoaded } from '../es_search'; import { sanitizeRequestParams } from '../../sanitize_request_params'; import { AsyncSearchStatusResponse } from './types'; +import { shimHitsTotal, getTotalLoaded, IAsyncSearchOptions } from '../../../../common'; /** * Get the Kibana representation of an async search status response. @@ -35,16 +36,17 @@ export function toAsyncKibanaSearchStatusResponse( */ export function toAsyncKibanaSearchResponse( response: AsyncSearchResponse, - warning?: string, - requestParams?: ConnectionRequestParams + headers: IncomingHttpHeaders, + requestParams?: ConnectionRequestParams, + options?: IAsyncSearchOptions ): IKibanaSearchResponse { return { - id: response.id, - rawResponse: response.response, - isPartial: response.is_partial, - isRunning: response.is_running, - ...(warning ? { warning } : {}), + id: headers['x-elasticsearch-async-id'] as string, + rawResponse: response.response ? shimHitsTotal(response.response, options) : response, + isPartial: headers['x-elasticsearch-async-is-running'] === '?1', + isRunning: headers['x-elasticsearch-async-is-running'] === '?1', + ...(headers.warning ? { warning: headers.warning } : {}), ...(requestParams ? { requestParams: sanitizeRequestParams(requestParams) } : {}), - ...getTotalLoaded(response.response), + ...(response.response ? getTotalLoaded(response.response) : {}), }; } diff --git a/src/plugins/data/server/search/strategies/esql_async_search/esql_async_search_strategy.test.ts b/src/plugins/data/server/search/strategies/esql_async_search/esql_async_search_strategy.test.ts index f75d56a481eaa..3d1f32e81a7f1 100644 --- a/src/plugins/data/server/search/strategies/esql_async_search/esql_async_search_strategy.test.ts +++ b/src/plugins/data/server/search/strategies/esql_async_search/esql_async_search_strategy.test.ts @@ -29,6 +29,10 @@ const mockAsyncResponse = { }, }, }, + headers: { + 'x-elasticsearch-async-id': 'foo', + 'x-elasticsearch-async-is-running': '?0', + }, }; describe('ES|QL async search strategy', () => { diff --git a/src/plugins/data/server/search/strategies/esql_async_search/esql_async_search_strategy.ts b/src/plugins/data/server/search/strategies/esql_async_search/esql_async_search_strategy.ts index 83b67e5ecb4fd..f8c686d8e4b9a 100644 --- a/src/plugins/data/server/search/strategies/esql_async_search/esql_async_search_strategy.ts +++ b/src/plugins/data/server/search/strategies/esql_async_search/esql_async_search_strategy.ts @@ -14,6 +14,7 @@ import type { IKibanaSearchResponse, IKibanaSearchRequest } from '@kbn/search-ty import { SqlQueryRequest } from '@elastic/elasticsearch/lib/api/typesWithBodyKey'; import { SqlGetAsyncResponse } from '@elastic/elasticsearch/lib/api/types'; import type { ESQLSearchParams } from '@kbn/es-types'; +import { toAsyncKibanaSearchResponse } from './response_utils'; import { getCommonDefaultAsyncSubmitParams, getCommonDefaultAsyncGetParams, @@ -22,7 +23,6 @@ import { pollSearch } from '../../../../common'; import { getKbnSearchError } from '../../report_search_error'; import type { ISearchStrategy, SearchStrategyDependencies } from '../../types'; import type { IAsyncSearchOptions } from '../../../../common'; -import { toAsyncKibanaSearchResponse } from './response_utils'; import { SearchConfigSchema } from '../../../config'; // `drop_null_columns` is going to change the response @@ -74,14 +74,19 @@ export const esqlAsyncSearchStrategyProvider = ( ...(await getCommonDefaultAsyncSubmitParams(searchConfig, options)), ...requestParams, }; - const { body, headers, meta } = id + const response = id ? await client.transport.request( { method: 'GET', path: `/_query/async/${id}`, querystring: { ...params, drop_null_columns: dropNullColumns }, }, - { ...options.transport, signal: options.abortSignal, meta: true } + { + ...options.transport, + signal: options.abortSignal, + meta: true, + asStream: options.stream, + } ) : await client.transport.request( { @@ -90,16 +95,17 @@ export const esqlAsyncSearchStrategyProvider = ( body: params, querystring: dropNullColumns ? 'drop_null_columns' : '', }, - { ...options.transport, signal: options.abortSignal, meta: true } + { + ...options.transport, + signal: options.abortSignal, + meta: true, + asStream: options.stream, + } ); - const finalResponse = toAsyncKibanaSearchResponse( - body, - headers?.warning, - // do not return requestParams on polling calls - id ? undefined : meta?.request?.params - ); - return finalResponse; + const { body, headers, meta } = response; + + return toAsyncKibanaSearchResponse(body, headers, meta?.request?.params); }; const cancel = async () => { diff --git a/src/plugins/data/server/search/strategies/esql_async_search/response_utils.ts b/src/plugins/data/server/search/strategies/esql_async_search/response_utils.ts index 1c29906ea336e..0d7a63529314c 100644 --- a/src/plugins/data/server/search/strategies/esql_async_search/response_utils.ts +++ b/src/plugins/data/server/search/strategies/esql_async_search/response_utils.ts @@ -10,6 +10,7 @@ import type { ConnectionRequestParams } from '@elastic/transport'; import { SqlGetAsyncResponse } from '@elastic/elasticsearch/lib/api/types'; import type { IKibanaSearchResponse } from '@kbn/search-types'; +import { IncomingHttpHeaders } from 'http'; import { sanitizeRequestParams } from '../../sanitize_request_params'; /** @@ -17,17 +18,20 @@ import { sanitizeRequestParams } from '../../sanitize_request_params'; */ export function toAsyncKibanaSearchResponse( response: SqlGetAsyncResponse, - warning?: string, + headers: IncomingHttpHeaders, requestParams?: ConnectionRequestParams ): IKibanaSearchResponse { + const responseIsStream = response.id === undefined; return { - id: response.id, - rawResponse: { - ...response, - }, - isPartial: response.is_partial, - isRunning: response.is_running, - ...(warning ? { warning } : {}), + id: responseIsStream ? (headers['x-elasticsearch-async-id'] as string) : response.id, + rawResponse: response, + isRunning: responseIsStream + ? headers['x-elasticsearch-async-is-running'] === '?1' + : response.is_running, + isPartial: responseIsStream + ? headers['x-elasticsearch-async-is-partial'] === '?1' + : response.is_partial, + ...(headers?.warning ? { warning: headers?.warning } : {}), ...(requestParams ? { requestParams: sanitizeRequestParams(requestParams) } : {}), }; } diff --git a/x-pack/plugins/observability_solution/logs_shared/server/services/log_entries/log_entry_search_strategy.test.ts b/x-pack/plugins/observability_solution/logs_shared/server/services/log_entries/log_entry_search_strategy.test.ts index 7c46fe37d649e..5a8cac9e223bb 100644 --- a/x-pack/plugins/observability_solution/logs_shared/server/services/log_entries/log_entry_search_strategy.test.ts +++ b/x-pack/plugins/observability_solution/logs_shared/server/services/log_entries/log_entry_search_strategy.test.ts @@ -48,7 +48,11 @@ describe('LogEntry search strategy', () => { start_time_in_millis: 0, }, statusCode: 200, - headers: {}, + headers: { + 'x-elasticsearch-async-id': 'ASYNC_REQUEST_ID', + 'x-elasticsearch-async-is-running': '?0', + 'x-elasticsearch-async-is-partial': '?0', + }, warnings: [], meta: {} as any, } as TransportResult as any); // type inference for the mock fails @@ -149,7 +153,11 @@ describe('LogEntry search strategy', () => { start_time_in_millis: 0, }, statusCode: 200, - headers: {}, + headers: { + 'x-elasticsearch-async-id': 'ASYNC_REQUEST_ID', + 'x-elasticsearch-async-is-running': '?0', + 'x-elasticsearch-async-is-partial': '?0', + }, warnings: [], meta: {} as any, } as TransportResult as any);