From 8d4c5815f6da4cc055f3d80cc2134dc10dcd6859 Mon Sep 17 00:00:00 2001 From: Jean-Louis Leysens Date: Thu, 16 Sep 2021 13:17:27 +0200 Subject: [PATCH 1/6] WIP, switched CSV export to use PIT instead of scroll --- .../generate_csv/generate_csv.ts | 81 +++++++++++++------ .../reporting/server/routes/lib/jobs_query.ts | 2 +- .../functional/apps/discover/reporting.ts | 2 +- 3 files changed, 58 insertions(+), 27 deletions(-) diff --git a/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.ts b/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.ts index dc5c560d29546..6419b2519cd21 100644 --- a/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.ts +++ b/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.ts @@ -105,16 +105,31 @@ export class CsvGenerator { return results; } - private async scroll(scrollId: string, scrollSettings: CsvExportSettings['scroll']) { + private async scroll( + pitId: string, + settings: CsvExportSettings, + searchSource: ISearchSource, + searchAfter?: estypes.SearchSortResults + ) { this.logger.debug(`executing scroll request`); - const results = ( - await this.clients.es.asCurrentUser.scroll({ - body: { - scroll: scrollSettings.duration, - scroll_id: scrollId, + const { scroll: scrollSettings, includeFrozen } = settings; + const searchBody = searchSource.getSearchRequestBody(); + const searchParams: estypes.SearchRequest = { + body: { + ...searchBody, + pit: { + id: pitId, + keep_alive: scrollSettings.duration, }, - }) - ).body; + search_after: searchAfter, + }, + size: scrollSettings.size, + ignore_throttled: !includeFrozen, + }; + + const results = (await this.clients.es.asCurrentUser.search(searchParams)) + .body as estypes.SearchResponse; + return results; } @@ -294,14 +309,15 @@ export class CsvGenerator { throw new Error(`The search must have a reference to an index pattern!`); } - const { maxSizeBytes, bom, escapeFormulaValues, scroll: scrollSettings } = settings; + const { maxSizeBytes, bom, escapeFormulaValues } = settings; const builder = new MaxSizeStringBuilder(this.stream, byteSizeValueToNumber(maxSizeBytes), bom); const warnings: string[] = []; let first = true; let currentRecord = -1; let totalRecords = 0; - let scrollId: string | undefined; + let pitId: string | undefined; + let searchAfter: undefined | estypes.SearchSortResults; // apply timezone from the job to all date field formatters try { @@ -326,24 +342,36 @@ export class CsvGenerator { if (this.cancellationToken.isCancelled()) { break; } - let results: estypes.SearchResponse | undefined; - if (scrollId == null) { - // open a scroll cursor in Elasticsearch - results = await this.scan(index, searchSource, settings); - scrollId = results?._scroll_id; - if (results.hits?.total != null) { - totalRecords = results.hits.total as number; - this.logger.debug(`Total search results: ${totalRecords}`); - } - } else { - // use the scroll cursor in Elasticsearch - results = await this.scroll(scrollId, scrollSettings); + if (pitId == null) { + pitId = ( + await this.clients.es.asCurrentUser.openPointInTime({ + index: index.title, + keep_alive: settings.scroll.duration, + }) + ).body.id; + } + + // use the scroll cursor in Elasticsearch + const results = await this.scroll(pitId, settings, searchSource, searchAfter); + if (results.hits?.total != null && !totalRecords) { + totalRecords = + typeof results.hits.total === 'number' ? results.hits.total : results.hits.total.value; + console.log('total hits', totalRecords); + this.logger.debug(`Total search results: ${totalRecords}`); } + pitId = results.pit_id ? results.pit_id : pitId; + if (!results) { this.logger.warning(`Search results are undefined!`); break; } + console.log('results', results.hits.hits.length); + + const totalHits = results.hits.hits.length; + if (totalHits) { + searchAfter = results.hits.hits[totalHits - 1].sort; + } let table: Datatable | undefined; try { @@ -386,15 +414,16 @@ export class CsvGenerator { } } catch (err) { this.logger.error(err); + console.log(JSON.stringify(err, null, 2)); if (err instanceof KbnServerError && err.errBody) { throw JSON.stringify(err.errBody.error); } } finally { // clear scrollID - if (scrollId) { - this.logger.debug(`executing clearScroll request`); + if (pitId) { + this.logger.debug(`executing close PIT request`); try { - await this.clients.es.asCurrentUser.clearScroll({ body: { scroll_id: [scrollId] } }); + await this.clients.es.asCurrentUser.closePointInTime({ body: { id: pitId } }); } catch (err) { this.logger.error(err); } @@ -405,6 +434,8 @@ export class CsvGenerator { this.logger.debug(`Finished generating. Row count: ${this.csvRowCount}.`); + console.log('this.csvRowCount', this.csvRowCount); + return { content_type: CONTENT_TYPE_CSV, csv_contains_formulas: this.csvContainsFormulas && !escapeFormulaValues, diff --git a/x-pack/plugins/reporting/server/routes/lib/jobs_query.ts b/x-pack/plugins/reporting/server/routes/lib/jobs_query.ts index afa83ed331672..969c9dcf5a532 100644 --- a/x-pack/plugins/reporting/server/routes/lib/jobs_query.ts +++ b/x-pack/plugins/reporting/server/routes/lib/jobs_query.ts @@ -148,7 +148,7 @@ export function jobsQueryFactory(reportingCore: ReportingCore): JobsQueryFactory constant_score: { filter: { bool: { - must: [{ term: { _id: id } }, { term: { created_by: username } }], + must: [{ term: { _id: id } }], }, }, }, diff --git a/x-pack/test/functional/apps/discover/reporting.ts b/x-pack/test/functional/apps/discover/reporting.ts index bdf6b9e62c5f3..0b5d70a5c1c26 100644 --- a/x-pack/test/functional/apps/discover/reporting.ts +++ b/x-pack/test/functional/apps/discover/reporting.ts @@ -23,7 +23,7 @@ export default function ({ getService, getPageObjects }: FtrProviderContext) { }; // Failing: See https://github.com/elastic/kibana/issues/112164 - describe.skip('Discover CSV Export', () => { + describe.only('Discover CSV Export', () => { before('initialize tests', async () => { log.debug('ReportingPage:initTests'); await esArchiver.load('x-pack/test/functional/es_archives/reporting/ecommerce'); From f2026220d32504976d9100de62ac5377f9109824 Mon Sep 17 00:00:00 2001 From: Jean-Louis Leysens Date: Thu, 16 Sep 2021 13:34:52 +0200 Subject: [PATCH 2/6] remove .skip --- x-pack/test/functional/apps/discover/reporting.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/x-pack/test/functional/apps/discover/reporting.ts b/x-pack/test/functional/apps/discover/reporting.ts index 0b5d70a5c1c26..a5cc54ae8fc6d 100644 --- a/x-pack/test/functional/apps/discover/reporting.ts +++ b/x-pack/test/functional/apps/discover/reporting.ts @@ -22,8 +22,7 @@ export default function ({ getService, getPageObjects }: FtrProviderContext) { await kibanaServer.uiSettings.update({ 'discover:searchFieldsFromSource': setValue }); }; - // Failing: See https://github.com/elastic/kibana/issues/112164 - describe.only('Discover CSV Export', () => { + describe('Discover CSV Export', () => { before('initialize tests', async () => { log.debug('ReportingPage:initTests'); await esArchiver.load('x-pack/test/functional/es_archives/reporting/ecommerce'); From e059f08a683caac8bba12098a40f74cb5e07ab87 Mon Sep 17 00:00:00 2001 From: Jean-Louis Leysens Date: Thu, 16 Sep 2021 16:40:31 +0200 Subject: [PATCH 3/6] clean up the generate csv file --- .../generate_csv/generate_csv.ts | 38 ++----------------- 1 file changed, 3 insertions(+), 35 deletions(-) diff --git a/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.ts b/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.ts index 6419b2519cd21..c1bf741f3754a 100644 --- a/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.ts +++ b/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.ts @@ -14,8 +14,6 @@ import { Datatable } from 'src/plugins/expressions/server'; import { ReportingConfig } from '../../..'; import { cellHasFormulas, - ES_SEARCH_STRATEGY, - IndexPattern, ISearchSource, ISearchStartSearchSource, SearchFieldValue, @@ -80,31 +78,6 @@ export class CsvGenerator { private stream: Writable ) {} - private async scan( - index: IndexPattern, - searchSource: ISearchSource, - settings: CsvExportSettings - ) { - const { scroll: scrollSettings, includeFrozen } = settings; - const searchBody = searchSource.getSearchRequestBody(); - this.logger.debug(`executing search request`); - const searchParams = { - params: { - body: searchBody, - index: index.title, - scroll: scrollSettings.duration, - size: scrollSettings.size, - ignore_throttled: !includeFrozen, - }, - }; - - const results = ( - await this.clients.data.search(searchParams, { strategy: ES_SEARCH_STRATEGY }).toPromise() - ).rawResponse as estypes.SearchResponse; - - return results; - } - private async scroll( pitId: string, settings: CsvExportSettings, @@ -356,7 +329,6 @@ export class CsvGenerator { if (results.hits?.total != null && !totalRecords) { totalRecords = typeof results.hits.total === 'number' ? results.hits.total : results.hits.total.value; - console.log('total hits', totalRecords); this.logger.debug(`Total search results: ${totalRecords}`); } @@ -366,7 +338,6 @@ export class CsvGenerator { this.logger.warning(`Search results are undefined!`); break; } - console.log('results', results.hits.hits.length); const totalHits = results.hits.hits.length; if (totalHits) { @@ -414,28 +385,25 @@ export class CsvGenerator { } } catch (err) { this.logger.error(err); - console.log(JSON.stringify(err, null, 2)); if (err instanceof KbnServerError && err.errBody) { throw JSON.stringify(err.errBody.error); } } finally { - // clear scrollID + // close PIT if (pitId) { - this.logger.debug(`executing close PIT request`); + this.logger.debug(`executing close point-in-time request`); try { await this.clients.es.asCurrentUser.closePointInTime({ body: { id: pitId } }); } catch (err) { this.logger.error(err); } } else { - this.logger.warn(`No scrollId to clear!`); + this.logger.warn(`No point-in-time to close!`); } } this.logger.debug(`Finished generating. Row count: ${this.csvRowCount}.`); - console.log('this.csvRowCount', this.csvRowCount); - return { content_type: CONTENT_TYPE_CSV, csv_contains_formulas: this.csvContainsFormulas && !escapeFormulaValues, From 4c09af6653a280d4ab08b4f8eb13729abc6664f7 Mon Sep 17 00:00:00 2001 From: Jean-Louis Leysens Date: Thu, 16 Sep 2021 16:45:52 +0200 Subject: [PATCH 4/6] re-add username to get query --- x-pack/plugins/reporting/server/routes/lib/jobs_query.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugins/reporting/server/routes/lib/jobs_query.ts b/x-pack/plugins/reporting/server/routes/lib/jobs_query.ts index 969c9dcf5a532..afa83ed331672 100644 --- a/x-pack/plugins/reporting/server/routes/lib/jobs_query.ts +++ b/x-pack/plugins/reporting/server/routes/lib/jobs_query.ts @@ -148,7 +148,7 @@ export function jobsQueryFactory(reportingCore: ReportingCore): JobsQueryFactory constant_score: { filter: { bool: { - must: [{ term: { _id: id } }], + must: [{ term: { _id: id } }, { term: { created_by: username } }], }, }, }, From 14582f99ccab8c0b046c4c32718ebe8b5f761acc Mon Sep 17 00:00:00 2001 From: Jean-Louis Leysens Date: Thu, 16 Sep 2021 17:58:51 +0200 Subject: [PATCH 5/6] set ignore_unavailable to undefined --- .../csv_searchsource/generate_csv/generate_csv.ts | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.ts b/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.ts index c1bf741f3754a..3814005f2f858 100644 --- a/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.ts +++ b/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.ts @@ -16,6 +16,7 @@ import { cellHasFormulas, ISearchSource, ISearchStartSearchSource, + ES_SEARCH_STRATEGY, SearchFieldValue, SearchSourceFields, tabifyDocs, @@ -98,10 +99,14 @@ export class CsvGenerator { }, size: scrollSettings.size, ignore_throttled: !includeFrozen, + ignore_unavailable: undefined, }; - const results = (await this.clients.es.asCurrentUser.search(searchParams)) - .body as estypes.SearchResponse; + const results = ( + await this.clients.data + .search({ params: searchParams }, { strategy: ES_SEARCH_STRATEGY }) + .toPromise() + ).rawResponse as estypes.SearchResponse; return results; } @@ -324,7 +329,6 @@ export class CsvGenerator { ).body.id; } - // use the scroll cursor in Elasticsearch const results = await this.scroll(pitId, settings, searchSource, searchAfter); if (results.hits?.total != null && !totalRecords) { totalRecords = From 7db7614aec91a4efb626b20f92c77a7dd3a87f20 Mon Sep 17 00:00:00 2001 From: Jean-Louis Leysens Date: Mon, 20 Sep 2021 10:24:57 +0200 Subject: [PATCH 6/6] revert back to using _scroll, unskip the CSV export tests --- .../generate_csv/generate_csv.ts | 95 +++++++++---------- 1 file changed, 46 insertions(+), 49 deletions(-) diff --git a/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.ts b/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.ts index 3814005f2f858..dc5c560d29546 100644 --- a/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.ts +++ b/x-pack/plugins/reporting/server/export_types/csv_searchsource/generate_csv/generate_csv.ts @@ -14,9 +14,10 @@ import { Datatable } from 'src/plugins/expressions/server'; import { ReportingConfig } from '../../..'; import { cellHasFormulas, + ES_SEARCH_STRATEGY, + IndexPattern, ISearchSource, ISearchStartSearchSource, - ES_SEARCH_STRATEGY, SearchFieldValue, SearchSourceFields, tabifyDocs, @@ -79,38 +80,44 @@ export class CsvGenerator { private stream: Writable ) {} - private async scroll( - pitId: string, - settings: CsvExportSettings, + private async scan( + index: IndexPattern, searchSource: ISearchSource, - searchAfter?: estypes.SearchSortResults + settings: CsvExportSettings ) { - this.logger.debug(`executing scroll request`); const { scroll: scrollSettings, includeFrozen } = settings; const searchBody = searchSource.getSearchRequestBody(); - const searchParams: estypes.SearchRequest = { - body: { - ...searchBody, - pit: { - id: pitId, - keep_alive: scrollSettings.duration, - }, - search_after: searchAfter, + this.logger.debug(`executing search request`); + const searchParams = { + params: { + body: searchBody, + index: index.title, + scroll: scrollSettings.duration, + size: scrollSettings.size, + ignore_throttled: !includeFrozen, }, - size: scrollSettings.size, - ignore_throttled: !includeFrozen, - ignore_unavailable: undefined, }; const results = ( - await this.clients.data - .search({ params: searchParams }, { strategy: ES_SEARCH_STRATEGY }) - .toPromise() + await this.clients.data.search(searchParams, { strategy: ES_SEARCH_STRATEGY }).toPromise() ).rawResponse as estypes.SearchResponse; return results; } + private async scroll(scrollId: string, scrollSettings: CsvExportSettings['scroll']) { + this.logger.debug(`executing scroll request`); + const results = ( + await this.clients.es.asCurrentUser.scroll({ + body: { + scroll: scrollSettings.duration, + scroll_id: scrollId, + }, + }) + ).body; + return results; + } + /* * Load field formats for each field in the list */ @@ -287,15 +294,14 @@ export class CsvGenerator { throw new Error(`The search must have a reference to an index pattern!`); } - const { maxSizeBytes, bom, escapeFormulaValues } = settings; + const { maxSizeBytes, bom, escapeFormulaValues, scroll: scrollSettings } = settings; const builder = new MaxSizeStringBuilder(this.stream, byteSizeValueToNumber(maxSizeBytes), bom); const warnings: string[] = []; let first = true; let currentRecord = -1; let totalRecords = 0; - let pitId: string | undefined; - let searchAfter: undefined | estypes.SearchSortResults; + let scrollId: string | undefined; // apply timezone from the job to all date field formatters try { @@ -320,34 +326,25 @@ export class CsvGenerator { if (this.cancellationToken.isCancelled()) { break; } - if (pitId == null) { - pitId = ( - await this.clients.es.asCurrentUser.openPointInTime({ - index: index.title, - keep_alive: settings.scroll.duration, - }) - ).body.id; - } - - const results = await this.scroll(pitId, settings, searchSource, searchAfter); - if (results.hits?.total != null && !totalRecords) { - totalRecords = - typeof results.hits.total === 'number' ? results.hits.total : results.hits.total.value; - this.logger.debug(`Total search results: ${totalRecords}`); + let results: estypes.SearchResponse | undefined; + if (scrollId == null) { + // open a scroll cursor in Elasticsearch + results = await this.scan(index, searchSource, settings); + scrollId = results?._scroll_id; + if (results.hits?.total != null) { + totalRecords = results.hits.total as number; + this.logger.debug(`Total search results: ${totalRecords}`); + } + } else { + // use the scroll cursor in Elasticsearch + results = await this.scroll(scrollId, scrollSettings); } - pitId = results.pit_id ? results.pit_id : pitId; - if (!results) { this.logger.warning(`Search results are undefined!`); break; } - const totalHits = results.hits.hits.length; - if (totalHits) { - searchAfter = results.hits.hits[totalHits - 1].sort; - } - let table: Datatable | undefined; try { table = tabifyDocs(results, index, { shallow: true, meta: true }); @@ -393,16 +390,16 @@ export class CsvGenerator { throw JSON.stringify(err.errBody.error); } } finally { - // close PIT - if (pitId) { - this.logger.debug(`executing close point-in-time request`); + // clear scrollID + if (scrollId) { + this.logger.debug(`executing clearScroll request`); try { - await this.clients.es.asCurrentUser.closePointInTime({ body: { id: pitId } }); + await this.clients.es.asCurrentUser.clearScroll({ body: { scroll_id: [scrollId] } }); } catch (err) { this.logger.error(err); } } else { - this.logger.warn(`No point-in-time to close!`); + this.logger.warn(`No scrollId to clear!`); } }