From 1a427fbce97b37025cf0a4de4857f6b8166d9eb3 Mon Sep 17 00:00:00 2001 From: Shepherd Date: Tue, 4 Jun 2024 12:47:23 -0400 Subject: [PATCH] Add delta queue to operation repo Added a delta queue to the operation repo to flush all executor deltas at the same time. The operation repo's delta queue enqueues all deltas and then will sort and pass the delta to its correct executor. Previously, each executor was in charge of enqueuing and flushing their own delta queue. This change also aligns the Web SDK more with the iOS SDK's operation repo implementation. --- __test__/support/constants.ts | 2 +- src/core/executors/ExecutorBase.ts | 7 ----- src/core/operationRepo/OperationRepo.ts | 36 ++++++++++++++++++++++--- 3 files changed, 34 insertions(+), 11 deletions(-) diff --git a/__test__/support/constants.ts b/__test__/support/constants.ts index 9532d1122..f06d280fe 100644 --- a/__test__/support/constants.ts +++ b/__test__/support/constants.ts @@ -14,7 +14,7 @@ * @constant {number} DELTA_QUEUE_TIME_ADVANCE - The time advance for the delta queue. */ export const OPERATION_QUEUE_TIME_ADVANCE = 5001; -export const DELTA_QUEUE_TIME_ADVANCE = 1001; +export const DELTA_QUEUE_TIME_ADVANCE = 5001; /* S T R I N G C O N S T A N T S */ export const APP_ID = '34fcbe85-278d-4fd2-a4ec-0f80e95072c5'; diff --git a/src/core/executors/ExecutorBase.ts b/src/core/executors/ExecutorBase.ts index 209fab0ba..83cdfa9cc 100644 --- a/src/core/executors/ExecutorBase.ts +++ b/src/core/executors/ExecutorBase.ts @@ -28,17 +28,10 @@ export default abstract class ExecutorBase { private onlineStatus = true; - static DELTAS_BATCH_PROCESSING_TIME = 1; static OPERATIONS_BATCH_PROCESSING_TIME = 5; static RETRY_COUNT = 5; constructor(executorConfig: ExecutorConfig) { - setInterval(() => { - if (this._deltaQueue.length > 0) { - this.processDeltaQueue.call(this); - } - }, ExecutorBase.DELTAS_BATCH_PROCESSING_TIME * 1_000); - setInterval(() => { Log.debug('OneSignal: checking for operations to process from cache'); const cachedOperations = this.getOperationsFromCache(); diff --git a/src/core/operationRepo/OperationRepo.ts b/src/core/operationRepo/OperationRepo.ts index 2fdf8a120..745db9052 100644 --- a/src/core/operationRepo/OperationRepo.ts +++ b/src/core/operationRepo/OperationRepo.ts @@ -7,6 +7,8 @@ import { logMethodCall } from '../../shared/utils/utils'; export class OperationRepo { public executorStore: ExecutorStore; private _unsubscribeFromModelRepo: () => void; + private _deltaQueue: CoreDelta[] = []; + static DELTAS_BATCH_PROCESSING_TIME = 5; constructor(private modelRepo: ModelRepo) { this.executorStore = new ExecutorStore(); @@ -16,6 +18,12 @@ export class OperationRepo { this._processDelta(delta); }, ); + + setInterval(() => { + if (this._deltaQueue.length > 0) { + this._processDeltaQueue(); + } + }, OperationRepo.DELTAS_BATCH_PROCESSING_TIME * 1_000); } setModelRepoAndResubscribe(modelRepo: ModelRepo) { @@ -33,9 +41,31 @@ export class OperationRepo { this.executorStore.forceDeltaQueueProcessingOnAllExecutors(); } + private _flushDeltas(): void { + logMethodCall('OperationRepo._flushDeltas'); + this._deltaQueue = []; + } + private _processDelta(delta: CoreDelta): void { - logMethodCall('processDelta', { delta }); - const { modelName } = delta.model; - this.executorStore.store[modelName]?.enqueueDelta(delta); + logMethodCall('OperationRepo._processDelta', { delta }); + const deltaCopy = JSON.parse(JSON.stringify(delta)); + this._deltaQueue.push(deltaCopy); + } + + private _processDeltaQueue(): void { + logMethodCall('OperationRepo._processDeltaQueue'); + + this._deltaQueue.forEach((delta) => { + const { modelName } = delta.model; + this.executorStore.store[modelName]?.enqueueDelta(delta); + }); + + // for each executor + // TODO: fires SubscriptionExecutor.processDeltaQueue and SubscriptionExecutor._flushDeltas 3 times + // ExecutorStore has 3 ModelName for Subscriptions: smsSubscription, emailSubscription, pushSubscription + this.forceDeltaQueueProcessingOnAllExecutors(); + // executors flush is in the above method + + this._flushDeltas(); } }