From 54e568a418f7eea3fa7173b5d08f663a7eca8277 Mon Sep 17 00:00:00 2001 From: Mattk70 Date: Fri, 29 Mar 2024 12:05:58 +0000 Subject: [PATCH] Changed all sreatems to operate in paused mode --- js/ui.js | 2 +- js/worker.js | 232 ++++++++++++++++++++------------------------------- 2 files changed, 91 insertions(+), 143 deletions(-) diff --git a/js/ui.js b/js/ui.js index a7ecca5b..62b253ce 100644 --- a/js/ui.js +++ b/js/ui.js @@ -72,7 +72,7 @@ const STATE = { } // Batch size map for slider -const BATCH_SIZE_LIST = [4, 8, 16, 32, 48, 64, 128]; +const BATCH_SIZE_LIST = [16, 32, 48, 64, 128]; // Get the modules loaded in preload.js const fs = window.module.fs; diff --git a/js/worker.js b/js/worker.js index d193af10..8916112c 100644 --- a/js/worker.js +++ b/js/worker.js @@ -1027,14 +1027,15 @@ const setMetadata = async ({ file, proxy = file, source_file = file }) => { }) } -async function setupCtx(audio, rate, destination) { +function setupCtx(audio, rate, destination) { rate ??= sampleRate; // Deal with detached arraybuffer issue const useFilters = (STATE.filters.sendToModel && STATE.filters.active) || destination === 'UI'; return audioCtx.decodeAudioData(audio.buffer) - .then( audioBufferChunk => { + .then( audioBuffer => { const audioCtxSource = audioCtx.createBufferSource(); - audioCtxSource.buffer = audioBufferChunk; + audioCtxSource.buffer = audioBuffer; + audioBuffer = null; // release memory const duration = audioCtxSource.buffer.duration; const buffer = audioCtxSource.buffer; @@ -1062,19 +1063,8 @@ async function setupCtx(audio, rate, destination) { previousFilter ? previousFilter.connect(lowshelfFilter) : offlineSource.connect(lowshelfFilter); previousFilter = lowshelfFilter; } - // const from = 2000; - // const to = 8000; - // const geometricMean = Math.sqrt(from * to); - - // const bandpassFilter = offlineCtx.createBiquadFilter(); - // bandpassFilter.type = 'bandpass'; - // bandpassFilter.frequency.value = geometricMean; - // bandpassFilter.Q.value = geometricMean / (to - from); - // bandpassFilter.channelCount = 1; - // previousFilter ? previousFilter.connect(bandpassFilter) : offlineSource.connect(bandpassFilter); - // previousFilter = bandpassFilter; } -} + } if (STATE.audio.gain){ var gainNode = offlineCtx.createGain(); gainNode.gain.value = Math.pow(10, STATE.audio.gain / 20); @@ -1083,58 +1073,32 @@ async function setupCtx(audio, rate, destination) { } else { previousFilter ? previousFilter.connect(offlineCtx.destination) : offlineSource.connect(offlineCtx.destination); } - offlineSource.start(); + offlineSource.start(); return offlineCtx; - } ) - .catch( (error) => console.warn(error)); - - - // // Create a compressor node - // const compressor = new DynamicsCompressorNode(offlineCtx, { - // threshold: -30, - // knee: 6, - // ratio: 6, - // attack: 0, - // release: 0, - // }); - // previousFilter = offlineSource.connect(compressor) ; - - - // previousFilter ? previousFilter.connect(offlineCtx.destination) : offlineSource.connect(offlineCtx.destination); - - - // // Create a highshelf filter to boost or attenuate high-frequency content - // const highshelfFilter = offlineCtx.createBiquadFilter(); - // highshelfFilter.type = 'highshelf'; - // highshelfFilter.frequency.value = STATE.highPassFrequency || 0; // This sets the cutoff frequency of the highshelf filter to 3000 Hz - // highshelfFilter.gain.value = 0; // This sets the boost or attenuation in decibels (dB) - - - // Add audio normalizer as an Audio Worklet - // if (!normalizerNode){ - // await offlineCtx.audioWorklet.addModule('js/audio_normalizer_processor.js'); - // normalizerNode = new AudioWorkletNode(offlineCtx, 'audio-normalizer-processor'); - // } - // // Connect the nodes - // previousFilter ? previousFilter.connect(normalizerNode) : offlineSource.connect(normalizerNode); - // previousFilter = normalizerNode; - - // // Create a gain node to adjust the audio level - + }) + .catch(error => console.warn(error)); }; function checkBacklog(stream) { - const backlog = sumObjectValues(predictionsRequested) - sumObjectValues(predictionsReceived); - DEBUG && console.log('backlog:', backlog) - if (backlog < 100) { - stream.resume(); - } else { - // If queued value is above 100, wait and check again - setTimeout(() => checkBacklog(stream), 500); // Check every 0.5 seconds - } + return new Promise((resolve, reject) => { + const backlog = sumObjectValues(predictionsRequested) - sumObjectValues(predictionsReceived); + DEBUG && console.log('backlog:', backlog); + + if (backlog > 100) { + // If queued value is above 100, wait and check again + setTimeout(() => { + checkBacklog(stream) + .then(resolve) // Resolve the promise when backlog is within limit + .catch(reject); + }, 500); // Check every 0.5 seconds + } else { + resolve(stream.read()); // backlog ok then read the stream data + } + }); } + /** * * @param file @@ -1162,7 +1126,8 @@ const getWavePredictBuffers = async ({ // extract the header const headerStream = fs.createReadStream(file, {start: 0, end: 4096}); - headerStream.on('data', function (chunk) { + headerStream.on('readable', () => { + let chunk = headerStream.read(); let wav = new wavefileReader.WaveFileReader(); try { wav.fromBuffer(chunk); @@ -1195,27 +1160,25 @@ const getWavePredictBuffers = async ({ let chunkStart = start * sampleRate; // Changed on.('data') handler because of: https://stackoverflow.com/questions/32978094/nodejs-streams-and-premature-end - readStream.on('data', chunk => { - readStream.pause(); - if (chunk === null) return; - // The stream seems to read one more byte than the end - else if (chunk.byteLength <= 1 ) { - predictionsReceived[file]++; - readStream.resume(); - return - } + readStream.on('readable', () => { if (aborted) { readStream.destroy(); return } - const audio = Buffer.concat([meta.header, chunk]); - predictQueue.push([audio, file, end, chunkStart]); - chunkStart += WINDOW_SIZE * BATCH_SIZE * sampleRate; - processPredictQueue().then((resolve, reject) => checkBacklog(readStream) ); - }) - - readStream.on('end', () => readStream.destroy()) + checkBacklog(readStream).then(chunk => { + if (chunk === null || chunk.byteLength <= 1 ) { + // EOF + chunk?.byteLength && predictionsReceived[file]++; + readStream.destroy(); + } else { + const audio = Buffer.concat([meta.header, chunk]); + predictQueue.push([audio, file, end, chunkStart]); + chunkStart += WINDOW_SIZE * BATCH_SIZE * sampleRate; + processPredictQueue(); + } + }) + }) readStream.on('error', err => { console.log(`readstream error: ${err}, start: ${start}, , end: ${end}, duration: ${metadata[file].duration}`); err.code === 'ENOENT' && notifyMissingFile(file); @@ -1298,62 +1261,46 @@ const getPredictBuffers = async ({ //STREAM.end(); }); - STREAM.on('data', chunk => { - + STREAM.on('readable', () => { if (aborted) { - STREAM.destroy() - return - } - if (chunk.byteLength <= 1) { - STREAM.end(); STREAM.destroy(); - console.log('STREAM ended, destroyed') + return } - else { - try { - concatenatedBuffer = concatenatedBuffer.length ? Buffer.concat([concatenatedBuffer, chunk]) : chunk; - } catch (error) { - console.warn(error) + checkBacklog(STREAM).then(chunk => { + if (chunk === null || chunk.byteLength <= 1) { + // EOF: deal with part-full buffers + if (concatenatedBuffer.length){ + const audio = Buffer.concat([WAV_HEADER, concatenatedBuffer]); + predictQueue.push([audio, file, end, chunkStart]); + processPredictQueue(); + } + DEBUG && console.log('All chunks sent for ', file); + STREAM.destroy(); + resolve('finished') } - - // if we have a full buffer - if (concatenatedBuffer.length >= highWaterMark) { - STREAM.pause(); - chunk = concatenatedBuffer.subarray(0, highWaterMark); - concatenatedBuffer = concatenatedBuffer.subarray(highWaterMark); - const audio = Buffer.concat([WAV_HEADER, chunk]) - predictQueue.push([audio, file, end, chunkStart]); - chunkStart += WINDOW_SIZE * BATCH_SIZE * sampleRate - processPredictQueue().then((resolve, reject) => checkBacklog(STREAM) ); + else { + try { + concatenatedBuffer = concatenatedBuffer.length ? Buffer.concat([concatenatedBuffer, chunk]) : chunk; + } catch (e) { + console.log('Detached buffer?', e.message); + } + // if we have a full buffer + if (concatenatedBuffer.length >= highWaterMark) { + const chunk = concatenatedBuffer.subarray(0, highWaterMark); + concatenatedBuffer = concatenatedBuffer.subarray(highWaterMark); + const audio = Buffer.concat([WAV_HEADER, chunk]) + predictQueue.push([audio, file, end, chunkStart]); + chunkStart += WINDOW_SIZE * BATCH_SIZE * sampleRate + processPredictQueue(); + } } - } + }); }); - - // STREAM.on('close', () => console.log('STREAM closed')) - // STREAM.on('pipe', () => console.log('STREAM piped')) - // STREAM.on('unpipe', () => console.log('STREAM unpiped')) - // STREAM.on('drain', () => console.log('STREAM drained')) - // STREAM.on('finish', () => console.log('STREAM resumed')) - // STREAM.on('resume', () => console.log('STREAM resumed')) - - STREAM.on('end', () => { - // EOF: deal with part-full buffers - if (concatenatedBuffer.length){ - const audio = Buffer.concat([WAV_HEADER, concatenatedBuffer]); - predictQueue.push([audio, file, end, chunkStart]); - processPredictQueue(); - } - DEBUG && console.log('All chunks sent for ', file); - STREAM.destroy(); - resolve('finished') - }) - STREAM.on('error', err => { console.log('stream error: ', err); err.code === 'ENOENT' && notifyMissingFile(file); }) - // command.run(); }).catch(error => console.log(error)); } @@ -1377,7 +1324,6 @@ const fetchAudioBuffer = async ({ .format('s16le') .audioChannels(1) // Set to mono .audioFrequency(24_000) // Set sample rate to 24000 Hz (always - this is for wavesurfer) - .output(stream, { end:true }); if (STATE.filters.active) { if (STATE.filters.lowShelfAttenuation && STATE.filters.lowShelfFrequency){ command.audioFilters({ @@ -1400,6 +1346,7 @@ const fetchAudioBuffer = async ({ } ) } + command.writeToStream(stream); command.on('error', error => { UI.postMessage({event: 'generate-alert', message: error.message}) @@ -1409,28 +1356,29 @@ const fetchAudioBuffer = async ({ DEBUG && console.log('FFmpeg command: ' + commandLine); }) - stream.on('data', chunk => { - try { - concatenatedBuffer = concatenatedBuffer.length ? Buffer.concat([concatenatedBuffer, chunk]) : chunk; - } catch (error) { - console.warn(error) - } - }); - - stream.on('end', async () => { - if (concatenatedBuffer.length <= 1) return; - //Add the audio header for the UI - const audio = Buffer.concat([CHIRPITY_HEADER, concatenatedBuffer]); - const offlineCtx = await setupCtx(audio, sampleRate, 'UI').catch( (error) => {console.error(error.message)}); - if (offlineCtx){ - offlineCtx.startRendering().then(resampled => { - resolve(resampled); - }).catch((error) => { - console.error(`FetchAudio rendering failed: ${error}`); + stream.on('readable', () => { + const chunk = stream.read(); + if (chunk === null || chunk.byteLength <= 1) { + // Last chunk + const audio = Buffer.concat([CHIRPITY_HEADER, concatenatedBuffer]); + setupCtx(audio, sampleRate, 'UI').then(offlineCtx => { + offlineCtx.startRendering().then(resampled => { + resolve(resampled); + stream.end(); + stream.destroy(); + }).catch((error) => { + console.error(`FetchAudio rendering failed: ${error}`); + }); + }).catch( (error) => { + reject(error.message) + stream.destroy(); }); + + } else { + // other chunks + concatenatedBuffer = concatenatedBuffer.length ? Buffer.concat([concatenatedBuffer, chunk]) : chunk; } }); - command.run(); }); }