-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex.js
103 lines (87 loc) · 2.65 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
var fs = require('fs'),
cp = require('child_process'),
mkdirp = require('mkdirp'),
path = require('path');
module.exports = exports = function(data, mapper, reducer, cb) {
var time = Date.now(),
dir = '/tmp/js_hadoop_' + time;
mkdirp(dir, function(err) {
if (err) throw new Error(err);
var mfile = path.join(dir, 'mapper'),
rfile = path.join(dir, 'reducer'),
dfile = path.join(dir, 'data'),
mh = fs.openSync(mfile, 'w'),
rh = fs.openSync(rfile, 'w'),
dh = fs.openSync(dfile, 'w');
// Write mapper/reducer and make executable
fs.writeSync(mh, createMapper(mapper));
fs.chmodSync(mfile, 0755);
fs.writeSync(rh, toString(reducer));
fs.chmodSync(rfile, 0755);
// Write data to file
fs.writeSync(dh, makeDataFile(data));
// Call hadoop
var call = cp.spawn(path.join(__dirname, 'run.sh'), [
mfile,
rfile,
dfile,
path.join(dir, 'output')
], { cwd: __dirname });
// Ignoring stdout currently
call.stdout.on('data', function(data) {
//process.stdout.write(data);
});
call.stderr.on('data', function(data) {
//process.stdout.write(data);
});
call.on('error', function(e) {
throw new Error(e);
});
call.on('close', function(code) {
if (code !== 0) throw new Error('Nonzero exit code: %s', code);
// Must get data back now
fs.readdir(path.join(dir, 'output'), function(err, files) {
if (err) throw new Error(err);
var results = [];
for (var i in files) {
var f = files[i];
if (f.match(/^part-/)) {
var data = fs.readFileSync(path.join(dir, 'output', f), 'utf8')
.replace(/\r\n/g, '\n'),
lines = data.split('\n');
for (var j in lines) {
var l = lines[j];
if (l == '') continue;
var sp = l.split('\t'),
d = {
key: sp.shift(),
value: sp.join('\t')
}
results.push(d);
}
}
}
cb(results);
});
});
});
}
function toString(fn) {
return 'module.exports = exports = ' + fn.toString() + ';';
}
function createMapper(mapper) {
return 'module.exports = exports = ' +
'function(line, out) {\n' +
'\ttry{ var d = JSON.parse(line);}\n' +
'\tcatch(e) { return; }\n' +
'\tvar m = ' + mapper.toString() + ';\n' +
'\tm(d, out);\n' +
'};';
}
function makeDataFile(data) {
var str = '';
for (var i in data) {
str += JSON.stringify(data[i]) + '\n';
}
return str;
}