-
Notifications
You must be signed in to change notification settings - Fork 0
/
zo-function.js
120 lines (110 loc) · 3.8 KB
/
zo-function.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
function (items, pipeline) {
pipeline = (pipeline || []);
var runPipeline = function (items, funcs) {
first = _(funcs).head();
if (first) {
first(items, function (it) { runPipeline(it, _(funcs).tail()); });
}
}
var pipelineElement = function (processItem, addItem, options) {
options = (options || {});
var maxOutstandingProcesses = options.limit;
pipeline.push(function (items, next) {
var n = items.length;
var processedItems = [];
var pendingProcesses = [];
var outstandingProcesses = 0;
var canStartAnotherProcess = function () {
return !maxOutstandingProcesses || outstandingProcesses < maxOutstandingProcesses;
}
if (n > 0) {
_(items).each(function (item) {
var process = function (doPending) {
outstandingProcesses++;
processItem(item, function (processedItem) {
addItem(processedItems, item, processedItem);
n--;
outstandingProcesses--;
var pendingProcess = pendingProcesses.shift();
if (canStartAnotherProcess() && pendingProcess) {
pendingProcess();
}
if (n == 0) {
next(processedItems);
}
});
};
if (canStartAnotherProcess()) {
process();
} else {
pendingProcesses.push(process);
}
});
} else {
next(processedItems);
}
});
return zo(items, pipeline);
};
var foldl = function (first, folder) {
pipeline.push(function (items, next) {
var fold = function (foldedResult, index, items, next) {
if (index >= items.length) {
next(foldedResult);
} else {
folder(foldedResult, items[index], function (folded) {
process.nextTick(function () {
fold(folded, index + 1, items, next);
});
});
}
};
fold(first, 0, items, next);
});
return zo(items, pipeline);
};
var foldr = function (first, folder) {
pipeline.push(function (items, next) {
var fold = function (foldedResult, index, items, next) {
if (index < 0) {
next(foldedResult);
} else {
folder(foldedResult, items[index], function (folded) {
process.nextTick(function () {
fold(folded, index - 1, items, next);
});
});
}
};
fold(first, items.length - 1, items, next);
});
return zo(items, pipeline);
};
return {
results: function (f) {
pipeline.push(function (items, next) {
f(items);
});
runPipeline(items, pipeline);
},
map: function (mapper, options) {
return pipelineElement(mapper, function (mappedItems, item, mappedItem) {
mappedItems.push(mappedItem);
}, options);
},
foldr: foldr,
foldl: foldl,
reduce: foldl,
reduceRight: foldr,
select: function (selector, options) {
return pipelineElement(selector, function (selectedItems, item, itemSelected) {
if (itemSelected) selectedItems.push(item);
}, options);
},
each: function (doForEach, options) {
return pipelineElement(doForEach, function (selectedItems, item, itemSelected) {
selectedItems.push(item);
}, options);
},
};
};