Skip to content

Commit

Permalink
aggregated registry refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
Rafal Augustyniak committed Mar 18, 2021
1 parent ef9e9f7 commit c3a1be5
Show file tree
Hide file tree
Showing 6 changed files with 305 additions and 263 deletions.
2 changes: 1 addition & 1 deletion index.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@ exports.exponentialBuckets = require('./lib/bucketGenerators').exponentialBucket
exports.collectDefaultMetrics = require('./lib/defaultMetrics');

exports.aggregators = require('./lib/metricAggregators').aggregators;
exports.AggregatorRegistry = require('./lib/cluster');
exports.AggregatorRegistry = require('./lib/aggregatorRegistry');
95 changes: 95 additions & 0 deletions lib/aggregatorRegistry.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
'use strict';

const Registry = require('./registry');
const { Grouper } = require('./util');
const { aggregators } = require('./metricAggregators');
const WorkerThreads = require('./workerThreads');
const Cluster = require('./cluster');

class AggregatorRegistry extends Registry {
constructor() {
super();

this.registries = [Registry.globalRegistry];
this.workerThreads = new WorkerThreads({ registries: this.registries });
this.cluster = new Cluster({ registries: this.registries });
}

setWorkers(workers) {
this.workerThreads.set(workers);
}

clusterMetrics() {
return this.cluster.getMetrics();
}

workerThreadsMetrics() {
return this.workerThreads.getMetrics();
}

/**
* Sets the registry or registries to be aggregated. Call from workers to
* use a registry/registries other than the default global registry.
* @param {Array<Registry>|Registry} registries Registry or registries to be
* aggregated.
* @return {void}
*/
setRegistries(registries) {
if (!Array.isArray(registries)) registries = [registries];

registries.forEach(registry => {
if (!(registry instanceof Registry)) {
throw new TypeError(`Expected Registry, got ${typeof registry}`);
}
});

this.registries = registries;
}

/**
* Creates a new Registry instance from an array of metrics that were
* created by `registry.getMetricsAsJSON()`. Metrics are aggregated using
* the method specified by their `aggregator` property, or by summation if
* `aggregator` is undefined.
* @param {Array} metrics Array of metrics, each of which created by
* `registry.getMetricsAsJSON()`.
* @return {Registry} aggregated registry.
*/
static aggregate(metrics) {
const aggregatedRegistry = new Registry();
const metricsByName = new Grouper();

// Gather by name
metrics.forEach(m =>
m.forEach(metric => {
metricsByName.add(metric.name, metric);
}),
);

// Aggregate gathered metrics.
metricsByName.forEach(metric => {
const aggregatorName = metric[0].aggregator;
const aggregatorFn = aggregators[aggregatorName];

if (typeof aggregatorFn !== 'function') {
throw new Error(`'${aggregatorName}' is not a defined aggregator.`);
}

const aggregatedMetric = aggregatorFn(metric);
// NB: The 'omit' aggregator returns undefined.
if (aggregatedMetric) {
const aggregatedMetricWrapper = Object.assign(
{
get: () => aggregatedMetric,
},
aggregatedMetric,
);
aggregatedRegistry.registerMetric(aggregatedMetricWrapper);
}
});

return aggregatedRegistry;
}
}

module.exports = AggregatorRegistry;
Loading

0 comments on commit c3a1be5

Please sign in to comment.