From d793caff5f91f2e1c6fb3b17754f5e915c0f2f62 Mon Sep 17 00:00:00 2001 From: Mattk70 Date: Sun, 24 Mar 2024 10:40:55 +0000 Subject: [PATCH] Added a queue to manage memory for wav file processing - readstream pauses until backlog of chunks to process is sufficiently small (100 chunks) File not found error toast added. Not implemented for compressed files, since the ffmpeg conversion is sufficiently slow to prevent a runaway backlog, it seems. Also fixed erroneous analysis rate reporting due to selection analyses. --- js/tracking.js | 2 +- js/ui.js | 19 ++--- js/worker.js | 188 +++++++++++++++++++++++++++---------------------- 3 files changed, 114 insertions(+), 95 deletions(-) diff --git a/js/tracking.js b/js/tracking.js index 01b2dd08..c67ea88f 100644 --- a/js/tracking.js +++ b/js/tracking.js @@ -1,5 +1,5 @@ const DEBUG = false; -const ID_SITE = 2; +const ID_SITE = 3; function trackEvent(uuid, event, action, name, value){ diff --git a/js/ui.js b/js/ui.js index 2871daa4..a79a6dde 100644 --- a/js/ui.js +++ b/js/ui.js @@ -862,6 +862,7 @@ function resetDiagnostics() { // Worker listeners function analyseReset() { + clearActive(); DOM.fileNumber.textContent = ''; if (STATE.mode === 'analyse') PREDICTING = true; resetDiagnostics(); @@ -2921,14 +2922,16 @@ function onChartData(args) { // DIAGNOSTICS: t1_analysis = Date.now(); const analysisTime = ((t1_analysis - t0_analysis) / 1000).toFixed(2); - DIAGNOSTICS['Analysis Duration'] = analysisTime + ' seconds'; - const rate = (DIAGNOSTICS['Audio Duration'] / analysisTime); - DIAGNOSTICS['Analysis Rate'] = rate.toFixed(0) + 'x faster than real time performance.'; - trackEvent(config.UUID, `${config.model}-${config.backend}`, 'Audio Duration', config.backend, Math.round(DIAGNOSTICS['Audio Duration'])); - trackEvent(config.UUID, `${config.model}-${config.backend}`, 'Analysis Duration', config.backend, parseInt(analysisTime)); - trackEvent(config.UUID, `${config.model}-${config.backend}`, 'Analysis Rate', config.backend, parseInt(rate)); - STATE.selection || generateToast({ message:'Analysis complete.'}) - activateResultFilters(); + if (! STATE.selection){ + DIAGNOSTICS['Analysis Duration'] = analysisTime + ' seconds'; + const rate = (DIAGNOSTICS['Audio Duration'] / analysisTime); + DIAGNOSTICS['Analysis Rate'] = rate.toFixed(0) + 'x faster than real time performance.'; + trackEvent(config.UUID, `${config.model}-${config.backend}`, 'Audio Duration', config.backend, Math.round(DIAGNOSTICS['Audio Duration'])); + trackEvent(config.UUID, `${config.model}-${config.backend}`, 'Analysis Duration', config.backend, parseInt(analysisTime)); + trackEvent(config.UUID, `${config.model}-${config.backend}`, 'Analysis Rate', config.backend, parseInt(rate)); + generateToast({ message:'Analysis complete.'}) + activateResultFilters(); + } } /* diff --git a/js/worker.js b/js/worker.js index c0ef06ab..12f0fd8e 100644 --- a/js/worker.js +++ b/js/worker.js @@ -1122,6 +1122,18 @@ async function setupCtx(audio, rate, destination) { }; + +function checkBacklog(stream) { + const backlog = sumObjectValues(predictionsRequested) - sumObjectValues(predictionsReceived); + DEBUG && console.log('backlog:', backlog) + if (backlog < 100) { + stream.resume(); + } else { + // If queued value is above 100, wait and check again + setTimeout(() => checkBacklog(stream), 500); // Check every 0.5 seconds + } +} + /** * * @param file @@ -1131,8 +1143,7 @@ async function setupCtx(audio, rate, destination) { */ let predictQueue = []; -let processing = false; - + const getWavePredictBuffers = async ({ file = '', start = 0, end = undefined }) => { @@ -1183,24 +1194,27 @@ const getWavePredictBuffers = async ({ let chunkStart = start * sampleRate; // Changed on.('data') handler because of: https://stackoverflow.com/questions/32978094/nodejs-streams-and-premature-end - readStream.on('readable', () => { - const chunk = readStream.read(); + readStream.on('data', chunk => { + readStream.pause(); if (chunk === null) return; // The stream seems to read one more byte than the end else if (chunk.byteLength <= 1 ) { predictionsReceived[file]++; + readStream.resume(); return } if (aborted) { - readStream.destroy() + readStream.destroy(); return } const audio = Buffer.concat([meta.header, chunk]); predictQueue.push([audio, file, end, chunkStart]); - chunkStart += WINDOW_SIZE * BATCH_SIZE * sampleRate - processPredictQueue(); + chunkStart += WINDOW_SIZE * BATCH_SIZE * sampleRate; + processPredictQueue().then((resolve, reject) => checkBacklog(readStream) ); }) + readStream.on('end', () => readStream.destroy()) + readStream.on('error', err => { console.log(`readstream error: ${err}, start: ${start}, , end: ${end}, duration: ${metadata[file].duration}`); err.code === 'ENOENT' && notifyMissingFile(file); @@ -1209,36 +1223,32 @@ const getWavePredictBuffers = async ({ } async function processPredictQueue(){ - if (processing || predictQueue.length === 0) return; // Exit if already processing or queue is empty - processing = true; // Set processing flag to true - - const [audio, file, end, chunkStart] = predictQueue.shift(); // Dequeue chunk - await setupCtx(audio, undefined, 'model').then(offlineCtx => { - let worker; - if (offlineCtx) { - offlineCtx.startRendering().then((resampled) => { - const myArray = resampled.getChannelData(0); + return new Promise((resolve, reject) => { + const [audio, file, end, chunkStart] = predictQueue.shift(); // Dequeue chunk + setupCtx(audio, undefined, 'model').then(offlineCtx => { + let worker; + if (offlineCtx) { + offlineCtx.startRendering().then((resampled) => { + const myArray = resampled.getChannelData(0); + workerInstance = ++workerInstance >= NUM_WORKERS ? 0 : workerInstance; + worker = workerInstance; + feedChunksToModel(myArray, chunkStart, file, end, worker); + return resolve('done'); + }).catch((error) => { + console.error(`PredictBuffer rendering failed: ${error}, file ${file}`); + updateFilesBeingProcessed(file); + return reject(error) + }); + } else { + console.log('Short chunk', audio.length, 'padding'); + let chunkLength = STATE.model === 'birdnet' ? 144_000 : 72_000; workerInstance = ++workerInstance >= NUM_WORKERS ? 0 : workerInstance; worker = workerInstance; - feedChunksToModel(myArray, chunkStart, file, end, worker); - //chunkStart += WINDOW_SIZE * BATCH_SIZE * sampleRate; - processing = false; // Reset processing flag - processPredictQueue(); // Process next chunk in the queue - }).catch((error) => { - console.error(`PredictBuffer rendering failed: ${error}, file ${file}`); - updateFilesBeingProcessed(file); - processing = false; // Reset processing flag - processPredictQueue(); // Process next chunk in the queue - }); - } else { - console.log('Short chunk', audio.length, 'padding'); - let chunkLength = STATE.model === 'birdnet' ? 144_000 : 72_000; - workerInstance = ++workerInstance >= NUM_WORKERS ? 0 : workerInstance; - worker = workerInstance; - const myArray = new Float32Array(Array.from({ length: chunkLength }).fill(0)); - feedChunksToModel(myArray, chunkStart, file, end); - }}).catch(error => { - console.warn(file, error); + const myArray = new Float32Array(Array.from({ length: chunkLength }).fill(0)); + feedChunksToModel(myArray, chunkStart, file, end); + }}).catch(error => { + console.warn(file, error); + }) }) } @@ -1286,27 +1296,19 @@ const getPredictBuffers = async ({ //STREAM.end(); }); - STREAM.on('readable', async () => { - let chunk = STREAM.read(); - + STREAM.on('data', chunk => { + if (aborted) { command.kill() STREAM.destroy() return } if (chunk === null || chunk.byteLength <= 1) { - // EOF: deal with part-full buffers - if (concatenatedBuffer.length){ - const audio = Buffer.concat([WAV_HEADER, concatenatedBuffer]); - predictQueue.push([audio, file, end, chunkStart]); - processPredictQueue(); - } - DEBUG && console.log('All chunks sent for ', file); - //command.kill(); - resolve('finished') + } else { const bufferList = [concatenatedBuffer, chunk].filter(buf => buf.length > 0); + // try/catch may no longer be necessary try { concatenatedBuffer = Buffer.concat(bufferList); } catch (error) { @@ -1315,16 +1317,29 @@ const getPredictBuffers = async ({ // if we have a full buffer if (concatenatedBuffer.length >= highWaterMark) { + STREAM.pause(); chunk = concatenatedBuffer.subarray(0, highWaterMark); concatenatedBuffer = concatenatedBuffer.subarray(highWaterMark); const audio = Buffer.concat([WAV_HEADER, chunk]) predictQueue.push([audio, file, end, chunkStart]); chunkStart += WINDOW_SIZE * BATCH_SIZE * sampleRate - processPredictQueue(); + //processPredictQueue().then((resolve, reject) => checkBacklog(STREAM) ); + processPredictQueue().then(() => STREAM.resume() ); } } }); + STREAM.on('end', () => { + // EOF: deal with part-full buffers + if (concatenatedBuffer.length){ + const audio = Buffer.concat([WAV_HEADER, concatenatedBuffer]); + predictQueue.push([audio, file, end, chunkStart]); + processPredictQueue(); + } + DEBUG && console.log('All chunks sent for ', file); + resolve('finished') + }) + STREAM.on('error', err => { console.log('stream error: ', err); err.code === 'ENOENT' && notifyMissingFile(file); @@ -1358,7 +1373,7 @@ const fetchAudioBuffer = async ({ if (STATE.audio.normalise) command = command.audioFilter("loudnorm=I=-16:LRA=11:TP=-1.5"); command.on('error', error => { - updateFilesBeingProcessed(file) + UI.postMessage({event: 'generate-alert', message: error.message}) reject(new Error('Error extracting audio segment:', error)); }); command.on('start', function (commandLine) { @@ -1965,43 +1980,43 @@ const parsePredictions = async (response) => { if (! STATE.selection) await generateInsertQuery(latestResult, file).catch( (error) => console.log('Error generating insert query', error)); let [keysArray, speciesIDBatch, confidenceBatch] = latestResult; for (let i = 0; i < keysArray.length; i++) { - let updateUI = false; - let key = parseFloat(keysArray[i]); - const timestamp = metadata[file].fileStart + key * 1000; - const confidenceArray = confidenceBatch[i]; - const speciesIDArray = speciesIDBatch[i]; - for (let j = 0; j < confidenceArray.length; j++) { - let confidence = confidenceArray[j]; - if (confidence < 0.05) break; - confidence*=1000; - let speciesID = speciesIDArray[j]; - updateUI = (confidence > STATE.detect.confidence && (! included.length || included.includes(speciesID))); - if (STATE.selection || updateUI) { - let end, confidenceRequired; - if (STATE.selection) { - const duration = (STATE.selection.end - STATE.selection.start) / 1000; - end = key + duration; - confidenceRequired = STATE.userSettingsInSelection ? - STATE.detect.confidence : 50; - } else { - end = key + 3; - confidenceRequired = STATE.detect.confidence; - } + let updateUI = false; + let key = parseFloat(keysArray[i]); + const timestamp = metadata[file].fileStart + key * 1000; + const confidenceArray = confidenceBatch[i]; + const speciesIDArray = speciesIDBatch[i]; + for (let j = 0; j < confidenceArray.length; j++) { + let confidence = confidenceArray[j]; + if (confidence < 0.05) break; + confidence*=1000; + let speciesID = speciesIDArray[j]; + updateUI = (confidence > STATE.detect.confidence && (! included.length || included.includes(speciesID))); + if (STATE.selection || updateUI) { + let end, confidenceRequired; + if (STATE.selection) { + const duration = (STATE.selection.end - STATE.selection.start) / 1000; + end = key + duration; + confidenceRequired = STATE.userSettingsInSelection ? + STATE.detect.confidence : 50; + } else { + end = key + 3; + confidenceRequired = STATE.detect.confidence; + } if (confidence >= confidenceRequired) { - const { cname, sname } = await memoryDB.getAsync(`SELECT cname, sname FROM species WHERE id = ${speciesID}`).catch( (error) => console.log('Error getting species name', error)); - const result = { - timestamp: timestamp, - position: key, - end: end, - file: file, - cname: cname, - sname: sname, - score: confidence - } + const { cname, sname } = await memoryDB.getAsync(`SELECT cname, sname FROM species WHERE id = ${speciesID}`).catch( (error) => console.log('Error getting species name', error)); + const result = { + timestamp: timestamp, + position: key, + end: end, + file: file, + cname: cname, + sname: sname, + score: confidence + } sendResult(++index, result, false); - // Only show the highest confidence detection, unless it's a selection analysis - if (! STATE.selection) break; - }; + // Only show the highest confidence detection, unless it's a selection analysis + if (! STATE.selection) break; + }; } } } @@ -2057,8 +2072,8 @@ async function parseMessage(e) { if ( !aborted) { predictWorkers[response.worker].isAvailable = true; let worker = await parsePredictions(response).catch( (error) => console.log('Error parsing predictions', error)); - DEBUG && console.log('predictions left for', response.file, predictionsReceived[response.file] - predictionsRequested[response.file]) - const remaining = predictionsReceived[response.file] - predictionsRequested[response.file] + DEBUG && console.log('predictions left for', response.file, predictionsReceived[response.file] - batchChunksToSend[response.file]) + const remaining = predictionsReceived[response.file] - batchChunksToSend[response.file] if (remaining === 0) { if (filesBeingProcessed.length) { processNextFile({ worker: worker }); @@ -2085,6 +2100,7 @@ function updateFilesBeingProcessed(file) { const fileIndex = filesBeingProcessed.indexOf(file); if (fileIndex !== -1) { filesBeingProcessed.splice(fileIndex, 1) + DEBUG && console.log('filesbeingprocessed updated length now :', filesBeingProcessed.length) } if (!filesBeingProcessed.length) { if (!STATE.selection) getSummary();