Skip to content

Commit

Permalink
Add event listeners synchronously, use legacy Logstash mapping types …
Browse files Browse the repository at this point in the history
…with ES <= 5.x
  • Loading branch information
kherock committed Sep 22, 2018
1 parent f9d8f57 commit f8dac9c
Showing 1 changed file with 79 additions and 72 deletions.
151 changes: 79 additions & 72 deletions lib/appmetrics-elk.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,52 +32,42 @@ module.exports = function connector(emitter, opts) {

let bulkarray = [];

const startMonitoring = () => {
const preConfigured = ['memory'];
const preconfiguredEvents = ['memory'];

/*
* Memory is a special case as we change the structure of the data
*/
emitter.on('memory', memory => {
const data = {
process: {
private: memory.private,
physical: memory.physical,
virtual: memory.virtual,
},
system: {
physical: memory.physical_used,
total: memory.physical_total,
},
};
publishData('memory', memory.time, data);
});

const registerCallback = (value, type) => {
if (preConfigured.indexOf(type) === -1) {
emitter.on(type, eventdata => {
const data = {};
const { properties } = value.body.properties[type];
for (const prop of Object.keys(properties)) {
data[prop] = eventdata[prop];
}
publishData(type, eventdata.time, data);
});
}
/*
* Memory is a special case as we change the structure of the data
*/
emitter.on('memory', memory => {
const data = {
process: {
private: memory.private,
physical: memory.physical,
virtual: memory.virtual,
},
system: {
physical: memory.physical_used,
total: memory.physical_total,
},
};
publishData('memory', memory.time, data);
});

/*
* Register a callback for every event type we have a mapping for
*/
getJSONfromDir('mappings', registerCallback);

setInterval(() => {
if (bulkarray.length > 0) {
publishBulk(bulkarray);
bulkarray = [];
/*
* Register a callback for every event type we have a mapping for
*/
mapJsonInDir('mappings', (value, type) => {
if (preconfiguredEvents.indexOf(type) !== -1) {
return;
}
emitter.on(type, eventData => {
const data = {};
const { properties } = value.body.properties[type];
for (const prop of Object.keys(properties)) {
data[prop] = eventData[prop];
}
}, 5000);
};
publishData(type, eventData.time, data);
});
});

/*
* Publishing data to Elasticsearch (ES)
Expand All @@ -97,17 +87,6 @@ module.exports = function connector(emitter, opts) {
bulkarray.push(action, doc);
};

const publishBulk = () => {
esClient
.bulk({
index: opts.index,
type: 'doc',
body: bulkarray,
})
// .then(res => console.log('Published bulk update ' + res)
.catch(err => console.log('Error doing bulk update ' + err));
};

esClient
.search({
index: '.kibana',
Expand Down Expand Up @@ -140,47 +119,55 @@ module.exports = function connector(emitter, opts) {
}

/*
* Check to see if the appmetrics index exists in ElasticSearch. if
* Check to see if the appmetrics index exists in Elasticsearch. if
* is doesn't, then create the index to store the data into and
* upload the data type mappings
*/
return esClient.indices
.exists({ index: opts.index })
.then(exists => !exists && esClient.indices.create({ index: opts.index }))
.then(() => putMappings(esClient, esVersion, opts.index));
.then(() => putMappings(esClient, esVersion, opts.index))
.then(() => esVersion);
})
.catch(err => console.error('Failed to create index', err.stack))
)
.then(startMonitoring);
.then(esVersion => {
publishBulk(esClient, esVersion, opts.index, bulkarray);
setInterval(publishBulk, 5000, null, esClient, esVersion, opts.index, bulkarray);
});

return emitter;
};

function getJSONfromDir(directory, callback) {
function mapJsonInDir(directory, callback) {
const dirPath = path.join(__dirname, '..', directory);
fs.readdir(dirPath, (err, files) => {
if (err) {
console.log('Failed to read from ' + dirPath);
} else {
for (const filename of files) {
const file = path.join(dirPath, filename);
const basename = path.basename(filename, '.json');
callback.call(this, JSON.parse(fs.readFileSync(file, 'utf8')), basename);
}
try {
const files = fs.readdirSync(dirPath);
for (const filename of files) {
const file = path.join(dirPath, filename);
const basename = path.basename(filename, '.json');
callback(JSON.parse(fs.readFileSync(file, 'utf8')), basename);
}
});
} catch (err) {
console.error('Failed to read from ' + dirPath);
console.error(err);
}
}

/*
* Put the mappings for the data we create into the index. It
* shouldn't matter if we replace existing records as they should be the same...
*/
function putMappings(esClient, esVersion, index) {
getJSONfromDir('mappings', mapping => {
mapJsonInDir('mappings', (mapping, type) => {
mapping.index = index;
if (esVersion <= 2) {
backportFieldTypes(mapping);
}
if (esVersion <= 5) {
mapping.type = type;
delete mapping.body.properties.type;
}
esClient.indices
.putMapping(mapping)
// .then(res => console.log('Put mapping for ' + fileName))
Expand All @@ -189,7 +176,7 @@ function putMappings(esClient, esVersion, index) {
}

function putIndexes(esClient, esVersion, index) {
getJSONfromDir('indexes', indexPattern => {
mapJsonInDir('indexes', indexPattern => {
indexPattern.id = 'index-pattern:' + index;
indexPattern.body[indexPattern.body.type].title = index;
if (esVersion <= 5) {
Expand All @@ -203,7 +190,7 @@ function putIndexes(esClient, esVersion, index) {
}

function putDashboards(esClient, esVersion) {
getJSONfromDir('dashboards', dashboard => {
mapJsonInDir('dashboards', dashboard => {
if (esVersion <= 5) {
backportKibanaDoc(dashboard);
}
Expand All @@ -215,10 +202,11 @@ function putDashboards(esClient, esVersion) {
}

function putCharts(esClient, esVersion, index) {
getJSONfromDir('charts', chart => {
mapJsonInDir('charts', chart => {
const { kibanaSavedObjectMeta } = chart.body[chart.body.type];
const searchSourceJSON = JSON.parse(kibanaSavedObjectMeta.searchSourceJSON);
kibanaSavedObjectMeta.searchSourceJSON = JSON.stringify(Object.assign(searchSourceJSON, { index }));
searchSourceJSON.index = index;
kibanaSavedObjectMeta.searchSourceJSON = JSON.stringify(searchSourceJSON);
if (esVersion <= 5) {
backportKibanaDoc(chart);
}
Expand All @@ -229,6 +217,25 @@ function putCharts(esClient, esVersion, index) {
});
}

function publishBulk(esClient, esVersion, index, actions) {
if (!actions.length) return;
if (esVersion <= 5) {
for (let i = 0; i < actions.length; i += 2) {
actions[i].index._type = actions[i + 1].type;
delete actions[i + 1].type;
}
}
esClient
.bulk({
index,
type: 'doc',
body: actions,
})
// .then(res => console.log('Published bulk update ' + res)
.catch(err => console.log('Error doing bulk update ' + err));
actions.length = 0;
}

/*
* Converts Kibana 6.x documents back to 2.x/5.x
*/
Expand Down

0 comments on commit f8dac9c

Please sign in to comment.