diff --git a/libs/app-error/src/lib.rs b/libs/app-error/src/lib.rs index 4df946f1d..0ba39eefc 100644 --- a/libs/app-error/src/lib.rs +++ b/libs/app-error/src/lib.rs @@ -3,6 +3,7 @@ pub mod gotrue; #[cfg(feature = "gotrue_error")] use crate::gotrue::GoTrueError; +use std::error::Error; use std::string::FromUtf8Error; #[cfg(feature = "appflowy_ai_error")] @@ -277,13 +278,32 @@ impl From for AppError { return AppError::RequestTimeout(error.to_string()); } - if error.is_request() { - return if error.status() == Some(StatusCode::PAYLOAD_TOO_LARGE) { - AppError::PayloadTooLarge(error.to_string()) - } else { - AppError::InvalidRequest(error.to_string()) - }; + if let Some(cause) = error.source() { + if cause + .to_string() + .contains("connection closed before message completed") + { + return AppError::ServiceTemporaryUnavailable(error.to_string()); + } } + + // Handle request-related errors + if let Some(status_code) = error.status() { + if error.is_request() { + match status_code { + StatusCode::PAYLOAD_TOO_LARGE => { + return AppError::PayloadTooLarge(error.to_string()); + }, + status_code if status_code.is_server_error() => { + return AppError::ServiceTemporaryUnavailable(error.to_string()); + }, + _ => { + return AppError::InvalidRequest(error.to_string()); + }, + } + } + } + AppError::Unhandled(error.to_string()) } } @@ -447,6 +467,7 @@ impl From for AppError { AIError::PayloadTooLarge(err) => AppError::PayloadTooLarge(err), AIError::InvalidRequest(err) => AppError::InvalidRequest(err), AIError::SerdeError(err) => AppError::SerdeError(err), + AIError::ServiceUnavailable(err) => AppError::AIServiceUnavailable(err), } } } diff --git a/libs/appflowy-ai-client/src/client.rs b/libs/appflowy-ai-client/src/client.rs index 49457173e..8b9af236d 100644 --- a/libs/appflowy-ai-client/src/client.rs +++ b/libs/appflowy-ai-client/src/client.rs @@ -373,6 +373,11 @@ where resp: reqwest::Response, ) -> Result>, AIError> { let status_code = resp.status(); + if status_code.is_server_error() { + let body = resp.text().await?; + return Err(AIError::ServiceUnavailable(body)); + } + if !status_code.is_success() { let body = resp.text().await?; return Err(AIError::InvalidRequest(body)); @@ -385,16 +390,29 @@ where } impl From for AIError { fn from(error: reqwest::Error) -> Self { + if error.is_connect() { + return AIError::ServiceUnavailable(error.to_string()); + } + if error.is_timeout() { return AIError::RequestTimeout(error.to_string()); } - if error.is_request() { - return if error.status() == Some(StatusCode::PAYLOAD_TOO_LARGE) { - AIError::PayloadTooLarge(error.to_string()) - } else { - AIError::InvalidRequest(format!("{:?}", error)) - }; + // Handle request-related errors + if let Some(status_code) = error.status() { + if error.is_request() { + match status_code { + StatusCode::PAYLOAD_TOO_LARGE => { + return AIError::PayloadTooLarge(error.to_string()); + }, + status_code if status_code.is_server_error() => { + return AIError::ServiceUnavailable(error.to_string()); + }, + _ => { + return AIError::InvalidRequest(format!("{:?}", error)); + }, + } + } } AIError::Internal(error.into()) } diff --git a/libs/appflowy-ai-client/src/error.rs b/libs/appflowy-ai-client/src/error.rs index d82520c62..c2f80a856 100644 --- a/libs/appflowy-ai-client/src/error.rs +++ b/libs/appflowy-ai-client/src/error.rs @@ -14,4 +14,7 @@ pub enum AIError { #[error(transparent)] SerdeError(#[from] serde_json::Error), + + #[error("Service unavailable:{0}")] + ServiceUnavailable(String), } diff --git a/libs/client-api/src/http_chat.rs b/libs/client-api/src/http_chat.rs index dc5401217..9c09aa2c4 100644 --- a/libs/client-api/src/http_chat.rs +++ b/libs/client-api/src/http_chat.rs @@ -1,6 +1,7 @@ use crate::http::log_request_id; use crate::Client; +use app_error::AppError; use client_api_entity::chat_dto::{ ChatMessage, CreateAnswerMessageParams, CreateChatMessageParams, CreateChatParams, MessageCursor, RepeatedChatMessage, UpdateChatMessageContentParams, @@ -154,7 +155,17 @@ impl Client { .await? .timeout(Duration::from_secs(30)) .send() - .await?; + .await + .map_err(|err| { + let app_err = AppError::from(err); + if matches!(app_err, AppError::ServiceTemporaryUnavailable(_)) { + AppError::AIServiceUnavailable( + "AI service temporarily unavailable, please try again later".to_string(), + ) + } else { + app_err + } + })?; log_request_id(&resp); let stream = AppResponse::::json_response_stream(resp).await?; Ok(QuestionStream::new(stream)) diff --git a/libs/shared-entity/src/response_stream.rs b/libs/shared-entity/src/response_stream.rs index 034c4160b..aea8f6fa4 100644 --- a/libs/shared-entity/src/response_stream.rs +++ b/libs/shared-entity/src/response_stream.rs @@ -1,5 +1,5 @@ use crate::response::{AppResponse, AppResponseError}; -use app_error::ErrorCode; +use app_error::{AppError, ErrorCode}; use bytes::{Buf, Bytes, BytesMut}; use futures::{ready, Stream, TryStreamExt}; @@ -22,6 +22,11 @@ where resp: reqwest::Response, ) -> Result>, AppResponseError> { let status_code = resp.status(); + if status_code.is_server_error() { + let body = resp.text().await?; + return Err(AppError::AIServiceUnavailable(body).into()); + } + if !status_code.is_success() { let body = resp.text().await?; return Err(AppResponseError::new(ErrorCode::Internal, body)); diff --git a/src/api/chat.rs b/src/api/chat.rs index 67dc1fabc..62e465dba 100644 --- a/src/api/chat.rs +++ b/src/api/chat.rs @@ -316,7 +316,7 @@ async fn answer_stream_v2_handler( ) }, Err(err) => Ok( - HttpResponse::Ok() + HttpResponse::ServiceUnavailable() .content_type("text/event-stream") .streaming(stream::once(async move { Err(AppError::AIServiceUnavailable(err.to_string())) diff --git a/src/biz/chat/ops.rs b/src/biz/chat/ops.rs index ff6e2541a..d542e42f2 100644 --- a/src/biz/chat/ops.rs +++ b/src/biz/chat/ops.rs @@ -97,7 +97,8 @@ pub async fn generate_chat_message_answer( &ai_model, Some(metadata), ) - .await?; + .await + .map_err(|err| AppError::AIServiceUnavailable(err.to_string()))?; info!("new_answer: {:?}", new_answer); // Save the answer to the database