diff --git a/js/tracking.js b/js/tracking.js index 01b2dd08..c67ea88f 100644 --- a/js/tracking.js +++ b/js/tracking.js @@ -1,5 +1,5 @@ const DEBUG = false; -const ID_SITE = 2; +const ID_SITE = 3; function trackEvent(uuid, event, action, name, value){ diff --git a/js/ui.js b/js/ui.js index 2871daa4..a79a6dde 100644 --- a/js/ui.js +++ b/js/ui.js @@ -862,6 +862,7 @@ function resetDiagnostics() { // Worker listeners function analyseReset() { + clearActive(); DOM.fileNumber.textContent = ''; if (STATE.mode === 'analyse') PREDICTING = true; resetDiagnostics(); @@ -2921,14 +2922,16 @@ function onChartData(args) { // DIAGNOSTICS: t1_analysis = Date.now(); const analysisTime = ((t1_analysis - t0_analysis) / 1000).toFixed(2); - DIAGNOSTICS['Analysis Duration'] = analysisTime + ' seconds'; - const rate = (DIAGNOSTICS['Audio Duration'] / analysisTime); - DIAGNOSTICS['Analysis Rate'] = rate.toFixed(0) + 'x faster than real time performance.'; - trackEvent(config.UUID, `${config.model}-${config.backend}`, 'Audio Duration', config.backend, Math.round(DIAGNOSTICS['Audio Duration'])); - trackEvent(config.UUID, `${config.model}-${config.backend}`, 'Analysis Duration', config.backend, parseInt(analysisTime)); - trackEvent(config.UUID, `${config.model}-${config.backend}`, 'Analysis Rate', config.backend, parseInt(rate)); - STATE.selection || generateToast({ message:'Analysis complete.'}) - activateResultFilters(); + if (! STATE.selection){ + DIAGNOSTICS['Analysis Duration'] = analysisTime + ' seconds'; + const rate = (DIAGNOSTICS['Audio Duration'] / analysisTime); + DIAGNOSTICS['Analysis Rate'] = rate.toFixed(0) + 'x faster than real time performance.'; + trackEvent(config.UUID, `${config.model}-${config.backend}`, 'Audio Duration', config.backend, Math.round(DIAGNOSTICS['Audio Duration'])); + trackEvent(config.UUID, `${config.model}-${config.backend}`, 'Analysis Duration', config.backend, parseInt(analysisTime)); + trackEvent(config.UUID, `${config.model}-${config.backend}`, 'Analysis Rate', config.backend, parseInt(rate)); + generateToast({ message:'Analysis complete.'}) + activateResultFilters(); + } } /* diff --git a/js/worker.js b/js/worker.js index c0ef06ab..12f0fd8e 100644 --- a/js/worker.js +++ b/js/worker.js @@ -1122,6 +1122,18 @@ async function setupCtx(audio, rate, destination) { }; + +function checkBacklog(stream) { + const backlog = sumObjectValues(predictionsRequested) - sumObjectValues(predictionsReceived); + DEBUG && console.log('backlog:', backlog) + if (backlog < 100) { + stream.resume(); + } else { + // If queued value is above 100, wait and check again + setTimeout(() => checkBacklog(stream), 500); // Check every 0.5 seconds + } +} + /** * * @param file @@ -1131,8 +1143,7 @@ async function setupCtx(audio, rate, destination) { */ let predictQueue = []; -let processing = false; - + const getWavePredictBuffers = async ({ file = '', start = 0, end = undefined }) => { @@ -1183,24 +1194,27 @@ const getWavePredictBuffers = async ({ let chunkStart = start * sampleRate; // Changed on.('data') handler because of: https://stackoverflow.com/questions/32978094/nodejs-streams-and-premature-end - readStream.on('readable', () => { - const chunk = readStream.read(); + readStream.on('data', chunk => { + readStream.pause(); if (chunk === null) return; // The stream seems to read one more byte than the end else if (chunk.byteLength <= 1 ) { predictionsReceived[file]++; + readStream.resume(); return } if (aborted) { - readStream.destroy() + readStream.destroy(); return } const audio = Buffer.concat([meta.header, chunk]); predictQueue.push([audio, file, end, chunkStart]); - chunkStart += WINDOW_SIZE * BATCH_SIZE * sampleRate - processPredictQueue(); + chunkStart += WINDOW_SIZE * BATCH_SIZE * sampleRate; + processPredictQueue().then((resolve, reject) => checkBacklog(readStream) ); }) + readStream.on('end', () => readStream.destroy()) + readStream.on('error', err => { console.log(`readstream error: ${err}, start: ${start}, , end: ${end}, duration: ${metadata[file].duration}`); err.code === 'ENOENT' && notifyMissingFile(file); @@ -1209,36 +1223,32 @@ const getWavePredictBuffers = async ({ } async function processPredictQueue(){ - if (processing || predictQueue.length === 0) return; // Exit if already processing or queue is empty - processing = true; // Set processing flag to true - - const [audio, file, end, chunkStart] = predictQueue.shift(); // Dequeue chunk - await setupCtx(audio, undefined, 'model').then(offlineCtx => { - let worker; - if (offlineCtx) { - offlineCtx.startRendering().then((resampled) => { - const myArray = resampled.getChannelData(0); + return new Promise((resolve, reject) => { + const [audio, file, end, chunkStart] = predictQueue.shift(); // Dequeue chunk + setupCtx(audio, undefined, 'model').then(offlineCtx => { + let worker; + if (offlineCtx) { + offlineCtx.startRendering().then((resampled) => { + const myArray = resampled.getChannelData(0); + workerInstance = ++workerInstance >= NUM_WORKERS ? 0 : workerInstance; + worker = workerInstance; + feedChunksToModel(myArray, chunkStart, file, end, worker); + return resolve('done'); + }).catch((error) => { + console.error(`PredictBuffer rendering failed: ${error}, file ${file}`); + updateFilesBeingProcessed(file); + return reject(error) + }); + } else { + console.log('Short chunk', audio.length, 'padding'); + let chunkLength = STATE.model === 'birdnet' ? 144_000 : 72_000; workerInstance = ++workerInstance >= NUM_WORKERS ? 0 : workerInstance; worker = workerInstance; - feedChunksToModel(myArray, chunkStart, file, end, worker); - //chunkStart += WINDOW_SIZE * BATCH_SIZE * sampleRate; - processing = false; // Reset processing flag - processPredictQueue(); // Process next chunk in the queue - }).catch((error) => { - console.error(`PredictBuffer rendering failed: ${error}, file ${file}`); - updateFilesBeingProcessed(file); - processing = false; // Reset processing flag - processPredictQueue(); // Process next chunk in the queue - }); - } else { - console.log('Short chunk', audio.length, 'padding'); - let chunkLength = STATE.model === 'birdnet' ? 144_000 : 72_000; - workerInstance = ++workerInstance >= NUM_WORKERS ? 0 : workerInstance; - worker = workerInstance; - const myArray = new Float32Array(Array.from({ length: chunkLength }).fill(0)); - feedChunksToModel(myArray, chunkStart, file, end); - }}).catch(error => { - console.warn(file, error); + const myArray = new Float32Array(Array.from({ length: chunkLength }).fill(0)); + feedChunksToModel(myArray, chunkStart, file, end); + }}).catch(error => { + console.warn(file, error); + }) }) } @@ -1286,27 +1296,19 @@ const getPredictBuffers = async ({ //STREAM.end(); }); - STREAM.on('readable', async () => { - let chunk = STREAM.read(); - + STREAM.on('data', chunk => { + if (aborted) { command.kill() STREAM.destroy() return } if (chunk === null || chunk.byteLength <= 1) { - // EOF: deal with part-full buffers - if (concatenatedBuffer.length){ - const audio = Buffer.concat([WAV_HEADER, concatenatedBuffer]); - predictQueue.push([audio, file, end, chunkStart]); - processPredictQueue(); - } - DEBUG && console.log('All chunks sent for ', file); - //command.kill(); - resolve('finished') + } else { const bufferList = [concatenatedBuffer, chunk].filter(buf => buf.length > 0); + // try/catch may no longer be necessary try { concatenatedBuffer = Buffer.concat(bufferList); } catch (error) { @@ -1315,16 +1317,29 @@ const getPredictBuffers = async ({ // if we have a full buffer if (concatenatedBuffer.length >= highWaterMark) { + STREAM.pause(); chunk = concatenatedBuffer.subarray(0, highWaterMark); concatenatedBuffer = concatenatedBuffer.subarray(highWaterMark); const audio = Buffer.concat([WAV_HEADER, chunk]) predictQueue.push([audio, file, end, chunkStart]); chunkStart += WINDOW_SIZE * BATCH_SIZE * sampleRate - processPredictQueue(); + //processPredictQueue().then((resolve, reject) => checkBacklog(STREAM) ); + processPredictQueue().then(() => STREAM.resume() ); } } }); + STREAM.on('end', () => { + // EOF: deal with part-full buffers + if (concatenatedBuffer.length){ + const audio = Buffer.concat([WAV_HEADER, concatenatedBuffer]); + predictQueue.push([audio, file, end, chunkStart]); + processPredictQueue(); + } + DEBUG && console.log('All chunks sent for ', file); + resolve('finished') + }) + STREAM.on('error', err => { console.log('stream error: ', err); err.code === 'ENOENT' && notifyMissingFile(file); @@ -1358,7 +1373,7 @@ const fetchAudioBuffer = async ({ if (STATE.audio.normalise) command = command.audioFilter("loudnorm=I=-16:LRA=11:TP=-1.5"); command.on('error', error => { - updateFilesBeingProcessed(file) + UI.postMessage({event: 'generate-alert', message: error.message}) reject(new Error('Error extracting audio segment:', error)); }); command.on('start', function (commandLine) { @@ -1965,43 +1980,43 @@ const parsePredictions = async (response) => { if (! STATE.selection) await generateInsertQuery(latestResult, file).catch( (error) => console.log('Error generating insert query', error)); let [keysArray, speciesIDBatch, confidenceBatch] = latestResult; for (let i = 0; i < keysArray.length; i++) { - let updateUI = false; - let key = parseFloat(keysArray[i]); - const timestamp = metadata[file].fileStart + key * 1000; - const confidenceArray = confidenceBatch[i]; - const speciesIDArray = speciesIDBatch[i]; - for (let j = 0; j < confidenceArray.length; j++) { - let confidence = confidenceArray[j]; - if (confidence < 0.05) break; - confidence*=1000; - let speciesID = speciesIDArray[j]; - updateUI = (confidence > STATE.detect.confidence && (! included.length || included.includes(speciesID))); - if (STATE.selection || updateUI) { - let end, confidenceRequired; - if (STATE.selection) { - const duration = (STATE.selection.end - STATE.selection.start) / 1000; - end = key + duration; - confidenceRequired = STATE.userSettingsInSelection ? - STATE.detect.confidence : 50; - } else { - end = key + 3; - confidenceRequired = STATE.detect.confidence; - } + let updateUI = false; + let key = parseFloat(keysArray[i]); + const timestamp = metadata[file].fileStart + key * 1000; + const confidenceArray = confidenceBatch[i]; + const speciesIDArray = speciesIDBatch[i]; + for (let j = 0; j < confidenceArray.length; j++) { + let confidence = confidenceArray[j]; + if (confidence < 0.05) break; + confidence*=1000; + let speciesID = speciesIDArray[j]; + updateUI = (confidence > STATE.detect.confidence && (! included.length || included.includes(speciesID))); + if (STATE.selection || updateUI) { + let end, confidenceRequired; + if (STATE.selection) { + const duration = (STATE.selection.end - STATE.selection.start) / 1000; + end = key + duration; + confidenceRequired = STATE.userSettingsInSelection ? + STATE.detect.confidence : 50; + } else { + end = key + 3; + confidenceRequired = STATE.detect.confidence; + } if (confidence >= confidenceRequired) { - const { cname, sname } = await memoryDB.getAsync(`SELECT cname, sname FROM species WHERE id = ${speciesID}`).catch( (error) => console.log('Error getting species name', error)); - const result = { - timestamp: timestamp, - position: key, - end: end, - file: file, - cname: cname, - sname: sname, - score: confidence - } + const { cname, sname } = await memoryDB.getAsync(`SELECT cname, sname FROM species WHERE id = ${speciesID}`).catch( (error) => console.log('Error getting species name', error)); + const result = { + timestamp: timestamp, + position: key, + end: end, + file: file, + cname: cname, + sname: sname, + score: confidence + } sendResult(++index, result, false); - // Only show the highest confidence detection, unless it's a selection analysis - if (! STATE.selection) break; - }; + // Only show the highest confidence detection, unless it's a selection analysis + if (! STATE.selection) break; + }; } } } @@ -2057,8 +2072,8 @@ async function parseMessage(e) { if ( !aborted) { predictWorkers[response.worker].isAvailable = true; let worker = await parsePredictions(response).catch( (error) => console.log('Error parsing predictions', error)); - DEBUG && console.log('predictions left for', response.file, predictionsReceived[response.file] - predictionsRequested[response.file]) - const remaining = predictionsReceived[response.file] - predictionsRequested[response.file] + DEBUG && console.log('predictions left for', response.file, predictionsReceived[response.file] - batchChunksToSend[response.file]) + const remaining = predictionsReceived[response.file] - batchChunksToSend[response.file] if (remaining === 0) { if (filesBeingProcessed.length) { processNextFile({ worker: worker }); @@ -2085,6 +2100,7 @@ function updateFilesBeingProcessed(file) { const fileIndex = filesBeingProcessed.indexOf(file); if (fileIndex !== -1) { filesBeingProcessed.splice(fileIndex, 1) + DEBUG && console.log('filesbeingprocessed updated length now :', filesBeingProcessed.length) } if (!filesBeingProcessed.length) { if (!STATE.selection) getSummary();