Skip to content

Commit

Permalink
WIP: still buggy, need to revisit
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelbromley committed Dec 2, 2024
1 parent 254b025 commit 0367a0a
Show file tree
Hide file tree
Showing 8 changed files with 133 additions and 52 deletions.
1 change: 1 addition & 0 deletions packages/core/src/config/default-config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ export const defaultConfig: RuntimeVendureConfig = {
jobQueueStrategy: new InMemoryJobQueueStrategy(),
jobBufferStorageStrategy: new InMemoryJobBufferStorageStrategy(),
activeQueues: [],
excludedQueues: [],
prefix: '',
},
customFields: {
Expand Down
16 changes: 15 additions & 1 deletion packages/core/src/config/vendure-config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -934,12 +934,26 @@ export interface JobQueueOptions {
* @description
* Defines the queues that will run in this process.
* This can be used to configure only certain queues to run in this process.
* If its empty all queues will be run. Note: this option is primarily intended
*
* If its empty all queues will be run, except for any queues
* listed in `excludedQueues`.
*
* Note: this option is primarily intended
* to apply to the Worker process. Jobs will _always_ get published to the queue
* regardless of this setting, but this setting determines whether they get
* _processed_ or not.
*/
activeQueues?: string[];
/**
* @description
* Defines the queues that will be excluded from running in this process.
* Any queue in this list will not be processed by the worker process.
*
* If a queue is in both `activeQueues` and `excludeQueues`, it will be excluded.
*
* @since 3.1.0
*/
excludedQueues?: string[];
/**
* @description
* Prefixes all job queue names with the passed string. This is useful with multiple deployments
Expand Down
5 changes: 5 additions & 0 deletions packages/core/src/job-queue/job-queue.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,11 @@ export class JobQueueService implements OnModuleDestroy {
}

private shouldStartQueue(queueName: string): boolean {
if (this.configService.jobQueueOptions.excludedQueues.length > 0) {
if (this.configService.jobQueueOptions.excludedQueues.includes(queueName)) {
return false;
}
}
if (this.configService.jobQueueOptions.activeQueues.length > 0) {
if (!this.configService.jobQueueOptions.activeQueues.includes(queueName)) {
return false;
Expand Down
6 changes: 4 additions & 2 deletions packages/dev-server/dev-config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
LogLevel,
VendureConfig,
} from '@vendure/core';
import { PluginWithJobQueue } from '@vendure/core/e2e/fixtures/test-plugins/with-job-queue';
import { ElasticsearchPlugin } from '@vendure/elasticsearch-plugin';
import { defaultEmailHandlers, EmailPlugin, FileBasedTemplateLoader } from '@vendure/email-plugin';
import { BullMQJobQueuePlugin } from '@vendure/job-queue-plugin/package/bullmq';
Expand Down Expand Up @@ -72,14 +73,15 @@ export const devConfig: VendureConfig = {
// platformFeePercent: 10,
// platformFeeSKU: 'FEE',
// }),
PluginWithJobQueue,
AssetServerPlugin.init({
route: 'assets',
assetUploadDir: path.join(__dirname, 'assets'),
}),
DefaultSearchPlugin.init({ bufferUpdates: false, indexStockStatus: false }),
// Enable if you need to debug the job queue
// BullMQJobQueuePlugin.init({}),
DefaultJobQueuePlugin.init({}),
BullMQJobQueuePlugin.init({}),
// DefaultJobQueuePlugin.init({}),
// JobQueueTestPlugin.init({ queueCount: 10 }),
// ElasticsearchPlugin.init({
// host: 'http://localhost',
Expand Down
10 changes: 8 additions & 2 deletions packages/dev-server/index-worker.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
import { bootstrapWorker } from '@vendure/core';
import { bootstrapWorker, mergeConfig } from '@vendure/core';

import { devConfig } from './dev-config';

bootstrapWorker(devConfig)
bootstrapWorker(
mergeConfig(devConfig, {
jobQueueOptions: {
// excludedQueues: ['update-search-index'],
},
}),
)
.then(worker => worker.startJobQueue())
// .then(worker => worker.startHealthCheckServer({ port: 3001 }))
.catch(err => {
Expand Down
64 changes: 17 additions & 47 deletions packages/job-queue-plugin/src/bullmq/bullmq-job-queue-strategy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,17 @@ import {
import Bull, { ConnectionOptions, JobType, Processor, Queue, Worker, WorkerOptions } from 'bullmq';
import { EventEmitter } from 'events';
import { Cluster, Redis, RedisOptions } from 'ioredis';
import { firstValueFrom, Subject, Subscription, lastValueFrom } from 'rxjs';
import { map, tap, filter, takeUntil, debounceTime } from 'rxjs/operators';
import { firstValueFrom, Subject, Subscription } from 'rxjs';
import { map, takeUntil } from 'rxjs/operators';

import { ALL_JOB_TYPES, BULLMQ_PLUGIN_OPTIONS, loggerCtx } from './constants';
import { getGlobalId, MAX_QUEUE_ID, parseGlobalId } from './global-id';
import { RedisHealthIndicator } from './redis-health-indicator';
import { BullMQPluginOptions } from './types';

const QUEUE_NAME_PREFIX = 'vendure-queue-';
const DEFAULT_CONCURRENCY = 3;

const QUEUE_ID_BITS = 12; // 12 bits for the queue ID (supports 4096 queues)
const JOB_ID_BITS = 41; // 41 bits for the job ID (supports ~2 trillion jobs per queue)
// eslint-disable-next-line no-bitwise
const MAX_QUEUE_ID = (1 << QUEUE_ID_BITS) - 1; // Max queue ID (65535)

export class GracefulShutdownTimeoutError extends Error {
constructor(message: string) {
super(message);
Expand Down Expand Up @@ -290,19 +286,6 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
}
}

getGlobalId(queueName: string, jobId: number) {
const queueID = this.queueIds.get(queueName);
if (queueID == null) {
throw new Error(`Queue "${queueName}" not found`);
}
// eslint-disable-next-line no-bitwise
if (jobId >= 1 << JOB_ID_BITS) {
throw new Error('Job ID exceeds maximum allowed value');
}
// eslint-disable-next-line no-bitwise
return (queueID << JOB_ID_BITS) | jobId;
}

async findMany(options?: JobListOptions): Promise<PaginatedList<Job>> {
const skip = options?.skip ?? 0;
const take = options?.take ?? 10;
Expand Down Expand Up @@ -346,22 +329,6 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
if (queueNameIsEqualFilter) {
const bullQueue = await this.getOrCreateBullQueue(queueNameIsEqualFilter);
items = (await bullQueue?.getJobs(jobTypes, skip, take)) ?? [];

// items = (
// await Promise.all(
// items
// .filter(job => notNullOrUndefined(job.id))
// .map(job => {
// return this.findOneBullJob(
// this.buildUniqueJobId(
// this.getBullQueueName(queueNameIsEqualFilter),
// Number(job.id),
// ),
// );
// }),
// )
// ).filter(notNullOrUndefined);

const jobCounts = (await bullQueue?.getJobCounts(...jobTypes)) ?? 0;
totalItems = Object.values(jobCounts).reduce((sum, num) => sum + num, 0);
}
Expand Down Expand Up @@ -398,10 +365,7 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
}

private async getBullJobFromGlobalId(globalId: number): Promise<Bull.Job | undefined> {
// eslint-disable-next-line no-bitwise
const queueId = (globalId >> JOB_ID_BITS) & MAX_QUEUE_ID;
// eslint-disable-next-line no-bitwise
const jobId = globalId & ((1 << JOB_ID_BITS) - 1);
const { queueId, jobId } = parseGlobalId(globalId);
const queueName = Array.from(this.queueIds.entries()).find(([_, index]) => index === queueId)?.[0];

if (!queueName) {
Expand Down Expand Up @@ -526,7 +490,6 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
Logger.info(`Closing worker`, loggerCtx);
const gracefulShutdownTimeout = this.options.gracefulShutdownTimeout ?? 1000 * 60 * 10;
const startTime = Date.now();
let timer: NodeJS.Timeout;
const checkActive = async (resolve: (value: boolean) => void) => {
let activeCount = 0;
const activeJobs: Bull.Job[] = [];
Expand All @@ -542,12 +505,16 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
Logger.info(
`Waiting on ${activeCount} active ${
activeCount > 1 ? 'jobs' : 'job'
} (${activeJobs.map(j => this.getGlobalId(j.queueName, Number(j.id))).join(', ')})...`,
} (${activeJobs
.map(j => {
const queueId = this.queueIds.get(j.queueName);
return queueId ? getGlobalId(queueId, Number(j.id)) : 'unknown queue';
})
.join(', ')})...`,
loggerCtx,
);
if (Date.now() - startTime > gracefulShutdownTimeout) {
// If we've waited too long, just close the worker
// timer = setTimeout(checkActive, 2000);
Logger.warn(
`The graceful shutdown timeout of ${gracefulShutdownTimeout}ms has been exceeded. ` +
`Setting ${activeCount} jobs as failed...`,
Expand All @@ -557,18 +524,17 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
Logger.warn('All active jobs set as failed', loggerCtx);
resolve(false);
} else {
timer = setTimeout(() => checkActive(resolve), 2000);
setTimeout(() => checkActive(resolve), 2000);
}
} else {
resolve(true);
}
};
const gracefullyStopped = await new Promise(resolve => checkActive(resolve));
await new Promise(resolve => checkActive(resolve));

await this.closeAllWorkers();
Logger.info(`Worker closed`, loggerCtx);
await this.closeAllQueues();
// clearTimeout(timer);
Logger.info(`Queue closed`, loggerCtx);
this.cancellationSub.off('message', this.subscribeToCancellationEvents);
} catch (e: any) {
Expand Down Expand Up @@ -599,9 +565,13 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {

private async createVendureJob(bullJob: Bull.Job): Promise<Job> {
const jobJson = bullJob.toJSON();
const queueId = this.queueIds.get(this.getBullQueueName(bullJob.queueName));
if (queueId == null) {
throw new InternalServerError(`Queue ID not found for queue ${bullJob.queueName}`);
}
return new Job({
queueName: bullJob.name,
id: this.getGlobalId(bullJob.queueName, Number(bullJob.id)),
id: getGlobalId(queueId, Number(bullJob.id)),
state: await this.getState(bullJob),
data: bullJob.data,
attempts: bullJob.attemptsMade,
Expand Down
18 changes: 18 additions & 0 deletions packages/job-queue-plugin/src/bullmq/global-id.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { describe, expect, it } from 'vitest';

import { getGlobalId, parseGlobalId } from './global-id';

describe('global id functions', () => {
it('works', () => {
// repeat the above for 10,000 randomly generated values
for (let i = 0; i < 10_000; i++) {
const queueId = Math.floor(Math.random() * 65535);
const jobId = Math.floor(Math.random() * 2_000_000_000);
const globalId = getGlobalId(queueId, jobId);
const parsed = parseGlobalId(globalId);

expect(parsed.queueId).toBe(queueId);
expect(parsed.jobId).toBe(jobId);
}
});
});
65 changes: 65 additions & 0 deletions packages/job-queue-plugin/src/bullmq/global-id.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// const QUEUE_ID_BITS = 12; // 12 bits for the queue ID (supports 4096 queues)
// const JOB_ID_BITS = 41; // 41 bits for the job ID (supports ~2 trillion jobs per queue)
// // eslint-disable-next-line no-bitwise
// export const MAX_QUEUE_ID = (1 << QUEUE_ID_BITS) - 1; // Max queue ID (65535)
//
// /**
// * Combines a queueId and jobId into a single number, which will be unique across
// * all queues.
// *
// * To generate globally unique integer IDs for jobs across multiple queues while being able
// * to derive the queue and job ID from the global ID, you can combine the queue ID and job ID
// * into a single number using bitwise operations.
// */
// export function getGlobalId(queueId: number, jobId: number) {
// if (queueId == null) {
// throw new Error(`Queue not found`);
// }
// // eslint-disable-next-line no-bitwise
// if (jobId >= 1 << JOB_ID_BITS) {
// throw new Error('Job ID exceeds maximum allowed value');
// }
// // eslint-disable-next-line no-bitwise
// return (queueId << JOB_ID_BITS) | jobId;
// }
//
// /**
// * Splits a global ID into its queueId and jobId components.
// */
// export function parseGlobalId(globalId: number) {
// // eslint-disable-next-line no-bitwise
// const queueId = (globalId >> JOB_ID_BITS) & MAX_QUEUE_ID;
// // eslint-disable-next-line no-bitwise
// const jobId = globalId & ((1 << JOB_ID_BITS) - 1);
// return { queueId, jobId };
// }

// import { ParsedGlobalId } from './types';
// import { validateId } from './validation';

export const MAX_QUEUE_ID = Math.pow(2, 21) - 1; // 2,097,151
const SHIFT_BITS = 32;

/**
* Combines queueId and jobId into a global identifier
* @param queueId - The queue identifier
* @param jobId - The job identifier
* @returns The combined global identifier as a number
*/
export function getGlobalId(queueId: number, jobId: number): number {
// Shift queueId left by 32 bits and combine with jobId
return queueId * Math.pow(2, SHIFT_BITS) + jobId;
}

/**
* Parses a global identifier back into its queueId and jobId components
* @param globalId - The global identifier to parse
* @returns The parsed queue and job IDs
*/
export function parseGlobalId(globalId: number) {
// Extract queueId and jobId using bit operations
const queueId = Math.floor(globalId / Math.pow(2, SHIFT_BITS));
const jobId = globalId % Math.pow(2, SHIFT_BITS);

return { queueId, jobId };
}

0 comments on commit 0367a0a

Please sign in to comment.