From bab56811fce021365d74b06113ac252d1b6fddac Mon Sep 17 00:00:00 2001 From: Timothy Sullivan Date: Wed, 21 Feb 2024 12:24:30 -0700 Subject: [PATCH 1/7] Fix job notifications poller --- .../reporting_api_client.ts | 4 +- .../reporting/public/lib/stream_handler.ts | 147 ++++++++++-------- .../notifier/job_completion_notifications.ts | 70 ++++++--- x-pack/plugins/reporting/public/plugin.ts | 42 +---- 4 files changed, 136 insertions(+), 127 deletions(-) diff --git a/x-pack/plugins/reporting/public/lib/reporting_api_client/reporting_api_client.ts b/x-pack/plugins/reporting/public/lib/reporting_api_client/reporting_api_client.ts index 37b8c60f4fe16..a395f9c40ea5a 100644 --- a/x-pack/plugins/reporting/public/lib/reporting_api_client/reporting_api_client.ts +++ b/x-pack/plugins/reporting/public/lib/reporting_api_client/reporting_api_client.ts @@ -18,7 +18,7 @@ import rison from '@kbn/rison'; import moment from 'moment'; import { stringify } from 'query-string'; import { INTERNAL_ROUTES, PUBLIC_ROUTES } from '../../../common/constants'; -import { add } from '../../notifier/job_completion_notifications'; +import { addPendingJobId } from '../../notifier/job_completion_notifications'; import { Job } from '../job'; /* @@ -180,7 +180,7 @@ export class ReportingAPIClient implements IReportingAPI { body: JSON.stringify({ jobParams: jobParamsRison }), } ); - add(resp.job.id); + await addPendingJobId(resp.job.id); return new Job(resp.job); } diff --git a/x-pack/plugins/reporting/public/lib/stream_handler.ts b/x-pack/plugins/reporting/public/lib/stream_handler.ts index 6bdba0b9bf84d..ddd48d78f6716 100644 --- a/x-pack/plugins/reporting/public/lib/stream_handler.ts +++ b/x-pack/plugins/reporting/public/lib/stream_handler.ts @@ -6,11 +6,11 @@ */ import * as Rx from 'rxjs'; -import { catchError, map } from 'rxjs/operators'; +import { catchError, filter, mergeMap, takeUntil } from 'rxjs/operators'; import { DocLinksStart, NotificationsSetup, ThemeServiceStart } from '@kbn/core/public'; import { i18n } from '@kbn/i18n'; -import { JOB_COMPLETION_NOTIFICATIONS_SESSION_KEY, JOB_STATUS } from '@kbn/reporting-common'; +import { JOB_STATUS } from '@kbn/reporting-common'; import { JobId } from '@kbn/reporting-common/types'; import { @@ -21,6 +21,7 @@ import { getWarningMaxSizeToast, getWarningToast, } from '../notifier'; +import { getPendingJobIds, setPendingJobIds } from '../notifier/job_completion_notifications'; import { JobSummary, JobSummarySet } from '../types'; import { Job } from './job'; import { ReportingAPIClient } from './reporting_api_client'; @@ -31,10 +32,6 @@ import { ReportingAPIClient } from './reporting_api_client'; */ const COMPLETED_JOB_TOAST_TIMEOUT = 24 * 60 * 60 * 1000; // 24 hours -function updateStored(jobIds: JobId[]): void { - sessionStorage.setItem(JOB_COMPLETION_NOTIFICATIONS_SESSION_KEY, JSON.stringify(jobIds)); -} - function getReportStatus(src: Job): JobSummary { return { id: src.id, @@ -47,6 +44,24 @@ function getReportStatus(src: Job): JobSummary { }; } +function handleError( + err: Error, + notifications: NotificationsSetup, + theme: ThemeServiceStart +): Rx.Observable { + notifications.toasts.addDanger( + getGeneralErrorToast( + i18n.translate('xpack.reporting.publicNotifier.pollingErrorMessage', { + defaultMessage: 'Reporting notifier error!', + }), + err, + theme + ) + ); + window.console.error(err); + return Rx.of({ completed: [], failed: [] }); +} + export class ReportingNotifierStreamHandler { constructor( private notifications: NotificationsSetup, @@ -55,56 +70,62 @@ export class ReportingNotifierStreamHandler { private docLinks: DocLinksStart ) {} + public startPolling(interval: number, stop$: Rx.Observable) { + Rx.timer(0, interval) + .pipe( + takeUntil(stop$), // stop the interval when stop method is called + mergeMap(getPendingJobIds), // read all pending job IDs from session storage + filter((previousPending) => previousPending.length > 0), // stop the pipeline here if there are none pending + mergeMap((previousPending) => this.findChangedStatusJobs(previousPending)), // look up the latest status of all pending jobs on the server + mergeMap(({ completed, failed }) => this.showNotifications({ completed, failed })), + catchError((err) => { + // eslint-disable-next-line no-console + console.error(err); + return handleError(err, this.notifications, this.theme); + }) + ) + .subscribe(); + } + /* * Use Kibana Toast API to show our messages + * + * Public for purposes of testing */ public showNotifications({ completed: completedJobs, failed: failedJobs, }: JobSummarySet): Rx.Observable { + const notifications = this.notifications; + const apiClient = this.apiClient; + const theme = this.theme; + const docLinks = this.docLinks; + const getManagementLink = apiClient.getManagementLink.bind(apiClient); + const getDownloadLink = apiClient.getDownloadLink.bind(apiClient); + const showNotificationsAsync = async () => { const completedOptions = { toastLifeTimeMs: COMPLETED_JOB_TOAST_TIMEOUT }; // notifications with download link for (const job of completedJobs) { if (job.csvContainsFormulas) { - this.notifications.toasts.addWarning( - getWarningFormulasToast( - job, - this.apiClient.getManagementLink, - this.apiClient.getDownloadLink, - this.theme - ), + notifications.toasts.addWarning( + getWarningFormulasToast(job, getManagementLink, getDownloadLink, theme), completedOptions ); } else if (job.maxSizeReached) { - this.notifications.toasts.addWarning( - getWarningMaxSizeToast( - job, - this.apiClient.getManagementLink, - this.apiClient.getDownloadLink, - this.theme - ), + notifications.toasts.addWarning( + getWarningMaxSizeToast(job, getManagementLink, getDownloadLink, theme), completedOptions ); } else if (job.status === JOB_STATUS.WARNINGS) { - this.notifications.toasts.addWarning( - getWarningToast( - job, - this.apiClient.getManagementLink, - this.apiClient.getDownloadLink, - this.theme - ), + notifications.toasts.addWarning( + getWarningToast(job, getManagementLink, getDownloadLink, theme), completedOptions ); } else { - this.notifications.toasts.addSuccess( - getSuccessToast( - job, - this.apiClient.getManagementLink, - this.apiClient.getDownloadLink, - this.theme - ), + notifications.toasts.addSuccess( + getSuccessToast(job, getManagementLink, getDownloadLink, theme), completedOptions ); } @@ -112,15 +133,9 @@ export class ReportingNotifierStreamHandler { // no download link available for (const job of failedJobs) { - const errorText = await this.apiClient.getError(job.id); + const errorText = await apiClient.getError(job.id); this.notifications.toasts.addDanger( - getFailureToast( - errorText, - job, - this.apiClient.getManagementLink, - this.theme, - this.docLinks - ) + getFailureToast(errorText, job, getManagementLink, theme, docLinks) ); } return { completed: completedJobs, failed: failedJobs }; @@ -132,30 +147,38 @@ export class ReportingNotifierStreamHandler { /* * An observable that finds jobs that are known to be "processing" (stored in * session storage) but have non-processing job status on the server + * + * Public for purposes of testing */ - public findChangedStatusJobs(storedJobs: JobId[]): Rx.Observable { - return Rx.from(this.apiClient.findForJobIds(storedJobs)).pipe( - map((jobs) => { - const completedJobs: JobSummary[] = []; - const failedJobs: JobSummary[] = []; - const pending: JobId[] = []; - - // add side effects to storage - for (const job of jobs) { - const { id: jobId, status: jobStatus } = job; - if (storedJobs.includes(jobId)) { - if (jobStatus === JOB_STATUS.COMPLETED || jobStatus === JOB_STATUS.WARNINGS) { - completedJobs.push(getReportStatus(job)); - } else if (jobStatus === JOB_STATUS.FAILED) { - failedJobs.push(getReportStatus(job)); - } else { - pending.push(jobId); - } + public findChangedStatusJobs(previousPending: JobId[]): Rx.Observable { + return Rx.from(this.apiClient.findForJobIds(previousPending)).pipe( + mergeMap(async (jobs) => { + const newCompleted: JobSummary[] = []; + const newFailed: JobSummary[] = []; + const newPending: JobId[] = []; + + for (const pendingJobId of previousPending) { + const updatedJob = jobs.find(({ id }) => id === pendingJobId); + if ( + updatedJob?.status === JOB_STATUS.COMPLETED || + updatedJob?.status === JOB_STATUS.WARNINGS + ) { + newCompleted.push(getReportStatus(updatedJob)); + } else if (updatedJob?.status === JOB_STATUS.FAILED) { + newFailed.push(getReportStatus(updatedJob)); + } else { + // Keep job tracked in storage if is pending. It also + // may not be present in apiClient.findForJobIds + // response if index refresh is slow + newPending.push(pendingJobId); } } - updateStored(pending); // refresh the storage of pending job IDs, minus completed and failed job IDs - return { completed: completedJobs, failed: failedJobs }; + // refresh the storage of pending job IDs, minus + // completed and failed job IDs + await setPendingJobIds(newPending); + + return { completed: newCompleted, failed: newFailed }; }), catchError((err) => { // show connection refused toast diff --git a/x-pack/plugins/reporting/public/notifier/job_completion_notifications.ts b/x-pack/plugins/reporting/public/notifier/job_completion_notifications.ts index 61db874e75208..b98d5a35d6e07 100644 --- a/x-pack/plugins/reporting/public/notifier/job_completion_notifications.ts +++ b/x-pack/plugins/reporting/public/notifier/job_completion_notifications.ts @@ -8,29 +8,53 @@ import { JOB_COMPLETION_NOTIFICATIONS_SESSION_KEY } from '@kbn/reporting-common'; import { JobId } from '@kbn/reporting-common/types'; -const set = (jobs: string[]) => { - sessionStorage.setItem(JOB_COMPLETION_NOTIFICATIONS_SESSION_KEY, JSON.stringify(jobs)); -}; +// Reading and writing to the local storage must be atomic, +// i.e. performed in a single operation. This storage queue +// allows operations to process one at a time. +let operationQueue = Promise.resolve(); +async function addToQueue(func: (error: Error | null) => void) { + operationQueue = operationQueue.then(() => func(null)).catch(func); + await operationQueue; +} -const getAll = (): string[] => { - const sessionValue = sessionStorage.getItem(JOB_COMPLETION_NOTIFICATIONS_SESSION_KEY); - return sessionValue ? JSON.parse(sessionValue) : []; -}; +export async function getPendingJobIds(): Promise { + let jobs: JobId[] = []; + await addToQueue(async () => { + // get the current jobs + const jobsData = localStorage.getItem(JOB_COMPLETION_NOTIFICATIONS_SESSION_KEY); + jobs = jobsData ? JSON.parse(jobsData) : []; + }); + return jobs; +} -export const add = (jobId: JobId) => { - const jobs = getAll(); - jobs.push(jobId); - set(jobs); -}; +export async function addPendingJobId(jobId: JobId) { + addToQueue(async (error: Error | null) => { + return new Promise((resolve, reject) => { + if (error) { + window.console.error(error); + reject(error); + } + // get the current jobs synchronously + const jobsData = localStorage.getItem(JOB_COMPLETION_NOTIFICATIONS_SESSION_KEY); + const jobs: JobId[] = jobsData ? JSON.parse(jobsData) : []; + // add the new job + jobs.push(jobId); + // write back to local storage + localStorage.setItem(JOB_COMPLETION_NOTIFICATIONS_SESSION_KEY, JSON.stringify(jobs)); + resolve(); + }); + }); +} -export const remove = (jobId: JobId) => { - const jobs = getAll(); - const index = jobs.indexOf(jobId); - - if (!index) { - throw new Error('Unable to find job to remove it'); - } - - jobs.splice(index, 1); - set(jobs); -}; +export async function setPendingJobIds(jobIds: JobId[]) { + addToQueue(async (error: Error | null) => { + return new Promise((resolve, reject) => { + if (error) { + reject(error); + } + // write update jobs back to local storage + localStorage.setItem(JOB_COMPLETION_NOTIFICATIONS_SESSION_KEY, JSON.stringify(jobIds)); + resolve(); + }); + }); +} diff --git a/x-pack/plugins/reporting/public/plugin.ts b/x-pack/plugins/reporting/public/plugin.ts index 91fe945b6e09d..6e4da02f3d3c5 100644 --- a/x-pack/plugins/reporting/public/plugin.ts +++ b/x-pack/plugins/reporting/public/plugin.ts @@ -6,17 +6,14 @@ */ import * as Rx from 'rxjs'; -import { catchError, filter, map, mergeMap, takeUntil } from 'rxjs/operators'; import { CoreSetup, CoreStart, HttpSetup, IUiSettingsClient, - NotificationsSetup, Plugin, PluginInitializerContext, - ThemeServiceStart, } from '@kbn/core/public'; import type { DataPublicPluginStart } from '@kbn/data-plugin/public'; import { CONTEXT_MENU_TRIGGER } from '@kbn/embeddable-plugin/public'; @@ -26,8 +23,7 @@ import type { LicensingPluginStart } from '@kbn/licensing-plugin/public'; import type { ManagementSetup, ManagementStart } from '@kbn/management-plugin/public'; import type { ScreenshotModePluginSetup } from '@kbn/screenshot-mode-plugin/public'; -import { JOB_COMPLETION_NOTIFICATIONS_SESSION_KEY, durationToNumber } from '@kbn/reporting-common'; -import type { JobId } from '@kbn/reporting-common/types'; +import { durationToNumber } from '@kbn/reporting-common'; import type { ClientConfigType } from '@kbn/reporting-public'; import type { SharePluginSetup, SharePluginStart } from '@kbn/share-plugin/public'; import type { UiActionsSetup, UiActionsStart } from '@kbn/ui-actions-plugin/public'; @@ -35,35 +31,10 @@ import type { UiActionsSetup, UiActionsStart } from '@kbn/ui-actions-plugin/publ import type { ReportingSetup, ReportingStart } from '.'; import { ReportingAPIClient } from './lib/reporting_api_client'; import { ReportingNotifierStreamHandler as StreamHandler } from './lib/stream_handler'; -import { getGeneralErrorToast } from './notifier'; import { ReportingCsvPanelAction } from './panel_actions/get_csv_panel_action'; import { reportingCsvShareProvider } from './share_context_menu/register_csv_reporting'; import { reportingScreenshotShareProvider } from './share_context_menu/register_pdf_png_reporting'; import { getSharedComponents } from './shared'; -import type { JobSummarySet } from './types'; - -function getStored(): JobId[] { - const sessionValue = sessionStorage.getItem(JOB_COMPLETION_NOTIFICATIONS_SESSION_KEY); - return sessionValue ? JSON.parse(sessionValue) : []; -} - -function handleError( - notifications: NotificationsSetup, - err: Error, - theme: ThemeServiceStart -): Rx.Observable { - notifications.toasts.addDanger( - getGeneralErrorToast( - i18n.translate('xpack.reporting.publicNotifier.pollingErrorMessage', { - defaultMessage: 'Reporting notifier error!', - }), - err, - theme - ) - ); - window.console.error(err); - return Rx.of({ completed: [], failed: [] }); -} export interface ReportingPublicPluginSetupDependencies { home: HomePublicPluginSetup; @@ -264,16 +235,7 @@ export class ReportingPublicPlugin const apiClient = this.getApiClient(core.http, core.uiSettings); const streamHandler = new StreamHandler(notifications, apiClient, core.theme, docLinks); const interval = durationToNumber(this.config.poll.jobsRefresh.interval); - Rx.timer(0, interval) - .pipe( - takeUntil(this.stop$), // stop the interval when stop method is called - map(() => getStored()), // read all pending job IDs from session storage - filter((storedJobs) => storedJobs.length > 0), // stop the pipeline here if there are none pending - mergeMap((storedJobs) => streamHandler.findChangedStatusJobs(storedJobs)), // look up the latest status of all pending jobs on the server - mergeMap(({ completed, failed }) => streamHandler.showNotifications({ completed, failed })), - catchError((err) => handleError(notifications, err, core.theme)) - ) - .subscribe(); + streamHandler.startPolling(interval, this.stop$); return this.getContract(); } From 641a4c24c8ebf22aa664a3473ea0fdfe265cf747 Mon Sep 17 00:00:00 2001 From: Tim Sullivan Date: Thu, 22 Feb 2024 08:06:18 -0700 Subject: [PATCH 2/7] updates to comments --- x-pack/plugins/reporting/public/lib/stream_handler.ts | 2 +- .../reporting/public/notifier/job_completion_notifications.ts | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/x-pack/plugins/reporting/public/lib/stream_handler.ts b/x-pack/plugins/reporting/public/lib/stream_handler.ts index ddd48d78f6716..847ad7fda941b 100644 --- a/x-pack/plugins/reporting/public/lib/stream_handler.ts +++ b/x-pack/plugins/reporting/public/lib/stream_handler.ts @@ -175,7 +175,7 @@ export class ReportingNotifierStreamHandler { } // refresh the storage of pending job IDs, minus - // completed and failed job IDs + // the newly completed and failed jobs await setPendingJobIds(newPending); return { completed: newCompleted, failed: newFailed }; diff --git a/x-pack/plugins/reporting/public/notifier/job_completion_notifications.ts b/x-pack/plugins/reporting/public/notifier/job_completion_notifications.ts index b98d5a35d6e07..66066b7352f11 100644 --- a/x-pack/plugins/reporting/public/notifier/job_completion_notifications.ts +++ b/x-pack/plugins/reporting/public/notifier/job_completion_notifications.ts @@ -10,7 +10,9 @@ import { JobId } from '@kbn/reporting-common/types'; // Reading and writing to the local storage must be atomic, // i.e. performed in a single operation. This storage queue -// allows operations to process one at a time. +// Operations on the localStorage key can happen from various +// parts of code. Using a queue to manage async operations allows +// operations to process one at a time let operationQueue = Promise.resolve(); async function addToQueue(func: (error: Error | null) => void) { operationQueue = operationQueue.then(() => func(null)).catch(func); From f35b856dcee0190e613ea2748d644233e11f0eac Mon Sep 17 00:00:00 2001 From: Timothy Sullivan Date: Thu, 22 Feb 2024 11:27:56 -0700 Subject: [PATCH 3/7] fix a ts error from merge conflict --- x-pack/plugins/reporting/public/lib/stream_handler.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/x-pack/plugins/reporting/public/lib/stream_handler.ts b/x-pack/plugins/reporting/public/lib/stream_handler.ts index 6663759f78755..23cf4aef5ae92 100644 --- a/x-pack/plugins/reporting/public/lib/stream_handler.ts +++ b/x-pack/plugins/reporting/public/lib/stream_handler.ts @@ -13,7 +13,7 @@ import { i18n } from '@kbn/i18n'; import { JOB_STATUS } from '@kbn/reporting-common'; import { JobId } from '@kbn/reporting-common/types'; -import { Job, ReportingAPIClient } from '@kbn/reporting-public'; +import { Job, ReportingAPIClient, getPendingJobIds, setPendingJobIds } from '@kbn/reporting-public'; import { getFailureToast, getGeneralErrorToast, @@ -22,7 +22,6 @@ import { getWarningMaxSizeToast, getWarningToast, } from '../notifier'; -import { getPendingJobIds, setPendingJobIds } from '../notifier/job_completion_notifications'; import { JobSummary, JobSummarySet } from '../types'; /** From cc7263165736c6d361ae3c53279aa8359fd3a71b Mon Sep 17 00:00:00 2001 From: Timothy Sullivan Date: Thu, 22 Feb 2024 13:02:32 -0700 Subject: [PATCH 4/7] add unit test for update module --- .../job_completion_notifications.test.ts | 35 +++++++++++++++++++ .../public/job_completion_notifications.ts | 4 +-- 2 files changed, 37 insertions(+), 2 deletions(-) create mode 100644 packages/kbn-reporting/public/job_completion_notifications.test.ts diff --git a/packages/kbn-reporting/public/job_completion_notifications.test.ts b/packages/kbn-reporting/public/job_completion_notifications.test.ts new file mode 100644 index 0000000000000..0fdd126972c84 --- /dev/null +++ b/packages/kbn-reporting/public/job_completion_notifications.test.ts @@ -0,0 +1,35 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import { + getPendingJobIds, + addPendingJobId, + setPendingJobIds, +} from './job_completion_notifications'; + +describe('Job completion notifications', () => { + afterEach(async () => { + await setPendingJobIds([]); + }); + + it('initially contains not job IDs', async () => { + expect(await getPendingJobIds()).toEqual([]); + }); + + it('handles multiple job ID additions', async () => { + await addPendingJobId('job-123'); + await addPendingJobId('job-456'); + await addPendingJobId('job-789'); + expect(await getPendingJobIds()).toEqual(['job-123', 'job-456', 'job-789']); + }); + + it('handles setting a total of amount of job ID', async () => { + await setPendingJobIds(['job-abc', 'job-def', 'job-ghi']); + expect(await getPendingJobIds()).toEqual(['job-abc', 'job-def', 'job-ghi']); + }); +}); diff --git a/packages/kbn-reporting/public/job_completion_notifications.ts b/packages/kbn-reporting/public/job_completion_notifications.ts index 48f3c31953593..34718ff98b7b0 100644 --- a/packages/kbn-reporting/public/job_completion_notifications.ts +++ b/packages/kbn-reporting/public/job_completion_notifications.ts @@ -31,7 +31,7 @@ export async function getPendingJobIds(): Promise { } export async function addPendingJobId(jobId: JobId) { - addToQueue(async (error: Error | null) => { + await addToQueue(async (error: Error | null) => { return new Promise((resolve, reject) => { if (error) { window.console.error(error); @@ -50,7 +50,7 @@ export async function addPendingJobId(jobId: JobId) { } export async function setPendingJobIds(jobIds: JobId[]) { - addToQueue(async (error: Error | null) => { + await addToQueue(async (error: Error | null) => { return new Promise((resolve, reject) => { if (error) { reject(error); From d77186216a895adccc03a19c32b514e3c6fdfea9 Mon Sep 17 00:00:00 2001 From: Timothy Sullivan Date: Fri, 23 Feb 2024 09:32:23 -0700 Subject: [PATCH 5/7] Use revealing module pattern for jobCompletionNotifications --- .../job_completion_notifications.test.ts | 8 +- .../public/job_completion_notifications.ts | 100 ++++++++++-------- .../public/reporting_api_client.ts | 5 +- .../reporting/public/lib/stream_handler.ts | 8 +- 4 files changed, 65 insertions(+), 56 deletions(-) diff --git a/packages/kbn-reporting/public/job_completion_notifications.test.ts b/packages/kbn-reporting/public/job_completion_notifications.test.ts index 0fdd126972c84..969a6d95aec9a 100644 --- a/packages/kbn-reporting/public/job_completion_notifications.test.ts +++ b/packages/kbn-reporting/public/job_completion_notifications.test.ts @@ -6,13 +6,11 @@ * Side Public License, v 1. */ -import { - getPendingJobIds, - addPendingJobId, - setPendingJobIds, -} from './job_completion_notifications'; +import { jobCompletionNotifications } from './job_completion_notifications'; describe('Job completion notifications', () => { + const { setPendingJobIds, getPendingJobIds, addPendingJobId } = jobCompletionNotifications(); + afterEach(async () => { await setPendingJobIds([]); }); diff --git a/packages/kbn-reporting/public/job_completion_notifications.ts b/packages/kbn-reporting/public/job_completion_notifications.ts index 34718ff98b7b0..4b1b6a2150113 100644 --- a/packages/kbn-reporting/public/job_completion_notifications.ts +++ b/packages/kbn-reporting/public/job_completion_notifications.ts @@ -9,55 +9,63 @@ import { JOB_COMPLETION_NOTIFICATIONS_SESSION_KEY } from '@kbn/reporting-common'; import { JobId } from '@kbn/reporting-common/types'; -// Reading and writing to the local storage must be atomic, -// i.e. performed in a single operation. This storage queue -// Operations on the localStorage key can happen from various -// parts of code. Using a queue to manage async operations allows -// operations to process one at a time -let operationQueue = Promise.resolve(); -async function addToQueue(func: (error: Error | null) => void) { - operationQueue = operationQueue.then(() => func(null)).catch(func); - await operationQueue; -} - -export async function getPendingJobIds(): Promise { - let jobs: JobId[] = []; - await addToQueue(async () => { - // get the current jobs - const jobsData = localStorage.getItem(JOB_COMPLETION_NOTIFICATIONS_SESSION_KEY); - jobs = jobsData ? JSON.parse(jobsData) : []; - }); - return jobs; -} +export function jobCompletionNotifications() { + // Reading and writing to the local storage must be atomic, + // i.e. performed in a single operation. This storage queue + // Operations on the localStorage key can happen from various + // parts of code. Using a queue to manage async operations allows + // operations to process one at a time + let operationQueue = Promise.resolve(); + async function addToQueue(func: (error: Error | null) => void) { + operationQueue = operationQueue.then(() => func(null)).catch(func); + await operationQueue; + } -export async function addPendingJobId(jobId: JobId) { - await addToQueue(async (error: Error | null) => { - return new Promise((resolve, reject) => { - if (error) { - window.console.error(error); - reject(error); - } - // get the current jobs synchronously + async function getPendingJobIds(): Promise { + let jobs: JobId[] = []; + await addToQueue(async () => { + // get the current jobs const jobsData = localStorage.getItem(JOB_COMPLETION_NOTIFICATIONS_SESSION_KEY); - const jobs: JobId[] = jobsData ? JSON.parse(jobsData) : []; - // add the new job - jobs.push(jobId); - // write back to local storage - localStorage.setItem(JOB_COMPLETION_NOTIFICATIONS_SESSION_KEY, JSON.stringify(jobs)); - resolve(); + jobs = jobsData ? JSON.parse(jobsData) : []; }); - }); -} + return jobs; + } -export async function setPendingJobIds(jobIds: JobId[]) { - await addToQueue(async (error: Error | null) => { - return new Promise((resolve, reject) => { - if (error) { - reject(error); - } - // write update jobs back to local storage - localStorage.setItem(JOB_COMPLETION_NOTIFICATIONS_SESSION_KEY, JSON.stringify(jobIds)); - resolve(); + async function addPendingJobId(jobId: JobId) { + await addToQueue(async (error: Error | null) => { + return new Promise((resolve, reject) => { + if (error) { + window.console.error(error); + reject(error); + } + // get the current jobs synchronously + const jobsData = localStorage.getItem(JOB_COMPLETION_NOTIFICATIONS_SESSION_KEY); + const jobs: JobId[] = jobsData ? JSON.parse(jobsData) : []; + // add the new job + jobs.push(jobId); + // write back to local storage + localStorage.setItem(JOB_COMPLETION_NOTIFICATIONS_SESSION_KEY, JSON.stringify(jobs)); + resolve(); + }); }); - }); + } + + async function setPendingJobIds(jobIds: JobId[]) { + await addToQueue(async (error: Error | null) => { + return new Promise((resolve, reject) => { + if (error) { + reject(error); + } + // write update jobs back to local storage + localStorage.setItem(JOB_COMPLETION_NOTIFICATIONS_SESSION_KEY, JSON.stringify(jobIds)); + resolve(); + }); + }); + } + + return { + getPendingJobIds, + addPendingJobId, + setPendingJobIds, + }; } diff --git a/packages/kbn-reporting/public/reporting_api_client.ts b/packages/kbn-reporting/public/reporting_api_client.ts index e273482aed8f2..ccd1ab71143fc 100644 --- a/packages/kbn-reporting/public/reporting_api_client.ts +++ b/packages/kbn-reporting/public/reporting_api_client.ts @@ -22,7 +22,7 @@ import rison from '@kbn/rison'; import moment from 'moment'; import { stringify } from 'query-string'; import { Job } from '.'; -import { addPendingJobId } from './job_completion_notifications'; +import { jobCompletionNotifications } from './job_completion_notifications'; /* * For convenience, apps do not have to provide the browserTimezone and Kibana version. @@ -67,6 +67,7 @@ interface IReportingAPI { */ export class ReportingAPIClient implements IReportingAPI { private http: HttpSetup; + private addPendingJobId = jobCompletionNotifications().addPendingJobId; constructor( http: HttpSetup, @@ -183,7 +184,7 @@ export class ReportingAPIClient implements IReportingAPI { body: JSON.stringify({ jobParams: jobParamsRison }), } ); - await addPendingJobId(resp.job.id); + await this.addPendingJobId(resp.job.id); return new Job(resp.job); } diff --git a/x-pack/plugins/reporting/public/lib/stream_handler.ts b/x-pack/plugins/reporting/public/lib/stream_handler.ts index 23cf4aef5ae92..c8ecdf71ce45a 100644 --- a/x-pack/plugins/reporting/public/lib/stream_handler.ts +++ b/x-pack/plugins/reporting/public/lib/stream_handler.ts @@ -13,7 +13,7 @@ import { i18n } from '@kbn/i18n'; import { JOB_STATUS } from '@kbn/reporting-common'; import { JobId } from '@kbn/reporting-common/types'; -import { Job, ReportingAPIClient, getPendingJobIds, setPendingJobIds } from '@kbn/reporting-public'; +import { Job, ReportingAPIClient, jobCompletionNotifications } from '@kbn/reporting-public'; import { getFailureToast, getGeneralErrorToast, @@ -61,6 +61,8 @@ function handleError( } export class ReportingNotifierStreamHandler { + private jobCompletionNotifications = jobCompletionNotifications(); + constructor( private notifications: NotificationsSetup, private apiClient: ReportingAPIClient, @@ -72,7 +74,7 @@ export class ReportingNotifierStreamHandler { Rx.timer(0, interval) .pipe( takeUntil(stop$), // stop the interval when stop method is called - mergeMap(getPendingJobIds), // read all pending job IDs from session storage + mergeMap(this.jobCompletionNotifications.getPendingJobIds), // read all pending job IDs from session storage filter((previousPending) => previousPending.length > 0), // stop the pipeline here if there are none pending mergeMap((previousPending) => this.findChangedStatusJobs(previousPending)), // look up the latest status of all pending jobs on the server mergeMap(({ completed, failed }) => this.showNotifications({ completed, failed })), @@ -174,7 +176,7 @@ export class ReportingNotifierStreamHandler { // refresh the storage of pending job IDs, minus // the newly completed and failed jobs - await setPendingJobIds(newPending); + await this.jobCompletionNotifications.setPendingJobIds(newPending); return { completed: newCompleted, failed: newFailed }; }), From f2268b20f99a98554bfccb04afa93269a03e5199 Mon Sep 17 00:00:00 2001 From: Timothy Sullivan Date: Fri, 23 Feb 2024 10:29:28 -0700 Subject: [PATCH 6/7] Update test using Sinon to jest mocks --- .../__snapshots__/stream_handler.test.ts.snap | 316 +++++++++--------- .../public/lib/stream_handler.test.ts | 101 +++--- .../reporting/public/lib/stream_handler.ts | 8 +- 3 files changed, 217 insertions(+), 208 deletions(-) diff --git a/x-pack/plugins/reporting/public/lib/__snapshots__/stream_handler.test.ts.snap b/x-pack/plugins/reporting/public/lib/__snapshots__/stream_handler.test.ts.snap index ab6a5109a1066..8f277f6d92c31 100644 --- a/x-pack/plugins/reporting/public/lib/__snapshots__/stream_handler.test.ts.snap +++ b/x-pack/plugins/reporting/public/lib/__snapshots__/stream_handler.test.ts.snap @@ -38,201 +38,209 @@ Object { exports[`stream handler showNotifications show csv formulas warning 1`] = ` Array [ - Object { - "data-test-subj": "completeReportCsvFormulasWarning", - "text": MountPoint { - "reactNode": -

- +

+ +

+

+ +

+ -

-

- -

- , + }, + "title": MountPoint { + "reactNode": -
, + />, + }, }, - "title": MountPoint { - "reactNode": , + Object { + "toastLifeTimeMs": 86400000, }, - }, - Object { - "toastLifeTimeMs": 86400000, - }, + ], ] `; exports[`stream handler showNotifications show failed job toast 1`] = ` Array [ - Object { - "data-test-subj": "completeReportFailure", - "iconType": undefined, - "text": MountPoint { - "reactNode": - - this is the failed report error - - -

- - - , + Array [ + Object { + "data-test-subj": "completeReportFailure", + "iconType": undefined, + "text": MountPoint { + "reactNode": + + this is the failed report error + + +

+ + + , + } } + /> +

+
, + }, + "title": MountPoint { + "reactNode": -

- , - }, - "title": MountPoint { - "reactNode": , + />, + }, }, - }, + ], ] `; exports[`stream handler showNotifications show max length warning 1`] = ` Array [ - Object { - "data-test-subj": "completeReportMaxSizeWarning", - "text": MountPoint { - "reactNode": -

- -

-

- +

+ +

+

+ +

+ -

- , + }, + "title": MountPoint { + "reactNode": -
, + />, + }, }, - "title": MountPoint { - "reactNode": , + Object { + "toastLifeTimeMs": 86400000, }, - }, - Object { - "toastLifeTimeMs": 86400000, - }, + ], ] `; exports[`stream handler showNotifications show success 1`] = ` Array [ - Object { - "color": "success", - "data-test-subj": "completeReportSuccess", - "text": MountPoint { - "reactNode": -

- +

+ +

+ -

- , + }, + "title": MountPoint { + "reactNode": -
, + />, + }, }, - "title": MountPoint { - "reactNode": , + Object { + "toastLifeTimeMs": 86400000, }, - }, - Object { - "toastLifeTimeMs": 86400000, - }, + ], ] `; diff --git a/x-pack/plugins/reporting/public/lib/stream_handler.test.ts b/x-pack/plugins/reporting/public/lib/stream_handler.test.ts index 8cbe47e8b6e80..37ef7967ae287 100644 --- a/x-pack/plugins/reporting/public/lib/stream_handler.test.ts +++ b/x-pack/plugins/reporting/public/lib/stream_handler.test.ts @@ -5,22 +5,27 @@ * 2.0. */ -import sinon, { stub } from 'sinon'; - import { NotificationsStart } from '@kbn/core/public'; import { coreMock, docLinksServiceMock, themeServiceMock } from '@kbn/core/public/mocks'; -import { ReportApiJSON } from '@kbn/reporting-common/types'; +import { JobId, ReportApiJSON } from '@kbn/reporting-common/types'; -import { JobSummary } from '../types'; +import { JobSummary, JobSummarySet } from '../types'; import { Job, ReportingAPIClient } from '@kbn/reporting-public'; import { ReportingNotifierStreamHandler } from './stream_handler'; -Object.defineProperty(window, 'sessionStorage', { - value: { - setItem: jest.fn(() => null), - }, - writable: true, -}); +/** + * A test class that subclasses the main class with testable + * methods that access private methods indirectly. + */ +class TestReportingNotifierStreamHandler extends ReportingNotifierStreamHandler { + public testFindChangedStatusJobs(previousPending: JobId[]) { + return this.findChangedStatusJobs(previousPending); + } + + public testShowNotifications(jobs: JobSummarySet) { + return this.showNotifications(jobs); + } +} const mockJobsFound: Job[] = [ { id: 'job-source-mock1', status: 'completed', output: { csv_contains_formulas: false, max_size_reached: false }, payload: { title: 'specimen' } }, @@ -38,9 +43,9 @@ jobQueueClientMock.getError = () => Promise.resolve('this is the failed report e jobQueueClientMock.getManagementLink = () => '/#management'; jobQueueClientMock.getReportURL = () => '/reporting/download/job-123'; -const mockShowDanger = stub(); -const mockShowSuccess = stub(); -const mockShowWarning = stub(); +const mockShowDanger = jest.fn(); +const mockShowSuccess = jest.fn(); +const mockShowWarning = jest.fn(); const notificationsMock = { toasts: { addDanger: mockShowDanger, @@ -54,11 +59,11 @@ const docLink = docLinksServiceMock.createStartContract(); describe('stream handler', () => { afterEach(() => { - sinon.reset(); + jest.resetAllMocks(); }); it('constructs', () => { - const sh = new ReportingNotifierStreamHandler( + const sh = new TestReportingNotifierStreamHandler( notificationsMock, jobQueueClientMock, theme, @@ -69,13 +74,13 @@ describe('stream handler', () => { describe('findChangedStatusJobs', () => { it('finds no changed status jobs from empty', (done) => { - const sh = new ReportingNotifierStreamHandler( + const sh = new TestReportingNotifierStreamHandler( notificationsMock, jobQueueClientMock, theme, docLink ); - const findJobs = sh.findChangedStatusJobs([]); + const findJobs = sh.testFindChangedStatusJobs([]); findJobs.subscribe((data) => { expect(data).toEqual({ completed: [], failed: [] }); done(); @@ -83,13 +88,13 @@ describe('stream handler', () => { }); it('finds changed status jobs', (done) => { - const sh = new ReportingNotifierStreamHandler( + const sh = new TestReportingNotifierStreamHandler( notificationsMock, jobQueueClientMock, theme, docLink ); - const findJobs = sh.findChangedStatusJobs([ + const findJobs = sh.testFindChangedStatusJobs([ 'job-source-mock1', 'job-source-mock2', 'job-source-mock3', @@ -105,13 +110,13 @@ describe('stream handler', () => { describe('showNotifications', () => { it('show success', (done) => { - const sh = new ReportingNotifierStreamHandler( + const sh = new TestReportingNotifierStreamHandler( notificationsMock, jobQueueClientMock, theme, docLink ); - sh.showNotifications({ + sh.testShowNotifications({ completed: [ { id: 'yas1', @@ -122,22 +127,22 @@ describe('stream handler', () => { ], failed: [], }).subscribe(() => { - expect(mockShowDanger.callCount).toBe(0); - expect(mockShowSuccess.callCount).toBe(1); - expect(mockShowWarning.callCount).toBe(0); - expect(mockShowSuccess.args[0]).toMatchSnapshot(); + expect(mockShowDanger).not.toBeCalled(); + expect(mockShowSuccess).toBeCalledTimes(1); + expect(mockShowWarning).not.toBeCalled(); + expect(mockShowSuccess.mock.calls).toMatchSnapshot(); done(); }); }); it('show max length warning', (done) => { - const sh = new ReportingNotifierStreamHandler( + const sh = new TestReportingNotifierStreamHandler( notificationsMock, jobQueueClientMock, theme, docLink ); - sh.showNotifications({ + sh.testShowNotifications({ completed: [ { id: 'yas2', @@ -149,22 +154,22 @@ describe('stream handler', () => { ], failed: [], }).subscribe(() => { - expect(mockShowDanger.callCount).toBe(0); - expect(mockShowSuccess.callCount).toBe(0); - expect(mockShowWarning.callCount).toBe(1); - expect(mockShowWarning.args[0]).toMatchSnapshot(); + expect(mockShowDanger).not.toBeCalled(); + expect(mockShowSuccess).not.toBeCalled(); + expect(mockShowWarning).toBeCalledTimes(1); + expect(mockShowWarning.mock.calls).toMatchSnapshot(); done(); }); }); it('show csv formulas warning', (done) => { - const sh = new ReportingNotifierStreamHandler( + const sh = new TestReportingNotifierStreamHandler( notificationsMock, jobQueueClientMock, theme, docLink ); - sh.showNotifications({ + sh.testShowNotifications({ completed: [ { id: 'yas3', @@ -176,22 +181,22 @@ describe('stream handler', () => { ], failed: [], }).subscribe(() => { - expect(mockShowDanger.callCount).toBe(0); - expect(mockShowSuccess.callCount).toBe(0); - expect(mockShowWarning.callCount).toBe(1); - expect(mockShowWarning.args[0]).toMatchSnapshot(); + expect(mockShowDanger).not.toBeCalled(); + expect(mockShowSuccess).not.toBeCalled(); + expect(mockShowWarning).toBeCalledTimes(1); + expect(mockShowWarning.mock.calls).toMatchSnapshot(); done(); }); }); it('show failed job toast', (done) => { - const sh = new ReportingNotifierStreamHandler( + const sh = new TestReportingNotifierStreamHandler( notificationsMock, jobQueueClientMock, theme, docLink ); - sh.showNotifications({ + sh.testShowNotifications({ completed: [], failed: [ { @@ -202,22 +207,22 @@ describe('stream handler', () => { } as JobSummary, ], }).subscribe(() => { - expect(mockShowSuccess.callCount).toBe(0); - expect(mockShowWarning.callCount).toBe(0); - expect(mockShowDanger.callCount).toBe(1); - expect(mockShowDanger.args[0]).toMatchSnapshot(); + expect(mockShowSuccess).not.toBeCalled(); + expect(mockShowWarning).not.toBeCalled(); + expect(mockShowDanger).toBeCalledTimes(1); + expect(mockShowDanger.mock.calls).toMatchSnapshot(); done(); }); }); it('show multiple toast', (done) => { - const sh = new ReportingNotifierStreamHandler( + const sh = new TestReportingNotifierStreamHandler( notificationsMock, jobQueueClientMock, theme, docLink ); - sh.showNotifications({ + sh.testShowNotifications({ completed: [ { id: 'yas8', @@ -249,9 +254,9 @@ describe('stream handler', () => { } as JobSummary, ], }).subscribe(() => { - expect(mockShowSuccess.callCount).toBe(1); - expect(mockShowWarning.callCount).toBe(2); - expect(mockShowDanger.callCount).toBe(1); + expect(mockShowSuccess).toBeCalledTimes(1); + expect(mockShowWarning).toBeCalledTimes(2); + expect(mockShowDanger).toBeCalledTimes(1); done(); }); }); diff --git a/x-pack/plugins/reporting/public/lib/stream_handler.ts b/x-pack/plugins/reporting/public/lib/stream_handler.ts index c8ecdf71ce45a..212e501dad9ec 100644 --- a/x-pack/plugins/reporting/public/lib/stream_handler.ts +++ b/x-pack/plugins/reporting/public/lib/stream_handler.ts @@ -89,10 +89,8 @@ export class ReportingNotifierStreamHandler { /* * Use Kibana Toast API to show our messages - * - * Public for purposes of testing */ - public showNotifications({ + protected showNotifications({ completed: completedJobs, failed: failedJobs, }: JobSummarySet): Rx.Observable { @@ -147,10 +145,8 @@ export class ReportingNotifierStreamHandler { /* * An observable that finds jobs that are known to be "processing" (stored in * session storage) but have non-processing job status on the server - * - * Public for purposes of testing */ - public findChangedStatusJobs(previousPending: JobId[]): Rx.Observable { + protected findChangedStatusJobs(previousPending: JobId[]): Rx.Observable { return Rx.from(this.apiClient.findForJobIds(previousPending)).pipe( mergeMap(async (jobs) => { const newCompleted: JobSummary[] = []; From a471462cb72e55b92329854aa74bea9b0ab71610 Mon Sep 17 00:00:00 2001 From: Timothy Sullivan Date: Tue, 27 Feb 2024 22:46:18 +0100 Subject: [PATCH 7/7] Ditch the store mgmt queue and use synchronous loops --- .../job_completion_notifications.test.ts | 22 ++++-- .../public/job_completion_notifications.ts | 73 +++++++------------ .../public/reporting_api_client.ts | 2 +- .../reporting/public/lib/stream_handler.ts | 14 ++-- x-pack/plugins/reporting/public/types.ts | 4 +- 5 files changed, 51 insertions(+), 64 deletions(-) diff --git a/packages/kbn-reporting/public/job_completion_notifications.test.ts b/packages/kbn-reporting/public/job_completion_notifications.test.ts index 969a6d95aec9a..6934bd5add406 100644 --- a/packages/kbn-reporting/public/job_completion_notifications.test.ts +++ b/packages/kbn-reporting/public/job_completion_notifications.test.ts @@ -12,22 +12,28 @@ describe('Job completion notifications', () => { const { setPendingJobIds, getPendingJobIds, addPendingJobId } = jobCompletionNotifications(); afterEach(async () => { - await setPendingJobIds([]); + setPendingJobIds([]); }); it('initially contains not job IDs', async () => { - expect(await getPendingJobIds()).toEqual([]); + expect(getPendingJobIds()).toEqual([]); }); it('handles multiple job ID additions', async () => { - await addPendingJobId('job-123'); - await addPendingJobId('job-456'); - await addPendingJobId('job-789'); - expect(await getPendingJobIds()).toEqual(['job-123', 'job-456', 'job-789']); + addPendingJobId('job-123'); + addPendingJobId('job-456'); + addPendingJobId('job-789'); + expect(getPendingJobIds()).toEqual(['job-123', 'job-456', 'job-789']); }); it('handles setting a total of amount of job ID', async () => { - await setPendingJobIds(['job-abc', 'job-def', 'job-ghi']); - expect(await getPendingJobIds()).toEqual(['job-abc', 'job-def', 'job-ghi']); + setPendingJobIds(['job-abc', 'job-def', 'job-ghi']); + expect(getPendingJobIds()).toEqual(['job-abc', 'job-def', 'job-ghi']); + }); + + it('able to clear all jobIds', async () => { + setPendingJobIds(['job-abc', 'job-def', 'job-ghi']); + setPendingJobIds([]); + expect(getPendingJobIds()).toEqual([]); }); }); diff --git a/packages/kbn-reporting/public/job_completion_notifications.ts b/packages/kbn-reporting/public/job_completion_notifications.ts index 4b1b6a2150113..c5f1d0f9b3cab 100644 --- a/packages/kbn-reporting/public/job_completion_notifications.ts +++ b/packages/kbn-reporting/public/job_completion_notifications.ts @@ -10,57 +10,38 @@ import { JOB_COMPLETION_NOTIFICATIONS_SESSION_KEY } from '@kbn/reporting-common' import { JobId } from '@kbn/reporting-common/types'; export function jobCompletionNotifications() { - // Reading and writing to the local storage must be atomic, - // i.e. performed in a single operation. This storage queue - // Operations on the localStorage key can happen from various - // parts of code. Using a queue to manage async operations allows - // operations to process one at a time - let operationQueue = Promise.resolve(); - async function addToQueue(func: (error: Error | null) => void) { - operationQueue = operationQueue.then(() => func(null)).catch(func); - await operationQueue; - } - - async function getPendingJobIds(): Promise { - let jobs: JobId[] = []; - await addToQueue(async () => { - // get the current jobs - const jobsData = localStorage.getItem(JOB_COMPLETION_NOTIFICATIONS_SESSION_KEY); - jobs = jobsData ? JSON.parse(jobsData) : []; - }); + function getPendingJobIds(): JobId[] { + const jobs: JobId[] = []; + // get all current jobs + for (const key in localStorage) { + // check if key belongs to us + if (key.indexOf(JOB_COMPLETION_NOTIFICATIONS_SESSION_KEY) === 0) { + // get jobId from key + const jobId = key.replace(`${JOB_COMPLETION_NOTIFICATIONS_SESSION_KEY}-`, ''); + jobs.push(jobId); + } + } return jobs; } - async function addPendingJobId(jobId: JobId) { - await addToQueue(async (error: Error | null) => { - return new Promise((resolve, reject) => { - if (error) { - window.console.error(error); - reject(error); - } - // get the current jobs synchronously - const jobsData = localStorage.getItem(JOB_COMPLETION_NOTIFICATIONS_SESSION_KEY); - const jobs: JobId[] = jobsData ? JSON.parse(jobsData) : []; - // add the new job - jobs.push(jobId); - // write back to local storage - localStorage.setItem(JOB_COMPLETION_NOTIFICATIONS_SESSION_KEY, JSON.stringify(jobs)); - resolve(); - }); - }); + function addPendingJobId(jobId: JobId) { + // write back to local storage, value doesn't matter + localStorage.setItem(`${JOB_COMPLETION_NOTIFICATIONS_SESSION_KEY}-${jobId}`, jobId); } - async function setPendingJobIds(jobIds: JobId[]) { - await addToQueue(async (error: Error | null) => { - return new Promise((resolve, reject) => { - if (error) { - reject(error); - } - // write update jobs back to local storage - localStorage.setItem(JOB_COMPLETION_NOTIFICATIONS_SESSION_KEY, JSON.stringify(jobIds)); - resolve(); - }); - }); + function setPendingJobIds(jobIds: JobId[]) { + // clear reporting jobIds + for (const key in localStorage) { + if (key.indexOf(JOB_COMPLETION_NOTIFICATIONS_SESSION_KEY) === 0) { + localStorage.removeItem(key); + } + } + + // write update jobs back to local storage + for (let j = 0; j < jobIds.length; j++) { + const jobId = jobIds[j]; + localStorage.setItem(`${JOB_COMPLETION_NOTIFICATIONS_SESSION_KEY}-${jobId}`, jobId); + } } return { diff --git a/packages/kbn-reporting/public/reporting_api_client.ts b/packages/kbn-reporting/public/reporting_api_client.ts index ccd1ab71143fc..9086a013f1c76 100644 --- a/packages/kbn-reporting/public/reporting_api_client.ts +++ b/packages/kbn-reporting/public/reporting_api_client.ts @@ -184,7 +184,7 @@ export class ReportingAPIClient implements IReportingAPI { body: JSON.stringify({ jobParams: jobParamsRison }), } ); - await this.addPendingJobId(resp.job.id); + this.addPendingJobId(resp.job.id); return new Job(resp.job); } diff --git a/x-pack/plugins/reporting/public/lib/stream_handler.ts b/x-pack/plugins/reporting/public/lib/stream_handler.ts index 212e501dad9ec..2a7124e1e5e3c 100644 --- a/x-pack/plugins/reporting/public/lib/stream_handler.ts +++ b/x-pack/plugins/reporting/public/lib/stream_handler.ts @@ -6,7 +6,7 @@ */ import * as Rx from 'rxjs'; -import { catchError, filter, mergeMap, takeUntil } from 'rxjs/operators'; +import { catchError, filter, map, mergeMap, takeUntil } from 'rxjs/operators'; import { DocLinksStart, NotificationsSetup, ThemeServiceStart } from '@kbn/core/public'; import { i18n } from '@kbn/i18n'; @@ -74,7 +74,7 @@ export class ReportingNotifierStreamHandler { Rx.timer(0, interval) .pipe( takeUntil(stop$), // stop the interval when stop method is called - mergeMap(this.jobCompletionNotifications.getPendingJobIds), // read all pending job IDs from session storage + map(this.jobCompletionNotifications.getPendingJobIds), // read all pending job IDs from session storage filter((previousPending) => previousPending.length > 0), // stop the pipeline here if there are none pending mergeMap((previousPending) => this.findChangedStatusJobs(previousPending)), // look up the latest status of all pending jobs on the server mergeMap(({ completed, failed }) => this.showNotifications({ completed, failed })), @@ -105,7 +105,7 @@ export class ReportingNotifierStreamHandler { const completedOptions = { toastLifeTimeMs: COMPLETED_JOB_TOAST_TIMEOUT }; // notifications with download link - for (const job of completedJobs) { + for (const job of completedJobs ?? []) { if (job.csvContainsFormulas) { notifications.toasts.addWarning( getWarningFormulasToast(job, getManagementLink, getDownloadLink, theme), @@ -130,7 +130,7 @@ export class ReportingNotifierStreamHandler { } // no download link available - for (const job of failedJobs) { + for (const job of failedJobs ?? []) { const errorText = await apiClient.getError(job.id); this.notifications.toasts.addDanger( getFailureToast(errorText, job, getManagementLink, theme, docLinks) @@ -172,7 +172,7 @@ export class ReportingNotifierStreamHandler { // refresh the storage of pending job IDs, minus // the newly completed and failed jobs - await this.jobCompletionNotifications.setPendingJobIds(newPending); + this.jobCompletionNotifications.setPendingJobIds(newPending); return { completed: newCompleted, failed: newFailed }; }), @@ -186,9 +186,9 @@ export class ReportingNotifierStreamHandler { err, this.theme ) - ); // prettier-ignore + ); window.console.error(err); - return Rx.of({ completed: [], failed: [] }); // log the error and resume + return Rx.of({}); }) ); } diff --git a/x-pack/plugins/reporting/public/types.ts b/x-pack/plugins/reporting/public/types.ts index dcc887ef9a84d..d5af032db617c 100644 --- a/x-pack/plugins/reporting/public/types.ts +++ b/x-pack/plugins/reporting/public/types.ts @@ -27,6 +27,6 @@ export interface JobSummary { * @internal */ export interface JobSummarySet { - completed: JobSummary[]; - failed: JobSummary[]; + completed?: JobSummary[]; + failed?: JobSummary[]; }