diff --git a/golem-cli/src/oss/clients/worker.rs b/golem-cli/src/oss/clients/worker.rs index 1d4de4f2a1..1322de0ab0 100644 --- a/golem-cli/src/oss/clients/worker.rs +++ b/golem-cli/src/oss/clients/worker.rs @@ -15,13 +15,18 @@ use std::time::Duration; use crate::clients::worker::WorkerClient; +use crate::model::{ + GolemError, IdempotencyKey, WorkerMetadata, WorkerName, WorkerUpdateMode, + WorkersMetadataResponse, +}; use async_trait::async_trait; use futures_util::{future, pin_mut, SinkExt, StreamExt}; +use golem_client::api::WorkerError; use golem_client::model::{ InvokeParameters, InvokeResult, ScanCursor, UpdateWorkerRequest, WorkerCreationRequest, WorkerFilter, WorkerId, WorkersMetadataRequest, }; -use golem_client::Context; +use golem_client::{Context, Error}; use golem_common::uri::oss::urn::{ComponentUrn, WorkerUrn}; use native_tls::TlsConnector; use serde::Deserialize; @@ -31,11 +36,6 @@ use tokio_tungstenite::tungstenite::protocol::Message; use tokio_tungstenite::{connect_async_tls_with_config, Connector}; use tracing::{debug, info}; -use crate::model::{ - GolemError, IdempotencyKey, WorkerMetadata, WorkerName, WorkerUpdateMode, - WorkersMetadataResponse, -}; - #[derive(Clone)] pub struct WorkerClientLive { pub client: C, @@ -265,16 +265,11 @@ impl WorkerClient for WorkerCl .await .map_err(|e| match e { tungstenite::error::Error::Http(http_error_response) => { + let status = http_error_response.status().as_u16(); + match http_error_response.body().clone() { - Some(body) => GolemError(format!( - "Failed Websocket. Http error: {}, {}", - http_error_response.status(), - String::from_utf8_lossy(&body) - )), - None => GolemError(format!( - "Failed Websocket. Http error: {}", - http_error_response.status() - )), + Some(body) => get_worker_golem_error(status, body), + None => GolemError(format!("Failed Websocket. Http error: {}", status)), } } _ => GolemError(format!("Failed Websocket. Error: {}", e)), @@ -429,3 +424,16 @@ struct Log { pub context: String, pub message: String, } + +fn get_worker_golem_error(status: u16, body: Vec) -> GolemError { + let error: Result, serde_json::Error> = match status { + 400 => serde_json::from_slice(&body).map(|body| Error::Item(WorkerError::Error400(body))), + 401 => serde_json::from_slice(&body).map(|body| Error::Item(WorkerError::Error401(body))), + 403 => serde_json::from_slice(&body).map(|body| Error::Item(WorkerError::Error403(body))), + 404 => serde_json::from_slice(&body).map(|body| Error::Item(WorkerError::Error404(body))), + 409 => serde_json::from_slice(&body).map(|body| Error::Item(WorkerError::Error409(body))), + 500 => serde_json::from_slice(&body).map(|body| Error::Item(WorkerError::Error500(body))), + _ => Ok(Error::unexpected(status, body.into())), + }; + error.unwrap_or_else(Error::from).into() +} diff --git a/golem-service-base/src/model.rs b/golem-service-base/src/model.rs index 79cd39520b..b68cac7f19 100644 --- a/golem-service-base/src/model.rs +++ b/golem-service-base/src/model.rs @@ -1436,117 +1436,81 @@ impl TryFrom for } impl From for golem_api_grpc::proto::golem::worker::v1::WorkerExecutionError { + fn from(error: GolemError) -> Self { + golem_api_grpc::proto::golem::worker::v1::WorkerExecutionError { + error: Some(error.into()), + } + } +} + +impl From for golem_api_grpc::proto::golem::worker::v1::worker_execution_error::Error { fn from(error: GolemError) -> Self { match error { GolemError::InvalidRequest(err) => { - golem_api_grpc::proto::golem::worker::v1::WorkerExecutionError { - error: Some(golem_api_grpc::proto::golem::worker::v1::worker_execution_error::Error::InvalidRequest(err.into())), - } + golem_api_grpc::proto::golem::worker::v1::worker_execution_error::Error::InvalidRequest(err.into()) } GolemError::WorkerAlreadyExists(err) => { - golem_api_grpc::proto::golem::worker::v1::WorkerExecutionError { - error: Some(golem_api_grpc::proto::golem::worker::v1::worker_execution_error::Error::WorkerAlreadyExists(err.into())), - } + golem_api_grpc::proto::golem::worker::v1::worker_execution_error::Error::WorkerAlreadyExists(err.into()) } GolemError::WorkerNotFound(err) => { - golem_api_grpc::proto::golem::worker::v1::WorkerExecutionError { - error: Some(golem_api_grpc::proto::golem::worker::v1::worker_execution_error::Error::WorkerNotFound(err.into())), - } + golem_api_grpc::proto::golem::worker::v1::worker_execution_error::Error::WorkerNotFound(err.into()) } GolemError::WorkerCreationFailed(err) => { - golem_api_grpc::proto::golem::worker::v1::WorkerExecutionError { - error: Some(golem_api_grpc::proto::golem::worker::v1::worker_execution_error::Error::WorkerCreationFailed(err.into())), - } + golem_api_grpc::proto::golem::worker::v1::worker_execution_error::Error::WorkerCreationFailed(err.into()) } GolemError::FailedToResumeWorker(err) => { - golem_api_grpc::proto::golem::worker::v1::WorkerExecutionError { - error: Some(golem_api_grpc::proto::golem::worker::v1::worker_execution_error::Error::FailedToResumeWorker(Box::new(err.into()))), - } + golem_api_grpc::proto::golem::worker::v1::worker_execution_error::Error::FailedToResumeWorker(Box::new(err.into())) } GolemError::ComponentDownloadFailed(err) => { - golem_api_grpc::proto::golem::worker::v1::WorkerExecutionError { - error: Some(golem_api_grpc::proto::golem::worker::v1::worker_execution_error::Error::ComponentDownloadFailed(err.into())), - } + golem_api_grpc::proto::golem::worker::v1::worker_execution_error::Error::ComponentDownloadFailed(err.into()) } GolemError::ComponentParseFailed(err) => { - golem_api_grpc::proto::golem::worker::v1::WorkerExecutionError { - error: Some(golem_api_grpc::proto::golem::worker::v1::worker_execution_error::Error::ComponentParseFailed(err.into())), - } + golem_api_grpc::proto::golem::worker::v1::worker_execution_error::Error::ComponentParseFailed(err.into()) } GolemError::GetLatestVersionOfComponentFailed(err) => { - golem_api_grpc::proto::golem::worker::v1::WorkerExecutionError { - error: Some(golem_api_grpc::proto::golem::worker::v1::worker_execution_error::Error::GetLatestVersionOfComponentFailed(err.into())), - } + golem_api_grpc::proto::golem::worker::v1::worker_execution_error::Error::GetLatestVersionOfComponentFailed(err.into()) } GolemError::PromiseNotFound(err) => { - golem_api_grpc::proto::golem::worker::v1::WorkerExecutionError { - error: Some(golem_api_grpc::proto::golem::worker::v1::worker_execution_error::Error::PromiseNotFound(err.into())), - } + golem_api_grpc::proto::golem::worker::v1::worker_execution_error::Error::PromiseNotFound(err.into()) } GolemError::PromiseDropped(err) => { - golem_api_grpc::proto::golem::worker::v1::WorkerExecutionError { - error: Some(golem_api_grpc::proto::golem::worker::v1::worker_execution_error::Error::PromiseDropped(err.into())), - } + golem_api_grpc::proto::golem::worker::v1::worker_execution_error::Error::PromiseDropped(err.into()) } GolemError::PromiseAlreadyCompleted(err) => { - golem_api_grpc::proto::golem::worker::v1::WorkerExecutionError { - error: Some(golem_api_grpc::proto::golem::worker::v1::worker_execution_error::Error::PromiseAlreadyCompleted(err.into())), - } + golem_api_grpc::proto::golem::worker::v1::worker_execution_error::Error::PromiseAlreadyCompleted(err.into()) } GolemError::Interrupted(err) => { - golem_api_grpc::proto::golem::worker::v1::WorkerExecutionError { - error: Some(golem_api_grpc::proto::golem::worker::v1::worker_execution_error::Error::Interrupted(err.into())), - } + golem_api_grpc::proto::golem::worker::v1::worker_execution_error::Error::Interrupted(err.into()) } GolemError::ParamTypeMismatch(err) => { - golem_api_grpc::proto::golem::worker::v1::WorkerExecutionError { - error: Some(golem_api_grpc::proto::golem::worker::v1::worker_execution_error::Error::ParamTypeMismatch(err.into())), - } + golem_api_grpc::proto::golem::worker::v1::worker_execution_error::Error::ParamTypeMismatch(err.into()) } GolemError::NoValueInMessage(err) => { - golem_api_grpc::proto::golem::worker::v1::WorkerExecutionError { - error: Some(golem_api_grpc::proto::golem::worker::v1::worker_execution_error::Error::NoValueInMessage(err.into())), - } + golem_api_grpc::proto::golem::worker::v1::worker_execution_error::Error::NoValueInMessage(err.into()) } GolemError::ValueMismatch(err) => { - golem_api_grpc::proto::golem::worker::v1::WorkerExecutionError { - error: Some(golem_api_grpc::proto::golem::worker::v1::worker_execution_error::Error::ValueMismatch(err.into())), - } + golem_api_grpc::proto::golem::worker::v1::worker_execution_error::Error::ValueMismatch(err.into()) } GolemError::UnexpectedOplogEntry(err) => { - golem_api_grpc::proto::golem::worker::v1::WorkerExecutionError { - error: Some(golem_api_grpc::proto::golem::worker::v1::worker_execution_error::Error::UnexpectedOplogEntry(err.into())), - } + golem_api_grpc::proto::golem::worker::v1::worker_execution_error::Error::UnexpectedOplogEntry(err.into()) } GolemError::RuntimeError(err) => { - golem_api_grpc::proto::golem::worker::v1::WorkerExecutionError { - error: Some(golem_api_grpc::proto::golem::worker::v1::worker_execution_error::Error::RuntimeError(err.into())), - } + golem_api_grpc::proto::golem::worker::v1::worker_execution_error::Error::RuntimeError(err.into()) } GolemError::InvalidShardId(err) => { - golem_api_grpc::proto::golem::worker::v1::WorkerExecutionError { - error: Some(golem_api_grpc::proto::golem::worker::v1::worker_execution_error::Error::InvalidShardId(err.into())), - } + golem_api_grpc::proto::golem::worker::v1::worker_execution_error::Error::InvalidShardId(err.into()) } GolemError::PreviousInvocationFailed(err) => { - golem_api_grpc::proto::golem::worker::v1::WorkerExecutionError { - error: Some(golem_api_grpc::proto::golem::worker::v1::worker_execution_error::Error::PreviousInvocationFailed(err.into())), - } + golem_api_grpc::proto::golem::worker::v1::worker_execution_error::Error::PreviousInvocationFailed(err.into()) } GolemError::PreviousInvocationExited(err) => { - golem_api_grpc::proto::golem::worker::v1::WorkerExecutionError { - error: Some(golem_api_grpc::proto::golem::worker::v1::worker_execution_error::Error::PreviousInvocationExited(err.into())), - } + golem_api_grpc::proto::golem::worker::v1::worker_execution_error::Error::PreviousInvocationExited(err.into()) } GolemError::Unknown(err) => { - golem_api_grpc::proto::golem::worker::v1::WorkerExecutionError { - error: Some(golem_api_grpc::proto::golem::worker::v1::worker_execution_error::Error::Unknown(err.into())), - } + golem_api_grpc::proto::golem::worker::v1::worker_execution_error::Error::Unknown(err.into()) } GolemError::InvalidAccount(err) => { - golem_api_grpc::proto::golem::worker::v1::WorkerExecutionError { - error: Some(golem_api_grpc::proto::golem::worker::v1::worker_execution_error::Error::InvalidAccount(err.into())), - } + golem_api_grpc::proto::golem::worker::v1::worker_execution_error::Error::InvalidAccount(err.into()) } } } diff --git a/golem-worker-service-base/src/service/worker/default.rs b/golem-worker-service-base/src/service/worker/default.rs index 97694a5d9b..0580f23112 100644 --- a/golem-worker-service-base/src/service/worker/default.rs +++ b/golem-worker-service-base/src/service/worker/default.rs @@ -20,6 +20,7 @@ use golem_wasm_rpc::protobuf::type_annotated_value::TypeAnnotatedValue; use golem_wasm_rpc::protobuf::{TypedTuple, Val as ProtoVal}; use poem_openapi::types::ToJSON; use tonic::transport::Channel; +use tonic::Code; use tracing::{error, info}; use golem_api_grpc::proto::golem::worker::UpdateMode; @@ -50,8 +51,8 @@ use golem_service_base::{ use crate::service::component::ComponentService; use super::{ - AllExecutors, ConnectWorkerStream, HasWorkerExecutorClients, RandomExecutor, ResponseMapResult, - RoutingLogic, WorkerServiceError, + AllExecutors, CallWorkerExecutorError, ConnectWorkerStream, HasWorkerExecutorClients, + RandomExecutor, ResponseMapResult, RoutingLogic, WorkerServiceError, }; pub type WorkerResult = Result; @@ -277,6 +278,7 @@ where } => Err(err.into()), workerexecutor::v1::CreateWorkerResponse { .. } => Err("Empty response".into()), }, + WorkerServiceError::internal, ) .await?; @@ -290,6 +292,7 @@ where _auth_ctx: &AuthCtx, ) -> WorkerResult { let worker_id = worker_id.clone(); + let worker_id_err: golem_common::model::WorkerId = worker_id.clone().into(); let stream = self .call_worker_executor( worker_id.clone(), @@ -298,10 +301,19 @@ where Box::pin(worker_executor_client.connect_worker(ConnectWorkerRequest { worker_id: Some(worker_id.clone().into()), account_id: metadata.account_id.clone().map(|id| id.into()), + account_limits: metadata.limits.clone().map(|id| id.into()), })) }, |response| Ok(ConnectWorkerStream::new(response.into_inner())), + |error| match error { + CallWorkerExecutorError::FailedToConnectToPod(status) + if status.code() == Code::NotFound => + { + WorkerServiceError::WorkerNotFound(worker_id_err.clone()) + } + _ => WorkerServiceError::internal(error), + }, ) .await?; @@ -338,6 +350,7 @@ where } => Err(err.into()), workerexecutor::v1::DeleteWorkerResponse { .. } => Err("Empty response".into()), }, + WorkerServiceError::internal, ) .await?; @@ -407,6 +420,7 @@ where } } }, + WorkerServiceError::internal ).await?; Ok(invoke_response) @@ -466,6 +480,7 @@ where } } }, + WorkerServiceError::internal ).await?; Ok(invoke_response) @@ -513,6 +528,7 @@ where } => Err(err.into()), workerexecutor::v1::InvokeWorkerResponse { .. } => Err("Empty response".into()), }, + WorkerServiceError::internal, ) .await?; Ok(()) @@ -557,6 +573,7 @@ where } workerexecutor::v1::InvokeWorkerResponse { .. } => Err("Empty response".into()), }, + WorkerServiceError::internal, ) .await?; Ok(()) @@ -610,6 +627,7 @@ where } } }, + WorkerServiceError::internal ) .await?; Ok(result) @@ -646,6 +664,7 @@ where } => Err(err.into()), workerexecutor::v1::InterruptWorkerResponse { .. } => Err("Empty response".into()), }, + WorkerServiceError::internal, ) .await?; @@ -691,6 +710,7 @@ where } } }, + WorkerServiceError::internal ).await?; Ok(metadata) @@ -752,6 +772,7 @@ where } => Err(err.into()), workerexecutor::v1::ResumeWorkerResponse { .. } => Err("Empty response".into()), }, + WorkerServiceError::internal, ) .await?; Ok(()) @@ -787,6 +808,7 @@ where } => Err(err.into()), workerexecutor::v1::UpdateWorkerResponse { .. } => Err("Empty response".into()), }, + WorkerServiceError::internal, ) .await?; Ok(()) @@ -884,6 +906,7 @@ where } }).collect::, ResponseMapResult>>() }, + WorkerServiceError::internal ).await?; Ok(result.into_iter().flatten().collect()) @@ -908,25 +931,25 @@ where component_id.clone().into(); let account_id = metadata.account_id.clone().map(|id| id.into()); Box::pin(worker_executor_client.get_workers_metadata( - golem_api_grpc::proto::golem::workerexecutor::v1::GetWorkersMetadataRequest { - component_id: Some(component_id), - filter: filter.clone().map(|f| f.into()), - cursor: Some(cursor.clone().into()), - count, - precise, - account_id, - } - )) + golem_api_grpc::proto::golem::workerexecutor::v1::GetWorkersMetadataRequest { + component_id: Some(component_id), + filter: filter.clone().map(|f| f.into()), + cursor: Some(cursor.clone().into()), + count, + precise, + account_id, + } + )) }, |response| match response.into_inner() { workerexecutor::v1::GetWorkersMetadataResponse { result: - Some(workerexecutor::v1::get_workers_metadata_response::Result::Success( - workerexecutor::v1::GetWorkersMetadataSuccessResponse { - workers, - cursor, - }, - )), + Some(workerexecutor::v1::get_workers_metadata_response::Result::Success( + workerexecutor::v1::GetWorkersMetadataSuccessResponse { + workers, + cursor, + }, + )), } => { let workers = workers .into_iter() @@ -941,14 +964,15 @@ where } workerexecutor::v1::GetWorkersMetadataResponse { result: - Some(workerexecutor::v1::get_workers_metadata_response::Result::Failure( - err, - )), + Some(workerexecutor::v1::get_workers_metadata_response::Result::Failure( + err, + )), } => Err(err.into()), workerexecutor::v1::GetWorkersMetadataResponse { .. } => { Err("Empty response".into()) } }, + WorkerServiceError::internal, ) .await?; diff --git a/golem-worker-service-base/src/service/worker/routing_logic.rs b/golem-worker-service-base/src/service/worker/routing_logic.rs index 4e5cc84aef..39178f2b99 100644 --- a/golem-worker-service-base/src/service/worker/routing_logic.rs +++ b/golem-worker-service-base/src/service/worker/routing_logic.rs @@ -41,11 +41,12 @@ use crate::service::worker::WorkerServiceError; #[async_trait] pub trait RoutingLogic { - async fn call_worker_executor( + async fn call_worker_executor( &self, target: Target, remote_call: F, response_map: G, + error_map: H, ) -> Result where Out: Send + 'static, @@ -59,7 +60,8 @@ pub trait RoutingLogic { + Sync + Clone + 'static, - G: Fn(Target::ResultOut) -> Result + Send + Sync; + G: Fn(Target::ResultOut) -> Result + Send + Sync, + H: Fn(CallWorkerExecutorError) -> WorkerServiceError + Send + Sync; } #[async_trait] @@ -338,11 +340,12 @@ impl From for ResponseMapResult { #[async_trait] impl RoutingLogic for T { - async fn call_worker_executor( + async fn call_worker_executor( &self, target: Target, remote_call: F, response_map: G, + error_map: H, ) -> Result where Out: Send + 'static, @@ -357,6 +360,7 @@ impl Routing + Clone + 'static, G: Fn(Target::ResultOut) -> Result + Send + Sync, + H: Fn(CallWorkerExecutorError) -> WorkerServiceError + Send + Sync, { let mut retry = RetryState::new(self.worker_executor_retry_config()); loop { @@ -387,7 +391,7 @@ impl Routing if error.is_retriable() { retry.retry(self, &error, &pod).await } else { - retry.non_retryable_error(WorkerServiceError::internal(error), &pod) + retry.non_retryable_error(error_map(error), &pod) } } } @@ -413,7 +417,7 @@ pub enum CallWorkerExecutorError { // TODO: Change to display #[error("Failed to get routing table: {0:?}")] FailedToGetRoutingTable(RoutingTableError), - #[error("Failed to connect to pod: {0}")] + #[error("Failed to connect to pod: {} {}", .0.code(), .0.message())] FailedToConnectToPod(Status), } diff --git a/golem-worker-service/src/api/worker_connect.rs b/golem-worker-service/src/api/worker_connect.rs index b388a4d2bc..44f4a1c61b 100644 --- a/golem-worker-service/src/api/worker_connect.rs +++ b/golem-worker-service/src/api/worker_connect.rs @@ -20,11 +20,13 @@ use futures::StreamExt; use golem_common::model::ComponentId; use golem_common::recorded_http_api_request; use golem_service_base::auth::EmptyAuthCtx; -use golem_service_base::model::WorkerId; +use golem_service_base::model::{ErrorsBody, WorkerId}; +use golem_worker_service_base::api::WorkerApiBaseError; use golem_worker_service_base::service::worker::{proxy_worker_connection, ConnectWorkerStream}; use poem::web::websocket::WebSocket; -use poem::web::Data; +use poem::web::{Data, Path}; use poem::*; +use poem_openapi::payload::Json; use tracing::Instrument; #[derive(Clone)] @@ -40,13 +42,11 @@ impl ConnectService { #[handler] pub async fn ws( - req: &Request, + Path((component_id, worker_name)): Path<(ComponentId, String)>, websocket: WebSocket, Data(service): Data<&ConnectService>, ) -> Response { - tracing::info!("Connect request: {:?} {:?}", req.uri(), req); - - get_worker_stream(service, req) + get_worker_stream(service, component_id, worker_name) .await .map(|(worker_id, worker_stream)| { websocket @@ -74,12 +74,15 @@ const PING_TIMEOUT: Duration = Duration::from_secs(15); async fn get_worker_stream( service: &ConnectService, - req: &Request, + component_id: ComponentId, + worker_name: String, ) -> Result<(WorkerId, ConnectWorkerStream), Response> { - let worker_id = match get_worker_id(req) { - Ok(worker_id) => worker_id, - Err(err) => return Err((http::StatusCode::BAD_REQUEST, err).into_response()), - }; + let worker_id = WorkerId::new(component_id, worker_name).map_err(|e| { + let error = WorkerApiBaseError::BadRequest(Json(ErrorsBody { + errors: vec![format!("Invalid worker id: {e}")], + })); + error.into_response() + })?; let record = recorded_http_api_request!("connect_worker", worker_id = worker_id.to_string()); @@ -96,23 +99,10 @@ async fn get_worker_stream( match result { Ok(worker_stream) => record.succeed(Ok((worker_id, worker_stream))), Err(error) => { - let error = error.to_string(); - record.fail(error.clone(), &("InternalError")); - Err((http::StatusCode::INTERNAL_SERVER_ERROR, error).into_response()) + tracing::error!("Error connecting to worker: {error}"); + let error = WorkerApiBaseError::from(error); + let error = record.fail(error.clone(), &error); + Err(error.into_response()) } } } - -fn get_worker_id(req: &Request) -> Result { - let (component_id, worker_name) = req.path_params::<(String, String)>().map_err(|_| { - "Valid path parameters (component_id and worker_name) are required ".to_string() - })?; - - let component_id = ComponentId::try_from(component_id.as_str()) - .map_err(|error| format!("Invalid component id: {error}"))?; - - let worker_id = WorkerId::new(component_id, worker_name) - .map_err(|error| format!("Invalid worker name: {error}"))?; - - Ok(worker_id) -}