Skip to content

Commit

Permalink
fix(bug): missing concurrency parameters
Browse files Browse the repository at this point in the history
  • Loading branch information
rishikanthc committed Oct 21, 2024
1 parent 8b9e789 commit ff9681c
Show file tree
Hide file tree
Showing 4 changed files with 226 additions and 227 deletions.
1 change: 1 addition & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ services:
- REDIS_PORT=6379
- SCRIBO_FILES=/scriberr
- DEV_MODE=false
- CONCURRENCY=1
volumes:
- ./scriberr/pb_data:/app/db
- ./scriberr:/scriberr
Expand Down
276 changes: 139 additions & 137 deletions src/lib/queue.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Queue, Worker } from 'bullmq';
import { exec } from 'child_process';
import { wizardQueue } from './wizardQueue';
import { wizardQueue } from './wizardQueue';
import fs from 'fs';
import path from 'path';
import PocketBase from 'pocketbase';
Expand All @@ -18,6 +18,7 @@ export const transcriptionQueue = new Queue('transcriptionQueue', {
const pb = new PocketBase(env.POCKETBASE_URL);
pb.autoCancellation(false);
await pb.admins.authWithPassword(env.POCKETBASE_ADMIN_EMAIL, env.POCKETBASE_ADMIN_PASSWORD);
const concur = Number(env.CONCURRENCY);

// Create an express app
const app = express();
Expand Down Expand Up @@ -111,8 +112,8 @@ const execCommandWithLogging = (cmd: string, job: Job, progress: number) => {
// return;
// }

const _remaining = 95 - progress
const _prog = _remaining * tprogress / 100
const _remaining = 95 - progress;
const _prog = (_remaining * tprogress) / 100;

await job.updateProgress(_prog);
}
Expand All @@ -127,7 +128,6 @@ const execCommandWithLogging = (cmd: string, job: Job, progress: number) => {
}
});


process.on('error', (err) => {
reject(new Error(`Failed to start process: ${err.message}`));
});
Expand Down Expand Up @@ -173,33 +173,37 @@ const worker = new Worker(
const audiowaveformCmd = `audiowaveform -i ${ffmpegPath} -o ${audioPath}.json`;
await execCommandWithLogging(audiowaveformCmd, job);
await job.log(`Audiowaveform for ${recordId} generated`);

const settingsRecords = await pb.collection('settings').getList(1, 1);
const settings = settingsRecords.items[0];

// Execute whisper.cpp command and log output
const transcriptdir = path.resolve(env.SCRIBO_FILES, 'transcripts', `${recordId}`);
const transcriptPath= path.resolve(env.SCRIBO_FILES, 'transcripts', `${recordId}`, `${recordId}`);
const transcriptPath = path.resolve(
env.SCRIBO_FILES,
'transcripts',
`${recordId}`,
`${recordId}`
);
fs.mkdir(transcriptdir, { recursive: true }, (err) => {
if (err) throw err;
});

let whisperCmd;
console.log(env.DEV_MODE)
job.log(env.DEV_MODE)
console.log(env.DEV_MODE);
job.log(env.DEV_MODE);

const isDevMode = env.DEV_MODE === 'true' || env.DEV_MODE === true;

console.log("DEV MODE ----->", isDevMode)
job.log(`DEV MODE -----> ${isDevMode}`)
console.log('DEV MODE ----->', isDevMode);
job.log(`DEV MODE -----> ${isDevMode}`);

if (isDevMode) {
whisperCmd = `./whisper.cpp/main -m ./whisper.cpp/models/ggml-${settings.model}.en.bin -f ${ffmpegPath} -oj -of ${transcriptPath} -t ${settings.threads} -p ${settings.processors} -pp`;
} else {
whisperCmd = `whisper -m /models/ggml-${settings.model}.en.bin -f ${ffmpegPath} -oj -of ${transcriptPath} -t ${settings.threads} -p ${settings.processors} -pp`;
}


let rttmContent;
let segments;

Expand Down Expand Up @@ -235,30 +239,28 @@ const worker = new Worker(
let upd;

if (settings.diarize) {
const diarizedTranscript = generateTranscript(transcriptJson.transcription, rttmContent)
const diarizedJson = {transcription: diarizedTranscript};
const diarizedTranscript = generateTranscript(transcriptJson.transcription, rttmContent);
const diarizedJson = { transcription: diarizedTranscript };

upd = await pb.collection('scribo').update(recordId, {
// transcript: '{ "test": "hi" }',
transcript: transcriptJson,
diarizedtranscript: diarizedJson,
rttm: rttmContent,
processed: true,
diarized: true,
peaks: JSON.parse(audioPeaks)
// transcript: '{ "test": "hi" }',
transcript: transcriptJson,
diarizedtranscript: diarizedJson,
rttm: rttmContent,
processed: true,
diarized: true,
peaks: JSON.parse(audioPeaks)
});

} else {

upd = await pb.collection('scribo').update(recordId, {
// transcript: '{ "test": "hi" }',
transcript: transcriptJson,
processed: true,
diarized: false,
peaks: JSON.parse(audioPeaks)
// transcript: '{ "test": "hi" }',
transcript: transcriptJson,
processed: true,
diarized: false,
peaks: JSON.parse(audioPeaks)
});
}

await job.log(`Updated PocketBase record for ${recordId}`);
console.log('UPDATED +++++ ', upd);

Expand All @@ -281,7 +283,7 @@ const worker = new Worker(
},
{
connection: { host: env.REDIS_HOST, port: env.REDIS_PORT }, // Redis connection
concurrency: env.CONCURRENCY // Allows multiple jobs to run concurrently
concurrency: concur || 1 // Allows multiple jobs to run concurrently
}
);

Expand Down Expand Up @@ -327,118 +329,118 @@ async function splitAudioIntoSegments(audioPath, segments, outputDir, job) {
}

function preprocessWordTimestamps(wordTimestamps) {
const cleanedTimestamps = [];
let previousWord = null;

wordTimestamps.forEach((word, index) => {
const text = word.text.trim();

// Handle periods and other punctuation
if (text === '.') {
if (previousWord) {
// Append the period to the previous word
previousWord.text += text;
previousWord.timestamps.to = word.timestamps.to;
}
} else if (text.startsWith("'")) {
// Append apostrophe-starting words to the previous word
if (previousWord) {
previousWord.text += text;
previousWord.timestamps.to = word.timestamps.to;
}
} else if (text.length === 1 && text !== 'a' && text !== 'i' && text !== 'I') {
// Handle single character words (except "a")
// if (previousWord) {
// // Append single character to the previous word
// previousWord.text += ` ${text}`;
// previousWord.timestamps.to = word.timestamps.to;
// } else if (index + 1 < wordTimestamps.length) {
// // If no previous word, prepend to the next word
// const nextWord = wordTimestamps[index + 1];
// nextWord.text = `${text} ${nextWord.text}`;
// nextWord.timestamps.from = word.timestamps.from;
// }
console.log('deleting char')
} else if (text.length === 1 && (text === 'a' || text === 'I' || text === 'i')) {
// Keep "a" as a separate word
cleanedTimestamps.push(word);
previousWord = word;
} else {
// Remove other single-character symbols (e.g., parentheses, commas)
if (!/^[\.,!?;:()\[\]]$/.test(text)) {
cleanedTimestamps.push(word);
previousWord = word;
}
}
});

return cleanedTimestamps;
const cleanedTimestamps = [];
let previousWord = null;

wordTimestamps.forEach((word, index) => {
const text = word.text.trim();

// Handle periods and other punctuation
if (text === '.') {
if (previousWord) {
// Append the period to the previous word
previousWord.text += text;
previousWord.timestamps.to = word.timestamps.to;
}
} else if (text.startsWith("'")) {
// Append apostrophe-starting words to the previous word
if (previousWord) {
previousWord.text += text;
previousWord.timestamps.to = word.timestamps.to;
}
} else if (text.length === 1 && text !== 'a' && text !== 'i' && text !== 'I') {
// Handle single character words (except "a")
// if (previousWord) {
// // Append single character to the previous word
// previousWord.text += ` ${text}`;
// previousWord.timestamps.to = word.timestamps.to;
// } else if (index + 1 < wordTimestamps.length) {
// // If no previous word, prepend to the next word
// const nextWord = wordTimestamps[index + 1];
// nextWord.text = `${text} ${nextWord.text}`;
// nextWord.timestamps.from = word.timestamps.from;
// }
console.log('deleting char');
} else if (text.length === 1 && (text === 'a' || text === 'I' || text === 'i')) {
// Keep "a" as a separate word
cleanedTimestamps.push(word);
previousWord = word;
} else {
// Remove other single-character symbols (e.g., parentheses, commas)
if (!/^[\.,!?;:()\[\]]$/.test(text)) {
cleanedTimestamps.push(word);
previousWord = word;
}
}
});

return cleanedTimestamps;
}

function generateTranscript(wordys, rttmString) {
const speakerSegments = parseRttm(rttmString);
const wordTimestamps = preprocessWordTimestamps(wordys);

const finalTranscript = [];
let currentSegment = {
text: "",
timestamps: { from: null, to: null },
speaker: null
};

wordTimestamps.forEach(word => {
const wordStart = word.offsets.from;
const wordEnd = word.offsets.to;

const matchingSpeakerSegment = speakerSegments.find(speakerSegment => {
const speakerStart = speakerSegment.startTime * 1000;
const speakerEnd = speakerStart + (speakerSegment.duration * 1000);
return wordEnd >= speakerStart && wordEnd <= speakerEnd;
});

const assignedSpeaker = matchingSpeakerSegment ? matchingSpeakerSegment.speaker : currentSegment.speaker;

if (!matchingSpeakerSegment) {
console.log('---------> Speaker unknown')
}

// If the current segment is for the same speaker, append the word
if (currentSegment.speaker === assignedSpeaker) {
currentSegment.text += word.text;
currentSegment.timestamps.to = word.timestamps.to; // Update end time
} else if (currentSegment === null) {
currentSegment.speaker = assignedSpeaker;
currentSegment.text += word.text
currentSegment.timestamps.to = word.timestamps.to; // Update end time

} else {
// Push the current segment if it has text
if (currentSegment.text.length > 0) {
finalTranscript.push({ ...currentSegment });
}

// Start a new segment for the new speaker
currentSegment = {
text: word.text,
timestamps: { from: word.timestamps.from, to: word.timestamps.to },
speaker: assignedSpeaker
};
}
});

// Push the last segment if any
if (currentSegment.text.length > 0) {
finalTranscript.push(currentSegment);
}

return finalTranscript;
}
const speakerSegments = parseRttm(rttmString);
const wordTimestamps = preprocessWordTimestamps(wordys);

const finalTranscript = [];
let currentSegment = {
text: '',
timestamps: { from: null, to: null },
speaker: null
};

wordTimestamps.forEach((word) => {
const wordStart = word.offsets.from;
const wordEnd = word.offsets.to;

const matchingSpeakerSegment = speakerSegments.find((speakerSegment) => {
const speakerStart = speakerSegment.startTime * 1000;
const speakerEnd = speakerStart + speakerSegment.duration * 1000;
return wordEnd >= speakerStart && wordEnd <= speakerEnd;
});

function timestampToSeconds(timestamp) {
const [hours, minutes, seconds] = timestamp.split(":");
const [sec, ms] = seconds.split(",");
return parseFloat(hours) * 3600 + parseFloat(minutes) * 60 + parseFloat(sec) + parseFloat(ms) / 1000;
}
const assignedSpeaker = matchingSpeakerSegment
? matchingSpeakerSegment.speaker
: currentSegment.speaker;

if (!matchingSpeakerSegment) {
console.log('---------> Speaker unknown');
}

// If the current segment is for the same speaker, append the word
if (currentSegment.speaker === assignedSpeaker) {
currentSegment.text += word.text;
currentSegment.timestamps.to = word.timestamps.to; // Update end time
} else if (currentSegment === null) {
currentSegment.speaker = assignedSpeaker;
currentSegment.text += word.text;
currentSegment.timestamps.to = word.timestamps.to; // Update end time
} else {
// Push the current segment if it has text
if (currentSegment.text.length > 0) {
finalTranscript.push({ ...currentSegment });
}

// Start a new segment for the new speaker
currentSegment = {
text: word.text,
timestamps: { from: word.timestamps.from, to: word.timestamps.to },
speaker: assignedSpeaker
};
}
});

// Push the last segment if any
if (currentSegment.text.length > 0) {
finalTranscript.push(currentSegment);
}

return finalTranscript;
}

function timestampToSeconds(timestamp) {
const [hours, minutes, seconds] = timestamp.split(':');
const [sec, ms] = seconds.split(',');
return (
parseFloat(hours) * 3600 + parseFloat(minutes) * 60 + parseFloat(sec) + parseFloat(ms) / 1000
);
}
Loading

0 comments on commit ff9681c

Please sign in to comment.