diff --git a/.gitignore b/.gitignore index 6704566..cab290e 100644 --- a/.gitignore +++ b/.gitignore @@ -102,3 +102,5 @@ dist # TernJS port file .tern-port + +package-lock.json diff --git a/@types/task.queue.d.ts b/@types/task.queue.d.ts new file mode 100644 index 0000000..18ae882 --- /dev/null +++ b/@types/task.queue.d.ts @@ -0,0 +1,15 @@ +'use strict' + +import { QueueObject } from 'async' + +export class TaskQueue { + protected queues: Map> + + public hasQueue(key: string): boolean + + public initQueue(key: string, concurrency: number = 1): boolean + + public getQueue(key: string): QueueObject | undefined + + public pushTask(key: string, task: () => Promise): Promise +} diff --git a/README.md b/README.md index e11edbb..6ba2c4f 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,46 @@ # task-queue-js -nodejs task queue work running asynchronous functions + +NodeJS task queue work running asynchronous functions + +## Installing +```console +npm install --save-prod task-queue-js +``` + +## Testing +``` +npm test +``` + +## Usage +```javascript +const { TaskQueue } = require('task-queue-js') + +const main = async () => { + const tq = new TaskQueue() + tq.initQueue('foo') + + const calcs = [] + const someTask = async (ms, i) => { + await new Promise(resolve => setTimeout(resolve, ms)) + calcs.push(i) + return i + } + + const promises = [] + promises.push(tq.pushTask('foo', () => someTask(3000, 1))) + promises.push(tq.pushTask('foo', () => someTask(5000, 2))) + promises.push(tq.pushTask('foo', () => someTask(1000, 3))) + promises.push(tq.pushTask('foo', () => someTask(1000, 4))) + + const batchRes = await Promise.all(promises) + console.log('batch results', batchRes, calcs) +} + +main().catch(console.error) +``` + +More examples can be found under examples directory! + +## Authors +- vigan.abd diff --git a/examples/concurrent.js b/examples/concurrent.js new file mode 100644 index 0000000..6826445 --- /dev/null +++ b/examples/concurrent.js @@ -0,0 +1,31 @@ +'use strict' + +const { TaskQueue } = require('../index') + +const main = async () => { + const tq = new TaskQueue() + tq.initQueue('foo', 3) + + const taskRes = await tq.pushTask('foo', async () => { + return 0.1 + 0.2 + }) + console.log('single task result', taskRes) + + const calcs = [] + const someTask = async (ms, i) => { + await new Promise(resolve => setTimeout(resolve, ms)) + calcs.push(i) + return i + } + + const promises = [] + promises.push(tq.pushTask('foo', () => someTask(3000, 1))) + promises.push(tq.pushTask('foo', () => someTask(5000, 2))) + promises.push(tq.pushTask('foo', () => someTask(1000, 3))) + promises.push(tq.pushTask('foo', () => someTask(1000, 4))) + + const batchRes = await Promise.all(promises) + console.log('batch results', batchRes, calcs) +} + +main().catch(console.error) diff --git a/examples/sequential.js b/examples/sequential.js new file mode 100644 index 0000000..901bccf --- /dev/null +++ b/examples/sequential.js @@ -0,0 +1,26 @@ +'use strict' + +const { TaskQueue } = require('../index') + +const main = async () => { + const tq = new TaskQueue() + tq.initQueue('foo') + + const calcs = [] + const someTask = async (ms, i) => { + await new Promise(resolve => setTimeout(resolve, ms)) + calcs.push(i) + return i + } + + const promises = [] + promises.push(tq.pushTask('foo', () => someTask(3000, 1))) + promises.push(tq.pushTask('foo', () => someTask(5000, 2))) + promises.push(tq.pushTask('foo', () => someTask(1000, 3))) + promises.push(tq.pushTask('foo', () => someTask(1000, 4))) + + const batchRes = await Promise.all(promises) + console.log('batch results', batchRes, calcs) +} + +main().catch(console.error) diff --git a/index.d.ts b/index.d.ts new file mode 100644 index 0000000..36b8806 --- /dev/null +++ b/index.d.ts @@ -0,0 +1,3 @@ +'use strict' + +export { TaskQueue } from './@types/task.queue' diff --git a/index.js b/index.js new file mode 100644 index 0000000..830a19b --- /dev/null +++ b/index.js @@ -0,0 +1,5 @@ +const TaskQueue = require('./src/task.queue') + +module.exports = { + TaskQueue +} diff --git a/package.json b/package.json new file mode 100644 index 0000000..76e75a7 --- /dev/null +++ b/package.json @@ -0,0 +1,34 @@ +{ + "name": "task-queue-js", + "version": "1.0.0", + "description": "simple task queue for handling asynchronous operations", + "main": "index.js", + "scripts": { + "format": "standard --fix", + "lint": "standard", + "test": "npm run lint && npm run unit", + "unit": "NODE_ENV=test mocha ./test" + }, + "repository": { + "type": "git", + "url": "git+https://github.com/vigan-abd/task-queue-js.git" + }, + "keywords": [ + "task queue", + "async queue" + ], + "author": "vigan.abd", + "license": "GPL-3.0", + "bugs": { + "url": "https://github.com/vigan-abd/task-queue-js/issues" + }, + "homepage": "https://github.com/vigan-abd/task-queue-js#readme", + "dependencies": { + "async": "^3.2.4", + "flat-promise": "^1.0.3" + }, + "devDependencies": { + "mocha": "^10.0.0", + "standard": "^17.0.0" + } +} diff --git a/src/task.queue.js b/src/task.queue.js new file mode 100644 index 0000000..1f08125 --- /dev/null +++ b/src/task.queue.js @@ -0,0 +1,59 @@ +'use strict' + +const async = require('async') +const flatPromise = require('flat-promise') + +class TaskQueue { + constructor () { + /** @type {Map} */ + this.queues = new Map() + } + + /** + * @param {string} key + */ + hasQueue (key) { + return this.queues.has(key) + } + + /** + * @param {string} key + * @param {number} [concurrency] + */ + initQueue (key, concurrency = 1) { + if (this.queues.has(key)) return false + + this.queues.set(key, async.queue(async (job, cb) => { + try { + const res = await job.task() + job.resolve(res) + } catch (err) { + job.reject(err) + } finally { + cb() // task queue cb + } + }, concurrency)) + + return true + } + + /** + * @param {string} key + */ + getQueue (key) { + return this.queues.get(key) + } + + pushTask (key, task) { + const queue = this.queues.get(key) + if (!queue) return Promise.reject(new Error('ERR_TASK_QUEUE_NOT_FOUND')) + + const { promise, reject, resolve } = flatPromise() + const job = { task, resolve, reject } + queue.push(job) + + return promise + } +} + +module.exports = TaskQueue diff --git a/test/helper.js b/test/helper.js new file mode 100644 index 0000000..bd40d08 --- /dev/null +++ b/test/helper.js @@ -0,0 +1,7 @@ +'use strict' + +const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms)) + +module.exports = { + sleep +} diff --git a/test/task.queue.js b/test/task.queue.js new file mode 100644 index 0000000..481c429 --- /dev/null +++ b/test/task.queue.js @@ -0,0 +1,134 @@ +'use strict' + +/* eslint-env mocha */ + +const assert = require('assert') +const TaskQueue = require('../src/task.queue') +const { sleep } = require('./helper') + +describe('TaskQueue tests', () => { + describe('hasQueue tests', () => { + it('should return false when queue does not exist', () => { + const tq = new TaskQueue() + + assert.strictEqual(tq.hasQueue('foo'), false) + }) + + it('should return true when queue exists', () => { + const tq = new TaskQueue() + tq.initQueue('foo') + + assert.strictEqual(tq.hasQueue('foo'), true) + }) + }) + + describe('initQueue tests', () => { + it('should return true when queue does not exist', () => { + const tq = new TaskQueue() + const res = tq.initQueue('foo') + + assert.strictEqual(res, true) + assert.strictEqual(tq.hasQueue('foo'), true) + }) + + it('should return true when queue already exists', () => { + const tq = new TaskQueue() + tq.initQueue('foo') + const res = tq.initQueue('foo') + + assert.strictEqual(res, false) + assert.strictEqual(tq.hasQueue('foo'), true) + }) + }) + + describe('getQueue tests', () => { + it('should return undefined when queue does not exist', () => { + const tq = new TaskQueue() + const res = tq.getQueue('foo') + + assert.strictEqual(res, undefined) + }) + + it('should return instance of async queue when it exists', () => { + const tq = new TaskQueue() + tq.initQueue('foo') + const res = tq.getQueue('foo') + + assert.strictEqual(typeof res, 'object') + assert.strictEqual(res.concurrency, 1) + }) + }) + + describe('pushTask tests', () => { + const job = async (mts, i, arr) => { + await sleep(mts) + arr.push(i) + return i + } + + it('should reject when queue does not exist', async () => { + const tq = new TaskQueue() + + await assert.rejects( + tq.pushTask('foo', async () => 123), + (err) => { + assert.ok(err instanceof Error) + assert.strictEqual(err.message, 'ERR_TASK_QUEUE_NOT_FOUND') + return true + } + ) + }) + + it('should process promise and return result', async () => { + const tq = new TaskQueue() + tq.initQueue('foo') + const res = await tq.pushTask('foo', async () => { + await sleep(500) + return 123 + }) + + assert.strictEqual(res, 123) + }) + + it('should work with non async functions', async () => { + const tq = new TaskQueue() + tq.initQueue('foo') + const res = await tq.pushTask('foo', () => 123) + + assert.strictEqual(res, 123) + }) + + it('should work process items according to concurrency', async () => { + const tq = new TaskQueue() + tq.initQueue('foo', 1) + + const promises = [] + const process = [] + promises.push(tq.pushTask('foo', () => job(500, 1, process))) + promises.push(tq.pushTask('foo', () => job(200, 2, process))) + promises.push(tq.pushTask('foo', () => job(700, 3, process))) + promises.push(tq.pushTask('foo', () => job(300, 4, process))) + const res = await Promise.all(promises) + + assert.deepStrictEqual(res, [1, 2, 3, 4]) + assert.deepStrictEqual(process, [1, 2, 3, 4]) + }) + + it('should support parallel processing as well', async () => { + const tq = new TaskQueue() + tq.initQueue('foo', 3) + + const promises = [] + const process = [] + + promises.push(tq.pushTask('foo', () => job(3000, 1, process))) + promises.push(tq.pushTask('foo', () => job(5000, 2, process))) + promises.push(tq.pushTask('foo', () => job(4000, 3, process))) + promises.push(tq.pushTask('foo', () => job(1000, 4, process))) + const res = await Promise.all(promises) + + assert.deepStrictEqual(res, [1, 2, 3, 4]) + assert.deepStrictEqual(process, [1, 3, 4, 2]) + }).timeout(10000) + }) +}).timeout(7000)