Skip to content

Commit

Permalink
pass elasticsearch response to browser as a stream (elastic#193060)
Browse files Browse the repository at this point in the history
Remove decoding and re-encoding of the ES-response from Kibana-server.
Resolves elastic#189640
  • Loading branch information
ppisljar authored and mbondyra committed Nov 8, 2024
1 parent 62ba1fc commit 743cbc1
Show file tree
Hide file tree
Showing 18 changed files with 257 additions and 109 deletions.
6 changes: 6 additions & 0 deletions packages/kbn-search-types/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ export interface ISearchOptions {
* To pass an abort signal, use {@link ISearchOptions.abortSignal}
*/
transport?: Omit<TransportRequestOptions, 'signal'>;

/**
* When set es results are streamed back to the caller without any parsing of the content.
*/
stream?: boolean;
}

/**
Expand All @@ -130,4 +135,5 @@ export type ISearchOptionsSerializable = Pick<
| 'isRestore'
| 'retrieveResults'
| 'executionContext'
| 'stream'
>;
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@
*/

export * from './types';
export * from './response_utils';
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
* License v3.0 only", or the "Server Side Public License, v 1".
*/

import { getTotalLoaded, toKibanaSearchResponse, shimHitsTotal } from './response_utils';
import { getTotalLoaded, shimHitsTotal } from './response_utils';
import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';

describe('response utils', () => {
Expand All @@ -29,34 +29,6 @@ describe('response utils', () => {
});
});

describe('toKibanaSearchResponse', () => {
it('returns rawResponse, isPartial, isRunning, total, and loaded', () => {
const result = toKibanaSearchResponse({
_shards: {
successful: 10,
failed: 5,
skipped: 5,
total: 100,
},
} as unknown as estypes.SearchResponse<unknown>);

expect(result).toEqual({
rawResponse: {
_shards: {
successful: 10,
failed: 5,
skipped: 5,
total: 100,
},
},
isRunning: false,
isPartial: false,
total: 100,
loaded: 15,
});
});
});

describe('shimHitsTotal', () => {
test('returns the total if it is already numeric', () => {
const result = shimHitsTotal({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,8 @@
* License v3.0 only", or the "Server Side Public License, v 1".
*/

import type { ConnectionRequestParams } from '@elastic/transport';
import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import { ISearchOptions } from '@kbn/search-types';
import { sanitizeRequestParams } from '../../sanitize_request_params';

/**
* Get the `total`/`loaded` for this response (see `IKibanaSearchResponse`). Note that `skipped` is
Expand All @@ -23,23 +21,6 @@ export function getTotalLoaded(response: estypes.SearchResponse<unknown>) {
return { total, loaded };
}

/**
* Get the Kibana representation of this response (see `IKibanaSearchResponse`).
* @internal
*/
export function toKibanaSearchResponse(
rawResponse: estypes.SearchResponse<unknown>,
requestParams?: ConnectionRequestParams
) {
return {
rawResponse,
isPartial: false,
isRunning: false,
...(requestParams ? { requestParams: sanitizeRequestParams(requestParams) } : {}),
...getTotalLoaded(rawResponse),
};
}

/**
* Temporary workaround until https://github.com/elastic/kibana/issues/26356 is addressed.
* Since we are setting `track_total_hits` in the request, `hits.total` will be an object
Expand Down
6 changes: 4 additions & 2 deletions src/plugins/data/common/search/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ import { AggTypesDependencies } from '..';
/**
* @returns true if response is abort
*/
export const isAbortResponse = (response?: IKibanaSearchResponse) => {
return !response || !response.rawResponse;
export const isAbortResponse = (
response?: IKibanaSearchResponse | { response: IKibanaSearchResponse }
) => {
return !response || !('rawResponse' in response || 'response' in response);
};

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,19 @@ import type {
} from '@kbn/search-types';
import { createEsError, isEsError, renderSearchError } from '@kbn/search-errors';
import type { IKibanaSearchResponse, ISearchOptions } from '@kbn/search-types';
import {
AsyncSearchGetResponse,
ErrorResponseBase,
SqlGetAsyncResponse,
} from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import {
ENHANCED_ES_SEARCH_STRATEGY,
ESQL_ASYNC_SEARCH_STRATEGY,
getTotalLoaded,
IAsyncSearchOptions,
isRunningResponse,
pollSearch,
shimHitsTotal,
UI_SETTINGS,
} from '../../../common';
import { SearchUsageCollector } from '../collectors';
Expand Down Expand Up @@ -445,14 +453,75 @@ export class SearchInterceptor {
if (this.bFetchDisabled) {
const { executionContext, strategy, ...searchOptions } = this.getSerializableOptions(options);
return this.deps.http
.post(`/internal/search/${strategy}${request.id ? `/${request.id}` : ''}`, {
version: '1',
signal: abortSignal,
context: executionContext,
body: JSON.stringify({
...request,
...searchOptions,
}),
.post<IKibanaSearchResponse | ErrorResponseBase>(
`/internal/search/${strategy}${request.id ? `/${request.id}` : ''}`,
{
version: '1',
signal: abortSignal,
context: executionContext,
body: JSON.stringify({
...request,
...searchOptions,
stream:
strategy === ESQL_ASYNC_SEARCH_STRATEGY ||
strategy === ENHANCED_ES_SEARCH_STRATEGY ||
strategy === undefined, // undefined strategy is treated as enhanced ES
}),
asResponse: true,
}
)
.then((rawResponse) => {
const warning = rawResponse.response?.headers.get('warning');
const requestParams =
rawResponse.body && 'requestParams' in rawResponse.body
? rawResponse.body.requestParams
: JSON.parse(rawResponse.response?.headers.get('kbn-search-request-params') || '{}');
const isRestored =
rawResponse.body && 'isRestored' in rawResponse.body
? rawResponse.body.isRestored
: rawResponse.response?.headers.get('kbn-search-is-restored') === '?1';

if (rawResponse.body && 'error' in rawResponse.body) {
// eslint-disable-next-line no-throw-literal
throw {
attributes: {
error: rawResponse.body.error,
rawResponse: rawResponse.body,
requestParams,
isRestored,
},
};
}

switch (strategy) {
case ENHANCED_ES_SEARCH_STRATEGY:
if (rawResponse.body?.rawResponse) return rawResponse.body;
const typedResponse = rawResponse.body as unknown as AsyncSearchGetResponse;
const shimmedResponse = shimHitsTotal(typedResponse.response, {
legacyHitsTotal: searchOptions.legacyHitsTotal,
});
return {
id: typedResponse.id,
isPartial: typedResponse.is_partial,
isRunning: typedResponse.is_running,
rawResponse: shimmedResponse,
warning,
requestParams,
isRestored,
...getTotalLoaded(shimmedResponse),
};
case ESQL_ASYNC_SEARCH_STRATEGY:
const esqlResponse = rawResponse.body as unknown as SqlGetAsyncResponse;
return {
id: esqlResponse.id,
rawResponse: esqlResponse,
isPartial: esqlResponse.is_partial,
isRunning: esqlResponse.is_running,
warning,
};
default:
return rawResponse.body;
}
})
.catch((e: IHttpFetchError<KibanaServerError>) => {
if (e?.body) {
Expand Down
3 changes: 2 additions & 1 deletion src/plugins/data/server/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,13 @@ export type {
AsyncSearchStatusResponse,
} from './search';
export {
shimHitsTotal,
SearchSessionService,
NoSearchIdInSessionError,
INITIAL_SEARCH_SESSION_REST_VERSION,
} from './search';

export { shimHitsTotal } from '../common/search';

// Search namespace
export const search = {
aggs: {
Expand Down
28 changes: 27 additions & 1 deletion src/plugins/data/server/search/routes/search.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import { first } from 'rxjs';
import { schema } from '@kbn/config-schema';
import { reportServerError } from '@kbn/kibana-utils-plugin/server';
import { IncomingMessage } from 'http';
import { reportSearchError } from '../report_search_error';
import { getRequestAbortedSignal } from '../../lib';
import type { DataPluginRouter } from '../types';
Expand All @@ -25,6 +26,12 @@ export function registerSearchRoute(router: DataPluginRouter): void {
.addVersion(
{
version: '1',
security: {
authz: {
enabled: false,
reason: 'This route is opted out from authorization',
},
},
validate: {
request: {
params: schema.object({
Expand All @@ -38,6 +45,7 @@ export function registerSearchRoute(router: DataPluginRouter): void {
isStored: schema.maybe(schema.boolean()),
isRestore: schema.maybe(schema.boolean()),
retrieveResults: schema.maybe(schema.boolean()),
stream: schema.maybe(schema.boolean()),
},
{ unknowns: 'allow' }
),
Expand All @@ -51,6 +59,7 @@ export function registerSearchRoute(router: DataPluginRouter): void {
isStored,
isRestore,
retrieveResults,
stream,
...searchRequest
} = request.body;
const { strategy, id } = request.params;
Expand All @@ -69,12 +78,23 @@ export function registerSearchRoute(router: DataPluginRouter): void {
isStored,
isRestore,
retrieveResults,
stream,
}
)
.pipe(first())
.toPromise();

return res.ok({ body: response });
if (response && (response.rawResponse as unknown as IncomingMessage).pipe) {
return res.ok({
body: response.rawResponse,
headers: {
'kbn-search-is-restored': response.isRestored ? '?1' : '?0',
'kbn-search-request-params': JSON.stringify(response.requestParams),
},
});
} else {
return res.ok({ body: response });
}
} catch (err) {
return reportSearchError(res, err);
}
Expand All @@ -89,6 +109,12 @@ export function registerSearchRoute(router: DataPluginRouter): void {
.addVersion(
{
version: '1',
security: {
authz: {
enabled: false,
reason: 'This route is opted out from authorization',
},
},
validate: {
request: {
params: schema.object({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import { elasticsearchServiceMock } from '@kbn/core/server/mocks';
import { pluginInitializerContextConfigMock } from '@kbn/core/server/mocks';
import { esSearchStrategyProvider } from './es_search_strategy';
import { esSearchStrategyProvider, toKibanaSearchResponse } from './es_search_strategy';
import { SearchStrategyDependencies } from '../../types';

import indexNotFoundException from '../../../../common/search/test_data/index_not_found_exception.json';
Expand Down Expand Up @@ -211,3 +211,31 @@ describe('ES search strategy', () => {
}
});
});

describe('toKibanaSearchResponse', () => {
it('returns rawResponse, isPartial, isRunning, total, and loaded', () => {
const result = toKibanaSearchResponse({
_shards: {
successful: 10,
failed: 5,
skipped: 5,
total: 100,
},
} as unknown as estypes.SearchResponse<unknown>);

expect(result).toEqual({
rawResponse: {
_shards: {
successful: 10,
failed: 5,
skipped: 5,
total: 100,
},
},
isRunning: false,
isPartial: false,
total: 100,
loaded: 15,
});
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,35 @@
*/

import { firstValueFrom, from, Observable } from 'rxjs';
import type { ConnectionRequestParams } from '@elastic/transport';
import { tap } from 'rxjs';
import type { Logger, SharedGlobalConfig } from '@kbn/core/server';
import { estypes } from '@elastic/elasticsearch';
import { shimHitsTotal, getTotalLoaded } from '../../../../common';
import { sanitizeRequestParams } from '../../sanitize_request_params';
import { getKbnSearchError, KbnSearchError } from '../../report_search_error';
import type { ISearchStrategy } from '../../types';
import type { SearchUsage } from '../../collectors/search';
import { getDefaultSearchParams, getShardTimeout } from './request_utils';
import { shimHitsTotal, toKibanaSearchResponse } from './response_utils';
import { searchUsageObserver } from '../../collectors/search/usage';

/**
* Get the Kibana representation of this response (see `IKibanaSearchResponse`).
* @internal
*/
export function toKibanaSearchResponse(
rawResponse: estypes.SearchResponse<unknown>,
requestParams?: ConnectionRequestParams
) {
return {
rawResponse,
isPartial: false,
isRunning: false,
...(requestParams ? { requestParams: sanitizeRequestParams(requestParams) } : {}),
...getTotalLoaded(rawResponse),
};
}

export const esSearchStrategyProvider = (
config$: Observable<SharedGlobalConfig>,
logger: Logger,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,4 @@

export { esSearchStrategyProvider } from './es_search_strategy';
export * from './request_utils';
export * from './response_utils';
export { ES_SEARCH_STRATEGY } from '../../../../common';
Loading

0 comments on commit 743cbc1

Please sign in to comment.