Skip to content

Commit

Permalink
Added a queue to manage memory for wav file processing - readstream p…
Browse files Browse the repository at this point in the history
…auses until backlog of chunks to process is sufficiently small (100 chunks)

File not found error toast added.
Not implemented for compressed files, since the ffmpeg conversion is sufficiently slow to prevent a runaway backlog, it seems.
Also fixed erroneous analysis rate reporting due to selection analyses.
  • Loading branch information
Mattk70 committed Mar 24, 2024
1 parent 5aa5007 commit d793caf
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 95 deletions.
2 changes: 1 addition & 1 deletion js/tracking.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
const DEBUG = false;
const ID_SITE = 2;
const ID_SITE = 3;


function trackEvent(uuid, event, action, name, value){
Expand Down
19 changes: 11 additions & 8 deletions js/ui.js
Original file line number Diff line number Diff line change
Expand Up @@ -862,6 +862,7 @@ function resetDiagnostics() {

// Worker listeners
function analyseReset() {
clearActive();
DOM.fileNumber.textContent = '';
if (STATE.mode === 'analyse') PREDICTING = true;
resetDiagnostics();
Expand Down Expand Up @@ -2921,14 +2922,16 @@ function onChartData(args) {
// DIAGNOSTICS:
t1_analysis = Date.now();
const analysisTime = ((t1_analysis - t0_analysis) / 1000).toFixed(2);
DIAGNOSTICS['Analysis Duration'] = analysisTime + ' seconds';
const rate = (DIAGNOSTICS['Audio Duration'] / analysisTime);
DIAGNOSTICS['Analysis Rate'] = rate.toFixed(0) + 'x faster than real time performance.';
trackEvent(config.UUID, `${config.model}-${config.backend}`, 'Audio Duration', config.backend, Math.round(DIAGNOSTICS['Audio Duration']));
trackEvent(config.UUID, `${config.model}-${config.backend}`, 'Analysis Duration', config.backend, parseInt(analysisTime));
trackEvent(config.UUID, `${config.model}-${config.backend}`, 'Analysis Rate', config.backend, parseInt(rate));
STATE.selection || generateToast({ message:'Analysis complete.'})
activateResultFilters();
if (! STATE.selection){
DIAGNOSTICS['Analysis Duration'] = analysisTime + ' seconds';
const rate = (DIAGNOSTICS['Audio Duration'] / analysisTime);
DIAGNOSTICS['Analysis Rate'] = rate.toFixed(0) + 'x faster than real time performance.';
trackEvent(config.UUID, `${config.model}-${config.backend}`, 'Audio Duration', config.backend, Math.round(DIAGNOSTICS['Audio Duration']));
trackEvent(config.UUID, `${config.model}-${config.backend}`, 'Analysis Duration', config.backend, parseInt(analysisTime));
trackEvent(config.UUID, `${config.model}-${config.backend}`, 'Analysis Rate', config.backend, parseInt(rate));
generateToast({ message:'Analysis complete.'})
activateResultFilters();
}
}

/*
Expand Down
188 changes: 102 additions & 86 deletions js/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -1122,6 +1122,18 @@ async function setupCtx(audio, rate, destination) {

};


function checkBacklog(stream) {
const backlog = sumObjectValues(predictionsRequested) - sumObjectValues(predictionsReceived);
DEBUG && console.log('backlog:', backlog)
if (backlog < 100) {
stream.resume();
} else {
// If queued value is above 100, wait and check again
setTimeout(() => checkBacklog(stream), 500); // Check every 0.5 seconds
}
}

/**
*
* @param file
Expand All @@ -1131,8 +1143,7 @@ async function setupCtx(audio, rate, destination) {
*/

let predictQueue = [];
let processing = false;


const getWavePredictBuffers = async ({
file = '', start = 0, end = undefined
}) => {
Expand Down Expand Up @@ -1183,24 +1194,27 @@ const getWavePredictBuffers = async ({

let chunkStart = start * sampleRate;
// Changed on.('data') handler because of: https://stackoverflow.com/questions/32978094/nodejs-streams-and-premature-end
readStream.on('readable', () => {
const chunk = readStream.read();
readStream.on('data', chunk => {
readStream.pause();
if (chunk === null) return;
// The stream seems to read one more byte than the end
else if (chunk.byteLength <= 1 ) {
predictionsReceived[file]++;
readStream.resume();
return
}
if (aborted) {
readStream.destroy()
readStream.destroy();
return
}
const audio = Buffer.concat([meta.header, chunk]);
predictQueue.push([audio, file, end, chunkStart]);
chunkStart += WINDOW_SIZE * BATCH_SIZE * sampleRate
processPredictQueue();
chunkStart += WINDOW_SIZE * BATCH_SIZE * sampleRate;
processPredictQueue().then((resolve, reject) => checkBacklog(readStream) );
})

readStream.on('end', () => readStream.destroy())

readStream.on('error', err => {
console.log(`readstream error: ${err}, start: ${start}, , end: ${end}, duration: ${metadata[file].duration}`);
err.code === 'ENOENT' && notifyMissingFile(file);
Expand All @@ -1209,36 +1223,32 @@ const getWavePredictBuffers = async ({
}

async function processPredictQueue(){
if (processing || predictQueue.length === 0) return; // Exit if already processing or queue is empty
processing = true; // Set processing flag to true

const [audio, file, end, chunkStart] = predictQueue.shift(); // Dequeue chunk
await setupCtx(audio, undefined, 'model').then(offlineCtx => {
let worker;
if (offlineCtx) {
offlineCtx.startRendering().then((resampled) => {
const myArray = resampled.getChannelData(0);
return new Promise((resolve, reject) => {
const [audio, file, end, chunkStart] = predictQueue.shift(); // Dequeue chunk
setupCtx(audio, undefined, 'model').then(offlineCtx => {
let worker;
if (offlineCtx) {
offlineCtx.startRendering().then((resampled) => {
const myArray = resampled.getChannelData(0);
workerInstance = ++workerInstance >= NUM_WORKERS ? 0 : workerInstance;
worker = workerInstance;
feedChunksToModel(myArray, chunkStart, file, end, worker);
return resolve('done');
}).catch((error) => {
console.error(`PredictBuffer rendering failed: ${error}, file ${file}`);
updateFilesBeingProcessed(file);
return reject(error)
});
} else {
console.log('Short chunk', audio.length, 'padding');
let chunkLength = STATE.model === 'birdnet' ? 144_000 : 72_000;
workerInstance = ++workerInstance >= NUM_WORKERS ? 0 : workerInstance;
worker = workerInstance;
feedChunksToModel(myArray, chunkStart, file, end, worker);
//chunkStart += WINDOW_SIZE * BATCH_SIZE * sampleRate;
processing = false; // Reset processing flag
processPredictQueue(); // Process next chunk in the queue
}).catch((error) => {
console.error(`PredictBuffer rendering failed: ${error}, file ${file}`);
updateFilesBeingProcessed(file);
processing = false; // Reset processing flag
processPredictQueue(); // Process next chunk in the queue
});
} else {
console.log('Short chunk', audio.length, 'padding');
let chunkLength = STATE.model === 'birdnet' ? 144_000 : 72_000;
workerInstance = ++workerInstance >= NUM_WORKERS ? 0 : workerInstance;
worker = workerInstance;
const myArray = new Float32Array(Array.from({ length: chunkLength }).fill(0));
feedChunksToModel(myArray, chunkStart, file, end);
}}).catch(error => {
console.warn(file, error);
const myArray = new Float32Array(Array.from({ length: chunkLength }).fill(0));
feedChunksToModel(myArray, chunkStart, file, end);
}}).catch(error => {
console.warn(file, error);
})
})
}

Expand Down Expand Up @@ -1286,27 +1296,19 @@ const getPredictBuffers = async ({
//STREAM.end();
});

STREAM.on('readable', async () => {
let chunk = STREAM.read();

STREAM.on('data', chunk => {

if (aborted) {
command.kill()
STREAM.destroy()
return
}
if (chunk === null || chunk.byteLength <= 1) {
// EOF: deal with part-full buffers
if (concatenatedBuffer.length){
const audio = Buffer.concat([WAV_HEADER, concatenatedBuffer]);
predictQueue.push([audio, file, end, chunkStart]);
processPredictQueue();
}
DEBUG && console.log('All chunks sent for ', file);
//command.kill();
resolve('finished')

}
else {
const bufferList = [concatenatedBuffer, chunk].filter(buf => buf.length > 0);
// try/catch may no longer be necessary
try {
concatenatedBuffer = Buffer.concat(bufferList);
} catch (error) {
Expand All @@ -1315,16 +1317,29 @@ const getPredictBuffers = async ({

// if we have a full buffer
if (concatenatedBuffer.length >= highWaterMark) {
STREAM.pause();
chunk = concatenatedBuffer.subarray(0, highWaterMark);
concatenatedBuffer = concatenatedBuffer.subarray(highWaterMark);
const audio = Buffer.concat([WAV_HEADER, chunk])
predictQueue.push([audio, file, end, chunkStart]);
chunkStart += WINDOW_SIZE * BATCH_SIZE * sampleRate
processPredictQueue();
//processPredictQueue().then((resolve, reject) => checkBacklog(STREAM) );
processPredictQueue().then(() => STREAM.resume() );
}
}
});

STREAM.on('end', () => {
// EOF: deal with part-full buffers
if (concatenatedBuffer.length){
const audio = Buffer.concat([WAV_HEADER, concatenatedBuffer]);
predictQueue.push([audio, file, end, chunkStart]);
processPredictQueue();
}
DEBUG && console.log('All chunks sent for ', file);
resolve('finished')
})

STREAM.on('error', err => {
console.log('stream error: ', err);
err.code === 'ENOENT' && notifyMissingFile(file);
Expand Down Expand Up @@ -1358,7 +1373,7 @@ const fetchAudioBuffer = async ({
if (STATE.audio.normalise) command = command.audioFilter("loudnorm=I=-16:LRA=11:TP=-1.5");

command.on('error', error => {
updateFilesBeingProcessed(file)
UI.postMessage({event: 'generate-alert', message: error.message})
reject(new Error('Error extracting audio segment:', error));
});
command.on('start', function (commandLine) {
Expand Down Expand Up @@ -1965,43 +1980,43 @@ const parsePredictions = async (response) => {
if (! STATE.selection) await generateInsertQuery(latestResult, file).catch( (error) => console.log('Error generating insert query', error));
let [keysArray, speciesIDBatch, confidenceBatch] = latestResult;
for (let i = 0; i < keysArray.length; i++) {
let updateUI = false;
let key = parseFloat(keysArray[i]);
const timestamp = metadata[file].fileStart + key * 1000;
const confidenceArray = confidenceBatch[i];
const speciesIDArray = speciesIDBatch[i];
for (let j = 0; j < confidenceArray.length; j++) {
let confidence = confidenceArray[j];
if (confidence < 0.05) break;
confidence*=1000;
let speciesID = speciesIDArray[j];
updateUI = (confidence > STATE.detect.confidence && (! included.length || included.includes(speciesID)));
if (STATE.selection || updateUI) {
let end, confidenceRequired;
if (STATE.selection) {
const duration = (STATE.selection.end - STATE.selection.start) / 1000;
end = key + duration;
confidenceRequired = STATE.userSettingsInSelection ?
STATE.detect.confidence : 50;
} else {
end = key + 3;
confidenceRequired = STATE.detect.confidence;
}
let updateUI = false;
let key = parseFloat(keysArray[i]);
const timestamp = metadata[file].fileStart + key * 1000;
const confidenceArray = confidenceBatch[i];
const speciesIDArray = speciesIDBatch[i];
for (let j = 0; j < confidenceArray.length; j++) {
let confidence = confidenceArray[j];
if (confidence < 0.05) break;
confidence*=1000;
let speciesID = speciesIDArray[j];
updateUI = (confidence > STATE.detect.confidence && (! included.length || included.includes(speciesID)));
if (STATE.selection || updateUI) {
let end, confidenceRequired;
if (STATE.selection) {
const duration = (STATE.selection.end - STATE.selection.start) / 1000;
end = key + duration;
confidenceRequired = STATE.userSettingsInSelection ?
STATE.detect.confidence : 50;
} else {
end = key + 3;
confidenceRequired = STATE.detect.confidence;
}
if (confidence >= confidenceRequired) {
const { cname, sname } = await memoryDB.getAsync(`SELECT cname, sname FROM species WHERE id = ${speciesID}`).catch( (error) => console.log('Error getting species name', error));
const result = {
timestamp: timestamp,
position: key,
end: end,
file: file,
cname: cname,
sname: sname,
score: confidence
}
const { cname, sname } = await memoryDB.getAsync(`SELECT cname, sname FROM species WHERE id = ${speciesID}`).catch( (error) => console.log('Error getting species name', error));
const result = {
timestamp: timestamp,
position: key,
end: end,
file: file,
cname: cname,
sname: sname,
score: confidence
}
sendResult(++index, result, false);
// Only show the highest confidence detection, unless it's a selection analysis
if (! STATE.selection) break;
};
// Only show the highest confidence detection, unless it's a selection analysis
if (! STATE.selection) break;
};
}
}
}
Expand Down Expand Up @@ -2057,8 +2072,8 @@ async function parseMessage(e) {
if ( !aborted) {
predictWorkers[response.worker].isAvailable = true;
let worker = await parsePredictions(response).catch( (error) => console.log('Error parsing predictions', error));
DEBUG && console.log('predictions left for', response.file, predictionsReceived[response.file] - predictionsRequested[response.file])
const remaining = predictionsReceived[response.file] - predictionsRequested[response.file]
DEBUG && console.log('predictions left for', response.file, predictionsReceived[response.file] - batchChunksToSend[response.file])
const remaining = predictionsReceived[response.file] - batchChunksToSend[response.file]
if (remaining === 0) {
if (filesBeingProcessed.length) {
processNextFile({ worker: worker });
Expand All @@ -2085,6 +2100,7 @@ function updateFilesBeingProcessed(file) {
const fileIndex = filesBeingProcessed.indexOf(file);
if (fileIndex !== -1) {
filesBeingProcessed.splice(fileIndex, 1)
DEBUG && console.log('filesbeingprocessed updated length now :', filesBeingProcessed.length)
}
if (!filesBeingProcessed.length) {
if (!STATE.selection) getSummary();
Expand Down

0 comments on commit d793caf

Please sign in to comment.