diff --git a/docker-compose.yaml b/docker-compose.yaml index 77145c2..6ea1b36 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -26,6 +26,7 @@ services: - REDIS_PORT=6379 - SCRIBO_FILES=/scriberr - DEV_MODE=false + - CONCURRENCY=1 volumes: - ./scriberr/pb_data:/app/db - ./scriberr:/scriberr diff --git a/src/lib/queue.ts b/src/lib/queue.ts index a00edec..69768e3 100644 --- a/src/lib/queue.ts +++ b/src/lib/queue.ts @@ -1,6 +1,6 @@ import { Queue, Worker } from 'bullmq'; import { exec } from 'child_process'; -import { wizardQueue } from './wizardQueue'; +import { wizardQueue } from './wizardQueue'; import fs from 'fs'; import path from 'path'; import PocketBase from 'pocketbase'; @@ -18,6 +18,7 @@ export const transcriptionQueue = new Queue('transcriptionQueue', { const pb = new PocketBase(env.POCKETBASE_URL); pb.autoCancellation(false); await pb.admins.authWithPassword(env.POCKETBASE_ADMIN_EMAIL, env.POCKETBASE_ADMIN_PASSWORD); +const concur = Number(env.CONCURRENCY); // Create an express app const app = express(); @@ -111,8 +112,8 @@ const execCommandWithLogging = (cmd: string, job: Job, progress: number) => { // return; // } - const _remaining = 95 - progress - const _prog = _remaining * tprogress / 100 + const _remaining = 95 - progress; + const _prog = (_remaining * tprogress) / 100; await job.updateProgress(_prog); } @@ -127,7 +128,6 @@ const execCommandWithLogging = (cmd: string, job: Job, progress: number) => { } }); - process.on('error', (err) => { reject(new Error(`Failed to start process: ${err.message}`)); }); @@ -173,25 +173,30 @@ const worker = new Worker( const audiowaveformCmd = `audiowaveform -i ${ffmpegPath} -o ${audioPath}.json`; await execCommandWithLogging(audiowaveformCmd, job); await job.log(`Audiowaveform for ${recordId} generated`); - + const settingsRecords = await pb.collection('settings').getList(1, 1); const settings = settingsRecords.items[0]; // Execute whisper.cpp command and log output const transcriptdir = path.resolve(env.SCRIBO_FILES, 'transcripts', `${recordId}`); - const transcriptPath= path.resolve(env.SCRIBO_FILES, 'transcripts', `${recordId}`, `${recordId}`); + const transcriptPath = path.resolve( + env.SCRIBO_FILES, + 'transcripts', + `${recordId}`, + `${recordId}` + ); fs.mkdir(transcriptdir, { recursive: true }, (err) => { if (err) throw err; }); let whisperCmd; - console.log(env.DEV_MODE) - job.log(env.DEV_MODE) + console.log(env.DEV_MODE); + job.log(env.DEV_MODE); const isDevMode = env.DEV_MODE === 'true' || env.DEV_MODE === true; - console.log("DEV MODE ----->", isDevMode) - job.log(`DEV MODE -----> ${isDevMode}`) + console.log('DEV MODE ----->', isDevMode); + job.log(`DEV MODE -----> ${isDevMode}`); if (isDevMode) { whisperCmd = `./whisper.cpp/main -m ./whisper.cpp/models/ggml-${settings.model}.en.bin -f ${ffmpegPath} -oj -of ${transcriptPath} -t ${settings.threads} -p ${settings.processors} -pp`; @@ -199,7 +204,6 @@ const worker = new Worker( whisperCmd = `whisper -m /models/ggml-${settings.model}.en.bin -f ${ffmpegPath} -oj -of ${transcriptPath} -t ${settings.threads} -p ${settings.processors} -pp`; } - let rttmContent; let segments; @@ -235,30 +239,28 @@ const worker = new Worker( let upd; if (settings.diarize) { - const diarizedTranscript = generateTranscript(transcriptJson.transcription, rttmContent) - const diarizedJson = {transcription: diarizedTranscript}; + const diarizedTranscript = generateTranscript(transcriptJson.transcription, rttmContent); + const diarizedJson = { transcription: diarizedTranscript }; upd = await pb.collection('scribo').update(recordId, { - // transcript: '{ "test": "hi" }', - transcript: transcriptJson, - diarizedtranscript: diarizedJson, - rttm: rttmContent, - processed: true, - diarized: true, - peaks: JSON.parse(audioPeaks) + // transcript: '{ "test": "hi" }', + transcript: transcriptJson, + diarizedtranscript: diarizedJson, + rttm: rttmContent, + processed: true, + diarized: true, + peaks: JSON.parse(audioPeaks) }); - } else { - upd = await pb.collection('scribo').update(recordId, { - // transcript: '{ "test": "hi" }', - transcript: transcriptJson, - processed: true, - diarized: false, - peaks: JSON.parse(audioPeaks) + // transcript: '{ "test": "hi" }', + transcript: transcriptJson, + processed: true, + diarized: false, + peaks: JSON.parse(audioPeaks) }); } - + await job.log(`Updated PocketBase record for ${recordId}`); console.log('UPDATED +++++ ', upd); @@ -281,7 +283,7 @@ const worker = new Worker( }, { connection: { host: env.REDIS_HOST, port: env.REDIS_PORT }, // Redis connection - concurrency: env.CONCURRENCY // Allows multiple jobs to run concurrently + concurrency: concur || 1 // Allows multiple jobs to run concurrently } ); @@ -327,118 +329,118 @@ async function splitAudioIntoSegments(audioPath, segments, outputDir, job) { } function preprocessWordTimestamps(wordTimestamps) { - const cleanedTimestamps = []; - let previousWord = null; - - wordTimestamps.forEach((word, index) => { - const text = word.text.trim(); - - // Handle periods and other punctuation - if (text === '.') { - if (previousWord) { - // Append the period to the previous word - previousWord.text += text; - previousWord.timestamps.to = word.timestamps.to; - } - } else if (text.startsWith("'")) { - // Append apostrophe-starting words to the previous word - if (previousWord) { - previousWord.text += text; - previousWord.timestamps.to = word.timestamps.to; - } - } else if (text.length === 1 && text !== 'a' && text !== 'i' && text !== 'I') { - // Handle single character words (except "a") - // if (previousWord) { - // // Append single character to the previous word - // previousWord.text += ` ${text}`; - // previousWord.timestamps.to = word.timestamps.to; - // } else if (index + 1 < wordTimestamps.length) { - // // If no previous word, prepend to the next word - // const nextWord = wordTimestamps[index + 1]; - // nextWord.text = `${text} ${nextWord.text}`; - // nextWord.timestamps.from = word.timestamps.from; - // } - console.log('deleting char') - } else if (text.length === 1 && (text === 'a' || text === 'I' || text === 'i')) { - // Keep "a" as a separate word - cleanedTimestamps.push(word); - previousWord = word; - } else { - // Remove other single-character symbols (e.g., parentheses, commas) - if (!/^[\.,!?;:()\[\]]$/.test(text)) { - cleanedTimestamps.push(word); - previousWord = word; - } - } - }); - - return cleanedTimestamps; + const cleanedTimestamps = []; + let previousWord = null; + + wordTimestamps.forEach((word, index) => { + const text = word.text.trim(); + + // Handle periods and other punctuation + if (text === '.') { + if (previousWord) { + // Append the period to the previous word + previousWord.text += text; + previousWord.timestamps.to = word.timestamps.to; + } + } else if (text.startsWith("'")) { + // Append apostrophe-starting words to the previous word + if (previousWord) { + previousWord.text += text; + previousWord.timestamps.to = word.timestamps.to; + } + } else if (text.length === 1 && text !== 'a' && text !== 'i' && text !== 'I') { + // Handle single character words (except "a") + // if (previousWord) { + // // Append single character to the previous word + // previousWord.text += ` ${text}`; + // previousWord.timestamps.to = word.timestamps.to; + // } else if (index + 1 < wordTimestamps.length) { + // // If no previous word, prepend to the next word + // const nextWord = wordTimestamps[index + 1]; + // nextWord.text = `${text} ${nextWord.text}`; + // nextWord.timestamps.from = word.timestamps.from; + // } + console.log('deleting char'); + } else if (text.length === 1 && (text === 'a' || text === 'I' || text === 'i')) { + // Keep "a" as a separate word + cleanedTimestamps.push(word); + previousWord = word; + } else { + // Remove other single-character symbols (e.g., parentheses, commas) + if (!/^[\.,!?;:()\[\]]$/.test(text)) { + cleanedTimestamps.push(word); + previousWord = word; + } + } + }); + + return cleanedTimestamps; } function generateTranscript(wordys, rttmString) { - const speakerSegments = parseRttm(rttmString); - const wordTimestamps = preprocessWordTimestamps(wordys); - - const finalTranscript = []; - let currentSegment = { - text: "", - timestamps: { from: null, to: null }, - speaker: null - }; - - wordTimestamps.forEach(word => { - const wordStart = word.offsets.from; - const wordEnd = word.offsets.to; - - const matchingSpeakerSegment = speakerSegments.find(speakerSegment => { - const speakerStart = speakerSegment.startTime * 1000; - const speakerEnd = speakerStart + (speakerSegment.duration * 1000); - return wordEnd >= speakerStart && wordEnd <= speakerEnd; - }); - - const assignedSpeaker = matchingSpeakerSegment ? matchingSpeakerSegment.speaker : currentSegment.speaker; - - if (!matchingSpeakerSegment) { - console.log('---------> Speaker unknown') - } - - // If the current segment is for the same speaker, append the word - if (currentSegment.speaker === assignedSpeaker) { - currentSegment.text += word.text; - currentSegment.timestamps.to = word.timestamps.to; // Update end time - } else if (currentSegment === null) { - currentSegment.speaker = assignedSpeaker; - currentSegment.text += word.text - currentSegment.timestamps.to = word.timestamps.to; // Update end time - - } else { - // Push the current segment if it has text - if (currentSegment.text.length > 0) { - finalTranscript.push({ ...currentSegment }); - } - - // Start a new segment for the new speaker - currentSegment = { - text: word.text, - timestamps: { from: word.timestamps.from, to: word.timestamps.to }, - speaker: assignedSpeaker - }; - } - }); - - // Push the last segment if any - if (currentSegment.text.length > 0) { - finalTranscript.push(currentSegment); - } - - return finalTranscript; -} + const speakerSegments = parseRttm(rttmString); + const wordTimestamps = preprocessWordTimestamps(wordys); + + const finalTranscript = []; + let currentSegment = { + text: '', + timestamps: { from: null, to: null }, + speaker: null + }; + + wordTimestamps.forEach((word) => { + const wordStart = word.offsets.from; + const wordEnd = word.offsets.to; + + const matchingSpeakerSegment = speakerSegments.find((speakerSegment) => { + const speakerStart = speakerSegment.startTime * 1000; + const speakerEnd = speakerStart + speakerSegment.duration * 1000; + return wordEnd >= speakerStart && wordEnd <= speakerEnd; + }); -function timestampToSeconds(timestamp) { - const [hours, minutes, seconds] = timestamp.split(":"); - const [sec, ms] = seconds.split(","); - return parseFloat(hours) * 3600 + parseFloat(minutes) * 60 + parseFloat(sec) + parseFloat(ms) / 1000; -} + const assignedSpeaker = matchingSpeakerSegment + ? matchingSpeakerSegment.speaker + : currentSegment.speaker; + + if (!matchingSpeakerSegment) { + console.log('---------> Speaker unknown'); + } + + // If the current segment is for the same speaker, append the word + if (currentSegment.speaker === assignedSpeaker) { + currentSegment.text += word.text; + currentSegment.timestamps.to = word.timestamps.to; // Update end time + } else if (currentSegment === null) { + currentSegment.speaker = assignedSpeaker; + currentSegment.text += word.text; + currentSegment.timestamps.to = word.timestamps.to; // Update end time + } else { + // Push the current segment if it has text + if (currentSegment.text.length > 0) { + finalTranscript.push({ ...currentSegment }); + } + + // Start a new segment for the new speaker + currentSegment = { + text: word.text, + timestamps: { from: word.timestamps.from, to: word.timestamps.to }, + speaker: assignedSpeaker + }; + } + }); + // Push the last segment if any + if (currentSegment.text.length > 0) { + finalTranscript.push(currentSegment); + } + return finalTranscript; +} +function timestampToSeconds(timestamp) { + const [hours, minutes, seconds] = timestamp.split(':'); + const [sec, ms] = seconds.split(','); + return ( + parseFloat(hours) * 3600 + parseFloat(minutes) * 60 + parseFloat(sec) + parseFloat(ms) / 1000 + ); +} diff --git a/src/lib/wizardQueue.ts b/src/lib/wizardQueue.ts index 67547ff..325ea6e 100644 --- a/src/lib/wizardQueue.ts +++ b/src/lib/wizardQueue.ts @@ -15,7 +15,6 @@ const pb = new PocketBase(env.POCKETBASE_URL); pb.autoCancellation(false); await pb.admins.authWithPassword(env.POCKETBASE_ADMIN_EMAIL, env.POCKETBASE_ADMIN_PASSWORD); - // Remove all jobs from the queue async function clearQueue() { await wizardQueue.drain(); @@ -93,58 +92,56 @@ const execCommandWithLogging = (cmd, job) => { }); }); }; -; - -export const execCommandWithLoggingSync = (cmd: string, job: any): Promise => { - return new Promise((resolve, reject) => { - const process = exec(cmd, { shell: true, maxBuffer: 1024 * 1024 * 10 }); // Max buffer for larger outputs - - process.stdout.on('data', async (data) => { - console.log(`stdout: ${data}`); - await job.log(`stdout: ${data}`); - }); - - process.stderr.on('data', async (data) => { - console.error(`stderr: ${data}`); - await job.log(`stderr: ${data}`); - }); - - process.on('close', (code) => { - if (code === 0) { - resolve(true); - } else { - reject(new Error(`Command failed with exit code ${code !== null ? code : 'unknown'}`)); - } - }); - - process.on('error', (err) => { - reject(new Error(`Failed to start process: ${err.message}`)); - }); - }); +export const execCommandWithLoggingSync = (cmd: string, job: any): Promise => { + return new Promise((resolve, reject) => { + const process = exec(cmd, { shell: true, maxBuffer: 1024 * 1024 * 10 }); // Max buffer for larger outputs + + process.stdout.on('data', async (data) => { + console.log(`stdout: ${data}`); + await job.log(`stdout: ${data}`); + }); + + process.stderr.on('data', async (data) => { + console.error(`stderr: ${data}`); + await job.log(`stderr: ${data}`); + }); + + process.on('close', (code) => { + if (code === 0) { + resolve(true); + } else { + reject(new Error(`Command failed with exit code ${code !== null ? code : 'unknown'}`)); + } + }); + + process.on('error', (err) => { + reject(new Error(`Failed to start process: ${err.message}`)); + }); + }); }; // Set up the worker to process jobs automatically const worker = new Worker( 'wizardQueue', async (job) => { - console.log("hello world from wizard") + console.log('hello world from wizard'); ensureCollectionExists(pb); let modelPath; let cmd; const isDevMode = env.DEV_MODE === 'true' || env.DEV_MODE === true; if (isDevMode) { - modelPath = path.resolve(env.SCRIBO_FILES, 'models/whisper.cpp') + modelPath = path.resolve(env.SCRIBO_FILES, 'models/whisper.cpp'); } else { - modelPath = path.resolve('/models/whisper.cpp') + modelPath = path.resolve('/models/whisper.cpp'); } try { - const {settings }= job.data; - await job.log('starting job') - cmd = `make clean -C ${modelPath}` + const { settings } = job.data; + await job.log('starting job'); + cmd = `make clean -C ${modelPath}`; await execCommandWithLogging(cmd, job); - const isNvidia= env.NVIDIA === 'true' || env.NVIDIA === true; + const isNvidia = env.NVIDIA === 'true' || env.NVIDIA === true; if (isNvidia) { cmd = `GGML_CUDA=1 make -j -C ${modelPath}`; @@ -152,17 +149,17 @@ const worker = new Worker( cmd = `make -C ${modelPath}`; } await execCommandWithLogging(cmd, job); - await job.log('finished making whisper') - job.updateProgress(50) + await job.log('finished making whisper'); + job.updateProgress(50); - cmd = `python3 -m pip install --no-cache-dir pyannote.audio` + cmd = `python3 -m pip install --no-cache-dir pyannote.audio`; await execCommandWithLogging(cmd, job); - await job.log('finished installing pyannote') - job.updateProgress(75) + await job.log('finished installing pyannote'); + job.updateProgress(75); const modToDownload = modelsToDownload(settings); - console.log(modToDownload) - await job.log(modToDownload) + console.log(modToDownload); + await job.log(modToDownload); const isDevMode = env.DEV_MODE === 'true' || env.DEV_MODE === true; @@ -170,83 +167,81 @@ const worker = new Worker( let cmd2; if (isDevMode) { - cmd2 = `sh ${modelPath}/models/download-ggml-model.sh ${m}.en`; + cmd2 = `sh ${modelPath}/models/download-ggml-model.sh ${m}.en`; } else { - cmd2 = `sh ${modelPath}/models/download-ggml-model.sh ${m}.en /models`; + cmd2 = `sh ${modelPath}/models/download-ggml-model.sh ${m}.en /models`; } - await job.log(`Executing command: ${cmd2}`); - execCommandWithLoggingSync(cmd2, job); - const prg = 75 + (25 * (idx + 1) / modelsToDownload.length); // idx + 1 ensures progress increments - await job.updateProgress(prg); + await job.log(`Executing command: ${cmd2}`); + execCommandWithLoggingSync(cmd2, job); + const prg = 75 + (25 * (idx + 1)) / modelsToDownload.length; // idx + 1 ensures progress increments + await job.updateProgress(prg); }); - await job.log('finished job') - } catch(error) { + await job.log('finished job'); + } catch (error) { console.log(error); - job.log(error) + job.log(error); } - - console.log(`DEVMODE ------>>>>>> ${isDevMode}`) - job.log(`DEVMODE ------>>>>>> ${isDevMode}`) + console.log(`DEVMODE ------>>>>>> ${isDevMode}`); + job.log(`DEVMODE ------>>>>>> ${isDevMode}`); if (!isDevMode) { - console.log("eecuting copy") - job.log("eecuting copy") + console.log('eecuting copy'); + job.log('eecuting copy'); cmd = `cp ${modelPath}/main /usr/local/bin/whisper`; - console.log(cmd) - job.log(cmd) - execCommandWithLoggingSync(cmd, job); - job.log("COPIED WHISPER BINARY") - console.log("COPIED WHISPER BINARY") + console.log(cmd); + job.log(cmd); + execCommandWithLoggingSync(cmd, job); + job.log('COPIED WHISPER BINARY'); + console.log('COPIED WHISPER BINARY'); } - const settt = await pb.collection('settings').getList(1,1); + const settt = await pb.collection('settings').getList(1, 1); if (settt && settt.items.length > 0) { - const record = settt.items[0]; // Get the first record (assuming one record is returned) + const record = settt.items[0]; // Get the first record (assuming one record is returned) - // Update the 'wizard' field to true - const updatedRecord = await pb.collection('settings').update(record.id, { - wizard: true - }); + // Update the 'wizard' field to true + const updatedRecord = await pb.collection('settings').update(record.id, { + wizard: true + }); - console.log('Updated record:', updatedRecord); + console.log('Updated record:', updatedRecord); } else { - console.log('No records found in settings collection'); + console.log('No records found in settings collection'); } - job.updateProgress(100) + job.updateProgress(100); }, { connection: { host: env.REDIS_HOST, port: env.REDIS_PORT }, // Redis connection concurrency: 1, // Allows multiple jobs to run concurrently lockDuration: 500000, // Lock duration (in milliseconds), e.g., 5 minutes - lockRenewTime: 500000 + lockRenewTime: 500000 } ); function modelsToDownload(settings) { - let m = []; + let m = []; - const set = JSON.parse(settings) - console.log('Settings:', settings); + const set = JSON.parse(settings); + console.log('Settings:', settings); console.log('Models:', set.models); - - if (set.models.small) { - m.push('small'); - } - if (set.models.tiny) { - m.push('tiny'); - } - if (set.models.medium) { - m.push('medium'); - } - if (set.models.largev1) { - m.push('large-v1'); - } - - return m; + if (set.models.small) { + m.push('small'); + } + if (set.models.tiny) { + m.push('tiny'); } + if (set.models.medium) { + m.push('medium'); + } + if (set.models.largev1) { + m.push('large-v1'); + } + + return m; +} diff --git a/start_services.sh b/start_services.sh index d16c350..52264f4 100644 --- a/start_services.sh +++ b/start_services.sh @@ -4,6 +4,7 @@ echo "Creating admin with email: ${POCKETBASE_ADMIN_EMAIL}" echo "PocketBase URL: ${POCKETBASE_URL}" +cp -r /app/whisper.cpp /models/ # Start PocketBase in the background # pocketbase serve --http=0.0.0.0:8080 --dir /app/db &