Skip to content

Commit

Permalink
[search source] remove is_partial check (#164506)
Browse files Browse the repository at this point in the history
Closes #164893

### Background

"is partial" has 2 meanings
1) Results are incomplete because search is still running
2) Search is finished. Results are incomplete because there are shard
failures (either in local or remote clusters)

[async
search](https://www.elastic.co/guide/en/elasticsearch/reference/current/async-search.html)
defines 2 flags.
1) `is_running`: Whether the search is still being executed or it has
completed
2) `is_partial`: When the query is no longer running, indicates whether
the search failed or was successfully completed on all shards. While the
query is being executed, is_partial is always set to true
**note**: there is a bug in async search where requests to only local
clusters return `is_partial:false` when there are shard errors on the
local cluster. See
elastic/elasticsearch#98725. This should be
resolved in 8.11

Kibana's existing search implementation does not align with
Elasticsearch's `is_running` and `is_partial` flags. Kibana defines "is
partial" as definition "1)". Elasticsearch async search defines "is
partial" as definition "2)".

This PR aligns Kibana's usage of "is partial" with Elasticsearch's
definition. This required the following changes
1) `isErrorResponse` renamed to `isAbortedResponse`. Method no longer
returns true when `!response.isRunning && !!response.isPartial`. Kibana
handles results with incomplete data. **Note** removed export of
`isErrorResponse` from data plugin since its use outside of data plugin
does not make sense.
2) Replace `isPartialResponse` with `isRunningResponse`. This aligns
Kibana's definition with Elasticsearch async search flags.
3) Remove `isCompleteResponse`. The word "complete" is ambiguous. Does
it mean the search is finished (no longer running)? Or does it mean the
search has all results and there are no shard failures?

---------

Co-authored-by: kibanamachine <[email protected]>
Co-authored-by: Jatin Kathuria <[email protected]>
Co-authored-by: Patryk Kopyciński <[email protected]>
  • Loading branch information
4 people authored Oct 2, 2023
1 parent 236eff4 commit b5bcf69
Show file tree
Hide file tree
Showing 47 changed files with 192 additions and 441 deletions.
6 changes: 3 additions & 3 deletions examples/search_examples/public/search/app.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import { IInspectorInfo } from '@kbn/data-plugin/common';
import {
DataPublicPluginStart,
IKibanaSearchResponse,
isCompleteResponse,
isRunningResponse,
} from '@kbn/data-plugin/public';
import { SearchResponseWarning } from '@kbn/data-plugin/public/search/types';
import type { DataView, DataViewField } from '@kbn/data-views-plugin/public';
Expand Down Expand Up @@ -209,7 +209,7 @@ export const SearchExamplesApp = ({
})
.subscribe({
next: (res) => {
if (isCompleteResponse(res)) {
if (!isRunningResponse(res)) {
setIsLoading(false);
setResponse(res);
const aggResult: number | undefined = res.rawResponse.aggregations
Expand Down Expand Up @@ -389,7 +389,7 @@ export const SearchExamplesApp = ({
.subscribe({
next: (res) => {
setResponse(res);
if (isCompleteResponse(res)) {
if (!isRunningResponse(res)) {
setIsLoading(false);
notifications.toasts.addSuccess({
title: 'Query result',
Expand Down
4 changes: 2 additions & 2 deletions examples/search_examples/public/search_sessions/app.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import {
DataPublicPluginStart,
IEsSearchRequest,
IEsSearchResponse,
isCompleteResponse,
isRunningResponse,
QueryState,
SearchSessionState,
} from '@kbn/data-plugin/public';
Expand Down Expand Up @@ -706,7 +706,7 @@ function doSearch(
return lastValueFrom(
data.search.search(req, { sessionId }).pipe(
tap((res) => {
if (isCompleteResponse(res)) {
if (!isRunningResponse(res)) {
const avgResult: number | undefined = res.rawResponse.aggregations
? // @ts-expect-error @elastic/elasticsearch no way to declare a type for aggregation in the search response
res.rawResponse.aggregations[1]?.value ?? res.rawResponse.aggregations[2]?.value
Expand Down
4 changes: 2 additions & 2 deletions examples/search_examples/public/sql_search/app.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import { CoreStart } from '@kbn/core/public';
import {
DataPublicPluginStart,
IKibanaSearchResponse,
isCompleteResponse,
isRunningResponse,
} from '@kbn/data-plugin/public';
import {
SQL_SEARCH_STRATEGY,
Expand Down Expand Up @@ -66,7 +66,7 @@ export const SqlSearchExampleApp = ({ notifications, data }: SearchExamplesAppDe
})
.subscribe({
next: (res) => {
if (isCompleteResponse(res)) {
if (!isRunningResponse(res)) {
setIsLoading(false);
setResponse(res);
}
Expand Down
4 changes: 2 additions & 2 deletions src/plugins/data/README.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -158,12 +158,12 @@ The `SearchSource` API is a convenient way to construct and run an Elasticsearch
One benefit of using the low-level search API, is partial response support, allowing for a better and more responsive user experience.

```.ts
import { isCompleteResponse } from '../plugins/data/public';
import { isRunningResponse } from '../plugins/data/public';

const search$ = data.search.search(request)
.subscribe({
next: (response) => {
if (isCompleteResponse(response)) {
if (!isRunningResponse(response)) {
// Final result
search$.unsubscribe();
} else {
Expand Down
13 changes: 2 additions & 11 deletions src/plugins/data/common/search/poll_search.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { pollSearch } from './poll_search';
import { AbortError } from '@kbn/kibana-utils-plugin/common';

describe('pollSearch', () => {
function getMockedSearch$(resolveOnI = 1, finishWithError = false) {
function getMockedSearch$(resolveOnI = 1) {
let counter = 0;
return jest.fn().mockImplementation(() => {
counter++;
Expand All @@ -19,7 +19,7 @@ describe('pollSearch', () => {
if (lastCall) {
resolve({
isRunning: false,
isPartial: finishWithError,
isPartial: false,
rawResponse: {},
});
} else {
Expand Down Expand Up @@ -57,15 +57,6 @@ describe('pollSearch', () => {
expect(cancelFn).toBeCalledTimes(0);
});

test('Throws Error on ES error response', async () => {
const searchFn = getMockedSearch$(2, true);
const cancelFn = jest.fn();
const poll = pollSearch(searchFn, cancelFn).toPromise();
await expect(poll).rejects.toThrow(Error);
expect(searchFn).toBeCalledTimes(2);
expect(cancelFn).toBeCalledTimes(0);
});

test('Throws AbortError on empty response', async () => {
const searchFn = jest.fn().mockResolvedValue(undefined);
const cancelFn = jest.fn();
Expand Down
8 changes: 4 additions & 4 deletions src/plugins/data/common/search/poll_search.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { from, Observable, timer, defer, fromEvent, EMPTY } from 'rxjs';
import { expand, map, switchMap, takeUntil, takeWhile, tap } from 'rxjs/operators';
import { AbortError } from '@kbn/kibana-utils-plugin/common';
import type { IAsyncSearchOptions, IKibanaSearchResponse } from '..';
import { isErrorResponse, isPartialResponse } from '..';
import { isAbortResponse, isRunningResponse } from '..';

export const pollSearch = <Response extends IKibanaSearchResponse>(
search: () => Promise<Response>,
Expand Down Expand Up @@ -57,11 +57,11 @@ export const pollSearch = <Response extends IKibanaSearchResponse>(
return timer(getPollInterval(elapsedTime)).pipe(switchMap(search));
}),
tap((response) => {
if (isErrorResponse(response)) {
throw response ? new Error('Received partial response') : new AbortError();
if (isAbortResponse(response)) {
throw new AbortError();
}
}),
takeWhile<Response>(isPartialResponse, true),
takeWhile<Response>(isRunningResponse, true),
takeUntil<Response>(aborted$)
);
});
Expand Down
12 changes: 3 additions & 9 deletions src/plugins/data/common/search/search_source/search_source.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,7 @@ import { getSearchParamsFromRequest, RequestFailure } from './fetch';
import type { FetchHandlers, SearchRequest } from './fetch';
import { getRequestInspectorStats, getResponseInspectorStats } from './inspect';

import {
getEsQueryConfig,
IKibanaSearchResponse,
isPartialResponse,
isCompleteResponse,
UI_SETTINGS,
} from '../..';
import { getEsQueryConfig, IKibanaSearchResponse, isRunningResponse, UI_SETTINGS } from '../..';
import { AggsStart } from '../aggs';
import { extractReferences } from './extract_references';
import {
Expand Down Expand Up @@ -546,7 +540,7 @@ export class SearchSource {
// For testing timeout messages in UI, uncomment the next line
// response.rawResponse.timed_out = true;
return new Observable<IKibanaSearchResponse<unknown>>((obs) => {
if (isPartialResponse(response)) {
if (isRunningResponse(response)) {
obs.next(this.postFlightTransform(response));
} else {
if (!this.hasPostFlightRequests()) {
Expand Down Expand Up @@ -582,7 +576,7 @@ export class SearchSource {
});
}),
map((response) => {
if (!isCompleteResponse(response)) {
if (isRunningResponse(response)) {
return response;
}
return onResponse(searchRequest, response, options);
Expand Down
198 changes: 15 additions & 183 deletions src/plugins/data/common/search/utils.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,210 +6,42 @@
* Side Public License, v 1.
*/

import { isErrorResponse, isCompleteResponse, isPartialResponse } from './utils';
import type { IKibanaSearchResponse } from './types';
import { isAbortResponse, isRunningResponse } from './utils';

describe('utils', () => {
describe('isErrorResponse', () => {
describe('isAbortResponse', () => {
it('returns `true` if the response is undefined', () => {
const isError = isErrorResponse();
const isError = isAbortResponse();
expect(isError).toBe(true);
});

it('returns `true` if the response is not running and partial', () => {
const isError = isErrorResponse({
isPartial: true,
isRunning: false,
rawResponse: {},
});
expect(isError).toBe(true);
});

it('returns `false` if the response is not running and partial and contains failure details', () => {
const isError = isErrorResponse({
isPartial: true,
isRunning: false,
rawResponse: {
took: 7,
timed_out: false,
_shards: {
total: 2,
successful: 1,
skipped: 0,
failed: 1,
failures: [
{
shard: 0,
index: 'remote:tmp-00002',
node: '9SNgMgppT2-6UHJNXwio3g',
reason: {
type: 'script_exception',
reason: 'runtime error',
script_stack: [
'[email protected]/org.elasticsearch.search.lookup.LeafDocLookup.getFactoryForDoc(LeafDocLookup.java:148)',
'[email protected]/org.elasticsearch.search.lookup.LeafDocLookup.get(LeafDocLookup.java:191)',
'[email protected]/org.elasticsearch.search.lookup.LeafDocLookup.get(LeafDocLookup.java:32)',
"doc['bar'].value < 10",
' ^---- HERE',
],
script: "doc['bar'].value < 10",
lang: 'painless',
position: {
offset: 4,
start: 0,
end: 21,
},
caused_by: {
type: 'illegal_argument_exception',
reason: 'No field found for [bar] in mapping',
},
},
},
],
},
_clusters: {
total: 1,
successful: 1,
skipped: 0,
details: {
remote: {
status: 'partial',
indices: 'tmp-*',
took: 3,
timed_out: false,
_shards: {
total: 2,
successful: 1,
skipped: 0,
failed: 1,
},
failures: [
{
shard: 0,
index: 'remote:tmp-00002',
node: '9SNgMgppT2-6UHJNXwio3g',
reason: {
type: 'script_exception',
reason: 'runtime error',
script_stack: [
'[email protected]/org.elasticsearch.search.lookup.LeafDocLookup.getFactoryForDoc(LeafDocLookup.java:148)',
'[email protected]/org.elasticsearch.search.lookup.LeafDocLookup.get(LeafDocLookup.java:191)',
'[email protected]/org.elasticsearch.search.lookup.LeafDocLookup.get(LeafDocLookup.java:32)',
"doc['bar'].value < 10",
' ^---- HERE',
],
script: "doc['bar'].value < 10",
lang: 'painless',
position: {
offset: 4,
start: 0,
end: 21,
},
caused_by: {
type: 'illegal_argument_exception',
reason: 'No field found for [bar] in mapping',
},
},
},
],
},
},
},
hits: {
total: {
value: 1,
relation: 'eq',
},
max_score: 0,
hits: [
{
_index: 'remote:tmp-00001',
_id: 'd8JNlYoBFqAcOBVnvdqx',
_score: 0,
_source: {
foo: 'bar',
bar: 1,
},
},
],
},
},
});
expect(isError).toBe(false);
});

it('returns `false` if the response is running and partial', () => {
const isError = isErrorResponse({
isPartial: true,
isRunning: true,
rawResponse: {},
});
expect(isError).toBe(false);
});

it('returns `false` if the response is complete', () => {
const isError = isErrorResponse({
isPartial: false,
isRunning: false,
rawResponse: {},
});
expect(isError).toBe(false);
});
});

describe('isCompleteResponse', () => {
it('returns `false` if the response is undefined', () => {
const isError = isCompleteResponse();
expect(isError).toBe(false);
});

it('returns `false` if the response is running and partial', () => {
const isError = isCompleteResponse({
isPartial: true,
isRunning: true,
rawResponse: {},
});
expect(isError).toBe(false);
});

it('returns `true` if the response is complete', () => {
const isError = isCompleteResponse({
isPartial: false,
isRunning: false,
rawResponse: {},
});
expect(isError).toBe(true);
});

it('returns `true` if the response does not indicate isRunning', () => {
const isError = isCompleteResponse({
rawResponse: {},
});
it('returns `true` if rawResponse is undefined', () => {
const isError = isAbortResponse({} as unknown as IKibanaSearchResponse);
expect(isError).toBe(true);
});
});

describe('isPartialResponse', () => {
describe('isRunningResponse', () => {
it('returns `false` if the response is undefined', () => {
const isError = isPartialResponse();
expect(isError).toBe(false);
const isRunning = isRunningResponse();
expect(isRunning).toBe(false);
});

it('returns `true` if the response is running and partial', () => {
const isError = isPartialResponse({
isPartial: true,
it('returns `true` if the response is running', () => {
const isRunning = isRunningResponse({
isRunning: true,
rawResponse: {},
});
expect(isError).toBe(true);
expect(isRunning).toBe(true);
});

it('returns `false` if the response is complete', () => {
const isError = isPartialResponse({
isPartial: false,
it('returns `false` if the response is finished running', () => {
const isRunning = isRunningResponse({
isRunning: false,
rawResponse: {},
});
expect(isError).toBe(false);
expect(isRunning).toBe(false);
});
});
});
Loading

0 comments on commit b5bcf69

Please sign in to comment.