diff --git a/src/plugins/data/common/search/search_source/search_source.test.ts b/src/plugins/data/common/search/search_source/search_source.test.ts index b5cabc654a3f7..4e6192d24e8eb 100644 --- a/src/plugins/data/common/search/search_source/search_source.test.ts +++ b/src/plugins/data/common/search/search_source/search_source.test.ts @@ -903,6 +903,13 @@ describe('SearchSource', () => { expect(Object.keys(JSON.parse(searchSourceJSON))).toEqual(['highlightAll', 'from', 'sort']); }); + test('should add pit', () => { + const pit = { id: 'flimflam', keep_alive: '1m' }; + searchSource.setField('pit', pit); + const { searchSourceJSON } = searchSource.serialize(); + expect(searchSourceJSON).toBe(JSON.stringify({ pit })); + }); + test('should serialize filters', () => { const filter = [ { diff --git a/src/plugins/data/common/search/search_source/search_source.ts b/src/plugins/data/common/search/search_source/search_source.ts index 497a247668694..fad799c7915b1 100644 --- a/src/plugins/data/common/search/search_source/search_source.ts +++ b/src/plugins/data/common/search/search_source/search_source.ts @@ -667,6 +667,8 @@ export class SearchSource { getConfig(UI_SETTINGS.SORT_OPTIONS) ); return addToBody(key, sort); + case 'pit': + return addToRoot(key, val); case 'aggs': if ((val as unknown) instanceof AggConfigs) { return addToBody('aggs', val.toDsl()); @@ -768,7 +770,7 @@ export class SearchSource { const { getConfig } = this.dependencies; const searchRequest = this.mergeProps(); searchRequest.body = searchRequest.body || {}; - const { body, index, query, filters, highlightAll } = searchRequest; + const { body, index, query, filters, highlightAll, pit } = searchRequest; searchRequest.indexType = this.getIndexType(index); const metaFields = getConfig(UI_SETTINGS.META_FIELDS) ?? []; @@ -911,6 +913,10 @@ export class SearchSource { delete searchRequest.highlightAll; } + if (pit) { + body.pit = pit; + } + return searchRequest; } diff --git a/src/plugins/data/common/search/search_source/types.ts b/src/plugins/data/common/search/search_source/types.ts index a583a1d1112cc..140c2dd59a59d 100644 --- a/src/plugins/data/common/search/search_source/types.ts +++ b/src/plugins/data/common/search/search_source/types.ts @@ -39,6 +39,9 @@ export interface ISearchStartSearchSource createEmpty: () => ISearchSource; } +/** + * @deprecated use {@link estypes.SortResults} instead. + */ export type EsQuerySearchAfter = [string | number, string | number]; export enum SortDirection { @@ -112,9 +115,13 @@ export interface SearchSourceFields { * {@link IndexPatternService} */ index?: DataView; - searchAfter?: EsQuerySearchAfter; timeout?: string; terminate_after?: number; + searchAfter?: estypes.SortResults; + /** + * Allow querying to use a point-in-time ID for paging results + */ + pit?: estypes.SearchPointInTimeReference; parent?: SearchSourceFields; } @@ -160,7 +167,7 @@ export type SerializedSearchSourceFields = { * {@link IndexPatternService} */ index?: string | DataViewSpec; - searchAfter?: EsQuerySearchAfter; + searchAfter?: estypes.SortResults; timeout?: string; terminate_after?: number; diff --git a/src/plugins/data/server/search/strategies/es_search/es_search_strategy.ts b/src/plugins/data/server/search/strategies/es_search/es_search_strategy.ts index 73a3b58704877..b2aed5804f248 100644 --- a/src/plugins/data/server/search/strategies/es_search/es_search_strategy.ts +++ b/src/plugins/data/server/search/strategies/es_search/es_search_strategy.ts @@ -35,13 +35,17 @@ export const esSearchStrategyProvider = ( throw new KbnServerError(`Unsupported index pattern type ${request.indexType}`, 400); } + const isPit = request.params?.body?.pit != null; + const search = async () => { try { const config = await firstValueFrom(config$); // @ts-expect-error params fall back to any, but should be valid SearchRequest params const { terminateAfter, ...requestParams } = request.params ?? {}; + const defaults = await getDefaultSearchParams(uiSettingsClient, { isPit }); + const params = { - ...(await getDefaultSearchParams(uiSettingsClient)), + ...defaults, ...getShardTimeout(config), ...(terminateAfter ? { terminate_after: terminateAfter } : {}), ...requestParams, diff --git a/src/plugins/data/server/search/strategies/es_search/request_utils.ts b/src/plugins/data/server/search/strategies/es_search/request_utils.ts index 2418ccfb49a0c..11fd271902e1f 100644 --- a/src/plugins/data/server/search/strategies/es_search/request_utils.ts +++ b/src/plugins/data/server/search/strategies/es_search/request_utils.ts @@ -18,19 +18,29 @@ export function getShardTimeout( } export async function getDefaultSearchParams( - uiSettingsClient: Pick + uiSettingsClient: Pick, + options = { isPit: false } ): Promise<{ max_concurrent_shard_requests?: number; - ignore_unavailable: boolean; + ignore_unavailable?: boolean; track_total_hits: boolean; }> { const maxConcurrentShardRequests = await uiSettingsClient.get( UI_SETTINGS.COURIER_MAX_CONCURRENT_SHARD_REQUESTS ); - return { + + const defaults: Awaited> = { max_concurrent_shard_requests: maxConcurrentShardRequests > 0 ? maxConcurrentShardRequests : undefined, - ignore_unavailable: true, // Don't fail if the index/indices don't exist track_total_hits: true, }; + + // If the request has a point-in-time ID attached, it can not include ignore_unavailable from {@link estypes.IndicesOptions}. + // ES will reject the request as that option was set when the point-in-time was created. + // Otherwise, this option allows search to not fail when the index/indices don't exist + if (!options.isPit) { + defaults.ignore_unavailable = true; + } + + return defaults; } diff --git a/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/__snapshots__/generate_csv.test.ts.snap b/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/__snapshots__/generate_csv.test.ts.snap index 855b447d85ced..c10911d7687d3 100644 --- a/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/__snapshots__/generate_csv.test.ts.snap +++ b/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/__snapshots__/generate_csv.test.ts.snap @@ -73,7 +73,7 @@ exports[`keeps order of the columns during the scroll 1`] = ` " `; -exports[`uses the scrollId to page all the data 1`] = ` +exports[`uses the pit ID to page all the data 1`] = ` "date,ip,message \\"2020-12-31T00:14:28.000Z\\",\\"110.135.176.89\\",\\"hit from the initial search\\" \\"2020-12-31T00:14:28.000Z\\",\\"110.135.176.89\\",\\"hit from the initial search\\" diff --git a/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.test.ts b/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.test.ts index ee00ea28cc05e..804fa4bcdd4a6 100644 --- a/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.test.ts +++ b/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.test.ts @@ -5,7 +5,7 @@ * 2.0. */ -import { errors as esErrors } from '@elastic/elasticsearch'; +import { errors as esErrors, estypes } from '@elastic/elasticsearch'; import type { SearchResponse } from '@elastic/elasticsearch/lib/api/types'; import type { IScopedClusterClient, IUiSettingsClient, Logger } from '@kbn/core/server'; import { @@ -50,6 +50,7 @@ const searchSourceMock = { ...searchSourceInstanceMock, getSearchRequestBody: jest.fn(() => ({})), }; + const mockSearchSourceService: jest.Mocked = { create: jest.fn().mockReturnValue(searchSourceMock), createEmpty: jest.fn().mockReturnValue(searchSourceMock), @@ -58,19 +59,21 @@ const mockSearchSourceService: jest.Mocked = { extract: jest.fn(), getAllMigrations: jest.fn(), }; + +const mockPitId = 'oju9fs3698s3902f02-8qg3-u9w36oiewiuyew6'; + +const getMockRawResponse = (hits: Array> = [], total = hits.length) => ({ + took: 1, + timed_out: false, + pit_id: mockPitId, + _shards: { total: 1, successful: 1, failed: 0, skipped: 0 }, + hits: { hits, total, max_score: 0 }, +}); + const mockDataClientSearchDefault = jest.fn().mockImplementation( (): Rx.Observable<{ rawResponse: SearchResponse }> => Rx.of({ - rawResponse: { - took: 1, - timed_out: false, - _shards: { total: 1, successful: 1, failed: 0, skipped: 0 }, - hits: { - hits: [], - total: 0, - max_score: 0, - }, - }, + rawResponse: getMockRawResponse(), }) ); @@ -92,6 +95,8 @@ beforeEach(async () => { mockDataClient = dataPluginMock.createStartContract().search.asScoped({} as any); mockDataClient.search = mockDataClientSearchDefault; + mockEsClient.asCurrentUser.openPointInTime = jest.fn().mockResolvedValueOnce({ id: mockPitId }); + uiSettingsClient = uiSettingsServiceMock .createStartContract() .asScopedToClient(savedObjectsClientMock.create()); @@ -117,6 +122,8 @@ beforeEach(async () => { searchSourceMock.getField = jest.fn((key: string) => { switch (key) { + case 'pit': + return { id: mockPitId }; case 'index': return { fields: { @@ -125,6 +132,7 @@ beforeEach(async () => { }, metaFields: ['_id', '_index', '_type', '_score'], getFormatterForField: jest.fn(), + getIndexPattern: () => 'logstash-*', }; } }); @@ -157,20 +165,15 @@ it('formats an empty search result to CSV content', async () => { it('formats a search result to CSV content', async () => { mockDataClient.search = jest.fn().mockImplementation(() => Rx.of({ - rawResponse: { - hits: { - hits: [ - { - fields: { - date: `["2020-12-31T00:14:28.000Z"]`, - ip: `["110.135.176.89"]`, - message: `["This is a great message!"]`, - }, - }, - ], - total: 1, - }, - }, + rawResponse: getMockRawResponse([ + { + fields: { + date: `["2020-12-31T00:14:28.000Z"]`, + ip: `["110.135.176.89"]`, + message: `["This is a great message!"]`, + }, + } as unknown as estypes.SearchHit, + ]), }) ); const generateCsv = new CsvGenerator( @@ -199,16 +202,16 @@ const HITS_TOTAL = 100; it('calculates the bytes of the content', async () => { mockDataClient.search = jest.fn().mockImplementation(() => Rx.of({ - rawResponse: { - hits: { - hits: range(0, HITS_TOTAL).map(() => ({ - fields: { - message: ['this is a great message'], - }, - })), - total: HITS_TOTAL, - }, - }, + rawResponse: getMockRawResponse( + range(0, HITS_TOTAL).map( + () => + ({ + fields: { + message: ['this is a great message'], + }, + } as unknown as estypes.SearchHit) + ) + ), }) ); @@ -246,18 +249,18 @@ it('warns if max size was reached', async () => { mockDataClient.search = jest.fn().mockImplementation(() => Rx.of({ - rawResponse: { - hits: { - hits: range(0, HITS_TOTAL).map(() => ({ - fields: { - date: ['2020-12-31T00:14:28.000Z'], - ip: ['110.135.176.89'], - message: ['super cali fragile istic XPLA docious'], - }, - })), - total: HITS_TOTAL, - }, - }, + rawResponse: getMockRawResponse( + range(0, HITS_TOTAL).map( + () => + ({ + fields: { + date: ['2020-12-31T00:14:28.000Z'], + ip: ['110.135.176.89'], + message: ['super cali fragile istic XPLA docious'], + }, + } as unknown as estypes.SearchHit) + ) + ), }) ); @@ -283,36 +286,42 @@ it('warns if max size was reached', async () => { expect(content).toMatchSnapshot(); }); -it('uses the scrollId to page all the data', async () => { - mockDataClient.search = jest.fn().mockImplementation(() => - Rx.of({ - rawResponse: { - _scroll_id: 'awesome-scroll-hero', - hits: { - hits: range(0, HITS_TOTAL / 10).map(() => ({ - fields: { - date: ['2020-12-31T00:14:28.000Z'], - ip: ['110.135.176.89'], - message: ['hit from the initial search'], - }, - })), - total: HITS_TOTAL, - }, - }, - }) - ); - - mockEsClient.asCurrentUser.scroll = jest.fn().mockResolvedValue({ - hits: { - hits: range(0, HITS_TOTAL / 10).map(() => ({ - fields: { - date: ['2020-12-31T00:14:28.000Z'], - ip: ['110.135.176.89'], - message: ['hit from a subsequent scroll'], - }, - })), - }, - }); +it('uses the pit ID to page all the data', async () => { + mockDataClient.search = jest + .fn() + .mockImplementationOnce(() => + Rx.of({ + rawResponse: getMockRawResponse( + range(0, HITS_TOTAL / 10).map( + () => + ({ + fields: { + date: ['2020-12-31T00:14:28.000Z'], + ip: ['110.135.176.89'], + message: ['hit from the initial search'], + }, + } as unknown as estypes.SearchHit) + ), + HITS_TOTAL + ), + }) + ) + .mockImplementation(() => + Rx.of({ + rawResponse: getMockRawResponse( + range(0, HITS_TOTAL / 10).map( + () => + ({ + fields: { + date: ['2020-12-31T00:14:28.000Z'], + ip: ['110.135.176.89'], + message: ['hit from a subsequent scroll'], + }, + } as unknown as estypes.SearchHit) + ) + ), + }) + ); const generateCsv = new CsvGenerator( createMockJob({ columns: ['date', 'ip', 'message'] }), @@ -334,70 +343,55 @@ it('uses the scrollId to page all the data', async () => { expect(csvResult.warnings).toEqual([]); expect(content).toMatchSnapshot(); - expect(mockDataClient.search).toHaveBeenCalledTimes(1); + expect(mockDataClient.search).toHaveBeenCalledTimes(10); expect(mockDataClient.search).toBeCalledWith( - { params: { body: {}, ignore_throttled: undefined, scroll: '30s', size: 500 } }, + { params: { body: {}, ignore_throttled: undefined } }, { strategy: 'es', transport: { maxRetries: 0, requestTimeout: '30s' } } ); - // `scroll` and `clearScroll` must be called with scroll ID in the post body! - expect(mockEsClient.asCurrentUser.scroll).toHaveBeenCalledTimes(9); - expect(mockEsClient.asCurrentUser.scroll).toHaveBeenCalledWith({ - scroll: '30s', - scroll_id: 'awesome-scroll-hero', - }); + expect(mockEsClient.asCurrentUser.openPointInTime).toHaveBeenCalledTimes(1); + expect(mockEsClient.asCurrentUser.openPointInTime).toHaveBeenCalledWith( + { + ignore_unavailable: true, + index: 'logstash-*', + keep_alive: '30s', + }, + { maxRetries: 0, requestTimeout: '30s' } + ); - expect(mockEsClient.asCurrentUser.clearScroll).toHaveBeenCalledTimes(1); - expect(mockEsClient.asCurrentUser.clearScroll).toHaveBeenCalledWith({ - scroll_id: ['awesome-scroll-hero'], + expect(mockEsClient.asCurrentUser.closePointInTime).toHaveBeenCalledTimes(1); + expect(mockEsClient.asCurrentUser.closePointInTime).toHaveBeenCalledWith({ + body: { id: mockPitId }, }); }); it('keeps order of the columns during the scroll', async () => { - mockDataClient.search = jest.fn().mockImplementation(() => - Rx.of({ - rawResponse: { - _scroll_id: 'awesome-scroll-hero', - hits: { - hits: [ - { - fields: { - a: ['a1'], - b: ['b1'], - }, - }, - ], - total: 3, - }, - }, - }) - ); - - mockEsClient.asCurrentUser.scroll = jest + mockDataClient.search = jest .fn() - .mockResolvedValueOnce({ - hits: { - hits: [ - { - fields: { - b: ['b2'], - }, - }, - ], - }, - }) - .mockResolvedValueOnce({ - hits: { - hits: [ - { - fields: { - a: ['a3'], - c: ['c3'], - }, - }, - ], - }, - }); + .mockImplementationOnce(() => + Rx.of({ + rawResponse: getMockRawResponse( + [{ fields: { a: ['a1'], b: ['b1'] } } as unknown as estypes.SearchHit], + 3 + ), + }) + ) + .mockImplementationOnce(() => + Rx.of({ + rawResponse: getMockRawResponse( + [{ fields: { b: ['b2'] } } as unknown as estypes.SearchHit], + 3 + ), + }) + ) + .mockImplementationOnce(() => + Rx.of({ + rawResponse: getMockRawResponse( + [{ fields: { a: ['a3'], c: ['c3'] } } as unknown as estypes.SearchHit], + 3 + ), + }) + ); const generateCsv = new CsvGenerator( createMockJob({ searchSource: {}, columns: [] }), @@ -424,21 +418,16 @@ describe('fields from job.searchSource.getFields() (7.12 generated)', () => { it('cells can be multi-value', async () => { mockDataClient.search = jest.fn().mockImplementation(() => Rx.of({ - rawResponse: { - hits: { - hits: [ - { - _id: 'my-cool-id', - _index: 'my-cool-index', - _version: 4, - fields: { - sku: [`This is a cool SKU.`, `This is also a cool SKU.`], - }, - }, - ], - total: 1, + rawResponse: getMockRawResponse([ + { + _id: 'my-cool-id', + _index: 'my-cool-index', + _version: 4, + fields: { + sku: [`This is a cool SKU.`, `This is also a cool SKU.`], + }, }, - }, + ]), }) ); @@ -466,22 +455,17 @@ describe('fields from job.searchSource.getFields() (7.12 generated)', () => { it('provides top-level underscored fields as columns', async () => { mockDataClient.search = jest.fn().mockImplementation(() => Rx.of({ - rawResponse: { - hits: { - hits: [ - { - _id: 'my-cool-id', - _index: 'my-cool-index', - _version: 4, - fields: { - date: ['2020-12-31T00:14:28.000Z'], - message: [`it's nice to see you`], - }, - }, - ], - total: 1, + rawResponse: getMockRawResponse([ + { + _id: 'my-cool-id', + _index: 'my-cool-index', + _version: 4, + fields: { + date: ['2020-12-31T00:14:28.000Z'], + message: [`it's nice to see you`], + }, }, - }, + ]), }) ); @@ -520,28 +504,23 @@ describe('fields from job.searchSource.getFields() (7.12 generated)', () => { it('sorts the fields when they are to be used as table column names', async () => { mockDataClient.search = jest.fn().mockImplementation(() => Rx.of({ - rawResponse: { - hits: { - hits: [ - { - _id: 'my-cool-id', - _index: 'my-cool-index', - _version: 4, - fields: { - date: ['2020-12-31T00:14:28.000Z'], - message_z: [`test field Z`], - message_y: [`test field Y`], - message_x: [`test field X`], - message_w: [`test field W`], - message_v: [`test field V`], - message_u: [`test field U`], - message_t: [`test field T`], - }, - }, - ], - total: 1, + rawResponse: getMockRawResponse([ + { + _id: 'my-cool-id', + _index: 'my-cool-index', + _version: 4, + fields: { + date: ['2020-12-31T00:14:28.000Z'], + message_z: [`test field Z`], + message_y: [`test field Y`], + message_x: [`test field X`], + message_w: [`test field W`], + message_v: [`test field V`], + message_u: [`test field U`], + message_t: [`test field T`], + }, }, - }, + ]), }) ); @@ -581,22 +560,17 @@ describe('fields from job.columns (7.13+ generated)', () => { it('cells can be multi-value', async () => { mockDataClient.search = jest.fn().mockImplementation(() => Rx.of({ - rawResponse: { - hits: { - hits: [ - { - _id: 'my-cool-id', - _index: 'my-cool-index', - _version: 4, - fields: { - product: 'coconut', - category: [`cool`, `rad`], - }, - }, - ], - total: 1, + rawResponse: getMockRawResponse([ + { + _id: 'my-cool-id', + _index: 'my-cool-index', + _version: 4, + fields: { + product: 'coconut', + category: [`cool`, `rad`], + }, }, - }, + ]), }) ); @@ -624,22 +598,17 @@ describe('fields from job.columns (7.13+ generated)', () => { it('columns can be top-level fields such as _id and _index', async () => { mockDataClient.search = jest.fn().mockImplementation(() => Rx.of({ - rawResponse: { - hits: { - hits: [ - { - _id: 'my-cool-id', - _index: 'my-cool-index', - _version: 4, - fields: { - product: 'coconut', - category: [`cool`, `rad`], - }, - }, - ], - total: 1, + rawResponse: getMockRawResponse([ + { + _id: 'my-cool-id', + _index: 'my-cool-index', + _version: 4, + fields: { + product: 'coconut', + category: [`cool`, `rad`], + }, }, - }, + ]), }) ); @@ -667,22 +636,17 @@ describe('fields from job.columns (7.13+ generated)', () => { it('default column names come from tabify', async () => { mockDataClient.search = jest.fn().mockImplementation(() => Rx.of({ - rawResponse: { - hits: { - hits: [ - { - _id: 'my-cool-id', - _index: 'my-cool-index', - _version: 4, - fields: { - product: 'coconut', - category: [`cool`, `rad`], - }, - }, - ], - total: 1, + rawResponse: getMockRawResponse([ + { + _id: 'my-cool-id', + _index: 'my-cool-index', + _version: 4, + fields: { + product: 'coconut', + category: [`cool`, `rad`], + }, }, - }, + ]), }) ); @@ -714,20 +678,15 @@ describe('formulas', () => { it(`escapes formula values in a cell, doesn't warn the csv contains formulas`, async () => { mockDataClient.search = jest.fn().mockImplementation(() => Rx.of({ - rawResponse: { - hits: { - hits: [ - { - fields: { - date: ['2020-12-31T00:14:28.000Z'], - ip: ['110.135.176.89'], - message: [TEST_FORMULA], - }, - }, - ], - total: 1, - }, - }, + rawResponse: getMockRawResponse([ + { + fields: { + date: ['2020-12-31T00:14:28.000Z'], + ip: ['110.135.176.89'], + message: [TEST_FORMULA], + }, + } as unknown as estypes.SearchHit, + ]), }) ); @@ -757,20 +716,15 @@ describe('formulas', () => { it(`escapes formula values in a header, doesn't warn the csv contains formulas`, async () => { mockDataClient.search = jest.fn().mockImplementation(() => Rx.of({ - rawResponse: { - hits: { - hits: [ - { - fields: { - date: ['2020-12-31T00:14:28.000Z'], - ip: ['110.135.176.89'], - [TEST_FORMULA]: 'This is great data', - }, - }, - ], - total: 1, - }, - }, + rawResponse: getMockRawResponse([ + { + fields: { + date: ['2020-12-31T00:14:28.000Z'], + ip: ['110.135.176.89'], + [TEST_FORMULA]: 'This is great data', + }, + } as unknown as estypes.SearchHit, + ]), }) ); @@ -808,20 +762,15 @@ describe('formulas', () => { }); mockDataClient.search = jest.fn().mockImplementation(() => Rx.of({ - rawResponse: { - hits: { - hits: [ - { - fields: { - date: ['2020-12-31T00:14:28.000Z'], - ip: ['110.135.176.89'], - message: [TEST_FORMULA], - }, - }, - ], - total: 1, - }, - }, + rawResponse: getMockRawResponse([ + { + fields: { + date: ['2020-12-31T00:14:28.000Z'], + ip: ['110.135.176.89'], + message: [TEST_FORMULA], + }, + } as unknown as estypes.SearchHit, + ]), }) ); @@ -875,8 +824,6 @@ it('can override ignoring frozen indices', async () => { params: { body: {}, ignore_throttled: false, - scroll: '30s', - size: 500, }, }, { strategy: 'es', transport: { maxRetries: 0, requestTimeout: '30s' } } @@ -928,7 +875,7 @@ it('will return partial data if the scroll or search fails', async () => { expect(mockLogger.error.mock.calls).toMatchInlineSnapshot(` Array [ Array [ - "CSV export scan error: ResponseError: my error", + "CSV export search error: ResponseError: my error", ], Array [ [ResponseError: my error], @@ -978,27 +925,27 @@ it('handles unknown errors', async () => { describe('error codes', () => { it('returns the expected error code when authentication expires', async () => { - mockDataClient.search = jest.fn().mockImplementation(() => - Rx.of({ - rawResponse: { - _scroll_id: 'test', - hits: { - hits: range(0, 5).map(() => ({ + mockDataClient.search = jest + .fn() + .mockImplementationOnce(() => + Rx.of({ + rawResponse: getMockRawResponse( + range(0, 5).map(() => ({ + _index: 'lasdf', + _id: 'lasdf123', fields: { date: ['2020-12-31T00:14:28.000Z'], ip: ['110.135.176.89'], message: ['super cali fragile istic XPLA docious'], }, })), - total: 10, - }, - }, - }) - ); - - mockEsClient.asCurrentUser.scroll = jest.fn().mockImplementation(() => { - throw new esErrors.ResponseError({ statusCode: 403, meta: {} as any, warnings: [] }); - }); + 10 + ), + }) + ) + .mockImplementationOnce(() => { + throw new esErrors.ResponseError({ statusCode: 403, meta: {} as any, warnings: [] }); + }); const generateCsv = new CsvGenerator( createMockJob({ columns: ['date', 'ip', 'message'] }), @@ -1029,7 +976,7 @@ describe('error codes', () => { expect(mockLogger.error.mock.calls).toMatchInlineSnapshot(` Array [ Array [ - "CSV export scroll error: ResponseError: Response Error", + "CSV export search error: ResponseError: Response Error", ], Array [ [ResponseError: Response Error], diff --git a/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.ts b/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.ts index d287ec58530b9..f527956d5c7fa 100644 --- a/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.ts +++ b/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.ts @@ -5,15 +5,9 @@ * 2.0. */ -import { errors as esErrors } from '@elastic/elasticsearch'; -import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey'; +import { errors as esErrors, estypes } from '@elastic/elasticsearch'; import type { IScopedClusterClient, IUiSettingsClient, Logger } from '@kbn/core/server'; -import type { - DataView, - ISearchSource, - ISearchStartSearchSource, - SearchRequest, -} from '@kbn/data-plugin/common'; +import type { ISearchSource, ISearchStartSearchSource } from '@kbn/data-plugin/common'; import { cellHasFormulas, ES_SEARCH_STRATEGY, tabifyDocs } from '@kbn/data-plugin/common'; import type { IScopedSearchClient } from '@kbn/data-plugin/server'; import type { Datatable } from '@kbn/expressions-plugin/server'; @@ -61,21 +55,63 @@ export class CsvGenerator { private stream: Writable ) {} - private async scan(index: DataView, searchSource: ISearchSource, settings: CsvExportSettings) { + private async openPointInTime(indexPatternTitle: string, settings: CsvExportSettings) { + const { duration } = settings.scroll; + let pitId: string | undefined; + this.logger.debug(`Requesting point-in-time for: [${indexPatternTitle}]...`); + try { + // NOTE: if ES is overloaded, this request could time out + const response = await this.clients.es.asCurrentUser.openPointInTime( + { + index: indexPatternTitle, + keep_alive: duration, + ignore_unavailable: true, + }, + { + requestTimeout: duration, + maxRetries: 0, + } + ); + pitId = response.id; + } catch (err) { + this.logger.error(err); + } + + if (!pitId) { + throw new Error(`Could not receive a point-in-time ID!`); + } + + this.logger.debug(`Opened PIT ID: ${this.truncatePitId(pitId)}`); + + return pitId; + } + + private async doSearch( + searchSource: ISearchSource, + settings: CsvExportSettings, + searchAfter?: estypes.SortResults + ) { const { scroll: scrollSettings, includeFrozen } = settings; - const searchBody: SearchRequest | undefined = searchSource.getSearchRequestBody(); + searchSource.setField('size', scrollSettings.size); + + if (searchAfter) { + searchSource.setField('searchAfter', searchAfter); + } + + const pitId = searchSource.getField('pit')?.id; + this.logger.debug( + `Executing search request with PIT ID: [${this.truncatePitId(pitId)}]` + + (searchAfter ? ` search_after: [${searchAfter}]` : '') + ); + + const searchBody: estypes.SearchRequest = searchSource.getSearchRequestBody(); if (searchBody == null) { throw new Error('Could not retrieve the search body!'); } - this.logger.debug(`Tracking total hits with: track_total_hits=${searchBody.track_total_hits}`); - this.logger.info(`Executing search request...`); const searchParams = { params: { body: searchBody, - index: index.title, - scroll: scrollSettings.duration, - size: scrollSettings.size, ignore_throttled: includeFrozen ? false : undefined, // "true" will cause deprecation warnings logged in ES }, }; @@ -88,35 +124,19 @@ export class CsvGenerator { strategy: ES_SEARCH_STRATEGY, transport: { maxRetries: 0, // retrying reporting jobs is handled in the task manager scheduling logic - requestTimeout: this.config.scroll.duration, + requestTimeout: scrollSettings.duration, }, }) ) - ).rawResponse as estypes.SearchResponse; + ).rawResponse; } catch (err) { - this.logger.error(`CSV export scan error: ${err}`); + this.logger.error(`CSV export search error: ${err}`); throw err; } return results; } - private async scroll(scrollId: string, scrollSettings: CsvExportSettings['scroll']) { - this.logger.info(`Executing scroll request...`); - - let results: estypes.SearchResponse | undefined; - try { - results = await this.clients.es.asCurrentUser.scroll({ - scroll: scrollSettings.duration, - scroll_id: scrollId, - }); - } catch (err) { - this.logger.error(`CSV export scroll error: ${err}`); - throw err; - } - return results; - } - /* * Load field formats for each field in the list */ @@ -202,7 +222,7 @@ export class CsvGenerator { builder: MaxSizeStringBuilder, settings: CsvExportSettings ) { - this.logger.debug(`Building CSV header row...`); + this.logger.debug(`Building CSV header row`); const header = Array.from(columns).map(this.escapeValues(settings)).join(settings.separator) + '\n'; @@ -225,7 +245,7 @@ export class CsvGenerator { formatters: Record, settings: CsvExportSettings ) { - this.logger.debug(`Building ${table.rows.length} CSV data rows...`); + this.logger.debug(`Building ${table.rows.length} CSV data rows`); for (const dataTableRow of table.rows) { if (this.cancellationToken.isCancelled()) { break; @@ -293,26 +313,28 @@ export class CsvGenerator { throw new Error(`The search must have a reference to an index pattern!`); } - const { maxSizeBytes, bom, escapeFormulaValues, scroll: scrollSettings } = settings; - + const { maxSizeBytes, bom, escapeFormulaValues, timezone } = settings; + const indexPatternTitle = index.getIndexPattern(); const builder = new MaxSizeStringBuilder(this.stream, byteSizeValueToNumber(maxSizeBytes), bom); const warnings: string[] = []; let first = true; let currentRecord = -1; let totalRecords: number | undefined; let totalRelation = 'eq'; - let scrollId: string | undefined; + let searchAfter: estypes.SortResults | undefined; + + let pitId = await this.openPointInTime(indexPatternTitle, settings); // apply timezone from the job to all date field formatters try { index.fields.getByType('date').forEach(({ name }) => { - this.logger.debug(`setting timezone on ${name}`); + this.logger.debug(`Setting timezone on ${name}`); const format: FieldFormatConfig = { ...index.fieldFormatMap[name], id: index.fieldFormatMap[name]?.id || 'date', // allow id: date_nanos params: { ...index.fieldFormatMap[name]?.params, - timezone: settings.timezone, + timezone, }, }; index.setFieldFormat(name, format); @@ -327,24 +349,20 @@ export class CsvGenerator { if (this.cancellationToken.isCancelled()) { break; } - let results: estypes.SearchResponse | undefined; - if (scrollId == null) { - // open a scroll cursor in Elasticsearch - results = await this.scan(index, searchSource, settings); - scrollId = results?._scroll_id; - if (results?.hits?.total != null) { - const { hits } = results; - if (typeof hits.total === 'number') { - totalRecords = hits.total; - } else { - totalRecords = hits.total?.value; - totalRelation = hits.total?.relation ?? 'unknown'; - } - this.logger.info(`Total hits: [${totalRecords}].` + `Accuracy: ${totalRelation}`); + // set the latest pit, which could be different from the last request + searchSource.setField('pit', { id: pitId, keep_alive: settings.scroll.duration }); + + const results = await this.doSearch(searchSource, settings, searchAfter); + + const { hits } = results; + if (first && hits.total != null) { + if (typeof hits.total === 'number') { + totalRecords = hits.total; + } else { + totalRecords = hits.total?.value; + totalRelation = hits.total?.relation ?? 'unknown'; } - } else { - // use the scroll cursor in Elasticsearch - results = await this.scroll(scrollId, scrollSettings); + this.logger.info(`Total hits ${totalRelation} ${totalRecords}.`); } if (!results) { @@ -352,13 +370,35 @@ export class CsvGenerator { break; } - // TODO check for shard failures, log them and add a warning if found - { - const { - hits: { hits, ...hitsMeta }, - ...header - } = results; - this.logger.debug('Results metadata: ' + JSON.stringify({ header, hitsMeta })); + const { + hits: { hits: _hits, ...hitsMeta }, + ...headerWithPit + } = results; + + const { pit_id: newPitId, ...header } = headerWithPit; + + const logInfo = { + header: { pit_id: `${this.truncatePitId(newPitId)}`, ...header }, + hitsMeta, + }; + this.logger.debug(`Results metadata: ${JSON.stringify(logInfo)}`); + + // use the most recently received id for the next search request + this.logger.debug(`Received PIT ID: [${this.truncatePitId(results.pit_id)}]`); + pitId = results.pit_id ?? pitId; + + // Update last sort results for next query. PIT is used, so the sort results + // automatically include _shard_doc as a tiebreaker + searchAfter = hits.hits[hits.hits.length - 1]?.sort as estypes.SortResults | undefined; + this.logger.debug(`Received search_after: [${searchAfter}]`); + + // check for shard failures, log them and add a warning if found + const { _shards: shards } = header; + if (shards.failures) { + shards.failures.forEach(({ reason }) => { + warnings.push(`Shard failure: ${JSON.stringify(reason)}`); + this.logger.warn(JSON.stringify(reason)); + }); } let table: Datatable | undefined; @@ -411,16 +451,12 @@ export class CsvGenerator { warnings.push(i18nTexts.unknownError(err?.message ?? err)); } } finally { - // clear scrollID - if (scrollId) { - this.logger.debug(`Executing clearScroll request`); - try { - await this.clients.es.asCurrentUser.clearScroll({ scroll_id: [scrollId] }); - } catch (err) { - this.logger.error(err); - } + // + if (pitId) { + this.logger.debug(`Closing point-in-time`); + await this.clients.es.asCurrentUser.closePointInTime({ body: { id: pitId } }); } else { - this.logger.warn(`No scrollId to clear!`); + this.logger.warn(`No PIT ID to clear!`); } } @@ -429,7 +465,7 @@ export class CsvGenerator { if (!this.maxSizeReached && this.csvRowCount !== totalRecords) { this.logger.warn( `ES scroll returned fewer total hits than expected! ` + - `Search result total hits: ${totalRecords}. Row count: ${this.csvRowCount}.` + `Search result total hits: ${totalRecords}. Row count: ${this.csvRowCount}` ); warnings.push( i18nTexts.csvRowCountError({ expected: totalRecords ?? NaN, received: this.csvRowCount }) @@ -447,4 +483,8 @@ export class CsvGenerator { error_code: reportingError?.code, }; } + + private truncatePitId(pitId: string | undefined) { + return pitId?.substring(0, 12) + '...'; + } }