-
Notifications
You must be signed in to change notification settings - Fork 1
/
worker.js
107 lines (97 loc) · 3.25 KB
/
worker.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
const uuid = require('uuid');
const esClient = require('./es');
const config = require('./config');
let counter = 0;
setInterval(() => {
console.log(`I have processed ${counter} tasks in the past 5s`);
counter = 0;
}, 5000);
(async () => {
while (true) {
const uniqueId = uuid.v4();
// Claim tasks
const updateByQueryResult = await esClient.updateByQuery({
index: config.esIndex,
refresh: true,
max_docs: config.workerConcurrency,
conflicts: 'proceed',
body: {
query: {
bool: {
filter: {
bool: {
must: [
{
term: {
status: 'idle',
}
},
{
range: {
runAt: { lte: 'now' }
}
}
]
}
}
}
},
script: {
source: `ctx._source.owner=params.uniqueId; ctx._source.status=params.status;`,
lang: 'painless',
params: {
uniqueId,
status: 'running',
}
},
sort: [
{ runAt: { order: 'asc' } }
]
}
});
if (updateByQueryResult.total < config.workerConcurrency) {
console.log(`Did not claim full amount of tasks, only ${updateByQueryResult.total} but should of claimed ${config.workerConcurrency}`);
}
// Get claimed tasks
const result = await esClient.search({
index: config.esIndex,
size: config.workerConcurrency,
body: {
query: {
bool: {
filter: {
term: {
owner: uniqueId,
}
}
}
}
}
});
if (result.hits.total.value < updateByQueryResult.total) {
console.log(`Did not fetch all claimed tasks, only ${result.hits.total.value}`);
}
// Finished tasks
const params = [];
for (const hit of result.hits.hits) {
params.push({ update: { _id: hit._id } });
params.push({
doc: {
...hit._source,
status: 'idle',
runAt: new Date(),
owner: null
}
})
}
const bulkResult = await esClient.bulk({
refresh: false,
index: config.esIndex,
body: params,
});
if (bulkResult.errors === true) {
console.log('Encountered errors in bulk request');
}
counter += result.hits.total.value;
}
})();