Skip to content

Commit

Permalink
* add GetHistory logId to be able to better trace calls
Browse files Browse the repository at this point in the history
* sync aggregate.js
  • Loading branch information
Apollon77 committed May 9, 2022
1 parent acb4a65 commit 6d52e9c
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 28 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ sendTo('influxdb.0', 'getHistory', {
Possible options:
- **start** - (optional) time in ms - *Date.now()*'
- **end** - (optional) time in ms - *Date.now()*', by default is (now + 5000 seconds)
- **step** - (optional) used in aggregate (m4, max, min, average, total) step in ms of intervals
- **step** - (optional) used in aggregate (max, min, average, total, ...) step in ms of intervals
- **count** - number of values if aggregate is 'onchange' or number of intervals if other aggregate method. Count will be ignored if step is set, else default is 500 if not set
- **from** - if *from* field should be included in answer
- **ack** - if *ack* field should be included in answer
Expand Down
32 changes: 16 additions & 16 deletions lib/aggregate.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ function initAggregate(options) {
options.integralUnit *= 1000; // Convert to milliseconds
}

log(`Initialize: maxIndex = ${options.maxIndex}, step = ${options.step}, start = ${options.start}, end = ${options.end}`);
log(`${options.logId} Initialize: maxIndex = ${options.maxIndex}, step = ${options.step}, start = ${options.start}, end = ${options.end}`);
// pre-fill the result with timestamps (add one before start and one after end)
for (let i = 0; i <= options.maxIndex + 2; i++) {
options.result[i] = {
Expand Down Expand Up @@ -194,10 +194,10 @@ function aggregationLogic(data, index, options) {
}
} else if (options.aggregate === 'percentile' || options.aggregate === 'quantile') {
options.quantileDatapoints[index].push(data.val);
log(`Quantile ${index}: Add ts= ${data.ts} val=${data.val}`);
log(`${options.logId} Quantile ${index}: Add ts= ${data.ts} val=${data.val}`);
} else if (options.aggregate === 'integral') {
options.integralDatapoints[index].push(data);
log(`Integral ${index}: Add ts= ${data.ts} val=${data.val}`);
log(`${options.logId} Integral ${index}: Add ts= ${data.ts} val=${data.val}`);
}
}

Expand Down Expand Up @@ -401,13 +401,13 @@ function finishAggregation(options) {
ts: indexStartTs,
val: options.integralDatapoints[k - 1][options.integralDatapoints[k - 1].length - 1].val
});
log(`Integral: ${k}: Added start entry for interval with ts=${indexStartTs}, val=${options.integralDatapoints[k][0].val}`);
log(`${options.logId} Integral: ${k}: Added start entry for interval with ts=${indexStartTs}, val=${options.integralDatapoints[k][0].val}`);
} else if (options.integralDatapoints[k].length && options.integralDatapoints[k][0].ts > indexStartTs) {
options.integralDatapoints[k].unshift({
ts: indexStartTs,
val: options.integralDatapoints[k][0].val
});
log(`Integral: ${k}: Added start entry for interval with ts=${indexStartTs}, val=${options.integralDatapoints[k][0].val} with same value as first point in interval because no former datapoint was found`);
log(`${options.logId} Integral: ${k}: Added start entry for interval with ts=${indexStartTs}, val=${options.integralDatapoints[k][0].val} with same value as first point in interval because no former datapoint was found`);
} else if (options.integralDatapoints[k].length && options.integralDatapoints[k][0].ts < indexStartTs) {
// if the first entry of this interval started before the start of the interval, search for the last value before the start of the interval, add as start entry
let preFirstIndex = null;
Expand All @@ -422,12 +422,12 @@ function finishAggregation(options) {
ts: indexStartTs,
val: options.integralDatapoints[k][preFirstIndex].val
});
log(`Integral: ${k}: Remove ${preFirstIndex + 1} entries and add start entry for interval with ts=${indexStartTs}, val=${options.integralDatapoints[k][0].val}`);
log(`${options.logId} Integral: ${k}: Remove ${preFirstIndex + 1} entries and add start entry for interval with ts=${indexStartTs}, val=${options.integralDatapoints[k][0].val}`);
}
}

const vals = options.integralDatapoints[k].map(dp => `[${dp.ts}, ${dp.val}]`);
log(`Integral: ${k}: ${options.integralDatapoints[k].length} datapoints for interval for ${indexStartTs} - ${indexEndTs}: ${vals.join(',')}`);
log(`${options.logId} Integral: ${k}: ${options.integralDatapoints[k].length} datapoints for interval for ${indexStartTs} - ${indexEndTs}: ${vals.join(',')}`);

if (!options.result[k].val.ts) {
options.result[k].val.ts = Math.round(options.start + (((k - 1) + 0.5) * options.step));
Expand All @@ -437,11 +437,11 @@ function finishAggregation(options) {
const valEndTs = options.integralDatapoints[k][kk + 1] ? Math.min(options.integralDatapoints[k][kk + 1].ts, indexEndTs) : indexEndTs;
let valDuration = valEndTs - options.integralDatapoints[k][kk].ts;
if (valDuration < 0) {
log(`Integral: ${k}[${kk}] data do not belong to this interval, ignore ${JSON.stringify(options.integralDatapoints[k][kk])} (vs. ${valEndTs})`)
log(`${options.logId} Integral: ${k}[${kk}] data do not belong to this interval, ignore ${JSON.stringify(options.integralDatapoints[k][kk])} (vs. ${valEndTs})`)
break;
}
if (valDuration === 0) {
log(`Integral: ${k}[${kk}] valDuration zero, ignore ${JSON.stringify(options.integralDatapoints[k][kk])}`)
log(`${options.logId} Integral: ${k}[${kk}] valDuration zero, ignore ${JSON.stringify(options.integralDatapoints[k][kk])}`)
continue;
}
let valStart = parseFloat(options.integralDatapoints[k][kk].val) || 0;
Expand All @@ -450,7 +450,7 @@ function finishAggregation(options) {
if (options.integralInterpolation !== 'linear' || valStart === valEnd) {
const integralAdd = valStart * valDuration / options.integralUnit;
// simple rectangle linear interpolation
log(`Integral: ${k}[${kk}] : Add ${integralAdd} from val=${valStart} for ${valDuration}`);
log(`${options.logId} Integral: ${k}[${kk}] : Add ${integralAdd} from val=${valStart} for ${valDuration}`);
options.result[k].val.val += integralAdd;
} else if ((valStart >= 0 && valEnd >= 0) || (valStart <= 0 && valEnd <= 0)) {
// start and end are both positive or both negative, or one is 0
Expand All @@ -465,7 +465,7 @@ function finishAggregation(options) {
const rectPart = minVal * valDuration / options.integralUnit;
const trianglePart = (maxVal - minVal) * valDuration * 0.5 / options.integralUnit;
const integralAdd = (rectPart + trianglePart) * multiplier;
log(`Integral: ${k}[${kk}] : Add R${rectPart} + T${trianglePart} => ${integralAdd} from val=${valStart} to ${valEnd} for ${valDuration}`);
log(`${options.logId} Integral: ${k}[${kk}] : Add R${rectPart} + T${trianglePart} => ${integralAdd} from val=${valStart} to ${valEnd} for ${valDuration}`);
options.result[k].val.val += integralAdd;
} else {
// Values are on different sides of 0, so we need to find the 0 crossing
Expand All @@ -474,7 +474,7 @@ function finishAggregation(options) {
const trianglePart1 = valStart * zeroCrossing * 0.5 / options.integralUnit;
const trianglePart2 = valEnd * (valDuration - zeroCrossing) * 0.5 / options.integralUnit;
const integralAdd = trianglePart1 + trianglePart2;
log(`Integral: ${k}[${kk}] : Add T${trianglePart1} + T${trianglePart2} => ${integralAdd} from val=${valStart} to ${valEnd} for ${valDuration} (zero crossing ${zeroCrossing})`);
log(`${options.logId} Integral: ${k}[${kk}] : Add T${trianglePart1} + T${trianglePart2} => ${integralAdd} from val=${valStart} to ${valEnd} for ${valDuration} (zero crossing ${zeroCrossing})`);
options.result[k].val.val += integralAdd;
}
}
Expand Down Expand Up @@ -506,7 +506,7 @@ function finishAggregation(options) {
ts: options.result[k].val.ts,
val: quantile(options.quantile, options.quantileDatapoints[k])
};
log(`Quantile ${k} ${options.result[k].ts}: ${options.quantileDatapoints[k].join(', ')} -> ${options.result[k].val}`);
log(`${options.logId} Quantile ${k} ${options.result[k].ts}: ${options.quantileDatapoints[k].join(', ')} -> ${options.result[k].val}`);
} else {
// no one value in this interval
options.result.splice(k, 1);
Expand Down Expand Up @@ -540,7 +540,7 @@ function beautify(options) {
log = options.log || console.log;
}

log(`Beautify: ${options.result.length} results`);
log(`${options.logId} Beautify: ${options.result.length} results`);
let preFirstValue = null;
let postLastValue = null;

Expand Down Expand Up @@ -613,7 +613,7 @@ function beautify(options) {
// interpolate
const y = preFirstValue.val + (firstY - preFirstValue.val) * (options.start - preFirstValue.ts) / (firstTS - preFirstValue.ts);
options.result.unshift({ts: options.start, val: y, i: true});
log(`interpolate ${y} from ${preFirstValue.val} to ${firstY} as first return value`);
log(`${options.logId} interpolate ${y} from ${preFirstValue.val} to ${firstY} as first return value`);
} else {
options.result.unshift({ts: options.start, val: null});
}
Expand Down Expand Up @@ -652,7 +652,7 @@ function beautify(options) {
// make interpolation
const _y = lastY + (postLastValue.val - lastY) * (options.end - lastTS) / (postLastValue.ts - lastTS);
options.result.push({ts: options.end, val: _y, i: true});
log(`interpolate ${_y} from ${lastY} to ${postLastValue.val} as last return value`);
log(`${options.logId} interpolate ${_y} from ${lastY} to ${postLastValue.val} as last return value`);
} else {
options.result.push({ts: options.end, val: null});
}
Expand Down
24 changes: 13 additions & 11 deletions main.js
Original file line number Diff line number Diff line change
Expand Up @@ -1861,7 +1861,8 @@ function getHistory(adapter, msg) {
quantile: msg.message.options.aggregate === 'quantile' ? parseFloat(msg.message.options.quantile) || 0.5 : null,
integralUnit: msg.message.options.aggregate === 'integral' ? parseInt(msg.message.options.integralUnit, 10) || 60 : null,
integralInterpolation: msg.message.options.aggregate === 'integral' ? msg.message.options.integralInterpolation || 'none' : null,
removeBorderValues: msg.message.options.removeBorderValues || false
removeBorderValues: msg.message.options.removeBorderValues || false,
logId: (msg.message.id ? msg.message.id : 'all') + Date.now() + Math.random()
};

if (!options.start && options.count) {
Expand All @@ -1881,7 +1882,7 @@ function getHistory(adapter, msg) {

const debugLog = options.debugLog = !!(adapter._influxDPs[options.id] && adapter._influxDPs[options.id][adapter.namespace] && adapter._influxDPs[options.id][adapter.namespace].enableDebugLogs);

debugLog && adapter.log.debug(`getHistory (InfluxDB1) call: ${JSON.stringify(options)}`);
debugLog && adapter.log.debug(`${options.logId} getHistory (InfluxDB1) call: ${JSON.stringify(options)}`);

if (options.id && adapter._aliasMap[options.id]) {
options.id = adapter._aliasMap[options.id];
Expand Down Expand Up @@ -2034,7 +2035,7 @@ function getHistory(adapter, msg) {
query = query + addQuery;
}

debugLog && adapter.log.debug(query);
debugLog && adapter.log.debug(`${options.logId} History-Queries to execute: ${query}`);

storeBufferedSeries(adapter, options.id, (err, storedCount) => {
if (err) {
Expand All @@ -2051,7 +2052,7 @@ function getHistory(adapter, msg) {
setConnected(adapter, true);
}

debugLog && adapter.log.debug(`Response rows: ${JSON.stringify(rows)}`);
debugLog && adapter.log.debug(`${options.logId} Response rows: ${JSON.stringify(rows)}`);

let result = [];

Expand Down Expand Up @@ -2114,7 +2115,8 @@ function getHistoryIflx2(adapter, msg) {
quantile: msg.message.options.aggregate === 'quantile' ? parseFloat(msg.message.options.quantile) || 0.5 : null,
integralUnit: msg.message.options.aggregate === 'integral' ? parseInt(msg.message.options.integralUnit, 10) || 60 : null,
integralInterpolation: msg.message.options.aggregate === 'integral' ? msg.message.options.integralInterpolation || 'none' : null,
removeBorderValues: msg.message.options.removeBorderValues || false
removeBorderValues: msg.message.options.removeBorderValues || false,
logId: (msg.message.id ? msg.message.id : 'all') + Date.now() + Math.random()
};

if (!options.start && options.count) {
Expand All @@ -2134,7 +2136,7 @@ function getHistoryIflx2(adapter, msg) {

const debugLog = options.debugLog = !!(adapter._influxDPs[options.id] && adapter._influxDPs[options.id][adapter.namespace] && adapter._influxDPs[options.id][adapter.namespace].enableDebugLogs);

debugLog && adapter.log.debug(`getHistory (InfluxDB2) call: ${JSON.stringify(options)}`);
debugLog && adapter.log.debug(`${options.logId} getHistory (InfluxDB2) call: ${JSON.stringify(options)}`);

if (options.id && adapter._aliasMap[options.id]) {
options.id = adapter._aliasMap[options.id];
Expand Down Expand Up @@ -2212,7 +2214,7 @@ function getHistoryIflx2(adapter, msg) {
supportsAggregates = false;
} else if (error) {
if (error.message.includes('type conflict: bool')) {
debugLog && adapter.log.debug(`Bool check error: ${error.message}`);
debugLog && adapter.log.debug(`${options.logId} Bool check error: ${error.message}`);
supportsAggregates = true;
error = null;
} else {
Expand All @@ -2223,7 +2225,7 @@ function getHistoryIflx2(adapter, msg) {
}, msg.callback);
}
} else {
debugLog && adapter.log.debug(`Bool check result: ${JSON.stringify(rslt)}`);
debugLog && adapter.log.debug(`${options.logId} Bool check result: ${JSON.stringify(rslt)}`);
if (rslt.find(r => r.error && r.error.includes('type conflict: bool'))) {
supportsAggregates = true;
} else {
Expand All @@ -2238,7 +2240,7 @@ function getHistoryIflx2(adapter, msg) {
}
}
if (!supportsAggregates) {
debugLog && adapter.log.debug(`Measurement ${options.id} seems to be no number - skipping aggregation options`);
debugLog && adapter.log.debug(`${options.logId} Measurement ${options.id} seems to be no number - skipping aggregation options`);
}

const fluxQueries = [];
Expand Down Expand Up @@ -2340,7 +2342,7 @@ function getHistoryIflx2(adapter, msg) {
fluxQueries.push(addFluxQuery);
}

debugLog && adapter.log.debug(`History-queries to execute: ${fluxQueries}`);
debugLog && adapter.log.debug(`${options.logId} History-queries to execute: ${fluxQueries}`);

// if specific id requested
adapter._client.queries(fluxQueries, (err, rows) => {
Expand All @@ -2353,7 +2355,7 @@ function getHistoryIflx2(adapter, msg) {
setConnected(adapter, true);
}

debugLog && adapter.log.debug(`Parsing retrieved rows:${JSON.stringify(rows)}`);
debugLog && adapter.log.debug(`${options.logId} Parsing retrieved rows:${JSON.stringify(rows)}`);

let result = [];

Expand Down

0 comments on commit 6d52e9c

Please sign in to comment.