Skip to content

Commit

Permalink
initial lib
Browse files Browse the repository at this point in the history
  • Loading branch information
vigan-abd committed Jun 16, 2022
1 parent 392c1f5 commit bb102f6
Show file tree
Hide file tree
Showing 11 changed files with 361 additions and 1 deletion.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,5 @@ dist

# TernJS port file
.tern-port

package-lock.json
15 changes: 15 additions & 0 deletions @types/task.queue.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
'use strict'

import { QueueObject } from 'async'

export class TaskQueue {
protected queues: Map<string, QueueObject<any>>

public hasQueue(key: string): boolean

public initQueue(key: string, concurrency: number = 1): boolean

public getQueue(key: string): QueueObject<any> | undefined

public pushTask(key: string, task: () => Promise<any>): Promise<any>
}
46 changes: 45 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,46 @@
# task-queue-js
nodejs task queue work running asynchronous functions

NodeJS task queue work running asynchronous functions

## Installing
```console
npm install --save-prod task-queue-js
```

## Testing
```
npm test
```

## Usage
```javascript
const { TaskQueue } = require('task-queue-js')

const main = async () => {
const tq = new TaskQueue()
tq.initQueue('foo')

const calcs = []
const someTask = async (ms, i) => {
await new Promise(resolve => setTimeout(resolve, ms))
calcs.push(i)
return i
}

const promises = []
promises.push(tq.pushTask('foo', () => someTask(3000, 1)))
promises.push(tq.pushTask('foo', () => someTask(5000, 2)))
promises.push(tq.pushTask('foo', () => someTask(1000, 3)))
promises.push(tq.pushTask('foo', () => someTask(1000, 4)))

const batchRes = await Promise.all(promises)
console.log('batch results', batchRes, calcs)
}

main().catch(console.error)
```

More examples can be found under examples directory!

## Authors
- vigan.abd
31 changes: 31 additions & 0 deletions examples/concurrent.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
'use strict'

const { TaskQueue } = require('../index')

const main = async () => {
const tq = new TaskQueue()
tq.initQueue('foo', 3)

const taskRes = await tq.pushTask('foo', async () => {
return 0.1 + 0.2
})
console.log('single task result', taskRes)

const calcs = []
const someTask = async (ms, i) => {
await new Promise(resolve => setTimeout(resolve, ms))
calcs.push(i)
return i
}

const promises = []
promises.push(tq.pushTask('foo', () => someTask(3000, 1)))
promises.push(tq.pushTask('foo', () => someTask(5000, 2)))
promises.push(tq.pushTask('foo', () => someTask(1000, 3)))
promises.push(tq.pushTask('foo', () => someTask(1000, 4)))

const batchRes = await Promise.all(promises)
console.log('batch results', batchRes, calcs)
}

main().catch(console.error)
26 changes: 26 additions & 0 deletions examples/sequential.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
'use strict'

const { TaskQueue } = require('../index')

const main = async () => {
const tq = new TaskQueue()
tq.initQueue('foo')

const calcs = []
const someTask = async (ms, i) => {
await new Promise(resolve => setTimeout(resolve, ms))
calcs.push(i)
return i
}

const promises = []
promises.push(tq.pushTask('foo', () => someTask(3000, 1)))
promises.push(tq.pushTask('foo', () => someTask(5000, 2)))
promises.push(tq.pushTask('foo', () => someTask(1000, 3)))
promises.push(tq.pushTask('foo', () => someTask(1000, 4)))

const batchRes = await Promise.all(promises)
console.log('batch results', batchRes, calcs)
}

main().catch(console.error)
3 changes: 3 additions & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
'use strict'

export { TaskQueue } from './@types/task.queue'
5 changes: 5 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
const TaskQueue = require('./src/task.queue')

module.exports = {
TaskQueue
}
34 changes: 34 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
{
"name": "task-queue-js",
"version": "1.0.0",
"description": "simple task queue for handling asynchronous operations",
"main": "index.js",
"scripts": {
"format": "standard --fix",
"lint": "standard",
"test": "npm run lint && npm run unit",
"unit": "NODE_ENV=test mocha ./test"
},
"repository": {
"type": "git",
"url": "git+https://github.com/vigan-abd/task-queue-js.git"
},
"keywords": [
"task queue",
"async queue"
],
"author": "vigan.abd",
"license": "GPL-3.0",
"bugs": {
"url": "https://github.com/vigan-abd/task-queue-js/issues"
},
"homepage": "https://github.com/vigan-abd/task-queue-js#readme",
"dependencies": {
"async": "^3.2.4",
"flat-promise": "^1.0.3"
},
"devDependencies": {
"mocha": "^10.0.0",
"standard": "^17.0.0"
}
}
59 changes: 59 additions & 0 deletions src/task.queue.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
'use strict'

const async = require('async')
const flatPromise = require('flat-promise')

class TaskQueue {
constructor () {
/** @type {Map<string, async.QueueObject>} */
this.queues = new Map()
}

/**
* @param {string} key
*/
hasQueue (key) {
return this.queues.has(key)
}

/**
* @param {string} key
* @param {number} [concurrency]
*/
initQueue (key, concurrency = 1) {
if (this.queues.has(key)) return false

this.queues.set(key, async.queue(async (job, cb) => {
try {
const res = await job.task()
job.resolve(res)
} catch (err) {
job.reject(err)
} finally {
cb() // task queue cb
}
}, concurrency))

return true
}

/**
* @param {string} key
*/
getQueue (key) {
return this.queues.get(key)
}

pushTask (key, task) {
const queue = this.queues.get(key)
if (!queue) return Promise.reject(new Error('ERR_TASK_QUEUE_NOT_FOUND'))

const { promise, reject, resolve } = flatPromise()
const job = { task, resolve, reject }
queue.push(job)

return promise
}
}

module.exports = TaskQueue
7 changes: 7 additions & 0 deletions test/helper.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
'use strict'

const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms))

module.exports = {
sleep
}
134 changes: 134 additions & 0 deletions test/task.queue.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
'use strict'

/* eslint-env mocha */

const assert = require('assert')
const TaskQueue = require('../src/task.queue')
const { sleep } = require('./helper')

describe('TaskQueue tests', () => {
describe('hasQueue tests', () => {
it('should return false when queue does not exist', () => {
const tq = new TaskQueue()

assert.strictEqual(tq.hasQueue('foo'), false)
})

it('should return true when queue exists', () => {
const tq = new TaskQueue()
tq.initQueue('foo')

assert.strictEqual(tq.hasQueue('foo'), true)
})
})

describe('initQueue tests', () => {
it('should return true when queue does not exist', () => {
const tq = new TaskQueue()
const res = tq.initQueue('foo')

assert.strictEqual(res, true)
assert.strictEqual(tq.hasQueue('foo'), true)
})

it('should return true when queue already exists', () => {
const tq = new TaskQueue()
tq.initQueue('foo')
const res = tq.initQueue('foo')

assert.strictEqual(res, false)
assert.strictEqual(tq.hasQueue('foo'), true)
})
})

describe('getQueue tests', () => {
it('should return undefined when queue does not exist', () => {
const tq = new TaskQueue()
const res = tq.getQueue('foo')

assert.strictEqual(res, undefined)
})

it('should return instance of async queue when it exists', () => {
const tq = new TaskQueue()
tq.initQueue('foo')
const res = tq.getQueue('foo')

assert.strictEqual(typeof res, 'object')
assert.strictEqual(res.concurrency, 1)
})
})

describe('pushTask tests', () => {
const job = async (mts, i, arr) => {
await sleep(mts)
arr.push(i)
return i
}

it('should reject when queue does not exist', async () => {
const tq = new TaskQueue()

await assert.rejects(
tq.pushTask('foo', async () => 123),
(err) => {
assert.ok(err instanceof Error)
assert.strictEqual(err.message, 'ERR_TASK_QUEUE_NOT_FOUND')
return true
}
)
})

it('should process promise and return result', async () => {
const tq = new TaskQueue()
tq.initQueue('foo')
const res = await tq.pushTask('foo', async () => {
await sleep(500)
return 123
})

assert.strictEqual(res, 123)
})

it('should work with non async functions', async () => {
const tq = new TaskQueue()
tq.initQueue('foo')
const res = await tq.pushTask('foo', () => 123)

assert.strictEqual(res, 123)
})

it('should work process items according to concurrency', async () => {
const tq = new TaskQueue()
tq.initQueue('foo', 1)

const promises = []
const process = []
promises.push(tq.pushTask('foo', () => job(500, 1, process)))
promises.push(tq.pushTask('foo', () => job(200, 2, process)))
promises.push(tq.pushTask('foo', () => job(700, 3, process)))
promises.push(tq.pushTask('foo', () => job(300, 4, process)))
const res = await Promise.all(promises)

assert.deepStrictEqual(res, [1, 2, 3, 4])
assert.deepStrictEqual(process, [1, 2, 3, 4])
})

it('should support parallel processing as well', async () => {
const tq = new TaskQueue()
tq.initQueue('foo', 3)

const promises = []
const process = []

promises.push(tq.pushTask('foo', () => job(3000, 1, process)))
promises.push(tq.pushTask('foo', () => job(5000, 2, process)))
promises.push(tq.pushTask('foo', () => job(4000, 3, process)))
promises.push(tq.pushTask('foo', () => job(1000, 4, process)))
const res = await Promise.all(promises)

assert.deepStrictEqual(res, [1, 2, 3, 4])
assert.deepStrictEqual(process, [1, 3, 4, 2])
}).timeout(10000)
})
}).timeout(7000)

0 comments on commit bb102f6

Please sign in to comment.