From b56b26a5a62ef672801a4f26225d4cd3dce1a149 Mon Sep 17 00:00:00 2001 From: scald <1215913+scald@users.noreply.github.com> Date: Fri, 29 Nov 2024 17:02:16 +0200 Subject: [PATCH] reduce temperature, fix OnceLock bug --- src/main.rs | 128 ++++++++++------------- src/processor.rs | 223 ++++++++++++++++++++-------------------- src/providers/openai.rs | 53 +++++----- 3 files changed, 193 insertions(+), 211 deletions(-) diff --git a/src/main.rs b/src/main.rs index 1508100..39bf70c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,6 @@ use anyhow::Result; use axum::{ - extract::{Multipart, Query, State}, + extract::{ Multipart, Query, State }, http::StatusCode, response::Json, routing::post, @@ -8,16 +8,12 @@ use axum::{ }; use bytes::Bytes; use eyeris::providers::TokenUsage; -use eyeris::{processor::ImageProcessor, prompts::PromptFormat, providers::AIProvider}; -use serde::{Deserialize, Serialize}; +use eyeris::{ processor::ImageProcessor, prompts::PromptFormat, providers::AIProvider }; +use serde::{ Deserialize, Serialize }; use std::sync::Arc; -use std::sync::OnceLock; use std::time::Instant; use tokio::sync::Semaphore; -// Create a static processor pool -static PROCESSOR_POOL: OnceLock> = OnceLock::new(); - #[derive(Clone)] struct AppState { semaphore: Arc, // Rate limiting @@ -47,11 +43,11 @@ struct ProcessOptions { } fn default_provider() -> String { - "ollama".to_string() + "openai".to_string() } fn default_model() -> String { - "moondream".to_string() + "gpt-4o".to_string() } #[tokio::main] @@ -65,9 +61,7 @@ async fn main() -> Result<()> { }; // Build router - let app = Router::new() - .route("/process", post(process_image)) - .with_state(state); + let app = Router::new().route("/process", post(process_image)).with_state(state); // Start server let addr = "0.0.0.0:3000"; @@ -82,60 +76,49 @@ async fn main() -> Result<()> { async fn process_image( State(state): State, Query(options): Query, - mut multipart: Multipart, + mut multipart: Multipart ) -> Result, StatusCode> { let start = Instant::now(); - // Get or initialize the processor pool - let processor = PROCESSOR_POOL.get_or_init(|| { - let init_start = Instant::now(); - let processor = Arc::new(ImageProcessor::new( - match options.provider.to_lowercase().as_str() { - "openai" => AIProvider::OpenAI, - "ollama" => AIProvider::Ollama, - _ => AIProvider::OpenAI, - }, - Some(options.model), - Some(options.format), - )); - tracing::info!( - duration_ms = init_start.elapsed().as_millis(), - "Processor pool initialized" - ); - processor - }); + // Create a new processor for each request instead of using the pool + let init_start = Instant::now(); + let processor = ImageProcessor::new( + match options.provider.to_lowercase().as_str() { + "openai" => AIProvider::OpenAI, + "ollama" => AIProvider::Ollama, + _ => AIProvider::OpenAI, + }, + Some(options.model), + Some(options.format) + ); + tracing::info!(duration_ms = init_start.elapsed().as_millis(), "Processor initialized"); let permit_start = Instant::now(); - let _permit = state - .semaphore - .acquire() - .await - .map_err(|_| StatusCode::SERVICE_UNAVAILABLE)?; - tracing::info!( - duration_ms = permit_start.elapsed().as_millis(), - "Rate limit permit acquired" - ); + let _permit = state.semaphore.acquire().await.map_err(|_| StatusCode::SERVICE_UNAVAILABLE)?; + tracing::info!(duration_ms = permit_start.elapsed().as_millis(), "Rate limit permit acquired"); let multipart_start = Instant::now(); - let image_data: Bytes = if let Some(field) = multipart - .next_field() - .await - .map_err(|_| StatusCode::BAD_REQUEST)? + let image_data: Bytes = if + let Some(field) = multipart.next_field().await.map_err(|_| StatusCode::BAD_REQUEST)? { if field.name().unwrap_or("") != "image" { - return Ok(Json(ProcessResponse { - success: false, - message: "No image field found".to_string(), - data: None, - })); + return Ok( + Json(ProcessResponse { + success: false, + message: "No image field found".to_string(), + data: None, + }) + ); } field.bytes().await.map_err(|_| StatusCode::BAD_REQUEST)? } else { - return Ok(Json(ProcessResponse { - success: false, - message: "No image provided".to_string(), - data: None, - })); + return Ok( + Json(ProcessResponse { + success: false, + message: "No image provided".to_string(), + data: None, + }) + ); }; tracing::info!( duration_ms = multipart_start.elapsed().as_millis(), @@ -145,10 +128,7 @@ async fn process_image( let process_start = Instant::now(); let result = processor.process(&image_data).await; - tracing::info!( - duration_ms = process_start.elapsed().as_millis(), - "Image processing completed" - ); + tracing::info!(duration_ms = process_start.elapsed().as_millis(), "Image processing completed"); tracing::info!( total_duration_ms = start.elapsed().as_millis(), @@ -156,18 +136,24 @@ async fn process_image( ); match result { - Ok((analysis, token_usage)) => Ok(Json(ProcessResponse { - success: true, - message: "Image processed successfully".to_string(), - data: Some(ProcessedData { - analysis, - token_usage, - }), - })), - Err(e) => Ok(Json(ProcessResponse { - success: false, - message: format!("Processing failed: {}", e), - data: None, - })), + Ok((analysis, token_usage)) => + Ok( + Json(ProcessResponse { + success: true, + message: "Image processed successfully".to_string(), + data: Some(ProcessedData { + analysis, + token_usage, + }), + }) + ), + Err(e) => + Ok( + Json(ProcessResponse { + success: false, + message: format!("Processing failed: {}", e), + data: None, + }) + ), } } diff --git a/src/processor.rs b/src/processor.rs index 91e434d..d3d625a 100644 --- a/src/processor.rs +++ b/src/processor.rs @@ -1,11 +1,11 @@ use crate::errors::ProcessorError; -use crate::prompts::{ImagePrompt, PromptFormat}; +use crate::prompts::{ ImagePrompt, PromptFormat }; use crate::providers::TokenUsage; -use crate::providers::{AIProvider, OllamaProvider, OpenAIProvider, Provider}; +use crate::providers::{ AIProvider, OllamaProvider, OpenAIProvider, Provider }; use base64::Engine; use image::codecs::jpeg::JpegEncoder; use image::imageops::FilterType; -use image::{DynamicImage, ImageBuffer}; +use image::{ DynamicImage, ImageBuffer }; use rayon::prelude::*; use std::time::Instant; @@ -90,7 +90,7 @@ impl ImageProcessor { async fn analyze_image( &self, - base64_image: &str, + base64_image: &str ) -> Result<(String, TokenUsage), ProcessorError> { let start = Instant::now(); @@ -99,65 +99,60 @@ impl ImageProcessor { // Clone the string before moving into spawn_blocking let base64_image_owned = base64_image.to_string(); - let optimized_image = tokio::task::spawn_blocking(move || { - // Use base64_image_owned instead of base64_image - let image_data = base64::engine::general_purpose::STANDARD - .decode(&base64_image_owned) - .map_err(|e| ProcessorError::Base64Error(e.to_string()))?; - - // Load image - let img = image::load_from_memory(&image_data)?; - - // Resize to smaller dimensions while maintaining aspect ratio - // Most vision models don't need images larger than 768px - let max_dim = 768; - let resized = if img.width() > max_dim || img.height() > max_dim { - let ratio = max_dim as f32 / img.width().max(img.height()) as f32; - let new_width = (img.width() as f32 * ratio) as u32; - let new_height = (img.height() as f32 * ratio) as u32; - img.resize(new_width, new_height, FilterType::Lanczos3) - } else { - img - }; - - // Convert to RGB and compress as JPEG with moderate quality - let mut buffer = Vec::new(); - let mut encoder = JpegEncoder::new_with_quality(&mut buffer, 10); - encoder - .encode( - resized.to_rgb8().as_raw(), - resized.width(), - resized.height(), - image::ColorType::Rgb8, - ) - .map_err(ProcessorError::ImageLoadError)?; - - // Convert back to base64 - Ok::<_, ProcessorError>(base64::engine::general_purpose::STANDARD.encode(&buffer)) - }) - .await - .map_err(|e| ProcessorError::ThumbnailError(e.to_string()))??; + let optimized_image = tokio::task + ::spawn_blocking(move || { + // Use base64_image_owned instead of base64_image + let image_data = base64::engine::general_purpose::STANDARD + .decode(&base64_image_owned) + .map_err(|e| ProcessorError::Base64Error(e.to_string()))?; + + // Load image + let img = image::load_from_memory(&image_data)?; + + // Resize to smaller dimensions while maintaining aspect ratio + // Most vision models don't need images larger than 768px + let max_dim = 768; + let resized = if img.width() > max_dim || img.height() > max_dim { + let ratio = (max_dim as f32) / (img.width().max(img.height()) as f32); + let new_width = ((img.width() as f32) * ratio) as u32; + let new_height = ((img.height() as f32) * ratio) as u32; + img.resize(new_width, new_height, FilterType::Lanczos3) + } else { + img + }; + + // Convert to RGB and compress as JPEG with moderate quality + let mut buffer = Vec::new(); + let mut encoder = JpegEncoder::new_with_quality(&mut buffer, 10); + encoder + .encode( + resized.to_rgb8().as_raw(), + resized.width(), + resized.height(), + image::ColorType::Rgb8 + ) + .map_err(ProcessorError::ImageLoadError)?; + + // Convert back to base64 + Ok::<_, ProcessorError>(base64::engine::general_purpose::STANDARD.encode(&buffer)) + }).await + .map_err(|e| ProcessorError::ThumbnailError(e.to_string()))??; tracing::info!( original_size = original_size, optimized_size = optimized_image.len(), - reduction_percent = - ((original_size - optimized_image.len()) as f32 / original_size as f32 * 100.0), + reduction_percent = (((original_size - optimized_image.len()) as f32) / + (original_size as f32)) * + 100.0, "Image optimization completed" ); // Create the prompt before calling the provider let prompt = ImagePrompt::new(self.format.clone()); - let (analysis, token_usage) = self - .provider - .analyze(&optimized_image, &prompt.text) - .await?; + let (analysis, token_usage) = self.provider.analyze(&optimized_image, &prompt.text).await?; - tracing::info!( - total_duration_ms = start.elapsed().as_millis(), - "Total analysis completed" - ); + tracing::info!(total_duration_ms = start.elapsed().as_millis(), "Total analysis completed"); Ok((analysis, token_usage.unwrap_or_default())) } @@ -165,68 +160,68 @@ impl ImageProcessor { async fn create_thumbnail(&self, image: DynamicImage) -> Result, ProcessorError> { let start = Instant::now(); - let result = tokio::task::spawn_blocking(move || { - let resize_start = Instant::now(); - let thumbnail = image.resize(300, 300, FilterType::Triangle); - let rgb_image = thumbnail.to_rgb8(); - tracing::info!( - duration_ms = resize_start.elapsed().as_millis(), - "Image resize completed" - ); - - let enhance_start = Instant::now(); - let width = rgb_image.width() as usize; - let height = rgb_image.height() as usize; - - let enhanced_pixels: Vec<_> = rgb_image - .chunks_exact(width * 3) - .enumerate() - .par_bridge() - .flat_map(|(y, row)| { - (0..width) - .map(move |x| { - let pixel = image::Rgb([ - (row[x * 3] as f32 * 1.1).min(255.0) as u8, - (row[x * 3 + 1] as f32 * 1.1).min(255.0) as u8, - (row[x * 3 + 2] as f32 * 1.1).min(255.0) as u8, - ]); - (x as u32, y as u32, pixel) - }) - .collect::>() - }) - .collect(); - - tracing::info!( - duration_ms = enhance_start.elapsed().as_millis(), - "Parallel enhancement completed" - ); - - let buffer_start = Instant::now(); - let mut enhanced = ImageBuffer::new(width as u32, height as u32); - for (x, y, pixel) in enhanced_pixels { - enhanced.put_pixel(x, y, pixel); - } - - let mut output = Vec::with_capacity(width * height * 3); - let mut encoder = JpegEncoder::new_with_quality(&mut output, 85); - encoder - .encode( - enhanced.as_raw(), - enhanced.width(), - enhanced.height(), - image::ColorType::Rgb8, - ) - .map_err(|e| ProcessorError::ThumbnailError(e.to_string()))?; - - tracing::info!( - duration_ms = buffer_start.elapsed().as_millis(), - "Buffer creation and JPEG encoding completed" - ); - - Ok(output) - }) - .await - .map_err(|e| ProcessorError::ThumbnailError(e.to_string()))?; + let result = tokio::task + ::spawn_blocking(move || { + let resize_start = Instant::now(); + let thumbnail = image.resize(300, 300, FilterType::Triangle); + let rgb_image = thumbnail.to_rgb8(); + tracing::info!( + duration_ms = resize_start.elapsed().as_millis(), + "Image resize completed" + ); + + let enhance_start = Instant::now(); + let width = rgb_image.width() as usize; + let height = rgb_image.height() as usize; + + let enhanced_pixels: Vec<_> = rgb_image + .chunks_exact(width * 3) + .enumerate() + .par_bridge() + .flat_map(|(y, row)| { + (0..width) + .map(move |x| { + let pixel = image::Rgb([ + ((row[x * 3] as f32) * 1.1).min(255.0) as u8, + ((row[x * 3 + 1] as f32) * 1.1).min(255.0) as u8, + ((row[x * 3 + 2] as f32) * 1.1).min(255.0) as u8, + ]); + (x as u32, y as u32, pixel) + }) + .collect::>() + }) + .collect(); + + tracing::info!( + duration_ms = enhance_start.elapsed().as_millis(), + "Parallel enhancement completed" + ); + + let buffer_start = Instant::now(); + let mut enhanced = ImageBuffer::new(width as u32, height as u32); + for (x, y, pixel) in enhanced_pixels { + enhanced.put_pixel(x, y, pixel); + } + + let mut output = Vec::with_capacity(width * height * 3); + let mut encoder = JpegEncoder::new_with_quality(&mut output, 85); + encoder + .encode( + enhanced.as_raw(), + enhanced.width(), + enhanced.height(), + image::ColorType::Rgb8 + ) + .map_err(|e| ProcessorError::ThumbnailError(e.to_string()))?; + + tracing::info!( + duration_ms = buffer_start.elapsed().as_millis(), + "Buffer creation and JPEG encoding completed" + ); + + Ok(output) + }).await + .map_err(|e| ProcessorError::ThumbnailError(e.to_string()))?; tracing::info!( total_duration_ms = start.elapsed().as_millis(), diff --git a/src/providers/openai.rs b/src/providers/openai.rs index be974f1..1350c7d 100644 --- a/src/providers/openai.rs +++ b/src/providers/openai.rs @@ -1,4 +1,4 @@ -use super::{Provider, TokenUsage}; +use super::{ Provider, TokenUsage }; use crate::errors::ProcessorError; use async_trait::async_trait; use reqwest::Client; @@ -39,7 +39,7 @@ impl OpenAIProvider { Self { client: Client::new(), model: model.unwrap_or_else(|| "gpt-4-vision-preview".to_string()), - temperature: 0.8, + temperature: 0.0, } } } @@ -49,11 +49,12 @@ impl Provider for OpenAIProvider { async fn analyze( &self, base64_image: &str, - prompt: &str, + prompt: &str ) -> Result<(String, Option), ProcessorError> { let api_key = std::env::var("OPENAI_API_KEY").map_err(ProcessorError::EnvError)?; - let request_body = json!({ + let request_body = + json!({ "model": self.model, "temperature": self.temperature, "messages": [{ @@ -71,46 +72,46 @@ impl Provider for OpenAIProvider { } ] }], - "max_tokens": 10000 + "max_completion_tokens": 10000 }); - let response = self - .client + let response = self.client .post("https://api.openai.com/v1/chat/completions") .header("Authorization", format!("Bearer {}", api_key)) .json(&request_body) - .send() - .await?; + .send().await?; let status = response.status(); if !status.is_success() { let error_text = response - .text() - .await + .text().await .unwrap_or_else(|_| "Failed to get error message".to_string()); - return Err(ProcessorError::AIProviderError(format!( - "OpenAI API request failed with status {}: {}", - status, error_text - ))); + return Err( + ProcessorError::AIProviderError( + format!("OpenAI API request failed with status {}: {}", status, error_text) + ) + ); } let response_text = response.text().await?; - let response: OpenAIResponse = serde_json::from_str(&response_text).map_err(|e| { - ProcessorError::ResponseParseError(format!( - "Failed to parse OpenAI response: {}. Response text: {}", - e, response_text - )) - })?; + let response: OpenAIResponse = serde_json + ::from_str(&response_text) + .map_err(|e| { + ProcessorError::ResponseParseError( + format!( + "Failed to parse OpenAI response: {}. Response text: {}", + e, + response_text + ) + ) + })?; - let analysis = response - .choices + let analysis = response.choices .first() .ok_or_else(|| { ProcessorError::ResponseParseError("No choices in response".to_string()) })? - .message - .content - .clone(); + .message.content.clone(); let token_usage = response.usage.map(|usage| TokenUsage { prompt_tokens: usage.prompt_tokens,