-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathqyu.js
253 lines (233 loc) · 7.96 KB
/
qyu.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
const EventEmitter = require('events');
const RateLimiter = require('./RateLimiter');
const LOWEST_PRIO = 10;
const DEFAULT_QUEUE_OPTIONS = {
log: { trace: () => {}, debug: () => {} }, // can be replaced by instance of simple-node-logger
rateLimit: null, // falsy => process in series. otherwise: max number of jobs to run within 1 second
statsInterval: 500, // emit `stats` every second
};
const DEFAULT_JOB_OPTIONS = {
priority: LOWEST_PRIO, // low job priority by default, when calling push()
};
// useful to get stack traces from UnhandledPromiseRejectionWarning errors
if (process.env.CATCH_UNHANDLED_REJECTIONS) {
process.on('unhandledRejection', r => console.error(r));
}
var nextJobId = 0; // global job counter, used to generate unique ids
/**
* Holds, controls and reports on execution of a queue of asynchronous jobs.
* @fires done
* @fires error
* @fires drain
* @fires stats
*/
class Qyu extends EventEmitter {
/**
* Instanciates a job queue.
* @param {Object} opts
* @param {number} opts.rateLimit - Maximum number of jobs to be run per second. If `null`, jobs will be run sequentially.
* @param {number} opts.statsInterval - interval for emitting `stats`, in ms
* @param {SimpleNodeLogger} opts.log - instance of simple-node-logger (optional)
* @param {boolean} opts.rejectErrorsOnPush - if true, push()'s premise will reject in case of job error
*/
constructor(opts) {
super(opts);
this.opts = Object.assign({}, DEFAULT_QUEUE_OPTIONS, opts);
this.log = this.opts.log;
this.log.trace('Qyu:constructor() ', opts);
this.jobs = []; // unsorted array of { job, opts } objects
this.started = false; // turns to `true` when client called `start()`
this.rateLimiter = new RateLimiter(this.opts);
this.rateLimiter.on('stats', (stats) => {
this.log.trace('Qyu ⚡️ stats ', stats);
/**
* Fired every `opts.statsInterval` milliseconds, to tell how many jobs are processed per second.
* @event stats
* @memberof Qyu
* @type {object}
* @property {number} nbJobsPerSecond - number of jobs that are processed per second
*/
this.emit('stats', stats);
this._processJobs(); // will run a job if possible
});
}
/**
* emit an `error` event
* @private
* @param {Object} err
* @param {number} err.jobId - identifier of the job that throwed the error
* @param {Error} err.error - error object throwed by the job
*/
_error({jobId, error}) {
this.log.trace('Qyu ⚡️ error ', {jobId, error});
/**
* Fired every time a job fails by throwing an error.
* @event error
* @memberof Qyu
* @type {Object}
* @property {number} jobId - identifier of the job that throwed the error
* @property {Error} error - error object throwed by the job
*/
this.emit('error', {jobId, error});
}
/**
* emit a `done` event
* @private
* @param {Object} res
* @param {Object} res.jobId - identifier of the job which execution ended
* @param {Object} res.jobResult - return value of the job function
* @param {Object} res.res - TBD
*/
_done(res) {
this.log.trace('Qyu ⚡️ done ', res);
/**
* Fired every time a job's execution has ended succesfully.
* @event done
* @memberof Qyu
* @type {object}
* @property {number} jobId - identifier of the job which execution ended
* @property {*} jobResult - return value of the job function
* @property {*} res - TBD
*/
this.emit('done', res);
}
/**
* called by _processJob() when a job has ended (with or without error)
* @private
* @param {Object} job
* @param {number} job.id - id of the job function that ended
* @param {boolean} withError - true if the job ended becaused of an error
* @param {*} jobResultOrError - return value of the job function that ended, or error
*/
_jobEnded(job, withError, jobResultOrError) {
this.log.trace('Qyu:_jobEnded() ', Array.prototype.slice.call(arguments));
this.rateLimiter.jobEnded();
const jobObj = { jobId: job.id };
if (withError) {
const failObj = Object.assign(jobObj, { error: jobResultOrError });
this._error(failObj);
if (this.opts.rejectErrorsOnPush) {
job.pushPromise.reject(failObj);
}
} else {
const doneObj = Object.assign(jobObj, { jobResult: jobResultOrError });
this._done(doneObj);
job.pushPromise.resolve(doneObj);
}
this._drainIfNoMore();
this._processJobs();
}
/**
* @private
* @returns true if a job can be processed right now.
*/
_readyToRunJobs() {
return this.started && this.jobs.length && this.rateLimiter.canRunMore();
}
/**
* runs the next job, if any, and if allowed by rate limiter.
* @private
*/
_processJob() {
const readyToRunJobs = this._readyToRunJobs();
this.log.trace('Qyu:_processJob() ', {
started: this.started,
running: this.rateLimiter.running,
remaining: this.jobs.map(j => j.id),
readyToRunJobs
});
if (readyToRunJobs) {
const priority = Math.min.apply(Math, this.jobs.map(job => job.opts.priority));
const job = this.jobs.find(job => job.opts.priority === priority);
this.jobs = this.jobs.filter(j => j.id !== job.id); // remove job from queue
this.log.debug('Qyu starting job ', job);
this.rateLimiter.jobStarted();
job.job()
.then(this._jobEnded.bind(this, job, false))
.catch(this._jobEnded.bind(this, job, true));
}
}
/**
* runs as many jobs as allowed by rate limiter.
* @private
*/
_processJobs() {
do {
this._processJob();
} while (this._readyToRunJobs());
}
/**
* emits drain and disables the rate limiter if there are no more jobs to process.
* @private
*/
_drainIfNoMore() {
this.log.trace('Qyu:_drainIfNoMore() ', {
started: this.started,
running: this.rateLimiter.running,
remaining: this.jobs.map(j => j.id)
});
if (!this.jobs.length && !this.rateLimiter.running) {
this.log.trace('Qyu ⚡️ drain');
/**
* Fired when no more jobs are to be run.
* @event drain
* @memberof Qyu
*/
this.emit('drain');
this.rateLimiter.toggle(false);
}
}
/**
* Add a job to this queue, and runs it if queue was started.
* @param {Function} job is a function returning a promise to indicate when the job is done
* @param {Object} opts
* @param {number} opts.priority from 1 to 10, 1 being the highest priority
* @returns {Promise} A promise that resolves with {jobId, jobResult}
*/
push(job, opts) {
return new Promise((resolve, reject) => {
const id = nextJobId++;
this.log.trace(`Qyu:push() id: ${id}, opts:`, opts);
this.jobs.push({
id,
job,
opts: Object.assign({}, DEFAULT_JOB_OPTIONS, opts),
pushPromise: { resolve, reject }
});
if (this.started) {
this.rateLimiter.toggle(true); // necessary for jobs pushed after drain
}
this._processJobs(); // useful for when jobs were pushed after Qyu was started
});
}
/**
* Pause all running jobs of this queue.
* @returns {Promise} A promise that resolves when the queue has paused (no jobs being processed)
*/
pause() {
this.log.trace('Qyu:pause()');
this.started = false; // prevent next jobs from being processed
return this.rateLimiter.waitForDrain().then(() => {
this.rateLimiter.toggle(false);
});
}
/**
* Start running jobs of this queue.
* @returns {Promise} A promise that resolves when the queue has started (first time) or unpaused
*/
start() {
this.log.trace('Qyu:start()');
return new Promise((resolve, reject) => {
this.started = true;
// throw 'dumm2'; // for testing
this.rateLimiter.toggle(true); // makes sure that the interval is started asap
this._drainIfNoMore();
this._processJobs();
resolve();
});
}
}
function qyu(opts) {
return new Qyu(opts);
}
module.exports = qyu;