Skip to content

Commit

Permalink
Add delta queue to operation repo
Browse files Browse the repository at this point in the history
Added a delta queue to the operation repo to flush all executor deltas at the same time. The operation repo's delta queue enqueues all deltas and then will sort and pass the delta to its correct executor. Previously, each executor was in charge of enqueuing and flushing their own delta queue. This change also aligns the Web SDK more with the iOS SDK's operation repo implementation.
  • Loading branch information
shepherd-l committed Jun 5, 2024
1 parent e652544 commit 1a427fb
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 11 deletions.
2 changes: 1 addition & 1 deletion __test__/support/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* @constant {number} DELTA_QUEUE_TIME_ADVANCE - The time advance for the delta queue.
*/
export const OPERATION_QUEUE_TIME_ADVANCE = 5001;
export const DELTA_QUEUE_TIME_ADVANCE = 1001;
export const DELTA_QUEUE_TIME_ADVANCE = 5001;

/* S T R I N G C O N S T A N T S */
export const APP_ID = '34fcbe85-278d-4fd2-a4ec-0f80e95072c5';
Expand Down
7 changes: 0 additions & 7 deletions src/core/executors/ExecutorBase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,10 @@ export default abstract class ExecutorBase {

private onlineStatus = true;

static DELTAS_BATCH_PROCESSING_TIME = 1;
static OPERATIONS_BATCH_PROCESSING_TIME = 5;
static RETRY_COUNT = 5;

constructor(executorConfig: ExecutorConfig<SupportedModel>) {
setInterval(() => {
if (this._deltaQueue.length > 0) {
this.processDeltaQueue.call(this);
}
}, ExecutorBase.DELTAS_BATCH_PROCESSING_TIME * 1_000);

setInterval(() => {
Log.debug('OneSignal: checking for operations to process from cache');
const cachedOperations = this.getOperationsFromCache();
Expand Down
36 changes: 33 additions & 3 deletions src/core/operationRepo/OperationRepo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import { logMethodCall } from '../../shared/utils/utils';
export class OperationRepo {
public executorStore: ExecutorStore;
private _unsubscribeFromModelRepo: () => void;
private _deltaQueue: CoreDelta<SupportedModel>[] = [];
static DELTAS_BATCH_PROCESSING_TIME = 5;

constructor(private modelRepo: ModelRepo) {
this.executorStore = new ExecutorStore();
Expand All @@ -16,6 +18,12 @@ export class OperationRepo {
this._processDelta(delta);
},
);

setInterval(() => {
if (this._deltaQueue.length > 0) {
this._processDeltaQueue();
}
}, OperationRepo.DELTAS_BATCH_PROCESSING_TIME * 1_000);
}

setModelRepoAndResubscribe(modelRepo: ModelRepo) {
Expand All @@ -33,9 +41,31 @@ export class OperationRepo {
this.executorStore.forceDeltaQueueProcessingOnAllExecutors();
}

private _flushDeltas(): void {
logMethodCall('OperationRepo._flushDeltas');
this._deltaQueue = [];
}

private _processDelta(delta: CoreDelta<SupportedModel>): void {
logMethodCall('processDelta', { delta });
const { modelName } = delta.model;
this.executorStore.store[modelName]?.enqueueDelta(delta);
logMethodCall('OperationRepo._processDelta', { delta });
const deltaCopy = JSON.parse(JSON.stringify(delta));
this._deltaQueue.push(deltaCopy);
}

private _processDeltaQueue(): void {
logMethodCall('OperationRepo._processDeltaQueue');

this._deltaQueue.forEach((delta) => {
const { modelName } = delta.model;
this.executorStore.store[modelName]?.enqueueDelta(delta);
});

// for each executor
// TODO: fires SubscriptionExecutor.processDeltaQueue and SubscriptionExecutor._flushDeltas 3 times
// ExecutorStore has 3 ModelName for Subscriptions: smsSubscription, emailSubscription, pushSubscription
this.forceDeltaQueueProcessingOnAllExecutors();
// executors flush is in the above method

this._flushDeltas();
}
}

0 comments on commit 1a427fb

Please sign in to comment.