From 75202be342e0618d09bcb4b2ce6d8f6c41d5db2f Mon Sep 17 00:00:00 2001 From: Daniel Vigovszky Date: Thu, 12 Dec 2024 16:06:58 +0100 Subject: [PATCH] Follow changes in all crates --- golem-api-grpc/Cargo.toml | 4 +- golem-cli/src/model/invoke_result_view.rs | 8 +- golem-cli/src/service/worker.rs | 2 +- golem-common/src/model/mod.rs | 13 ++-- golem-common/src/model/oplog.rs | 17 +--- golem-common/src/model/protobuf.rs | 12 ++- golem-common/src/model/public_oplog.rs | 25 ++++-- golem-common/src/newtype.rs | 8 +- golem-common/src/retries.rs | 2 +- .../src/config.rs | 3 +- .../src/service/compile_worker.rs | 2 +- .../src/service/component.rs | 2 +- golem-rib/src/compiler/ir.rs | 7 +- golem-rib/src/expr.rs | 5 +- golem-service-base/src/config.rs | 3 +- golem-shard-manager/src/healthcheck.rs | 4 +- .../src/shard_manager_config.rs | 5 +- .../src/durable_host/golem/mod.rs | 2 +- .../src/durable_host/golem/v11.rs | 2 +- .../src/durable_host/mod.rs | 2 +- golem-worker-executor-base/src/invocation.rs | 11 ++- .../src/services/component.rs | 2 +- .../src/services/golem_config.rs | 3 +- .../src/services/plugins.rs | 2 +- golem-worker-executor-base/src/worker.rs | 2 +- .../tests/compatibility/v1.rs | 20 ++--- .../tests/compatibility/v1_1.rs | 2 +- golem-worker-service-base/src/app_config.rs | 3 +- .../gateway_binding_compiled.rs | 47 +++++------ .../file_server_binding_handler.rs | 64 ++++++++------- .../rib_input_value_resolver.rs | 10 +++ .../src/gateway_execution/to_response.rs | 58 +++++++------- .../src/gateway_middleware/http/cors.rs | 16 +--- .../src/gateway_rib_interpreter/mod.rs | 19 ++++- golem-worker-service-base/src/getter.rs | 77 +++++++++++++++---- golem-worker-service-base/src/headers.rs | 28 +++---- .../src/service/component/default.rs | 2 +- .../src/service/worker/default.rs | 2 +- .../src/service/worker/routing_logic.rs | 2 +- .../tests/services_tests.rs | 4 +- golem-worker-service/src/service/mod.rs | 2 +- wasm-rpc/src/text.rs | 12 +-- wasm-rpc/src/value_and_type.rs | 11 +++ 43 files changed, 307 insertions(+), 220 deletions(-) diff --git a/golem-api-grpc/Cargo.toml b/golem-api-grpc/Cargo.toml index 56be0dd13..72e0fd349 100644 --- a/golem-api-grpc/Cargo.toml +++ b/golem-api-grpc/Cargo.toml @@ -11,8 +11,8 @@ description = "GRPC API for Golem services" harness = false [dependencies] -golem-wasm-ast = { path = "../wasm-ast", version = "0.0.0" } -golem-wasm-rpc = { path = "../wasm-rpc", version = "0.0.0", default-features = false, features = ["host"] } +golem-wasm-ast = { path = "../wasm-ast", version = "0.0.0", default-features = false, features = ["protobuf"] } +golem-wasm-rpc = { path = "../wasm-rpc", version = "0.0.0", default-features = false, features = ["protobuf"] } async-trait = { workspace = true } bincode = { workspace = true } diff --git a/golem-cli/src/model/invoke_result_view.rs b/golem-cli/src/model/invoke_result_view.rs index 15c87b6c7..2bc8e4894 100644 --- a/golem-cli/src/model/invoke_result_view.rs +++ b/golem-cli/src/model/invoke_result_view.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use golem_wasm_rpc::{protobuf, print_type_annotated_value}; +use golem_wasm_rpc::{print_type_annotated_value, protobuf}; use serde::{Deserialize, Serialize}; use serde_json::value::Value; use tracing::{debug, info}; @@ -122,7 +122,7 @@ mod tests { use golem_wasm_rpc::protobuf::type_annotated_value::TypeAnnotatedValue; use golem_wasm_rpc::protobuf::TypeAnnotatedValue as RootTypeAnnotatedValue; use golem_wasm_rpc::protobuf::TypedTuple; - use golem_wasm_rpc::{TypeAnnotatedValueConstructors, Uri}; + use golem_wasm_rpc::TypeAnnotatedValueConstructors; use uuid::Uuid; use golem_client::model::{ @@ -195,9 +195,7 @@ mod tests { fn fallback_to_json() { let res = parse( vec![golem_wasm_rpc::Value::Handle { - uri: Uri { - value: "".to_string(), - }, + uri: "".to_string(), resource_id: 1, }], vec![handle(AnalysedResourceId(1), AnalysedResourceMode::Owned)], diff --git a/golem-cli/src/service/worker.rs b/golem-cli/src/service/worker.rs index a4ff17f07..472bf5774 100644 --- a/golem-cli/src/service/worker.rs +++ b/golem-cli/src/service/worker.rs @@ -33,8 +33,8 @@ use golem_common::uri::oss::url::{ComponentUrl, WorkerUrl}; use golem_common::uri::oss::urn::{ComponentUrn, WorkerUrn}; use golem_wasm_ast::analysis::{AnalysedExport, AnalysedFunction, AnalysedInstance}; use golem_wasm_rpc::json::TypeAnnotatedValueJsonExtensions; -use golem_wasm_rpc::protobuf::type_annotated_value::TypeAnnotatedValue; use golem_wasm_rpc::parse_type_annotated_value; +use golem_wasm_rpc::protobuf::type_annotated_value::TypeAnnotatedValue; use itertools::Itertools; use serde_json::Value; use std::sync::Arc; diff --git a/golem-common/src/model/mod.rs b/golem-common/src/model/mod.rs index e310360ef..cdba257b5 100644 --- a/golem-common/src/model/mod.rs +++ b/golem-common/src/model/mod.rs @@ -30,6 +30,7 @@ use golem_wasm_ast::analysis::analysed_type::{ }; use golem_wasm_ast::analysis::{analysed_type, AnalysedType}; use golem_wasm_rpc::IntoValue; +use http::Uri; use rand::prelude::IteratorRandom; use serde::de::Unexpected; use serde::{de, Deserialize, Deserializer, Serialize, Serializer}; @@ -39,7 +40,6 @@ use std::fmt::{Display, Formatter}; use std::ops::Add; use std::str::FromStr; use std::time::{Duration, SystemTime}; -use http::Uri; use typed_path::Utf8UnixPathBuf; use uuid::{uuid, Uuid}; @@ -70,8 +70,11 @@ pub trait PoemTypeRequirements: pub trait PoemTypeRequirements {} #[cfg(feature = "poem")] -impl - PoemTypeRequirements for T +impl< + T: poem_openapi::types::Type + + poem_openapi::types::ParseFromJSON + + poem_openapi::types::ToJSON, + > PoemTypeRequirements for T { } @@ -2242,8 +2245,7 @@ mod tests { use crate::model::oplog::OplogIndex; use crate::model::{ - AccountId, ComponentFilePath, ComponentFilePermissions, ComponentId, Empty, - FilterComparator, IdempotencyKey, InitialComponentFile, InitialComponentFileKey, ShardId, + AccountId, ComponentFilePath, ComponentId, FilterComparator, IdempotencyKey, ShardId, StringFilterComparator, TargetWorkerId, Timestamp, WorkerFilter, WorkerId, WorkerMetadata, WorkerStatus, WorkerStatusRecord, }; @@ -2579,5 +2581,4 @@ mod tests { let path = ComponentFilePath::from_abs_str("a/b/c"); assert!(path.is_err()); } - } diff --git a/golem-common/src/model/oplog.rs b/golem-common/src/model/oplog.rs index 8e4b5348f..2be2f0d89 100644 --- a/golem-common/src/model/oplog.rs +++ b/golem-common/src/model/oplog.rs @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::model::RetryConfig; use crate::model::regions::OplogRegion; +use crate::model::RetryConfig; use crate::model::{ AccountId, ComponentVersion, IdempotencyKey, PluginInstallationId, Timestamp, WorkerId, WorkerInvocation, @@ -199,18 +199,7 @@ impl<'de> BorrowDecode<'de> for PayloadId { } #[derive( - Debug, - Clone, - Copy, - PartialOrd, - Ord, - PartialEq, - Eq, - Hash, - Encode, - Decode, - Serialize, - Deserialize, + Debug, Clone, Copy, PartialOrd, Ord, PartialEq, Eq, Hash, Encode, Decode, Serialize, Deserialize, )] #[cfg_attr(feature = "poem", derive(poem_openapi::NewType))] pub struct WorkerResourceId(pub u64); @@ -856,4 +845,4 @@ mod protobuf { } } } -} \ No newline at end of file +} diff --git a/golem-common/src/model/protobuf.rs b/golem-common/src/model/protobuf.rs index b9f2c7cd6..417b42a77 100644 --- a/golem-common/src/model/protobuf.rs +++ b/golem-common/src/model/protobuf.rs @@ -12,14 +12,22 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::ops::Add; use crate::model::oplog::OplogIndex; -use crate::model::{AccountId, ComponentFilePath, ComponentFilePermissions, ComponentFileSystemNode, ComponentFileSystemNodeDetails, ComponentType, FilterComparator, GatewayBindingType, IdempotencyKey, InitialComponentFile, InitialComponentFileKey, LogLevel, NumberOfShards, Pod, PromiseId, RoutingTable, RoutingTableEntry, ScanCursor, ShardId, StringFilterComparator, TargetWorkerId, Timestamp, WorkerCreatedAtFilter, WorkerEnvFilter, WorkerEvent, WorkerFilter, WorkerId, WorkerNameFilter, WorkerNotFilter, WorkerStatus, WorkerStatusFilter, WorkerVersionFilter}; +use crate::model::{ + AccountId, ComponentFilePath, ComponentFilePermissions, ComponentFileSystemNode, + ComponentFileSystemNodeDetails, ComponentType, FilterComparator, GatewayBindingType, + IdempotencyKey, InitialComponentFile, InitialComponentFileKey, LogLevel, NumberOfShards, Pod, + PromiseId, RoutingTable, RoutingTableEntry, ScanCursor, ShardId, StringFilterComparator, + TargetWorkerId, Timestamp, WorkerCreatedAtFilter, WorkerEnvFilter, WorkerEvent, WorkerFilter, + WorkerId, WorkerNameFilter, WorkerNotFilter, WorkerStatus, WorkerStatusFilter, + WorkerVersionFilter, +}; use golem_api_grpc::proto::golem; use golem_api_grpc::proto::golem::shardmanager::{ Pod as GrpcPod, RoutingTable as GrpcRoutingTable, RoutingTableEntry as GrpcRoutingTableEntry, }; use golem_api_grpc::proto::golem::worker::Cursor; +use std::ops::Add; use std::time::{Duration, SystemTime}; impl From for prost_types::Timestamp { diff --git a/golem-common/src/model/public_oplog.rs b/golem-common/src/model/public_oplog.rs index cbc392d57..8448c272b 100644 --- a/golem-common/src/model/public_oplog.rs +++ b/golem-common/src/model/public_oplog.rs @@ -12,11 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::model::RetryConfig; use crate::model::lucene::{LeafQuery, Query}; use crate::model::oplog::{LogLevel, OplogIndex, WorkerResourceId, WrappedFunctionType}; use crate::model::plugin::PluginInstallation; use crate::model::regions::OplogRegion; +use crate::model::RetryConfig; use crate::model::{ AccountId, ComponentVersion, Empty, IdempotencyKey, PluginInstallationId, Timestamp, WorkerId, }; @@ -2449,17 +2449,28 @@ mod protobuf { mod tests { use test_r::test; + use crate::model::public_oplog::{ + ChangeRetryPolicyParameters, CreateParameters, DescribeResourceParameters, + EndRegionParameters, ErrorParameters, ExportedFunctionCompletedParameters, + ExportedFunctionInvokedParameters, ExportedFunctionParameters, FailedUpdateParameters, + GrowMemoryParameters, ImportedFunctionInvokedParameters, JumpParameters, LogParameters, + PendingUpdateParameters, PendingWorkerInvocationParameters, PluginInstallationDescription, + PublicOplogEntry, PublicRetryConfig, PublicUpdateDescription, PublicWorkerInvocation, + PublicWrappedFunctionType, ResourceParameters, SnapshotBasedUpdateParameters, + SuccessfulUpdateParameters, TimestampParameter, + }; + use crate::model::{ + AccountId, ComponentId, Empty, IdempotencyKey, PluginInstallationId, Timestamp, WorkerId, + }; use std::collections::{BTreeMap, BTreeSet}; - use crate::model::{AccountId, ComponentId, Empty, IdempotencyKey, PluginInstallationId, Timestamp, WorkerId}; use uuid::Uuid; - use crate::model::public_oplog::{ChangeRetryPolicyParameters, CreateParameters, DescribeResourceParameters, EndRegionParameters, ErrorParameters, ExportedFunctionCompletedParameters, ExportedFunctionInvokedParameters, ExportedFunctionParameters, FailedUpdateParameters, GrowMemoryParameters, ImportedFunctionInvokedParameters, JumpParameters, LogParameters, PendingUpdateParameters, PendingWorkerInvocationParameters, PluginInstallationDescription, PublicOplogEntry, PublicRetryConfig, PublicUpdateDescription, PublicWorkerInvocation, PublicWrappedFunctionType, ResourceParameters, SnapshotBasedUpdateParameters, SuccessfulUpdateParameters, TimestampParameter}; - #[cfg(feature = "poem")] - use poem_openapi::types::ToJSON; - use golem_wasm_ast::analysis::analysed_type::{field, list, r#enum, record, s16, str, u64}; - use golem_wasm_rpc::{Value, ValueAndType}; use crate::model::oplog::{LogLevel, OplogIndex, WorkerResourceId}; use crate::model::regions::OplogRegion; + use golem_wasm_ast::analysis::analysed_type::{field, list, r#enum, record, s16, str, u64}; + use golem_wasm_rpc::{Value, ValueAndType}; + #[cfg(feature = "poem")] + use poem_openapi::types::ToJSON; fn rounded_ts(ts: Timestamp) -> Timestamp { Timestamp::from(ts.to_millis()) diff --git a/golem-common/src/newtype.rs b/golem-common/src/newtype.rs index 4e51ef091..7962a0937 100644 --- a/golem-common/src/newtype.rs +++ b/golem-common/src/newtype.rs @@ -102,7 +102,9 @@ macro_rules! newtype_uuid { } fn schema_ref() -> poem_openapi::registry::MetaSchemaRef { - poem_openapi::registry::MetaSchemaRef::Inline(Box::new(poem_openapi::registry::MetaSchema::new_with_format("string", "uuid"))) + poem_openapi::registry::MetaSchemaRef::Inline(Box::new( + poem_openapi::registry::MetaSchema::new_with_format("string", "uuid"), + )) } fn as_raw_value(&self) -> Option<&Self::RawValueType> { @@ -125,7 +127,9 @@ macro_rules! newtype_uuid { #[cfg(feature = "poem")] impl poem_openapi::types::ParseFromJSON for $name { - fn parse_from_json(value: Option) -> poem_openapi::types::ParseResult { + fn parse_from_json( + value: Option, + ) -> poem_openapi::types::ParseResult { match value { Some(serde_json::Value::String(s)) => Ok(Self(Uuid::from_str(&s)?)), _ => Err(poem_openapi::types::ParseError::<$name>::custom(format!( diff --git a/golem-common/src/retries.rs b/golem-common/src/retries.rs index 14e579a0c..fc610af2f 100644 --- a/golem-common/src/retries.rs +++ b/golem-common/src/retries.rs @@ -18,10 +18,10 @@ use std::pin::Pin; use std::time::{Duration, Instant}; use tracing::{error, info, warn, Level}; -use crate::model::RetryConfig; use crate::metrics::external_calls::{ record_external_call_failure, record_external_call_retry, record_external_call_success, }; +use crate::model::RetryConfig; use crate::retriable_error::IsRetriableError; /// Returns the delay to be waited before the next retry attempt. diff --git a/golem-component-compilation-service/src/config.rs b/golem-component-compilation-service/src/config.rs index e90a22813..202ed5b92 100644 --- a/golem-component-compilation-service/src/config.rs +++ b/golem-component-compilation-service/src/config.rs @@ -19,7 +19,8 @@ use std::net::{Ipv4Addr, SocketAddrV4}; use std::path::Path; use uuid::Uuid; -use golem_common::config::{ConfigExample, ConfigLoader, HasConfigExamples, RetryConfig}; +use golem_common::config::{ConfigExample, ConfigLoader, HasConfigExamples}; +use golem_common::model::RetryConfig; use golem_common::tracing::TracingConfig; use golem_service_base::config::BlobStorageConfig; use golem_worker_executor_base::services::golem_config::CompiledComponentServiceConfig; diff --git a/golem-component-compilation-service/src/service/compile_worker.rs b/golem-component-compilation-service/src/service/compile_worker.rs index 2a08d6754..2cc7c6b6b 100644 --- a/golem-component-compilation-service/src/service/compile_worker.rs +++ b/golem-component-compilation-service/src/service/compile_worker.rs @@ -20,9 +20,9 @@ use golem_api_grpc::proto::golem::component::v1::download_component_response; use golem_api_grpc::proto::golem::component::v1::ComponentError; use golem_api_grpc::proto::golem::component::v1::DownloadComponentRequest; use golem_common::client::{GrpcClient, GrpcClientConfig}; -use golem_common::config::RetryConfig; use golem_common::metrics::external_calls::record_external_call_response_size_bytes; use golem_common::model::ComponentId; +use golem_common::model::RetryConfig; use golem_common::retries::with_retries; use golem_worker_executor_base::grpc::authorised_grpc_request; use golem_worker_executor_base::grpc::is_grpc_retriable; diff --git a/golem-component-service-base/src/service/component.rs b/golem-component-service-base/src/service/component.rs index 2fbea25ee..ac324e06d 100644 --- a/golem-component-service-base/src/service/component.rs +++ b/golem-component-service-base/src/service/component.rs @@ -26,7 +26,6 @@ use bytes::Bytes; use futures::TryStreamExt; use golem_api_grpc::proto::golem::common::{ErrorBody, ErrorsBody}; use golem_api_grpc::proto::golem::component::v1::component_error; -use golem_common::config::RetryConfig; use golem_common::model::component::ComponentOwner; use golem_common::model::component_constraint::FunctionConstraintCollection; use golem_common::model::component_metadata::{ComponentMetadata, ComponentProcessingError}; @@ -35,6 +34,7 @@ use golem_common::model::plugin::{ PluginInstallationUpdate, PluginScope, PluginTypeSpecificDefinition, }; use golem_common::model::ComponentVersion; +use golem_common::model::RetryConfig; use golem_common::model::{AccountId, PluginInstallationId}; use golem_common::model::{ ComponentFilePath, ComponentFilePermissions, ComponentId, ComponentType, InitialComponentFile, diff --git a/golem-rib/src/compiler/ir.rs b/golem-rib/src/compiler/ir.rs index 564e42a49..f8ae17106 100644 --- a/golem-rib/src/compiler/ir.rs +++ b/golem-rib/src/compiler/ir.rs @@ -488,7 +488,9 @@ mod protobuf { RibIR::PushLit(value) => { Instruction::PushLit(golem_wasm_rpc::protobuf::TypeAnnotatedValue { type_annotated_value: Some( - value.try_into().map_err(|errs: Vec| errs.join(", "))?, + value + .try_into() + .map_err(|errs: Vec| errs.join(", "))?, ), }) } @@ -578,7 +580,8 @@ mod protobuf { RibIR::PushFlag(flag) => { Instruction::PushFlag(golem_wasm_rpc::protobuf::TypeAnnotatedValue { type_annotated_value: Some( - flag.try_into().map_err(|errs: Vec| errs.join(", "))?, + flag.try_into() + .map_err(|errs: Vec| errs.join(", "))?, ), }) } diff --git a/golem-rib/src/expr.rs b/golem-rib/src/expr.rs index 9620a6955..e7eb6d73d 100644 --- a/golem-rib/src/expr.rs +++ b/golem-rib/src/expr.rs @@ -16,7 +16,10 @@ use crate::call_type::CallType; use crate::parser::block::block; use crate::parser::type_name::TypeName; use crate::type_registry::FunctionTypeRegistry; -use crate::{from_string, text, type_checker, type_inference, DynamicParsedFunctionName, InferredType, ParsedFunctionName, VariableId}; +use crate::{ + from_string, text, type_checker, type_inference, DynamicParsedFunctionName, InferredType, + ParsedFunctionName, VariableId, +}; use bigdecimal::{BigDecimal, FromPrimitive, ToPrimitive}; use combine::parser::char::spaces; use combine::stream::position; diff --git a/golem-service-base/src/config.rs b/golem-service-base/src/config.rs index 71cd6193b..a5a4fda03 100644 --- a/golem-service-base/src/config.rs +++ b/golem-service-base/src/config.rs @@ -12,7 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -use golem_common::config::{DbSqliteConfig, RetryConfig}; +use golem_common::config::DbSqliteConfig; +use golem_common::model::RetryConfig; use serde::{Deserialize, Serialize}; use std::{path::PathBuf, time::Duration}; diff --git a/golem-shard-manager/src/healthcheck.rs b/golem-shard-manager/src/healthcheck.rs index e152f511c..07c827922 100644 --- a/golem-shard-manager/src/healthcheck.rs +++ b/golem-shard-manager/src/healthcheck.rs @@ -5,7 +5,7 @@ use std::sync::Arc; use async_trait::async_trait; -use golem_common::config::RetryConfig; +use golem_common::model::RetryConfig; use golem_common::retries::with_retriable_errors; use crate::error::HealthCheckError; @@ -104,7 +104,7 @@ pub mod kubernetes { use k8s_openapi::api::core::v1::{Pod, PodStatus}; use kube::{Api, Client}; - use golem_common::config::RetryConfig; + use golem_common::model::RetryConfig; use crate::healthcheck::{health_check_with_retries, HealthCheck, HealthCheckError}; diff --git a/golem-shard-manager/src/shard_manager_config.rs b/golem-shard-manager/src/shard_manager_config.rs index 446505492..2260cd6c0 100644 --- a/golem-shard-manager/src/shard_manager_config.rs +++ b/golem-shard-manager/src/shard_manager_config.rs @@ -17,9 +17,8 @@ use std::time::Duration; use serde::{Deserialize, Serialize}; -use golem_common::config::{ - ConfigExample, ConfigLoader, HasConfigExamples, RedisConfig, RetryConfig, -}; +use golem_common::config::{ConfigExample, ConfigLoader, HasConfigExamples, RedisConfig}; +use golem_common::model::RetryConfig; use golem_common::tracing::TracingConfig; use crate::model::Empty; diff --git a/golem-worker-executor-base/src/durable_host/golem/mod.rs b/golem-worker-executor-base/src/durable_host/golem/mod.rs index 6fd126450..a386645df 100644 --- a/golem-worker-executor-base/src/durable_host/golem/mod.rs +++ b/golem-worker-executor-base/src/durable_host/golem/mod.rs @@ -16,7 +16,7 @@ pub mod v11; use anyhow::anyhow; use async_trait::async_trait; -use golem_common::config::RetryConfig; +use golem_common::model::RetryConfig; use std::time::Duration; use tracing::debug; use uuid::Uuid; diff --git a/golem-worker-executor-base/src/durable_host/golem/v11.rs b/golem-worker-executor-base/src/durable_host/golem/v11.rs index 15d5685eb..fa6b4e7bf 100644 --- a/golem-worker-executor-base/src/durable_host/golem/v11.rs +++ b/golem-worker-executor-base/src/durable_host/golem/v11.rs @@ -34,8 +34,8 @@ use crate::services::{HasOplogService, HasPlugins}; use crate::workerctx::WorkerCtx; use anyhow::anyhow; use async_trait::async_trait; -use golem_common::config::RetryConfig; use golem_common::model::OwnedWorkerId; +use golem_common::model::RetryConfig; use std::time::Duration; use wasmtime::component::Resource; use wasmtime_wasi::WasiView; diff --git a/golem-worker-executor-base/src/durable_host/mod.rs b/golem-worker-executor-base/src/durable_host/mod.rs index 382439f13..b42fdd74b 100644 --- a/golem-worker-executor-base/src/durable_host/mod.rs +++ b/golem-worker-executor-base/src/durable_host/mod.rs @@ -58,7 +58,6 @@ pub use durability::*; use futures::future::try_join_all; use futures_util::TryFutureExt; use futures_util::TryStreamExt; -use golem_common::config::RetryConfig; use golem_common::model::component::ComponentOwner; use golem_common::model::oplog::{ IndexedResourceKey, LogLevel, OplogEntry, OplogIndex, UpdateDescription, WorkerError, @@ -66,6 +65,7 @@ use golem_common::model::oplog::{ }; use golem_common::model::plugin::{PluginOwner, PluginScope}; use golem_common::model::regions::{DeletedRegions, OplogRegion}; +use golem_common::model::RetryConfig; use golem_common::model::{exports, PluginInstallationId}; use golem_common::model::{ AccountId, ComponentFilePath, ComponentFilePermissions, ComponentFileSystemNode, diff --git a/golem-worker-executor-base/src/invocation.rs b/golem-worker-executor-base/src/invocation.rs index 4f5156b9c..efe9e3045 100644 --- a/golem-worker-executor-base/src/invocation.rs +++ b/golem-worker-executor-base/src/invocation.rs @@ -285,7 +285,7 @@ async fn get_or_create_indexed_resource<'a, Ctx: WorkerCtx>( Ok(InvokeResult::from_success( 0, vec![Value::Handle { - uri: store.data().self_uri(), + uri: store.data().self_uri().value, resource_id: resource_id.0, }], )) @@ -303,6 +303,11 @@ async fn get_or_create_indexed_resource<'a, Ctx: WorkerCtx>( "Could not extract resource constructor parameters from function name", ))?; + let constructor_params: Vec = constructor_params + .into_iter() + .map(|vnt| vnt.value) + .collect(); + debug!("Creating new indexed resource with parameters {constructor_params:?}"); let constructor_result = invoke( @@ -407,13 +412,13 @@ async fn drop_resource( let resource_id = match function_input.first() { Some(Value::Handle { uri, resource_id }) => { - if uri == &self_uri { + if uri == &self_uri.value { Ok(*resource_id) } else { Err(GolemError::ValueMismatch { details: format!( "trying to drop handle for on wrong worker ({} vs {}) {}", - uri.value, self_uri.value, raw_function_name + uri, self_uri.value, raw_function_name ), }) } diff --git a/golem-worker-executor-base/src/services/component.rs b/golem-worker-executor-base/src/services/component.rs index fba3a95fd..0b9ae6377 100644 --- a/golem-worker-executor-base/src/services/component.rs +++ b/golem-worker-executor-base/src/services/component.rs @@ -34,10 +34,10 @@ use golem_api_grpc::proto::golem::component::v1::{ }; use golem_common::cache::{BackgroundEvictionMode, Cache, FullCacheEvictionMode, SimpleCache}; use golem_common::client::{GrpcClient, GrpcClientConfig}; -use golem_common::config::RetryConfig; use golem_common::metrics::external_calls::record_external_call_response_size_bytes; use golem_common::model::component_metadata::LinearMemory; use golem_common::model::plugin::PluginInstallation; +use golem_common::model::RetryConfig; use golem_common::model::{ AccountId, ComponentId, ComponentType, ComponentVersion, InitialComponentFile, }; diff --git a/golem-worker-executor-base/src/services/golem_config.rs b/golem-worker-executor-base/src/services/golem_config.rs index 35dc1ab6c..44899e75c 100644 --- a/golem-worker-executor-base/src/services/golem_config.rs +++ b/golem-worker-executor-base/src/services/golem_config.rs @@ -25,8 +25,9 @@ use serde::{Deserialize, Serialize}; use url::Url; use golem_common::config::{ - ConfigExample, ConfigLoader, DbSqliteConfig, HasConfigExamples, RedisConfig, RetryConfig, + ConfigExample, ConfigLoader, DbSqliteConfig, HasConfigExamples, RedisConfig, }; +use golem_common::model::RetryConfig; use golem_common::tracing::TracingConfig; /// The shared global Golem configuration diff --git a/golem-worker-executor-base/src/services/plugins.rs b/golem-worker-executor-base/src/services/plugins.rs index 3ae838f93..d5ffd4431 100644 --- a/golem-worker-executor-base/src/services/plugins.rs +++ b/golem-worker-executor-base/src/services/plugins.rs @@ -24,11 +24,11 @@ use golem_api_grpc::proto::golem::component::v1::{ }; use golem_common::cache::{BackgroundEvictionMode, Cache, FullCacheEvictionMode, SimpleCache}; use golem_common::client::{GrpcClient, GrpcClientConfig}; -use golem_common::config::RetryConfig; use golem_common::model::plugin::{ DefaultPluginOwner, DefaultPluginScope, PluginDefinition, PluginInstallation, PluginOwner, PluginScope, }; +use golem_common::model::RetryConfig; use golem_common::model::{AccountId, ComponentId, ComponentVersion, PluginInstallationId}; use http::Uri; use std::sync::Arc; diff --git a/golem-worker-executor-base/src/worker.rs b/golem-worker-executor-base/src/worker.rs index 8d2e66e0e..a7de8b183 100644 --- a/golem-worker-executor-base/src/worker.rs +++ b/golem-worker-executor-base/src/worker.rs @@ -41,12 +41,12 @@ use crate::workerctx::{PublicWorkerIo, WorkerCtx}; use anyhow::anyhow; use drop_stream::DropStream; use futures::channel::oneshot; -use golem_common::config::RetryConfig; use golem_common::model::oplog::{ OplogEntry, OplogIndex, TimestampedUpdateDescription, UpdateDescription, WorkerError, WorkerResourceId, }; use golem_common::model::regions::{DeletedRegions, DeletedRegionsBuilder, OplogRegion}; +use golem_common::model::RetryConfig; use golem_common::model::{ exports, ComponentFilePath, ComponentType, PluginInstallationId, WorkerStatusRecordExtensions, }; diff --git a/golem-worker-executor-base/tests/compatibility/v1.rs b/golem-worker-executor-base/tests/compatibility/v1.rs index bc784cc67..177fa699e 100644 --- a/golem-worker-executor-base/tests/compatibility/v1.rs +++ b/golem-worker-executor-base/tests/compatibility/v1.rs @@ -23,13 +23,13 @@ use test_r::test; use bincode::{Decode, Encode}; use goldenfile::Mint; -use golem_common::config::RetryConfig; use golem_common::model::oplog::{ IndexedResourceKey, LogLevel, OplogEntry, OplogIndex, OplogPayload, PayloadId, TimestampedUpdateDescription, UpdateDescription, WorkerError, WorkerResourceId, WrappedFunctionType, }; use golem_common::model::regions::{DeletedRegions, OplogRegion}; +use golem_common::model::RetryConfig; use golem_common::model::{ AccountId, ComponentId, FailedUpdateRecord, IdempotencyKey, OwnedWorkerId, PromiseId, ScheduledAction, ShardId, SuccessfulUpdateRecord, Timestamp, TimestampedWorkerInvocation, @@ -44,7 +44,7 @@ use golem_wasm_ast::analysis::{ TypeU32, TypeU64, TypeU8, TypeVariant, }; use golem_wasm_rpc::protobuf::type_annotated_value::TypeAnnotatedValue; -use golem_wasm_rpc::{TypeAnnotatedValueConstructors, Uri, Value, WitValue}; +use golem_wasm_rpc::{TypeAnnotatedValueConstructors, Value, WitValue}; use golem_worker_executor_base::durable_host::http::serialized::{ SerializableDnsErrorPayload, SerializableErrorCode, SerializableFieldSizePayload, SerializableResponse, SerializableResponseHeaders, SerializableTlsAlertReceivedPayload, @@ -226,9 +226,7 @@ pub fn wasm_rpc_value() { let v21c = Value::Result(Ok(None)); let v21d = Value::Result(Err(None)); let v22 = Value::Handle { - uri: Uri { - value: "uri".to_string(), - }, + uri: "uri".to_string(), resource_id: 123, }; @@ -1179,9 +1177,7 @@ pub fn wit_value() { let wv21c: WitValue = Value::Result(Ok(None)).into(); let wv21d: WitValue = Value::Result(Err(None)).into(); let wv22: WitValue = Value::Handle { - uri: Uri { - value: "uri".to_string(), - }, + uri: "uri".to_string(), resource_id: 123, } .into(); @@ -1648,9 +1644,7 @@ pub fn proto_val() { let pv21c: golem_wasm_rpc::protobuf::Val = Value::Result(Ok(None)).into(); let pv21d: golem_wasm_rpc::protobuf::Val = Value::Result(Err(None)).into(); let pv22: golem_wasm_rpc::protobuf::Val = Value::Handle { - uri: Uri { - value: "uri".to_string(), - }, + uri: "uri".to_string(), resource_id: 123, } .into(); @@ -1856,9 +1850,7 @@ pub fn type_annotated_value() { .unwrap(); let tav22 = TypeAnnotatedValue::create( &Value::Handle { - uri: Uri { - value: "uri".to_string(), - }, + uri: "uri".to_string(), resource_id: 123, }, &AnalysedType::Handle(TypeHandle { diff --git a/golem-worker-executor-base/tests/compatibility/v1_1.rs b/golem-worker-executor-base/tests/compatibility/v1_1.rs index 475c10c1c..7f7bc91d1 100644 --- a/golem-worker-executor-base/tests/compatibility/v1_1.rs +++ b/golem-worker-executor-base/tests/compatibility/v1_1.rs @@ -17,10 +17,10 @@ use test_r::test; use crate::compatibility::v1::backward_compatible; use goldenfile::Mint; -use golem_common::config::RetryConfig; use golem_common::model::oplog::{ IndexedResourceKey, OplogEntry, OplogIndex, OplogPayload, WorkerResourceId, WrappedFunctionType, }; +use golem_common::model::RetryConfig; use golem_common::model::{ AccountId, ComponentId, IdempotencyKey, PluginInstallationId, Timestamp, TimestampedWorkerInvocation, WorkerId, WorkerInvocation, WorkerResourceDescription, diff --git a/golem-worker-service-base/src/app_config.rs b/golem-worker-service-base/src/app_config.rs index 9a0754482..c5096a080 100644 --- a/golem-worker-service-base/src/app_config.rs +++ b/golem-worker-service-base/src/app_config.rs @@ -21,8 +21,9 @@ use serde::{Deserialize, Serialize}; use url::Url; use uuid::Uuid; -use golem_common::config::{ConfigExample, HasConfigExamples, RedisConfig, RetryConfig}; +use golem_common::config::{ConfigExample, HasConfigExamples, RedisConfig}; use golem_common::config::{DbConfig, DbSqliteConfig}; +use golem_common::model::RetryConfig; use golem_common::tracing::TracingConfig; use golem_service_base::service::routing_table::RoutingTableConfig; diff --git a/golem-worker-service-base/src/gateway_binding/gateway_binding_compiled.rs b/golem-worker-service-base/src/gateway_binding/gateway_binding_compiled.rs index f954365d9..10eb03617 100644 --- a/golem-worker-service-base/src/gateway_binding/gateway_binding_compiled.rs +++ b/golem-worker-service-base/src/gateway_binding/gateway_binding_compiled.rs @@ -78,14 +78,14 @@ impl TryFrom Ok(internal::to_gateway_binding_compiled_proto( worker_binding, GatewayBindingType::Default, - )) + )?) } GatewayBindingCompiled::FileServer(worker_binding) => { Ok(internal::to_gateway_binding_compiled_proto( worker_binding, GatewayBindingType::FileServer, - )) + )?) } GatewayBindingCompiled::Static(static_binding) => { @@ -239,7 +239,7 @@ mod internal { pub(crate) fn to_gateway_binding_compiled_proto( worker_binding: WorkerBindingCompiled, binding_type: GatewayBindingType, - ) -> golem_api_grpc::proto::golem::apidefinition::CompiledGatewayBinding { + ) -> Result { let component = Some(worker_binding.component_id.into()); let worker_name = worker_binding .worker_name_compiled @@ -248,7 +248,8 @@ mod internal { let compiled_worker_name_expr = worker_binding .worker_name_compiled .clone() - .map(|w| w.compiled_worker_name.into()); + .map(|w| w.compiled_worker_name.try_into()) + .transpose()?; let worker_name_rib_input = worker_binding .worker_name_compiled .map(|w| w.rib_input_type_info.into()); @@ -256,7 +257,7 @@ mod internal { match worker_binding.idempotency_key_compiled { Some(x) => ( Some(x.idempotency_key.into()), - Some(x.compiled_idempotency_key.into()), + Some(x.compiled_idempotency_key.try_into()?), Some(x.rib_input.into()), ), None => (None, None, None), @@ -272,7 +273,7 @@ mod internal { worker_binding .response_compiled .response_mapping_compiled - .into(), + .try_into()?, ); let response_rib_input = Some(worker_binding.response_compiled.rib_input.into()); let response_rib_output = worker_binding @@ -291,21 +292,23 @@ mod internal { GatewayBindingType::CorsPreflight => 2, }; - golem_api_grpc::proto::golem::apidefinition::CompiledGatewayBinding { - component, - worker_name, - compiled_worker_name_expr, - worker_name_rib_input, - idempotency_key, - compiled_idempotency_key_expr, - idempotency_key_rib_input, - response, - compiled_response_expr, - response_rib_input, - worker_functions_in_response, - binding_type: Some(binding_type), - static_binding: None, - response_rib_output, - } + Ok( + golem_api_grpc::proto::golem::apidefinition::CompiledGatewayBinding { + component, + worker_name, + compiled_worker_name_expr, + worker_name_rib_input, + idempotency_key, + compiled_idempotency_key_expr, + idempotency_key_rib_input, + response, + compiled_response_expr, + response_rib_input, + worker_functions_in_response, + binding_type: Some(binding_type), + static_binding: None, + response_rib_output, + }, + ) } } diff --git a/golem-worker-service-base/src/gateway_execution/file_server_binding_handler.rs b/golem-worker-service-base/src/gateway_execution/file_server_binding_handler.rs index 7171e6a96..570851db1 100644 --- a/golem-worker-service-base/src/gateway_execution/file_server_binding_handler.rs +++ b/golem-worker-service-base/src/gateway_execution/file_server_binding_handler.rs @@ -14,8 +14,7 @@ use crate::empty_worker_metadata; use crate::gateway_binding::WorkerDetail; -use crate::getter::{get_response_headers_or_default, get_status_code, GetterExt}; -use crate::path::Path; +use crate::getter::{get_response_headers_or_default, get_status_code}; use crate::service::component::{ComponentService, ComponentServiceError}; use crate::service::worker::{WorkerService, WorkerServiceError}; use async_trait::async_trait; @@ -26,9 +25,8 @@ use golem_common::model::{ComponentFilePath, HasAccountId, TargetWorkerId}; use golem_service_base::auth::EmptyAuthCtx; use golem_service_base::model::validate_worker_name; use golem_service_base::service::initial_component_files::InitialComponentFilesService; -use golem_wasm_rpc::json::TypeAnnotatedValueJsonExtensions; -use golem_wasm_rpc::protobuf::type_annotated_value::TypeAnnotatedValue; -use golem_wasm_rpc::protobuf::typed_result::ResultValue; +use golem_wasm_ast::analysis::AnalysedType; +use golem_wasm_rpc::{Value, ValueAndType}; use http::StatusCode; use poem::web::headers::ContentType; use rib::RibResult; @@ -188,20 +186,22 @@ impl FileServerBindingDetails { // 3. A result of either of the above, with the same rules applied. match result { RibResult::Val(value) => match value { - TypeAnnotatedValue::Result(inner) => { - let value = inner - .result_value - .ok_or("Expected a result value".to_string())?; - match value { - ResultValue::OkValue(ok) => Self::from_rib_happy( - ok.type_annotated_value.ok_or("ok unset".to_string())?, - ), - ResultValue::ErrorValue(err) => { - let value = err.type_annotated_value.ok_or("err unset".to_string())?; - Err(format!("Error result: {}", value.to_json_value())) - } + ValueAndType { + value: Value::Result(value), + typ: AnalysedType::Result(typ), + } => match value { + Ok(ok) => { + let ok = ValueAndType::new( + *ok.ok_or("ok unset".to_string())?, + (*typ.ok.ok_or("Missing 'ok' type")?).clone(), + ); + Self::from_rib_happy(ok) } - } + Err(err) => { + let value = err.ok_or("err unset".to_string())?; + Err(format!("Error result: {value:?}")) + } + }, other => Self::from_rib_happy(other), }, RibResult::Unit => Err("Expected a value".to_string()), @@ -209,25 +209,33 @@ impl FileServerBindingDetails { } /// Like the above, just without the result case. - fn from_rib_happy(value: TypeAnnotatedValue) -> Result { - match value { - TypeAnnotatedValue::Str(raw_path) => Self::make_from(raw_path, None, None), - record @ TypeAnnotatedValue::Record(_) => { - let path = record - .get_optional(&Path::from_key("file-path")) + fn from_rib_happy(value: ValueAndType) -> Result { + match &value { + ValueAndType { + value: Value::String(raw_path), + .. + } => Self::make_from(raw_path.clone(), None, None), + ValueAndType { + value: Value::Record(field_values), + typ: AnalysedType::Record(record), + } => { + let path_position = record + .fields + .iter() + .position(|pair| &pair.name == "file-path") .ok_or("Record must contain 'file-path' field")?; - let path = if let TypeAnnotatedValue::Str(path) = path { + let path = if let Value::String(path) = &field_values[path_position] { path } else { return Err("file-path must be a string".to_string()); }; - let status = get_status_code(&record)?; - let headers = get_response_headers_or_default(&record)?; + let status = get_status_code(field_values, record)?; + let headers = get_response_headers_or_default(&value)?; let content_type = headers.get_content_type(); - Self::make_from(path, content_type, status) + Self::make_from(path.to_string(), content_type, status) } _ => Err("Response value expected".to_string()), } diff --git a/golem-worker-service-base/src/gateway_execution/rib_input_value_resolver.rs b/golem-worker-service-base/src/gateway_execution/rib_input_value_resolver.rs index 09d7fafa3..5f2fcca94 100644 --- a/golem-worker-service-base/src/gateway_execution/rib_input_value_resolver.rs +++ b/golem-worker-service-base/src/gateway_execution/rib_input_value_resolver.rs @@ -61,6 +61,11 @@ impl RibInputValueResolver for HttpRequestDetails { warn!("received: {:?}", rib_input_with_request_content); let input = TypeAnnotatedValue::parse_with_type(rib_input_with_request_content, request_type) .map_err(|err| RibInputTypeMismatch(format!("Input request details don't match the requirements for rib expression to execute: {}. Requirements. {:?}", err.join(", "), request_type)))?; + let input = input.try_into().map_err(|err| { + RibInputTypeMismatch(format!( + "Internal error converting between value representations: {err}" + )) + })?; let mut rib_input_map = HashMap::new(); rib_input_map.insert("request".to_string(), input); @@ -86,6 +91,11 @@ impl RibInputValueResolver for WorkerDetail { let request_value = TypeAnnotatedValue::parse_with_type(rib_input_with_request_content, worker_details_type) .map_err(|err| RibInputTypeMismatch(format!("Worker details don't match the requirements for rib expression to execute: {}. Requirements. {:?}", err.join(", "), worker_details_type)))?; + let request_value = request_value.try_into().map_err(|err| { + RibInputTypeMismatch(format!( + "Internal error converting between value representations: {err}" + )) + })?; let mut rib_input_map = HashMap::new(); rib_input_map.insert("worker".to_string(), request_value); diff --git a/golem-worker-service-base/src/gateway_execution/to_response.rs b/golem-worker-service-base/src/gateway_execution/to_response.rs index 12749388a..c8b3c544e 100644 --- a/golem-worker-service-base/src/gateway_execution/to_response.rs +++ b/golem-worker-service-base/src/gateway_execution/to_response.rs @@ -215,9 +215,14 @@ mod internal { let headers = get_response_headers_or_default(rib_result).map_err(EvaluationError)?; - let body = rib_result + let tav: TypeAnnotatedValue = rib_result + .clone() + .try_into() + .map_err(|errs: Vec| EvaluationError(errs.join(", ")))?; + + let body = tav .get_optional(&Path::from_key("body")) - .unwrap_or(rib_result.clone()); + .unwrap_or(tav.clone()); Ok(IntermediateHttpResponse { body: Some(body), @@ -282,9 +287,6 @@ mod internal { #[cfg(test)] mod test { use async_trait::async_trait; - use golem_wasm_rpc::protobuf::type_annotated_value::TypeAnnotatedValue; - use golem_wasm_rpc::protobuf::Type; - use golem_wasm_rpc::protobuf::{NameTypePair, NameValuePair, TypedRecord}; use std::sync::Arc; use test_r::test; @@ -293,50 +295,43 @@ mod test { DataKey, DataValue, GatewaySession, GatewaySessionError, SessionId, }; use crate::gateway_execution::to_response::ToHttpResponse; + use golem_wasm_ast::analysis::analysed_type::record; + use golem_wasm_ast::analysis::NameTypePair; + use golem_wasm_rpc::{IntoValueAndType, Value, ValueAndType}; use http::header::CONTENT_TYPE; use http::StatusCode; use rib::RibResult; - fn create_record(values: Vec<(String, TypeAnnotatedValue)>) -> TypeAnnotatedValue { - let mut name_type_pairs = vec![]; - let mut name_value_pairs = vec![]; - - for (key, value) in values.iter() { - let typ = Type::try_from(value).unwrap(); - name_type_pairs.push(NameTypePair { - name: key.to_string(), - typ: Some(typ), - }); + fn create_record(values: Vec<(String, ValueAndType)>) -> ValueAndType { + let mut fields = vec![]; + let mut field_values = vec![]; - name_value_pairs.push(NameValuePair { - name: key.to_string(), - value: Some(golem_wasm_rpc::protobuf::TypeAnnotatedValue { - type_annotated_value: Some(value.clone()), - }), + for (key, vnt) in values { + fields.push(NameTypePair { + name: key, + typ: vnt.typ, }); + field_values.push(vnt.value); } - TypeAnnotatedValue::Record(TypedRecord { - typ: name_type_pairs, - value: name_value_pairs, - }) + ValueAndType { + value: Value::Record(field_values), + typ: record(fields), + } } #[test] async fn test_evaluation_result_to_response_with_http_specifics() { let record = create_record(vec![ - ("status".to_string(), TypeAnnotatedValue::U16(400)), + ("status".to_string(), 400u16.into_value_and_type()), ( "headers".to_string(), create_record(vec![( "Content-Type".to_string(), - TypeAnnotatedValue::Str("application/json".to_string()), + "application/json".into_value_and_type(), )]), ), - ( - "body".to_string(), - TypeAnnotatedValue::Str("Hello".to_string()), - ), + ("body".to_string(), "Hello".into_value_and_type()), ]); let evaluation_result: RibResult = RibResult::Val(record); @@ -367,8 +362,7 @@ mod test { #[test] async fn test_evaluation_result_to_response_with_no_http_specifics() { - let evaluation_result: RibResult = - RibResult::Val(TypeAnnotatedValue::Str("Healthy".to_string())); + let evaluation_result: RibResult = RibResult::Val("Healthy".into_value_and_type()); let session_store: Arc = Arc::new(TestSessionStore); diff --git a/golem-worker-service-base/src/gateway_middleware/http/cors.rs b/golem-worker-service-base/src/gateway_middleware/http/cors.rs index e56061abe..576d6523b 100644 --- a/golem-worker-service-base/src/gateway_middleware/http/cors.rs +++ b/golem-worker-service-base/src/gateway_middleware/http/cors.rs @@ -142,18 +142,8 @@ impl HttpCors { let mut cors = HttpCors::default(); - for name_value in record { - let key = &name_value.name; - let value = name_value - .value - .as_ref() - .ok_or("Missing value in type_annotated_value record")? - .type_annotated_value - .as_ref() - .ok_or(format!( - "Unable to fetch value for key {} in type_annotated_value", - key - ))? + for (key, value) in record { + let value = value .get_literal() .ok_or(format!( "Invalid value for key {} in CORS preflight response", @@ -161,7 +151,7 @@ impl HttpCors { ))? .as_string(); - internal::set_cors_field(&mut cors, key, &value)?; + internal::set_cors_field(&mut cors, &key, &value)?; } Ok(cors) diff --git a/golem-worker-service-base/src/gateway_rib_interpreter/mod.rs b/golem-worker-service-base/src/gateway_rib_interpreter/mod.rs index 43d95135d..0e415855d 100644 --- a/golem-worker-service-base/src/gateway_rib_interpreter/mod.rs +++ b/golem-worker-service-base/src/gateway_rib_interpreter/mod.rs @@ -21,6 +21,7 @@ use golem_wasm_rpc::protobuf::type_annotated_value::TypeAnnotatedValue; use golem_common::model::{ComponentId, IdempotencyKey}; use golem_common::SafeDisplay; +use golem_wasm_rpc::ValueAndType; use rib::{RibByteCode, RibFunctionInvoke, RibInput, RibResult}; use crate::gateway_execution::{GatewayResolvedWorkerRequest, GatewayWorkerRequestExecutor}; @@ -97,7 +98,7 @@ impl WorkerServiceRibInterpreter| { + move |function_name: String, parameters: Vec| { let component_id = component_id.clone(); let worker_name = worker_name.clone(); let idempotency_key = idempotency_key.clone(); @@ -105,20 +106,30 @@ impl WorkerServiceRibInterpreter TypeAnnotatedValue + let function_params: Vec = parameters + .into_iter() + .map(|p| p.try_into()) + .collect::, _>>() + .map_err(|errs: Vec| errs.join(", "))?; + let worker_request = GatewayResolvedWorkerRequest { component_id, worker_name, function_name, - function_params: parameters, + function_params, idempotency_key, namespace, }; - executor + let tav = executor .execute(worker_request) .await .map(|v| v.result) - .map_err(|e| e.to_string()) + .map_err(|e| e.to_string())?; + + // Output TypeAnnotatedValue => ValueAndType + tav.try_into() } .boxed() } diff --git a/golem-worker-service-base/src/getter.rs b/golem-worker-service-base/src/getter.rs index 815d56ca1..0d3adf965 100644 --- a/golem-worker-service-base/src/getter.rs +++ b/golem-worker-service-base/src/getter.rs @@ -14,9 +14,11 @@ use crate::headers::ResolvedResponseHeaders; use crate::path::{Path, PathComponent}; +use golem_wasm_ast::analysis::{AnalysedType, TypeRecord}; use golem_wasm_rpc::json::TypeAnnotatedValueJsonExtensions; use golem_wasm_rpc::protobuf::type_annotated_value::TypeAnnotatedValue; use golem_wasm_rpc::protobuf::{TypedList, TypedRecord, TypedTuple}; +use golem_wasm_rpc::{Value, ValueAndType}; use http::StatusCode; use rib::GetLiteralValue; use rib::LiteralValue; @@ -35,6 +37,8 @@ pub enum GetError { NotRecord { key_name: String, found: String }, #[error("Not an array: index: {index}, original_value: {found}")] NotArray { index: usize, found: String }, + #[error("Internal error: {0}")] + Internal(String), } // To deal with fields in a TypeAnnotatedValue (that's returned from golem-rib) @@ -90,6 +94,17 @@ impl Getter for TypeAnnotatedValue { } } +impl Getter for ValueAndType { + fn get(&self, key: &Path) -> Result { + let tav: TypeAnnotatedValue = self + .clone() + .try_into() + .map_err(|errs: Vec| GetError::Internal(errs.join(", ")))?; + let result = tav.get(key)?; + result.try_into().map_err(GetError::Internal) + } +} + fn get_array(value: &TypeAnnotatedValue) -> Option> { match value { TypeAnnotatedValue::List(TypedList { values, .. }) => { @@ -124,32 +139,64 @@ impl> GetterExt for T { } pub fn get_response_headers( - typed_value: &TypeAnnotatedValue, + field_values: &[Value], + record: &TypeRecord, ) -> Result, String> { - match typed_value.get_optional(&Path::from_key("headers")) { + match record + .fields + .iter() + .position(|pair| &pair.name == "headers") + { None => Ok(None), - Some(header) => Ok(Some(ResolvedResponseHeaders::from_typed_value(&header)?)), + Some(field_position) => Ok(Some(ResolvedResponseHeaders::from_typed_value( + ValueAndType::new( + field_values[field_position].clone(), + record.fields[field_position].typ.clone(), + ), + )?)), } } pub fn get_response_headers_or_default( - typed_value: &TypeAnnotatedValue, + value: &ValueAndType, ) -> Result { - get_response_headers(typed_value).map(|headers| headers.unwrap_or_default()) + match value { + ValueAndType { + value: Value::Record(field_values), + typ: AnalysedType::Record(record), + } => get_response_headers(field_values, record).map(|headers| headers.unwrap_or_default()), + _ => Ok(ResolvedResponseHeaders::default()), + } } -pub fn get_status_code(typed_value: &TypeAnnotatedValue) -> Result, String> { - match typed_value.get_optional(&Path::from_key("status")) { +pub fn get_status_code( + field_values: &[Value], + record: &TypeRecord, +) -> Result, String> { + match record + .fields + .iter() + .position(|field| &field.name == "status") + { None => Ok(None), - Some(typed_value) => Ok(Some(get_status_code_inner(&typed_value)?)), + Some(field_position) => Ok(Some(get_status_code_inner(ValueAndType::new( + field_values[field_position].clone(), + record.fields[field_position].typ.clone(), + ))?)), } } -pub fn get_status_code_or_ok(typed_value: &TypeAnnotatedValue) -> Result { - get_status_code(typed_value).map(|status| status.unwrap_or(StatusCode::OK)) +pub fn get_status_code_or_ok(value: &ValueAndType) -> Result { + match value { + ValueAndType { + value: Value::Record(field_values), + typ: AnalysedType::Record(record), + } => get_status_code(field_values, record).map(|status| status.unwrap_or(StatusCode::OK)), + _ => Ok(StatusCode::OK), + } } -fn get_status_code_inner(status_code: &TypeAnnotatedValue) -> Result { +fn get_status_code_inner(status_code: ValueAndType) -> Result { let status_res: Result = match status_code.get_literal() { Some(LiteralValue::String(status_str)) => status_str.parse().map_err(|e| { @@ -166,7 +213,7 @@ fn get_status_code_inner(status_code: &TypeAnnotatedValue) -> Result Err(format!( "Status Code Expression is evaluated to a complex value. It is resolved to {:?}", - status_code.to_json_value() + status_code.value )) }; @@ -174,7 +221,7 @@ fn get_status_code_inner(status_code: &TypeAnnotatedValue) -> Result Result { + pub fn from_typed_value(header_map: ValueAndType) -> Result { match header_map { - TypeAnnotatedValue::Record(TypedRecord { value, .. }) => { + ValueAndType { + value: Value::Record(field_values), + typ: AnalysedType::Record(record), + } => { let mut resolved_headers: HashMap = HashMap::new(); - for name_value_pair in value { - let value_str = name_value_pair - .value - .as_ref() - .and_then(|v| v.type_annotated_value.clone()) - .ok_or("Unable to resolve header value".to_string())? + for (value, field_def) in field_values.into_iter().zip(record.fields) { + let value = ValueAndType::new(value, field_def.typ); + let value_str = value .get_literal() .map(|primitive| primitive.to_string()) .unwrap_or_else(|| "Unable to resolve header".to_string()); - resolved_headers.insert(name_value_pair.name.clone(), value_str); + resolved_headers.insert(field_def.name, value_str); } let headers = (&resolved_headers) @@ -56,8 +53,7 @@ impl ResolvedResponseHeaders { } _ => Err(format!( - "Header expression is not a record. It is resolved to {}", - header_map.to_json_value() + "Header expression is not a record. It is resolved to {header_map}", )), } } diff --git a/golem-worker-service-base/src/service/component/default.rs b/golem-worker-service-base/src/service/component/default.rs index 67fbbc47e..bd1598926 100644 --- a/golem-worker-service-base/src/service/component/default.rs +++ b/golem-worker-service-base/src/service/component/default.rs @@ -24,9 +24,9 @@ use golem_api_grpc::proto::golem::component::v1::{ use golem_api_grpc::proto::golem::component::ComponentConstraints; use golem_api_grpc::proto::golem::component::FunctionConstraintCollection as FunctionConstraintCollectionProto; use golem_common::client::{GrpcClient, GrpcClientConfig}; -use golem_common::config::RetryConfig; use golem_common::model::component_constraint::FunctionConstraintCollection; use golem_common::model::ComponentId; +use golem_common::model::RetryConfig; use golem_common::retries::with_retries; use golem_service_base::model::Component; use http::Uri; diff --git a/golem-worker-service-base/src/service/worker/default.rs b/golem-worker-service-base/src/service/worker/default.rs index 3a0d751ae..6f19b9d0d 100644 --- a/golem-worker-service-base/src/service/worker/default.rs +++ b/golem-worker-service-base/src/service/worker/default.rs @@ -31,9 +31,9 @@ use golem_api_grpc::proto::golem::workerexecutor::v1::{ ResumeWorkerRequest, SearchOplogResponse, UpdateWorkerRequest, }; use golem_common::client::MultiTargetGrpcClient; -use golem_common::config::RetryConfig; use golem_common::model::oplog::OplogIndex; use golem_common::model::public_oplog::{OplogCursor, PublicOplogEntry}; +use golem_common::model::RetryConfig; use golem_common::model::{ AccountId, ComponentFilePath, ComponentFileSystemNode, ComponentId, ComponentVersion, FilterComparator, IdempotencyKey, PluginInstallationId, PromiseId, ScanCursor, TargetWorkerId, diff --git a/golem-worker-service-base/src/service/worker/routing_logic.rs b/golem-worker-service-base/src/service/worker/routing_logic.rs index 007e80f81..56b026156 100644 --- a/golem-worker-service-base/src/service/worker/routing_logic.rs +++ b/golem-worker-service-base/src/service/worker/routing_logic.rs @@ -27,7 +27,7 @@ use tracing::{debug, error, info, warn, Instrument}; use golem_api_grpc::proto::golem::worker::v1::WorkerExecutionError; use golem_api_grpc::proto::golem::workerexecutor::v1::worker_executor_client::WorkerExecutorClient; use golem_common::client::MultiTargetGrpcClient; -use golem_common::config::RetryConfig; +use golem_common::model::RetryConfig; use golem_common::model::{Pod, ShardId, TargetWorkerId, WorkerId}; use golem_common::retriable_error::IsRetriableError; use golem_common::retries::get_delay; diff --git a/golem-worker-service-base/tests/services_tests.rs b/golem-worker-service-base/tests/services_tests.rs index 55951c1c3..0c8f19dba 100644 --- a/golem-worker-service-base/tests/services_tests.rs +++ b/golem-worker-service-base/tests/services_tests.rs @@ -16,8 +16,8 @@ use golem_service_base::migration::{Migrations, MigrationsDir}; use test_r::test; use async_trait::async_trait; -use golem_common::config::{DbPostgresConfig, DbSqliteConfig, RedisConfig, RetryConfig}; -use golem_common::model::ComponentId; +use golem_common::config::{DbPostgresConfig, DbSqliteConfig, RedisConfig}; +use golem_common::model::{ComponentId, RetryConfig}; use golem_service_base::auth::{DefaultNamespace, EmptyAuthCtx}; use golem_service_base::db; use golem_service_base::model::Component; diff --git a/golem-worker-service/src/service/mod.rs b/golem-worker-service/src/service/mod.rs index f85a3c502..53b3b2781 100644 --- a/golem-worker-service/src/service/mod.rs +++ b/golem-worker-service/src/service/mod.rs @@ -47,7 +47,7 @@ use golem_worker_service_base::service::worker::WorkerServiceDefault; use golem_api_grpc::proto::golem::workerexecutor::v1::worker_executor_client::WorkerExecutorClient; use golem_common::client::{GrpcClientConfig, MultiTargetGrpcClient}; -use golem_common::config::RetryConfig; +use golem_common::model::RetryConfig; use golem_common::config::DbConfig; use golem_common::redis::RedisPool; diff --git a/wasm-rpc/src/text.rs b/wasm-rpc/src/text.rs index 243b93d0d..cd52aefe2 100644 --- a/wasm-rpc/src/text.rs +++ b/wasm-rpc/src/text.rs @@ -249,14 +249,14 @@ impl WasmValue for ValueAndType { fn unwrap_u8(&self) -> u8 { match self.value { - Value::U8(val) => val as u8, + Value::U8(val) => val, _ => panic!("Expected u8, found {:?}", self), } } fn unwrap_u16(&self) -> u16 { match self.value { - Value::U16(val) => val as u16, + Value::U16(val) => val, _ => panic!("Expected u16, found {:?}", self), } } @@ -362,9 +362,9 @@ impl WasmValue for ValueAndType { value: *v.clone(), typ: typ .as_ref() - .expect(&format!( - "No type information for non-unit variant case {case_name}" - )) + .unwrap_or_else(|| { + panic!("No type information for non-unit variant case {case_name}") + }) .clone(), }) }); @@ -460,7 +460,7 @@ mod tests { let s = print_value_and_type(&typed_value).unwrap(); let round_trip_value: ValueAndType = parse_value_and_type(&typ, &s).unwrap(); - let result: Value = Value::try_from(round_trip_value).unwrap(); + let result: Value = Value::from(round_trip_value); assert_eq!(value, result); } diff --git a/wasm-rpc/src/value_and_type.rs b/wasm-rpc/src/value_and_type.rs index bb34cbf32..9c4365494 100644 --- a/wasm-rpc/src/value_and_type.rs +++ b/wasm-rpc/src/value_and_type.rs @@ -26,6 +26,17 @@ pub struct ValueAndType { pub typ: AnalysedType, } +#[cfg(feature = "text")] +impl std::fmt::Display for ValueAndType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{}", + crate::text::print_value_and_type(self).unwrap_or("".to_string()) + ) + } +} + impl ValueAndType { pub fn new(value: Value, typ: AnalysedType) -> Self { Self { value, typ }