diff --git a/golem-worker-service-base/src/api/common.rs b/golem-worker-service-base/src/api/common.rs index 6fea624f4..7b52730b7 100644 --- a/golem-worker-service-base/src/api/common.rs +++ b/golem-worker-service-base/src/api/common.rs @@ -134,6 +134,12 @@ mod conversion { ApiDeploymentError::DeploymentConflict(conflict) => { ApiEndpointError::already_exists(format!("Deployment conflict: {}", conflict)) } + ApiDeploymentError::ConflictingDefinitions(conflicts) => { + ApiEndpointError::already_exists(format!( + "Conflicting API definitions during deployment: {}", + conflicts.join(", ") + )) + } } } } diff --git a/golem-worker-service-base/src/api/custom_http_request_api.rs b/golem-worker-service-base/src/api/custom_http_request_api.rs index bf05d2afc..959ffa025 100644 --- a/golem-worker-service-base/src/api/custom_http_request_api.rs +++ b/golem-worker-service-base/src/api/custom_http_request_api.rs @@ -9,7 +9,7 @@ use poem::{Body, Endpoint, Request, Response}; use tracing::{error, info}; use crate::http::{ApiInputPath, InputHttpRequest}; -use crate::service::api_definition_lookup::ApiDefinitionLookup; +use crate::service::api_definition_lookup::ApiDefinitionsLookup; use crate::worker_binding::WorkerBindingResolver; use crate::worker_bridge_execution::WorkerRequestExecutor; @@ -21,7 +21,7 @@ pub struct CustomHttpRequestApi { pub evaluator: Arc, pub worker_metadata_fetcher: Arc, pub api_definition_lookup_service: - Arc + Sync + Send>, + Arc + Sync + Send>, } impl CustomHttpRequestApi { @@ -29,7 +29,7 @@ impl CustomHttpRequestApi { worker_request_executor_service: Arc, worker_metadata_fetcher: Arc, api_definition_lookup_service: Arc< - dyn ApiDefinitionLookup + Sync + Send, + dyn ApiDefinitionsLookup + Sync + Send, >, ) -> Self { let evaluator = Arc::new(DefaultEvaluator::from_worker_request_executor( @@ -83,7 +83,7 @@ impl CustomHttpRequestApi { req_body: json_request_body, }; - let api_definition = match self + let possible_api_definitions = match self .api_definition_lookup_service .get(api_request.clone()) .await @@ -97,7 +97,7 @@ impl CustomHttpRequestApi { } }; - match api_request.resolve(&api_definition).await { + match api_request.resolve(&possible_api_definitions).await { Ok(resolved_worker_request) => { resolved_worker_request .execute_with::(&self.evaluator, &self.worker_metadata_fetcher) @@ -105,10 +105,7 @@ impl CustomHttpRequestApi { } Err(msg) => { - error!( - "API request id: {} - request error: {}", - &api_definition.id, msg - ); + error!("Failed to resolve the API definition; error: {}", msg); Response::builder() .status(StatusCode::METHOD_NOT_ALLOWED) diff --git a/golem-worker-service-base/src/http/http_request.rs b/golem-worker-service-base/src/http/http_request.rs index a4f646d66..6d69e5186 100644 --- a/golem-worker-service-base/src/http/http_request.rs +++ b/golem-worker-service-base/src/http/http_request.rs @@ -313,7 +313,10 @@ mod tests { let evaluator = get_test_evaluator(); let worker_metadata_fetcher = get_test_metadata_fetcher("golem:it/api/get-cart-contents"); - let resolved_route = api_request.resolve(api_specification).await.unwrap(); + let resolved_route = api_request + .resolve(&vec![api_specification.clone()]) + .await + .unwrap(); resolved_route .execute_with(&evaluator, &worker_metadata_fetcher) @@ -951,7 +954,7 @@ mod tests { function_params, ); - let resolved_route = api_request.resolve(&api_specification).await; + let resolved_route = api_request.resolve(&vec![api_specification]).await; let result = resolved_route.map(|x| x.worker_detail); @@ -985,7 +988,7 @@ mod tests { expression, ); - let resolved_route = api_request.resolve(&api_specification).await.unwrap(); + let resolved_route = api_request.resolve(&vec![api_specification]).await.unwrap(); assert_eq!( resolved_route.worker_detail.idempotency_key, diff --git a/golem-worker-service-base/src/repo/api_deployment_repo.rs b/golem-worker-service-base/src/repo/api_deployment_repo.rs index 25bcd7b7a..227f1687a 100644 --- a/golem-worker-service-base/src/repo/api_deployment_repo.rs +++ b/golem-worker-service-base/src/repo/api_deployment_repo.rs @@ -1,4 +1,5 @@ use std::collections::HashMap; +use std::fmt::Display; use std::sync::Mutex; use crate::api_definition::{ApiDefinitionId, ApiDeployment, ApiSite, ApiSiteString}; @@ -40,6 +41,15 @@ pub trait ApiDeploymentRepo { pub enum ApiDeploymentRepoError { Internal(anyhow::Error), } + +impl Display for ApiDeploymentRepoError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + ApiDeploymentRepoError::Internal(e) => write!(f, "Internal error: {}", e), + } + } +} + impl From for ApiDeploymentRepoError { fn from(err: RedisError) -> Self { ApiDeploymentRepoError::Internal(anyhow::Error::new(err)) diff --git a/golem-worker-service-base/src/service/api_definition_lookup.rs b/golem-worker-service-base/src/service/api_definition_lookup.rs index f97d7c9f8..5b0995968 100644 --- a/golem-worker-service-base/src/service/api_definition_lookup.rs +++ b/golem-worker-service-base/src/service/api_definition_lookup.rs @@ -2,12 +2,14 @@ use std::fmt::Display; use async_trait::async_trait; +// TODO; We could optimise this further +// to pick the exact API Definition (instead of a vector), +// by doing route resolution at this stage rather than +// delegating that task to worker-binding resolver. +// However, requires lot more work. #[async_trait] -pub trait ApiDefinitionLookup { - async fn get( - &self, - input_http_request: Input, - ) -> Result; +pub trait ApiDefinitionsLookup { + async fn get(&self, input: Input) -> Result, ApiDefinitionLookupError>; } pub struct ApiDefinitionLookupError(pub String); diff --git a/golem-worker-service-base/src/service/api_deployment.rs b/golem-worker-service-base/src/service/api_deployment.rs index 510bbddb9..c90f40954 100644 --- a/golem-worker-service-base/src/service/api_deployment.rs +++ b/golem-worker-service-base/src/service/api_deployment.rs @@ -5,10 +5,13 @@ use crate::repo::api_namespace::ApiNamespace; use async_trait::async_trait; -use crate::service::api_definition::ApiDefinitionKey; use std::sync::Arc; use tracing::log::error; +use crate::api_definition::http::{AllPathPatterns, HttpApiDefinition, Route}; +use crate::http::router::{Router, RouterPattern}; +use std::fmt::Display; + #[async_trait] pub trait ApiDeploymentService { async fn deploy( @@ -41,6 +44,7 @@ pub enum ApiDeploymentError { ApiDeploymentNotFound(Namespace, ApiSiteString), InternalError(String), DeploymentConflict(ApiSiteString), + ConflictingDefinitions(Vec), } impl From for ApiDeploymentError { @@ -69,74 +73,58 @@ impl ApiDeploymentServiceDefault ApiDeploymentService - for ApiDeploymentServiceDefault +impl< + Namespace: ApiNamespace, + ApiDefinition: Clone + HasIsDraft + ConflictChecker + Send + Sync, + > ApiDeploymentService for ApiDeploymentServiceDefault { async fn deploy( &self, deployment: &ApiDeployment, ) -> Result<(), ApiDeploymentError> { - let api_definition_keys = deployment.api_definition_keys.clone(); - - let mut api_definitions = vec![]; - - for definition_key in api_definition_keys { - let api_definition_key = ApiDefinitionKey { - namespace: deployment.namespace.clone(), - id: definition_key.id.clone(), - version: definition_key.version.clone(), - }; - - let definition = self - .definition_repo - .get(&api_definition_key) - .await - .map_err(|err| { - ApiDeploymentError::::InternalError(format!( - "Error getting api definition: {}", - err - )) - })? - .ok_or(ApiDeploymentError::ApiDefinitionNotFound( - deployment.namespace.clone(), - definition_key.id.clone(), - ))?; - api_definitions.push((api_definition_key, definition)) - } + // New API definitions to be added to the deployment + let new_api_definitions = + internal::get_api_definitions_from_deployment(deployment, self.definition_repo.clone()) + .await?; + // Existing deployment let existing_deployment = self .deployment_repo .get(&ApiSiteString::from(&deployment.site)) .await?; match existing_deployment { - Some(existing_deployment) => { - let existing_namespace = existing_deployment.namespace; - - let new_deployment_namespace = deployment.namespace.clone(); - - if existing_namespace != new_deployment_namespace { - error!( + Some(existing_deployment) if existing_deployment.namespace != deployment.namespace => { + error!( "Failed to deploy api-definition of namespace {} with site: {} - site used by another API (under another namespace/API)", - &new_deployment_namespace, + &deployment.namespace, &deployment.site, ); - Err(ApiDeploymentError::DeploymentConflict(ApiSiteString::from( - &existing_deployment.site, - ))) - } else { - internal::deploy( - api_definitions, - deployment, - self.definition_repo.clone(), - self.deployment_repo.clone(), - ) - .await - } + Err(ApiDeploymentError::DeploymentConflict(ApiSiteString::from( + &existing_deployment.site, + ))) + } + + Some(existing_deployment) => { + let existing_api_definitions = internal::get_api_definitions_from_deployment( + &existing_deployment, + self.definition_repo.clone(), + ) + .await?; + + internal::deploy( + &existing_api_definitions, + &new_api_definitions, + deployment, + self.definition_repo.clone(), + self.deployment_repo.clone(), + ) + .await } None => { internal::deploy( - api_definitions, + &vec![], + &new_api_definitions, deployment, self.definition_repo.clone(), self.deployment_repo.clone(), @@ -198,35 +186,159 @@ impl ApiDeploymentSer } } +pub trait ConflictChecker { + type Entity: Display + Send; + fn find_conflicts(input: &Vec) -> Vec + where + Self: Sized; +} + +impl ConflictChecker for HttpApiDefinition { + type Entity = AllPathPatterns; + fn find_conflicts(definitions: &Vec) -> Vec { + let routes = definitions + .iter() + .flat_map(|def| def.routes.clone()) + .collect::>(); + + let mut router = Router::::new(); + + let mut conflicting_path_patterns = vec![]; + + for route in routes { + let method: hyper::Method = route.clone().method.into(); + let path = route + .clone() + .path + .path_patterns + .iter() + .map(|pattern| RouterPattern::from(pattern.clone())) + .collect::>(); + + if !router.add_route(method.clone(), path.clone(), route) { + let current_route = router.get_route(&method, &path).unwrap(); + + conflicting_path_patterns.push(current_route.path.clone()); + } + } + + conflicting_path_patterns + } +} + mod internal { use crate::api_definition::{ApiDeployment, HasIsDraft}; use crate::repo::api_definition_repo::ApiDefinitionRepo; use crate::repo::api_deployment_repo::ApiDeploymentRepo; use crate::repo::api_namespace::ApiNamespace; use crate::service::api_definition::ApiDefinitionKey; - use crate::service::api_deployment::ApiDeploymentError; + use crate::service::api_deployment::{ApiDeploymentError, ConflictChecker}; use std::sync::Arc; + use tracing::log::error; + + pub(crate) struct ApiDefinitionWithKey { + pub key: ApiDefinitionKey, + pub definition: ApiDefinition, + } - pub(crate) async fn deploy( - api_definitions: Vec<(ApiDefinitionKey, ApiDefinition)>, + pub(crate) async fn deploy< + Namespace: ApiNamespace, + ApiDefinition: Clone + ConflictChecker + HasIsDraft + Send, + >( + old_api_definitions: &Vec>, + new_api_definitions: &Vec>, deployment: &ApiDeployment, definition_repo: Arc + Sync + Send>, deployment_repo: Arc + Sync + Send>, ) -> Result<(), ApiDeploymentError> { - for (key, definition) in api_definitions { - if definition.is_draft() { - definition_repo.set_not_draft(&key).await.map_err(|err| { + let all_definitions = old_api_definitions + .iter() + .map(|def| def.definition.clone()) + .chain(new_api_definitions.iter().map(|def| def.definition.clone())) + .collect::>(); + + let conflicting_definitions = ApiDefinition::find_conflicts(&all_definitions); + + // If there are no conflicting definitions, make sure to tag the draft definitions to non-draft + // and send the deployment details to deployment repo + if conflicting_definitions.is_empty() { + for api_def in new_api_definitions { + if api_def.definition.is_draft() { + definition_repo + .set_not_draft(&api_def.key) + .await + .map_err(|err| { + ApiDeploymentError::::InternalError(format!( + "Error freezing api definition: {}", + err + )) + })?; + } + } + + deployment_repo + .deploy(deployment) + .await + .map_err(|err| err.into()) + } else { + let conflicting_definitions = conflicting_definitions + .iter() + .map(|def| format!("{}", def)) + .collect::>(); + + error!( + "Failed to deploy api-definition of namespace {} with site: {} - conflicting definitions: {:?}", + &deployment.namespace, + &deployment.site, + conflicting_definitions.join(", ") + ); + + Err(ApiDeploymentError::ConflictingDefinitions( + conflicting_definitions, + )) + } + } + + pub(crate) async fn get_api_definitions_from_deployment< + ApiDefinition: HasIsDraft + ConflictChecker + Send, + Namespace: ApiNamespace, + >( + deployment: &ApiDeployment, + definition_repo: Arc + Sync + Send>, + ) -> Result>, ApiDeploymentError> + { + let api_definition_keys = deployment.api_definition_keys.clone(); + + let mut api_definitions = vec![]; + + for definition_key in api_definition_keys { + let api_definition_key = ApiDefinitionKey { + namespace: deployment.namespace.clone(), + id: definition_key.id.clone(), + version: definition_key.version.clone(), + }; + + let definition = definition_repo + .get(&api_definition_key) + .await + .map_err(|err| { ApiDeploymentError::::InternalError(format!( - "Error freezing api definition: {}", + "Error getting api definition: {}", err )) - })?; - } + })? + .ok_or(ApiDeploymentError::ApiDefinitionNotFound( + deployment.namespace.clone(), + definition_key.id.clone(), + ))?; + let api_definition_with_key = ApiDefinitionWithKey { + key: api_definition_key, + definition, + }; + + api_definitions.push(api_definition_with_key) } - deployment_repo - .deploy(deployment) - .await - .map_err(|err| err.into()) + Ok(api_definitions) } } diff --git a/golem-worker-service-base/src/service/mod.rs b/golem-worker-service-base/src/service/mod.rs index ca50f0d37..65f718583 100644 --- a/golem-worker-service-base/src/service/mod.rs +++ b/golem-worker-service-base/src/service/mod.rs @@ -7,6 +7,7 @@ pub mod component; pub mod worker; pub mod http; + pub fn with_metadata(request: T, metadata: I) -> tonic::Request where I: IntoIterator, diff --git a/golem-worker-service-base/src/worker_binding/worker_binding_resolver.rs b/golem-worker-service-base/src/worker_binding/worker_binding_resolver.rs index cd4d9722b..b1a480098 100644 --- a/golem-worker-service-base/src/worker_binding/worker_binding_resolver.rs +++ b/golem-worker-service-base/src/worker_binding/worker_binding_resolver.rs @@ -21,11 +21,15 @@ use golem_service_base::model::{Id, WorkerId}; use crate::worker_binding::{RequestDetails, ResponseMapping}; use crate::worker_bridge_execution::to_response::ToResponse; +// TODO; It will be better if worker binding resolver +// able to deal with only one API definition +// as the first stage resolution can take place (based on host, input request (route resolution) +// up the stage #[async_trait] pub trait WorkerBindingResolver { async fn resolve( &self, - api_specification: &ApiDefinition, + api_specification: &Vec, ) -> Result; } @@ -152,12 +156,17 @@ impl ResolvedWorkerBinding { impl WorkerBindingResolver for InputHttpRequest { async fn resolve( &self, - api_definition: &HttpApiDefinition, + api_definition: &Vec, ) -> Result { let default_evaluator = DefaultEvaluator::noop(); + let routes = api_definition + .iter() + .flat_map(|x| x.routes.clone()) + .collect::>(); + let api_request = self; - let router = router::build(api_definition.routes.clone()); + let router = router::build(routes); let path: Vec<&str> = RouterPattern::split(&api_request.input_path.base_path).collect(); let request_query_variables = self.input_path.query_components().unwrap_or_default(); let request_body = &self.req_body; diff --git a/golem-worker-service/src/api/deploy_api_definition.rs b/golem-worker-service/src/api/deploy_api_definition.rs index 601f552c5..ea38cc3ec 100644 --- a/golem-worker-service/src/api/deploy_api_definition.rs +++ b/golem-worker-service/src/api/deploy_api_definition.rs @@ -12,7 +12,7 @@ use tracing::log::info; use golem_worker_service_base::api::ApiDeployment; use golem_worker_service_base::api_definition; -use golem_worker_service_base::service::api_definition::ApiDefinitionKey; +use golem_worker_service_base::service::api_definition::ApiDefinitionInfo; use golem_worker_service_base::service::api_deployment::ApiDeploymentService; pub struct ApiDeploymentApi { @@ -32,17 +32,20 @@ impl ApiDeploymentApi { &self, payload: Json, ) -> Result, ApiEndpointError> { - info!( - "Deploy API definition - id: {}, version: {}, site: {}", - payload.api_definitions, payload.version, payload.site - ); + let api_definition_infos = payload + .api_definitions + .iter() + .map(|k| ApiDefinitionInfo { + id: k.id.clone(), + version: k.version.clone(), + }) + .collect::>(); + + info!("Deploy API definitions at site: {}", payload.site); let api_deployment = api_definition::ApiDeployment { - api_definition_keys: vec![ApiDefinitionKey { - namespace: CommonNamespace::default(), - id: payload.api_definitions.clone(), - version: payload.version.clone(), - }], + namespace: CommonNamespace::default(), + api_definition_keys: api_definition_infos, site: payload.site.clone(), }; diff --git a/golem-worker-service/src/service/api_definition_lookup_impl.rs b/golem-worker-service/src/service/api_definition_lookup_impl.rs index 57e072d84..f09709bf5 100644 --- a/golem-worker-service/src/service/api_definition_lookup_impl.rs +++ b/golem-worker-service/src/service/api_definition_lookup_impl.rs @@ -5,10 +5,11 @@ use golem_worker_service_base::auth::CommonNamespace; use golem_worker_service_base::http::InputHttpRequest; use golem_worker_service_base::repo::api_definition_repo::ApiDefinitionRepo; use golem_worker_service_base::service::api_definition_lookup::{ - ApiDefinitionLookup, ApiDefinitionLookupError, + ApiDefinitionLookupError, ApiDefinitionsLookup, }; use golem_worker_service_base::repo::api_deployment_repo::ApiDeploymentRepo; +use golem_worker_service_base::service::api_definition::ApiDefinitionKey; use std::sync::Arc; use tracing::error; @@ -33,14 +34,14 @@ impl CustomRequestDefinitionLookupDefault { } #[async_trait] -impl ApiDefinitionLookup +impl ApiDefinitionsLookup for CustomRequestDefinitionLookupDefault { async fn get( &self, input_http_request: InputHttpRequest, - ) -> Result { - // HOST should exist in Http Reequest + ) -> Result, ApiDefinitionLookupError> { + // HOST should exist in Http Request let host = input_http_request .get_host() .ok_or(ApiDefinitionLookupError( @@ -63,23 +64,35 @@ impl ApiDefinitionLookup &host )))?; - let api_key = api_deployment.api_definition_keys; + let mut http_api_defs = vec![]; - let value = self - .register_api_definition_repo - .get(&api_key) - .await - .map_err(|err| { - error!("Error getting api definition from the repo: {}", err); - ApiDefinitionLookupError(format!( - "Error getting api definition from the repo: {}", - err - )) - })?; + for api_defs in api_deployment.api_definition_keys { + let api_key = ApiDefinitionKey { + namespace: api_deployment.namespace.clone(), + id: api_defs.id.clone(), + version: api_defs.version.clone(), + }; + + let value = self + .register_api_definition_repo + .get(&api_key) + .await + .map_err(|err| { + error!("Error getting api definition from the repo: {}", err); + ApiDefinitionLookupError(format!( + "Error getting api definition from the repo: {}", + err + )) + })?; + + let api_definition = value.ok_or(ApiDefinitionLookupError(format!( + "Api definition with id: {} and version: {} not found", + &api_key.id, &api_key.version + )))?; + + http_api_defs.push(api_definition); + } - value.ok_or(ApiDefinitionLookupError(format!( - "Api definition with id: {} and version: {} not found", - &api_key.id, &api_key.version - ))) + Ok(http_api_defs) } } diff --git a/golem-worker-service/src/service/mod.rs b/golem-worker-service/src/service/mod.rs index 05d46b36c..aefeac1cf 100644 --- a/golem-worker-service/src/service/mod.rs +++ b/golem-worker-service/src/service/mod.rs @@ -16,7 +16,7 @@ use golem_worker_service_base::repo::api_definition_repo::{ use golem_worker_service_base::service::api_definition::{ ApiDefinitionService, ApiDefinitionServiceDefault, }; -use golem_worker_service_base::service::api_definition_lookup::ApiDefinitionLookup; +use golem_worker_service_base::service::api_definition_lookup::ApiDefinitionsLookup; use golem_worker_service_base::service::api_definition_validator::ApiDefinitionValidatorNoop; use golem_worker_service_base::service::api_definition_validator::ApiDefinitionValidatorService; use golem_worker_service_base::service::component::{ComponentServiceNoop, RemoteComponentService}; @@ -54,7 +54,7 @@ pub struct Services { >, pub deployment_service: Arc + Sync + Send>, pub http_definition_lookup_service: - Arc + Sync + Send>, + Arc + Sync + Send>, pub worker_to_http_service: Arc, pub worker_metadata_fetcher: Arc, pub api_definition_validator_service: Arc< @@ -173,7 +173,7 @@ impl Services { Arc::new(InMemoryDeployment::default()); let definition_lookup_service: Arc< - dyn ApiDefinitionLookup + Sync + Send, + dyn ApiDefinitionsLookup + Sync + Send, > = Arc::new(CustomRequestDefinitionLookupDefault::new( definition_repo.clone(), deployment_repo.clone(),