From a5203a8bc2a496140678209fa605bde3afc6634a Mon Sep 17 00:00:00 2001 From: judegiordano Date: Wed, 22 Nov 2023 17:31:33 -0500 Subject: [PATCH] Handle Output Create Queue --- Cargo.toml | 4 +- src/bin/handlers/queues/clone-voice.rs | 25 ------------ src/bin/handlers/queues/create-output.rs | 49 ++++++++++++++++++++++++ src/controllers/outputs/controller.rs | 19 +++++++-- src/lib.rs | 8 ++-- sst.config.ts | 6 +-- 6 files changed, 74 insertions(+), 37 deletions(-) delete mode 100644 src/bin/handlers/queues/clone-voice.rs create mode 100644 src/bin/handlers/queues/create-output.rs diff --git a/Cargo.toml b/Cargo.toml index 2b7aac0..d5ff024 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,8 +44,8 @@ name = "migrate-indexes" path = "src/bin/scripts/migrate-indexes.rs" [[bin]] -name = "clone-voice" -path = "src/bin/handlers/queues/clone-voice.rs" +name = "create-output" +path = "src/bin/handlers/queues/create-output.rs" [[bin]] name = "sample-uploaded" diff --git a/src/bin/handlers/queues/clone-voice.rs b/src/bin/handlers/queues/clone-voice.rs deleted file mode 100644 index ed0c650..0000000 --- a/src/bin/handlers/queues/clone-voice.rs +++ /dev/null @@ -1,25 +0,0 @@ -use anyhow::Result; -use mongoose::Model; -use parrot_api::{ - aws::{s3::Client, sqs::FifoQueue}, - eleven_labs::ElevenLabs, - env::Config, - logger, - models::voice::Voice, - types::CloneVoiceFifoMessage, -}; - -#[allow(unused_variables)] -#[tokio::main] -pub async fn main() -> Result<()> { - logger::init()?; - let config = Config::new()?; - let voice_api = ElevenLabs::new()?; - let sqs = FifoQueue::new(config.clone_voice_queue_url).await; - let s3 = Client::new("parrot-api").await; - let messages = sqs.receive_fifo_message::().await?; - for message in messages { - let voice = Voice::read_by_id(&message.voice_id).await?; - } - Ok(()) -} diff --git a/src/bin/handlers/queues/create-output.rs b/src/bin/handlers/queues/create-output.rs new file mode 100644 index 0000000..81f0dd3 --- /dev/null +++ b/src/bin/handlers/queues/create-output.rs @@ -0,0 +1,49 @@ +use anyhow::Result; +use mongoose::{bson::doc, Model}; +use parrot_api::{ + aws::{s3::Client, sqs::FifoQueue}, + eleven_labs::ElevenLabs, + env::Config, + logger, + models::output::Output, + models::{output::OutputStatus, voice::Voice}, + types::CreateOutputFifoMessage, +}; + +#[allow(unused_variables)] +#[tokio::main] +pub async fn main() -> Result<()> { + logger::init()?; + let config = Config::new()?; + let voice_api = ElevenLabs::new()?; + let sqs = FifoQueue::new(config.create_output_queue_url).await; + let outputs_bucket = Client::new(&config.outputs_bucket_name).await; + let messages = sqs + .receive_fifo_message::() + .await?; + for message in messages { + let output = Output::read_by_id(&message.output_id).await?; + let voice = Voice::read_by_id(&output.voice).await?; + if voice.eleven_labs_id.is_none() { + anyhow::bail!("no eleven labs id supplied"); + }; + let bytes = voice_api + .text_to_speech(&voice.eleven_labs_id.unwrap(), &output.text) + .await?; + let updated = Output::update( + doc! { + "_id": output.id + }, + doc! { + "status": OutputStatus::Done.to_string() + }, + ) + .await?; + outputs_bucket + .put_object(&updated.id, bytes.to_vec()) + .await?; + // TODO: send server side event of process complete + tracing::info!("OUTPUT: {:?}", updated); + } + Ok(()) +} diff --git a/src/controllers/outputs/controller.rs b/src/controllers/outputs/controller.rs index 15826c3..6e1649d 100644 --- a/src/controllers/outputs/controller.rs +++ b/src/controllers/outputs/controller.rs @@ -4,24 +4,26 @@ use serde::{Deserialize, Serialize}; use serde_json::json; use crate::{ + aws::sqs::{FifoMessage, FifoQueue}, + env::Config, errors::ApiResponse, helpers::authenticate, models::{ output::Output, voice::{Voice, VoiceStatus}, }, + types::CreateOutputFifoMessage, }; #[derive(Deserialize, Serialize)] pub struct OutputPayload { - voice_name: String, + voice_id: String, text: String, } pub async fn create_output(req: HttpRequest, body: web::Json) -> ApiResponse { authenticate(req).await?; - let name = slug::slugify(body.voice_name.to_string()); - let voice = match Voice::read(doc! { "name": name }).await { + let voice = match Voice::read_by_id(&body.voice_id).await { Ok(voice) => voice, Err(_) => return Ok(HttpResponse::NotFound().json(json!({ "error": "no voice found" }))), }; @@ -34,5 +36,16 @@ pub async fn create_output(req: HttpRequest, body: web::Json) -> ..Default::default() }; let output = output.save().await?; + let config = Config::new()?; + let sqs = FifoQueue::new(config.create_output_queue_url).await; + // push to FIFO + sqs.send_fifo_message::(FifoMessage { + body: CreateOutputFifoMessage { + output_id: output.id.to_string(), + }, + group: output.voice.to_string(), + deduplication_id: output.id.to_string(), + }) + .await?; Ok(HttpResponse::Created().json(output)) } diff --git a/src/lib.rs b/src/lib.rs index bf729aa..56d5d55 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -24,7 +24,7 @@ pub mod env { pub log_level: Level, pub eleven_labs_api_key: String, pub authentication_token: String, - pub clone_voice_queue_url: String, + pub create_output_queue_url: String, pub samples_bucket_name: String, pub outputs_bucket_name: String, } @@ -50,7 +50,7 @@ pub mod env { }, eleven_labs_api_key: std::env::var("ELEVEN_LABS_API_KEY")?, authentication_token: std::env::var("AUTHENTICATION_TOKEN")?, - clone_voice_queue_url: std::env::var("CLONE_VOICE_QUEUE_URL")?, + create_output_queue_url: std::env::var("CREATE_OUTPUT_QUEUE_URL")?, samples_bucket_name: std::env::var("SAMPLES_BUCKET_NAME")?, outputs_bucket_name: std::env::var("OUTPUTS_BUCKET_NAME")?, }) @@ -165,7 +165,7 @@ pub mod types { use serde::{Deserialize, Serialize}; #[derive(Deserialize, Serialize)] - pub struct CloneVoiceFifoMessage { - pub voice_id: String, + pub struct CreateOutputFifoMessage { + pub output_id: String, } } diff --git a/sst.config.ts b/sst.config.ts index ad15fa8..b307650 100644 --- a/sst.config.ts +++ b/sst.config.ts @@ -2,8 +2,8 @@ import { type SSTConfig } from 'sst' import { Bucket, Function, Queue, type StackContext } from 'sst/constructs' function ApiStack({ stack }: StackContext) { - const cloneVoiceQueue = new Queue(stack, 'clone-voice-fifo', { - consumer: 'src/bin/handlers/queues/clone-voice.rs', + const createOutputQueue = new Queue(stack, 'create-output-fifo', { + consumer: 'src/bin/handlers/queues/create-output.rs', cdk: { queue: { fifo: true } } }) const api = new Function(stack, 'api', { @@ -32,7 +32,7 @@ function ApiStack({ stack }: StackContext) { const functions = stack.getAllFunctions() functions.forEach((fn) => { - fn.addEnvironment('CLONE_VOICE_QUEUE_URL', cloneVoiceQueue.cdk.queue.queueUrl) + fn.addEnvironment('CREATE_OUTPUT_QUEUE_URL', createOutputQueue.cdk.queue.queueUrl) fn.addEnvironment('SAMPLES_BUCKET_NAME', sampleBucket.bucketName) fn.addEnvironment('OUTPUTS_BUCKET_NAME', outputBucket.bucketName) fn.attachPermissions(['s3', 'sqs'])