From a471462cb72e55b92329854aa74bea9b0ab71610 Mon Sep 17 00:00:00 2001 From: Timothy Sullivan Date: Tue, 27 Feb 2024 22:46:18 +0100 Subject: [PATCH] Ditch the store mgmt queue and use synchronous loops --- .../job_completion_notifications.test.ts | 22 ++++-- .../public/job_completion_notifications.ts | 73 +++++++------------ .../public/reporting_api_client.ts | 2 +- .../reporting/public/lib/stream_handler.ts | 14 ++-- x-pack/plugins/reporting/public/types.ts | 4 +- 5 files changed, 51 insertions(+), 64 deletions(-) diff --git a/packages/kbn-reporting/public/job_completion_notifications.test.ts b/packages/kbn-reporting/public/job_completion_notifications.test.ts index 969a6d95aec9..6934bd5add40 100644 --- a/packages/kbn-reporting/public/job_completion_notifications.test.ts +++ b/packages/kbn-reporting/public/job_completion_notifications.test.ts @@ -12,22 +12,28 @@ describe('Job completion notifications', () => { const { setPendingJobIds, getPendingJobIds, addPendingJobId } = jobCompletionNotifications(); afterEach(async () => { - await setPendingJobIds([]); + setPendingJobIds([]); }); it('initially contains not job IDs', async () => { - expect(await getPendingJobIds()).toEqual([]); + expect(getPendingJobIds()).toEqual([]); }); it('handles multiple job ID additions', async () => { - await addPendingJobId('job-123'); - await addPendingJobId('job-456'); - await addPendingJobId('job-789'); - expect(await getPendingJobIds()).toEqual(['job-123', 'job-456', 'job-789']); + addPendingJobId('job-123'); + addPendingJobId('job-456'); + addPendingJobId('job-789'); + expect(getPendingJobIds()).toEqual(['job-123', 'job-456', 'job-789']); }); it('handles setting a total of amount of job ID', async () => { - await setPendingJobIds(['job-abc', 'job-def', 'job-ghi']); - expect(await getPendingJobIds()).toEqual(['job-abc', 'job-def', 'job-ghi']); + setPendingJobIds(['job-abc', 'job-def', 'job-ghi']); + expect(getPendingJobIds()).toEqual(['job-abc', 'job-def', 'job-ghi']); + }); + + it('able to clear all jobIds', async () => { + setPendingJobIds(['job-abc', 'job-def', 'job-ghi']); + setPendingJobIds([]); + expect(getPendingJobIds()).toEqual([]); }); }); diff --git a/packages/kbn-reporting/public/job_completion_notifications.ts b/packages/kbn-reporting/public/job_completion_notifications.ts index 4b1b6a215011..c5f1d0f9b3ca 100644 --- a/packages/kbn-reporting/public/job_completion_notifications.ts +++ b/packages/kbn-reporting/public/job_completion_notifications.ts @@ -10,57 +10,38 @@ import { JOB_COMPLETION_NOTIFICATIONS_SESSION_KEY } from '@kbn/reporting-common' import { JobId } from '@kbn/reporting-common/types'; export function jobCompletionNotifications() { - // Reading and writing to the local storage must be atomic, - // i.e. performed in a single operation. This storage queue - // Operations on the localStorage key can happen from various - // parts of code. Using a queue to manage async operations allows - // operations to process one at a time - let operationQueue = Promise.resolve(); - async function addToQueue(func: (error: Error | null) => void) { - operationQueue = operationQueue.then(() => func(null)).catch(func); - await operationQueue; - } - - async function getPendingJobIds(): Promise { - let jobs: JobId[] = []; - await addToQueue(async () => { - // get the current jobs - const jobsData = localStorage.getItem(JOB_COMPLETION_NOTIFICATIONS_SESSION_KEY); - jobs = jobsData ? JSON.parse(jobsData) : []; - }); + function getPendingJobIds(): JobId[] { + const jobs: JobId[] = []; + // get all current jobs + for (const key in localStorage) { + // check if key belongs to us + if (key.indexOf(JOB_COMPLETION_NOTIFICATIONS_SESSION_KEY) === 0) { + // get jobId from key + const jobId = key.replace(`${JOB_COMPLETION_NOTIFICATIONS_SESSION_KEY}-`, ''); + jobs.push(jobId); + } + } return jobs; } - async function addPendingJobId(jobId: JobId) { - await addToQueue(async (error: Error | null) => { - return new Promise((resolve, reject) => { - if (error) { - window.console.error(error); - reject(error); - } - // get the current jobs synchronously - const jobsData = localStorage.getItem(JOB_COMPLETION_NOTIFICATIONS_SESSION_KEY); - const jobs: JobId[] = jobsData ? JSON.parse(jobsData) : []; - // add the new job - jobs.push(jobId); - // write back to local storage - localStorage.setItem(JOB_COMPLETION_NOTIFICATIONS_SESSION_KEY, JSON.stringify(jobs)); - resolve(); - }); - }); + function addPendingJobId(jobId: JobId) { + // write back to local storage, value doesn't matter + localStorage.setItem(`${JOB_COMPLETION_NOTIFICATIONS_SESSION_KEY}-${jobId}`, jobId); } - async function setPendingJobIds(jobIds: JobId[]) { - await addToQueue(async (error: Error | null) => { - return new Promise((resolve, reject) => { - if (error) { - reject(error); - } - // write update jobs back to local storage - localStorage.setItem(JOB_COMPLETION_NOTIFICATIONS_SESSION_KEY, JSON.stringify(jobIds)); - resolve(); - }); - }); + function setPendingJobIds(jobIds: JobId[]) { + // clear reporting jobIds + for (const key in localStorage) { + if (key.indexOf(JOB_COMPLETION_NOTIFICATIONS_SESSION_KEY) === 0) { + localStorage.removeItem(key); + } + } + + // write update jobs back to local storage + for (let j = 0; j < jobIds.length; j++) { + const jobId = jobIds[j]; + localStorage.setItem(`${JOB_COMPLETION_NOTIFICATIONS_SESSION_KEY}-${jobId}`, jobId); + } } return { diff --git a/packages/kbn-reporting/public/reporting_api_client.ts b/packages/kbn-reporting/public/reporting_api_client.ts index ccd1ab71143f..9086a013f1c7 100644 --- a/packages/kbn-reporting/public/reporting_api_client.ts +++ b/packages/kbn-reporting/public/reporting_api_client.ts @@ -184,7 +184,7 @@ export class ReportingAPIClient implements IReportingAPI { body: JSON.stringify({ jobParams: jobParamsRison }), } ); - await this.addPendingJobId(resp.job.id); + this.addPendingJobId(resp.job.id); return new Job(resp.job); } diff --git a/x-pack/plugins/reporting/public/lib/stream_handler.ts b/x-pack/plugins/reporting/public/lib/stream_handler.ts index 212e501dad9e..2a7124e1e5e3 100644 --- a/x-pack/plugins/reporting/public/lib/stream_handler.ts +++ b/x-pack/plugins/reporting/public/lib/stream_handler.ts @@ -6,7 +6,7 @@ */ import * as Rx from 'rxjs'; -import { catchError, filter, mergeMap, takeUntil } from 'rxjs/operators'; +import { catchError, filter, map, mergeMap, takeUntil } from 'rxjs/operators'; import { DocLinksStart, NotificationsSetup, ThemeServiceStart } from '@kbn/core/public'; import { i18n } from '@kbn/i18n'; @@ -74,7 +74,7 @@ export class ReportingNotifierStreamHandler { Rx.timer(0, interval) .pipe( takeUntil(stop$), // stop the interval when stop method is called - mergeMap(this.jobCompletionNotifications.getPendingJobIds), // read all pending job IDs from session storage + map(this.jobCompletionNotifications.getPendingJobIds), // read all pending job IDs from session storage filter((previousPending) => previousPending.length > 0), // stop the pipeline here if there are none pending mergeMap((previousPending) => this.findChangedStatusJobs(previousPending)), // look up the latest status of all pending jobs on the server mergeMap(({ completed, failed }) => this.showNotifications({ completed, failed })), @@ -105,7 +105,7 @@ export class ReportingNotifierStreamHandler { const completedOptions = { toastLifeTimeMs: COMPLETED_JOB_TOAST_TIMEOUT }; // notifications with download link - for (const job of completedJobs) { + for (const job of completedJobs ?? []) { if (job.csvContainsFormulas) { notifications.toasts.addWarning( getWarningFormulasToast(job, getManagementLink, getDownloadLink, theme), @@ -130,7 +130,7 @@ export class ReportingNotifierStreamHandler { } // no download link available - for (const job of failedJobs) { + for (const job of failedJobs ?? []) { const errorText = await apiClient.getError(job.id); this.notifications.toasts.addDanger( getFailureToast(errorText, job, getManagementLink, theme, docLinks) @@ -172,7 +172,7 @@ export class ReportingNotifierStreamHandler { // refresh the storage of pending job IDs, minus // the newly completed and failed jobs - await this.jobCompletionNotifications.setPendingJobIds(newPending); + this.jobCompletionNotifications.setPendingJobIds(newPending); return { completed: newCompleted, failed: newFailed }; }), @@ -186,9 +186,9 @@ export class ReportingNotifierStreamHandler { err, this.theme ) - ); // prettier-ignore + ); window.console.error(err); - return Rx.of({ completed: [], failed: [] }); // log the error and resume + return Rx.of({}); }) ); } diff --git a/x-pack/plugins/reporting/public/types.ts b/x-pack/plugins/reporting/public/types.ts index dcc887ef9a84..d5af032db617 100644 --- a/x-pack/plugins/reporting/public/types.ts +++ b/x-pack/plugins/reporting/public/types.ts @@ -27,6 +27,6 @@ export interface JobSummary { * @internal */ export interface JobSummarySet { - completed: JobSummary[]; - failed: JobSummary[]; + completed?: JobSummary[]; + failed?: JobSummary[]; }