-
Notifications
You must be signed in to change notification settings - Fork 75
/
completed-task-poller.js
323 lines (300 loc) · 11.6 KB
/
completed-task-poller.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
// Copyright 2015, EMC, Inc.
'use strict';
var di = require('di');
module.exports = completedTaskPollerFactory;
di.annotate(completedTaskPollerFactory,
new di.Provide('TaskGraph.CompletedTaskPoller'));
di.annotate(completedTaskPollerFactory,
new di.Inject(
'TaskGraph.Store',
'Services.GraphProgress',
'Protocol.Events',
'Logger',
'Assert',
'Constants',
'Rx',
'Promise',
'_'
)
);
function completedTaskPollerFactory(
store,
graphProgressService,
eventsProtocol,
Logger,
assert,
Constants,
Rx,
Promise,
_
) {
var logger = Logger.initialize(completedTaskPollerFactory);
/**
* The CompletedTaskPoller polls the store for any tasks that have been
* finished or marked as unreachable (in the case of multiple branches of
* execution within a graph). It also evaluates graph states for
* completed tasks, and finally deletes them from the store so that it
* doesn't grow to be too large.
*
* @param {String} domain
* @param {Object} options
* @constructor CompletedTaskPoller
*/
function CompletedTaskPoller(domain, options) {
options = options || {};
this.running = false;
this.pollInterval = options.pollInterval || 1000;
this.concurrentCounter = { count: 0, max: 1 };
this.completedTaskBatchSize = options.completedTaskBatchSize || 200;
assert.number(this.completedTaskBatchSize, 'completedTaskBatchSize');
this.domain = domain || Constants.Task.DefaultDomain;
assert.string(this.domain, 'domain');
this.debug = _.has(options, 'debug') ? options.debug : false;
}
/**
* Poll the store for completed tasks.
*
* @memberOf CompletedTaskPoller
*/
CompletedTaskPoller.prototype.pollTasks = function() {
var self = this;
/*
* Before setting up the stream, make sure it is running, otherwise
* this will create a stream that will never run and immediately complete.
* This is basically defensive programming to try to prevent accidents where the
* startup code is executed in the wrong order (e.g. pollTasks() then
* this.running = true, that would be buggy because pollTasks() would
* stop execution before this.running = true was set).
*/
assert.ok(self.running, 'lease expiration poller is running');
/*
* For those unfamiliar with Rx.js:
*
* This Rx.Observable.interval call produces a continuously running
* pipeline of the chained calls below, that triggers on every
* pollInterval period. The reason for the .takeWhile call at the top is
* to basically implement an auto-disposal mechanism: before doing any
* of the subsequent logic, first check if we're still running and
* just shut down whole stream if we're not. That way we don't have
* to do any asynchronous disposal when stopping the service, we just
* set a variable this.running = false.
*
* The .map call is where the actual work is done, and the rest of the
* calls are just coordination about when to do it.
*/
Rx.Observable.interval(self.pollInterval)
.takeWhile(self.isRunning.bind(self))
.map(self.processCompletedTasks.bind(self, self.completedTaskBatchSize))
.mergeLossy(self.concurrentCounter)
// Don't let processCompletedTasks return a waterline object to the logger
// otherwise it will exceed the call stack trying to traverse a circular object
.map(null)
.subscribe(
// Success handler callback. Only log in debug mode.
self.handleStreamDebug.bind(self, 'CompletedTaskPoller stream pipeline success'),
// Error handler callback. This is considered catastrophic, most errors
// _should_ be caught at a lower level and not bubbled up.
self.handleStreamError.bind(self, 'Error with completed task deletion stream.')
);
};
/**
* This is used with Rx.Observable.prototype.takeWhile in the Observable
* created by CompletedTaskPoller.prototype.pollTasks. When isRunning()
* returns false, all the observables will automatically dispose.
*
* @returns {Boolean}
* @memberOf CompletedTaskPoller
*/
CompletedTaskPoller.prototype.isRunning = function() {
return this.running;
};
/**
* Find <limit> number of completed tasks in the store and process them,
* determining if any actions need to be taken in regards to the graph state,
* and finally deleting the task documents from the store.
*
* @param {Number} limit
* @returns {Observable}
* @memberOf CompletedTaskPoller
*/
CompletedTaskPoller.prototype.processCompletedTasks = function(limit) {
return Rx.Observable.just()
.flatMap(store.findCompletedTasks.bind(store, limit))
.filter(function(tasks) { return !_.isEmpty(tasks); })
.flatMap(this.deleteCompletedGraphs.bind(this))
.flatMap(this.deleteTasks.bind(this))
.catch(this.handleStreamError.bind(this, 'Error processing completed tasks'));
};
/**
* Determine if a graph is finished, and if so mark its finished state
* in the store and publish an event to the messenger.
*
* @param {Object} data
* @returns {Observable}
* @memberOf CompletedTaskPoller
*/
CompletedTaskPoller.prototype.handlePotentialFinishedGraph = function(data) {
var self = this;
assert.object(data, 'data');
assert.string(data.state, 'data.state');
return Rx.Observable.just(data)
.flatMap(function(data) {
if (_.contains(Constants.Task.FailedStates, data.state)) {
data.failed = true;
data.done = true;
return Rx.Observable.just(data);
}
return store.checkGraphSucceeded(data);
})
.flatMap(function(_data) {
if (_data.done) {
var graphState;
if (data.failed) {
graphState = Constants.Task.States.Failed;
} else {
graphState = Constants.Task.States.Succeeded;
}
return store.setGraphDone(graphState, _data)
.then(function(graph) {
// Don't publish duplicate events if we've already set the graph as done
// prior, but DO continue with the outer stream so that we delete
// the task document whose existence triggered this check.
if (!_.isEmpty(graph)) {
return Promise.resolve()
.tap(function() {
return graphProgressService.publishGraphFinished(
graph,
graphState,
{swallowError: true}
);
})
.then(function() {
return self._publishGraphFinished(graph);
});
}
});
}
return Rx.Observable.just();
});
};
/**
* Evaluate an array of finished tasks, and check if a graph is finished
* for each task that is marked as being potentially terminal.
*
* @param {Array} tasks
* @returns {Observable}
* @memberOf CompletedTaskPoller
*/
CompletedTaskPoller.prototype.deleteCompletedGraphs = function(tasks) {
assert.arrayOfObject(tasks, 'tasks array');
var terminalTasks = _.transform(tasks, function(result, task) {
// Collect only terminal tasks (tasks that we know are the last or
// one of the last tasks to run in a graph) for determing graph completion checks.
// This logic handles cases where all tasks in a graph are completed,
// but the graph completion event was dropped by the scheduler, either
// due to high load or a process failure. Hooking onto task deletion
// allows us to avoid doing full collection scans against graphobjects
// to find potential unfinished graphs.
if (_.contains(task.terminalOnStates, task.state)) {
result.push(task);
}
});
if (_.isEmpty(terminalTasks)) {
return Rx.Observable.just(tasks);
}
return Rx.Observable.from(terminalTasks)
.flatMap(this.handlePotentialFinishedGraph.bind(this))
.bufferWithCount(terminalTasks.length)
.map(tasks)
.catch(this.handleStreamError.bind(this, 'Error handling potential finished graphs'));
};
/**
* Delete task documents from the store
*
* @param {Array} tasks
* @returns {Observable}
* @memberOf CompletedTaskPoller
*/
CompletedTaskPoller.prototype.deleteTasks = function(tasks) {
assert.arrayOfObject(tasks, 'tasks array');
var objectIds = _.map(tasks, function(task) {
return task._id;
});
return Rx.Observable.just(objectIds)
.flatMap(store.deleteTasks.bind(store))
.catch(this.handleStreamError.bind(this, 'Error deleting completed tasks'));
};
/**
* Log handler for observable onError failure events.
*
* @param {String} msg
* @param {Object} err
* @returns {Observable}
* @memberOf CompletedTaskPoller
*/
CompletedTaskPoller.prototype.handleStreamError = function(msg, err) {
logger.error(msg, {
// stacks on some error objects (particularly from the assert library)
// don't get printed if part of the error object so separate them out here.
error: _.omit(err, 'stack'),
stack: err.stack
});
return Rx.Observable.empty();
};
/**
* Log handler for debug messaging during development/debugging. Only
* works when this.debug is set to true;
*
* @param {String} msg
* @param {Object} data
* @returns {Observable}
* @memberOf CompletedTaskPoller
*/
CompletedTaskPoller.prototype.handleStreamDebug = function(msg, data) {
if (this.debug) {
logger.debug(msg, data);
}
};
/**
* @memberOf CompletedTaskPoller
*/
CompletedTaskPoller.prototype.start = function() {
this.running = true;
this.pollTasks();
};
/**
* @memberOf CompletedTaskPoller
*/
CompletedTaskPoller.prototype.stop = function() {
this.running = false;
};
/**
* @returns {Object} CompletedTaskPoller instance
* @memberOf CompletedTaskPoller
*/
CompletedTaskPoller.create = function(domain, options) {
return new CompletedTaskPoller(domain, options);
};
/**
* Publish a graph finished event over AMQP and via Web Hooks.
*
* @param {Object} graph
* @returns {Promise}
* @memberOf CompletedTaskPoller
*/
CompletedTaskPoller.prototype._publishGraphFinished = function(graph) {
return eventsProtocol.publishGraphFinished(graph.instanceId, {
graphId: graph.instanceId,
graphName: graph.name,
status: graph._status
}, graph.node)
.catch(function(error) {
logger.error('Error publishing graph finished event', {
graphId: graph.instanceId,
_status: graph._status,
error: error
});
});
};
return CompletedTaskPoller;
}