Skip to content

Commit

Permalink
Changed all sreatems to operate in paused mode
Browse files Browse the repository at this point in the history
  • Loading branch information
Mattk70 committed Mar 29, 2024
1 parent 5de16fe commit 54e568a
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 143 deletions.
2 changes: 1 addition & 1 deletion js/ui.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ const STATE = {
}

// Batch size map for slider
const BATCH_SIZE_LIST = [4, 8, 16, 32, 48, 64, 128];
const BATCH_SIZE_LIST = [16, 32, 48, 64, 128];

// Get the modules loaded in preload.js
const fs = window.module.fs;
Expand Down
232 changes: 90 additions & 142 deletions js/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -1027,14 +1027,15 @@ const setMetadata = async ({ file, proxy = file, source_file = file }) => {
})
}

async function setupCtx(audio, rate, destination) {
function setupCtx(audio, rate, destination) {
rate ??= sampleRate;
// Deal with detached arraybuffer issue
const useFilters = (STATE.filters.sendToModel && STATE.filters.active) || destination === 'UI';
return audioCtx.decodeAudioData(audio.buffer)
.then( audioBufferChunk => {
.then( audioBuffer => {
const audioCtxSource = audioCtx.createBufferSource();
audioCtxSource.buffer = audioBufferChunk;
audioCtxSource.buffer = audioBuffer;
audioBuffer = null; // release memory
const duration = audioCtxSource.buffer.duration;
const buffer = audioCtxSource.buffer;

Expand Down Expand Up @@ -1062,19 +1063,8 @@ async function setupCtx(audio, rate, destination) {
previousFilter ? previousFilter.connect(lowshelfFilter) : offlineSource.connect(lowshelfFilter);
previousFilter = lowshelfFilter;
}
// const from = 2000;
// const to = 8000;
// const geometricMean = Math.sqrt(from * to);

// const bandpassFilter = offlineCtx.createBiquadFilter();
// bandpassFilter.type = 'bandpass';
// bandpassFilter.frequency.value = geometricMean;
// bandpassFilter.Q.value = geometricMean / (to - from);
// bandpassFilter.channelCount = 1;
// previousFilter ? previousFilter.connect(bandpassFilter) : offlineSource.connect(bandpassFilter);
// previousFilter = bandpassFilter;
}
}
}
if (STATE.audio.gain){
var gainNode = offlineCtx.createGain();
gainNode.gain.value = Math.pow(10, STATE.audio.gain / 20);
Expand All @@ -1083,58 +1073,32 @@ async function setupCtx(audio, rate, destination) {
} else {
previousFilter ? previousFilter.connect(offlineCtx.destination) : offlineSource.connect(offlineCtx.destination);
}
offlineSource.start();
offlineSource.start();
return offlineCtx;
} )
.catch( (error) => console.warn(error));


// // Create a compressor node
// const compressor = new DynamicsCompressorNode(offlineCtx, {
// threshold: -30,
// knee: 6,
// ratio: 6,
// attack: 0,
// release: 0,
// });
// previousFilter = offlineSource.connect(compressor) ;


// previousFilter ? previousFilter.connect(offlineCtx.destination) : offlineSource.connect(offlineCtx.destination);


// // Create a highshelf filter to boost or attenuate high-frequency content
// const highshelfFilter = offlineCtx.createBiquadFilter();
// highshelfFilter.type = 'highshelf';
// highshelfFilter.frequency.value = STATE.highPassFrequency || 0; // This sets the cutoff frequency of the highshelf filter to 3000 Hz
// highshelfFilter.gain.value = 0; // This sets the boost or attenuation in decibels (dB)


// Add audio normalizer as an Audio Worklet
// if (!normalizerNode){
// await offlineCtx.audioWorklet.addModule('js/audio_normalizer_processor.js');
// normalizerNode = new AudioWorkletNode(offlineCtx, 'audio-normalizer-processor');
// }
// // Connect the nodes
// previousFilter ? previousFilter.connect(normalizerNode) : offlineSource.connect(normalizerNode);
// previousFilter = normalizerNode;

// // Create a gain node to adjust the audio level

})
.catch(error => console.warn(error));
};


function checkBacklog(stream) {
const backlog = sumObjectValues(predictionsRequested) - sumObjectValues(predictionsReceived);
DEBUG && console.log('backlog:', backlog)
if (backlog < 100) {
stream.resume();
} else {
// If queued value is above 100, wait and check again
setTimeout(() => checkBacklog(stream), 500); // Check every 0.5 seconds
}
return new Promise((resolve, reject) => {
const backlog = sumObjectValues(predictionsRequested) - sumObjectValues(predictionsReceived);
DEBUG && console.log('backlog:', backlog);

if (backlog > 100) {
// If queued value is above 100, wait and check again
setTimeout(() => {
checkBacklog(stream)
.then(resolve) // Resolve the promise when backlog is within limit
.catch(reject);
}, 500); // Check every 0.5 seconds
} else {
resolve(stream.read()); // backlog ok then read the stream data
}
});
}


/**
*
* @param file
Expand Down Expand Up @@ -1162,7 +1126,8 @@ const getWavePredictBuffers = async ({

// extract the header
const headerStream = fs.createReadStream(file, {start: 0, end: 4096});
headerStream.on('data', function (chunk) {
headerStream.on('readable', () => {
let chunk = headerStream.read();
let wav = new wavefileReader.WaveFileReader();
try {
wav.fromBuffer(chunk);
Expand Down Expand Up @@ -1195,27 +1160,25 @@ const getWavePredictBuffers = async ({

let chunkStart = start * sampleRate;
// Changed on.('data') handler because of: https://stackoverflow.com/questions/32978094/nodejs-streams-and-premature-end
readStream.on('data', chunk => {
readStream.pause();
if (chunk === null) return;
// The stream seems to read one more byte than the end
else if (chunk.byteLength <= 1 ) {
predictionsReceived[file]++;
readStream.resume();
return
}
readStream.on('readable', () => {
if (aborted) {
readStream.destroy();
return
}
const audio = Buffer.concat([meta.header, chunk]);
predictQueue.push([audio, file, end, chunkStart]);
chunkStart += WINDOW_SIZE * BATCH_SIZE * sampleRate;
processPredictQueue().then((resolve, reject) => checkBacklog(readStream) );
})

readStream.on('end', () => readStream.destroy())

checkBacklog(readStream).then(chunk => {
if (chunk === null || chunk.byteLength <= 1 ) {
// EOF
chunk?.byteLength && predictionsReceived[file]++;
readStream.destroy();
} else {
const audio = Buffer.concat([meta.header, chunk]);
predictQueue.push([audio, file, end, chunkStart]);
chunkStart += WINDOW_SIZE * BATCH_SIZE * sampleRate;
processPredictQueue();
}
})
})
readStream.on('error', err => {
console.log(`readstream error: ${err}, start: ${start}, , end: ${end}, duration: ${metadata[file].duration}`);
err.code === 'ENOENT' && notifyMissingFile(file);
Expand Down Expand Up @@ -1298,62 +1261,46 @@ const getPredictBuffers = async ({
//STREAM.end();
});

STREAM.on('data', chunk => {

STREAM.on('readable', () => {
if (aborted) {
STREAM.destroy()
return
}
if (chunk.byteLength <= 1) {
STREAM.end();
STREAM.destroy();
console.log('STREAM ended, destroyed')
return
}
else {
try {
concatenatedBuffer = concatenatedBuffer.length ? Buffer.concat([concatenatedBuffer, chunk]) : chunk;
} catch (error) {
console.warn(error)
checkBacklog(STREAM).then(chunk => {
if (chunk === null || chunk.byteLength <= 1) {
// EOF: deal with part-full buffers
if (concatenatedBuffer.length){
const audio = Buffer.concat([WAV_HEADER, concatenatedBuffer]);
predictQueue.push([audio, file, end, chunkStart]);
processPredictQueue();
}
DEBUG && console.log('All chunks sent for ', file);
STREAM.destroy();
resolve('finished')
}

// if we have a full buffer
if (concatenatedBuffer.length >= highWaterMark) {
STREAM.pause();
chunk = concatenatedBuffer.subarray(0, highWaterMark);
concatenatedBuffer = concatenatedBuffer.subarray(highWaterMark);
const audio = Buffer.concat([WAV_HEADER, chunk])
predictQueue.push([audio, file, end, chunkStart]);
chunkStart += WINDOW_SIZE * BATCH_SIZE * sampleRate
processPredictQueue().then((resolve, reject) => checkBacklog(STREAM) );
else {
try {
concatenatedBuffer = concatenatedBuffer.length ? Buffer.concat([concatenatedBuffer, chunk]) : chunk;
} catch (e) {
console.log('Detached buffer?', e.message);
}
// if we have a full buffer
if (concatenatedBuffer.length >= highWaterMark) {
const chunk = concatenatedBuffer.subarray(0, highWaterMark);
concatenatedBuffer = concatenatedBuffer.subarray(highWaterMark);
const audio = Buffer.concat([WAV_HEADER, chunk])
predictQueue.push([audio, file, end, chunkStart]);
chunkStart += WINDOW_SIZE * BATCH_SIZE * sampleRate
processPredictQueue();
}
}
}
});
});

// STREAM.on('close', () => console.log('STREAM closed'))
// STREAM.on('pipe', () => console.log('STREAM piped'))
// STREAM.on('unpipe', () => console.log('STREAM unpiped'))
// STREAM.on('drain', () => console.log('STREAM drained'))
// STREAM.on('finish', () => console.log('STREAM resumed'))
// STREAM.on('resume', () => console.log('STREAM resumed'))

STREAM.on('end', () => {
// EOF: deal with part-full buffers
if (concatenatedBuffer.length){
const audio = Buffer.concat([WAV_HEADER, concatenatedBuffer]);
predictQueue.push([audio, file, end, chunkStart]);
processPredictQueue();
}
DEBUG && console.log('All chunks sent for ', file);
STREAM.destroy();
resolve('finished')
})

STREAM.on('error', err => {
console.log('stream error: ', err);
err.code === 'ENOENT' && notifyMissingFile(file);
})

// command.run();
}).catch(error => console.log(error));
}

Expand All @@ -1377,7 +1324,6 @@ const fetchAudioBuffer = async ({
.format('s16le')
.audioChannels(1) // Set to mono
.audioFrequency(24_000) // Set sample rate to 24000 Hz (always - this is for wavesurfer)
.output(stream, { end:true });
if (STATE.filters.active) {
if (STATE.filters.lowShelfAttenuation && STATE.filters.lowShelfFrequency){
command.audioFilters({
Expand All @@ -1400,6 +1346,7 @@ const fetchAudioBuffer = async ({
}
)
}
command.writeToStream(stream);

command.on('error', error => {
UI.postMessage({event: 'generate-alert', message: error.message})
Expand All @@ -1409,28 +1356,29 @@ const fetchAudioBuffer = async ({
DEBUG && console.log('FFmpeg command: ' + commandLine);
})

stream.on('data', chunk => {
try {
concatenatedBuffer = concatenatedBuffer.length ? Buffer.concat([concatenatedBuffer, chunk]) : chunk;
} catch (error) {
console.warn(error)
}
});

stream.on('end', async () => {
if (concatenatedBuffer.length <= 1) return;
//Add the audio header for the UI
const audio = Buffer.concat([CHIRPITY_HEADER, concatenatedBuffer]);
const offlineCtx = await setupCtx(audio, sampleRate, 'UI').catch( (error) => {console.error(error.message)});
if (offlineCtx){
offlineCtx.startRendering().then(resampled => {
resolve(resampled);
}).catch((error) => {
console.error(`FetchAudio rendering failed: ${error}`);
stream.on('readable', () => {
const chunk = stream.read();
if (chunk === null || chunk.byteLength <= 1) {
// Last chunk
const audio = Buffer.concat([CHIRPITY_HEADER, concatenatedBuffer]);
setupCtx(audio, sampleRate, 'UI').then(offlineCtx => {
offlineCtx.startRendering().then(resampled => {
resolve(resampled);
stream.end();
stream.destroy();
}).catch((error) => {
console.error(`FetchAudio rendering failed: ${error}`);
});
}).catch( (error) => {
reject(error.message)
stream.destroy();
});

} else {
// other chunks
concatenatedBuffer = concatenatedBuffer.length ? Buffer.concat([concatenatedBuffer, chunk]) : chunk;
}
});
command.run();
});
}

Expand Down

0 comments on commit 54e568a

Please sign in to comment.