Skip to content

Commit

Permalink
Handle Output Create Queue
Browse files Browse the repository at this point in the history
  • Loading branch information
judegiordano committed Nov 22, 2023
1 parent 8d94277 commit a5203a8
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 37 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ name = "migrate-indexes"
path = "src/bin/scripts/migrate-indexes.rs"

[[bin]]
name = "clone-voice"
path = "src/bin/handlers/queues/clone-voice.rs"
name = "create-output"
path = "src/bin/handlers/queues/create-output.rs"

[[bin]]
name = "sample-uploaded"
Expand Down
25 changes: 0 additions & 25 deletions src/bin/handlers/queues/clone-voice.rs

This file was deleted.

49 changes: 49 additions & 0 deletions src/bin/handlers/queues/create-output.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use anyhow::Result;
use mongoose::{bson::doc, Model};
use parrot_api::{
aws::{s3::Client, sqs::FifoQueue},
eleven_labs::ElevenLabs,
env::Config,
logger,
models::output::Output,
models::{output::OutputStatus, voice::Voice},
types::CreateOutputFifoMessage,
};

#[allow(unused_variables)]
#[tokio::main]
pub async fn main() -> Result<()> {
logger::init()?;
let config = Config::new()?;
let voice_api = ElevenLabs::new()?;
let sqs = FifoQueue::new(config.create_output_queue_url).await;
let outputs_bucket = Client::new(&config.outputs_bucket_name).await;
let messages = sqs
.receive_fifo_message::<CreateOutputFifoMessage>()
.await?;
for message in messages {
let output = Output::read_by_id(&message.output_id).await?;
let voice = Voice::read_by_id(&output.voice).await?;
if voice.eleven_labs_id.is_none() {
anyhow::bail!("no eleven labs id supplied");
};
let bytes = voice_api
.text_to_speech(&voice.eleven_labs_id.unwrap(), &output.text)
.await?;
let updated = Output::update(
doc! {
"_id": output.id
},
doc! {
"status": OutputStatus::Done.to_string()
},
)
.await?;
outputs_bucket
.put_object(&updated.id, bytes.to_vec())
.await?;
// TODO: send server side event of process complete
tracing::info!("OUTPUT: {:?}", updated);
}
Ok(())
}
19 changes: 16 additions & 3 deletions src/controllers/outputs/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,26 @@ use serde::{Deserialize, Serialize};
use serde_json::json;

use crate::{
aws::sqs::{FifoMessage, FifoQueue},
env::Config,
errors::ApiResponse,
helpers::authenticate,
models::{
output::Output,
voice::{Voice, VoiceStatus},
},
types::CreateOutputFifoMessage,
};

#[derive(Deserialize, Serialize)]
pub struct OutputPayload {
voice_name: String,
voice_id: String,
text: String,
}

pub async fn create_output(req: HttpRequest, body: web::Json<OutputPayload>) -> ApiResponse {
authenticate(req).await?;
let name = slug::slugify(body.voice_name.to_string());
let voice = match Voice::read(doc! { "name": name }).await {
let voice = match Voice::read_by_id(&body.voice_id).await {
Ok(voice) => voice,
Err(_) => return Ok(HttpResponse::NotFound().json(json!({ "error": "no voice found" }))),
};
Expand All @@ -34,5 +36,16 @@ pub async fn create_output(req: HttpRequest, body: web::Json<OutputPayload>) ->
..Default::default()
};
let output = output.save().await?;
let config = Config::new()?;
let sqs = FifoQueue::new(config.create_output_queue_url).await;
// push to FIFO
sqs.send_fifo_message::<CreateOutputFifoMessage>(FifoMessage {
body: CreateOutputFifoMessage {
output_id: output.id.to_string(),
},
group: output.voice.to_string(),
deduplication_id: output.id.to_string(),
})
.await?;
Ok(HttpResponse::Created().json(output))
}
8 changes: 4 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub mod env {
pub log_level: Level,
pub eleven_labs_api_key: String,
pub authentication_token: String,
pub clone_voice_queue_url: String,
pub create_output_queue_url: String,
pub samples_bucket_name: String,
pub outputs_bucket_name: String,
}
Expand All @@ -50,7 +50,7 @@ pub mod env {
},
eleven_labs_api_key: std::env::var("ELEVEN_LABS_API_KEY")?,
authentication_token: std::env::var("AUTHENTICATION_TOKEN")?,
clone_voice_queue_url: std::env::var("CLONE_VOICE_QUEUE_URL")?,
create_output_queue_url: std::env::var("CREATE_OUTPUT_QUEUE_URL")?,
samples_bucket_name: std::env::var("SAMPLES_BUCKET_NAME")?,
outputs_bucket_name: std::env::var("OUTPUTS_BUCKET_NAME")?,
})
Expand Down Expand Up @@ -165,7 +165,7 @@ pub mod types {
use serde::{Deserialize, Serialize};

#[derive(Deserialize, Serialize)]
pub struct CloneVoiceFifoMessage {
pub voice_id: String,
pub struct CreateOutputFifoMessage {
pub output_id: String,
}
}
6 changes: 3 additions & 3 deletions sst.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ import { type SSTConfig } from 'sst'
import { Bucket, Function, Queue, type StackContext } from 'sst/constructs'

function ApiStack({ stack }: StackContext) {
const cloneVoiceQueue = new Queue(stack, 'clone-voice-fifo', {
consumer: 'src/bin/handlers/queues/clone-voice.rs',
const createOutputQueue = new Queue(stack, 'create-output-fifo', {
consumer: 'src/bin/handlers/queues/create-output.rs',
cdk: { queue: { fifo: true } }
})
const api = new Function(stack, 'api', {
Expand Down Expand Up @@ -32,7 +32,7 @@ function ApiStack({ stack }: StackContext) {

const functions = stack.getAllFunctions()
functions.forEach((fn) => {
fn.addEnvironment('CLONE_VOICE_QUEUE_URL', cloneVoiceQueue.cdk.queue.queueUrl)
fn.addEnvironment('CREATE_OUTPUT_QUEUE_URL', createOutputQueue.cdk.queue.queueUrl)
fn.addEnvironment('SAMPLES_BUCKET_NAME', sampleBucket.bucketName)
fn.addEnvironment('OUTPUTS_BUCKET_NAME', outputBucket.bucketName)
fn.attachPermissions(['s3', 'sqs'])
Expand Down

0 comments on commit a5203a8

Please sign in to comment.