Skip to content

Commit

Permalink
reduce temperature, fix OnceLock bug
Browse files Browse the repository at this point in the history
  • Loading branch information
scald committed Nov 29, 2024
1 parent e1ecb4d commit b56b26a
Show file tree
Hide file tree
Showing 3 changed files with 193 additions and 211 deletions.
128 changes: 57 additions & 71 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,19 @@
use anyhow::Result;
use axum::{
extract::{Multipart, Query, State},
extract::{ Multipart, Query, State },
http::StatusCode,
response::Json,
routing::post,
Router,
};
use bytes::Bytes;
use eyeris::providers::TokenUsage;
use eyeris::{processor::ImageProcessor, prompts::PromptFormat, providers::AIProvider};
use serde::{Deserialize, Serialize};
use eyeris::{ processor::ImageProcessor, prompts::PromptFormat, providers::AIProvider };
use serde::{ Deserialize, Serialize };
use std::sync::Arc;
use std::sync::OnceLock;
use std::time::Instant;
use tokio::sync::Semaphore;

// Create a static processor pool
static PROCESSOR_POOL: OnceLock<Arc<ImageProcessor>> = OnceLock::new();

#[derive(Clone)]
struct AppState {
semaphore: Arc<Semaphore>, // Rate limiting
Expand Down Expand Up @@ -47,11 +43,11 @@ struct ProcessOptions {
}

fn default_provider() -> String {
"ollama".to_string()
"openai".to_string()
}

fn default_model() -> String {
"moondream".to_string()
"gpt-4o".to_string()
}

#[tokio::main]
Expand All @@ -65,9 +61,7 @@ async fn main() -> Result<()> {
};

// Build router
let app = Router::new()
.route("/process", post(process_image))
.with_state(state);
let app = Router::new().route("/process", post(process_image)).with_state(state);

// Start server
let addr = "0.0.0.0:3000";
Expand All @@ -82,60 +76,49 @@ async fn main() -> Result<()> {
async fn process_image(
State(state): State<AppState>,
Query(options): Query<ProcessOptions>,
mut multipart: Multipart,
mut multipart: Multipart
) -> Result<Json<ProcessResponse>, StatusCode> {
let start = Instant::now();

// Get or initialize the processor pool
let processor = PROCESSOR_POOL.get_or_init(|| {
let init_start = Instant::now();
let processor = Arc::new(ImageProcessor::new(
match options.provider.to_lowercase().as_str() {
"openai" => AIProvider::OpenAI,
"ollama" => AIProvider::Ollama,
_ => AIProvider::OpenAI,
},
Some(options.model),
Some(options.format),
));
tracing::info!(
duration_ms = init_start.elapsed().as_millis(),
"Processor pool initialized"
);
processor
});
// Create a new processor for each request instead of using the pool
let init_start = Instant::now();
let processor = ImageProcessor::new(
match options.provider.to_lowercase().as_str() {
"openai" => AIProvider::OpenAI,
"ollama" => AIProvider::Ollama,
_ => AIProvider::OpenAI,
},
Some(options.model),
Some(options.format)
);
tracing::info!(duration_ms = init_start.elapsed().as_millis(), "Processor initialized");

let permit_start = Instant::now();
let _permit = state
.semaphore
.acquire()
.await
.map_err(|_| StatusCode::SERVICE_UNAVAILABLE)?;
tracing::info!(
duration_ms = permit_start.elapsed().as_millis(),
"Rate limit permit acquired"
);
let _permit = state.semaphore.acquire().await.map_err(|_| StatusCode::SERVICE_UNAVAILABLE)?;
tracing::info!(duration_ms = permit_start.elapsed().as_millis(), "Rate limit permit acquired");

let multipart_start = Instant::now();
let image_data: Bytes = if let Some(field) = multipart
.next_field()
.await
.map_err(|_| StatusCode::BAD_REQUEST)?
let image_data: Bytes = if
let Some(field) = multipart.next_field().await.map_err(|_| StatusCode::BAD_REQUEST)?
{
if field.name().unwrap_or("") != "image" {
return Ok(Json(ProcessResponse {
success: false,
message: "No image field found".to_string(),
data: None,
}));
return Ok(
Json(ProcessResponse {
success: false,
message: "No image field found".to_string(),
data: None,
})
);
}
field.bytes().await.map_err(|_| StatusCode::BAD_REQUEST)?
} else {
return Ok(Json(ProcessResponse {
success: false,
message: "No image provided".to_string(),
data: None,
}));
return Ok(
Json(ProcessResponse {
success: false,
message: "No image provided".to_string(),
data: None,
})
);
};
tracing::info!(
duration_ms = multipart_start.elapsed().as_millis(),
Expand All @@ -145,29 +128,32 @@ async fn process_image(

let process_start = Instant::now();
let result = processor.process(&image_data).await;
tracing::info!(
duration_ms = process_start.elapsed().as_millis(),
"Image processing completed"
);
tracing::info!(duration_ms = process_start.elapsed().as_millis(), "Image processing completed");

tracing::info!(
total_duration_ms = start.elapsed().as_millis(),
"Total request handling completed"
);

match result {
Ok((analysis, token_usage)) => Ok(Json(ProcessResponse {
success: true,
message: "Image processed successfully".to_string(),
data: Some(ProcessedData {
analysis,
token_usage,
}),
})),
Err(e) => Ok(Json(ProcessResponse {
success: false,
message: format!("Processing failed: {}", e),
data: None,
})),
Ok((analysis, token_usage)) =>
Ok(
Json(ProcessResponse {
success: true,
message: "Image processed successfully".to_string(),
data: Some(ProcessedData {
analysis,
token_usage,
}),
})
),
Err(e) =>
Ok(
Json(ProcessResponse {
success: false,
message: format!("Processing failed: {}", e),
data: None,
})
),
}
}
Loading

0 comments on commit b56b26a

Please sign in to comment.