diff --git a/golem-worker-executor-base/src/durable_host/blobstore/container.rs b/golem-worker-executor-base/src/durable_host/blobstore/container.rs index e596addb8..4479fe4b5 100644 --- a/golem-worker-executor-base/src/durable_host/blobstore/container.rs +++ b/golem-worker-executor-base/src/durable_host/blobstore/container.rs @@ -15,13 +15,14 @@ use async_trait::async_trait; use golem_common::model::oplog::WrappedFunctionType; use wasmtime::component::Resource; +use wasmtime_wasi::bindings::filesystem::types::HostDescriptor; use wasmtime_wasi::WasiView; use crate::durable_host::blobstore::types::{ ContainerEntry, IncomingValueEntry, OutgoingValueEntry, StreamObjectNamesEntry, }; use crate::durable_host::serialized::SerializableError; -use crate::durable_host::{Durability, DurableWorkerCtx}; +use crate::durable_host::{Durability2, DurableWorkerCtx}; use crate::metrics::wasm::record_host_function_call; use crate::preview2::wasi::blobstore::container::{ Container, ContainerMetadata, Error, Host, HostContainer, HostStreamObjectNames, IncomingValue, @@ -69,32 +70,33 @@ impl HostContainer for DurableWorkerCtx { start: u64, end: u64, ) -> anyhow::Result, Error>> { - let _permit = self.begin_async_host_function().await?; - record_host_function_call("blobstore::container::container", "get_data"); - let account_id = self.state.owned_worker_id.account_id(); + let durability = Durability2::, SerializableError>::new( + self, + "golem blobstore::container", + "get_data", + WrappedFunctionType::ReadRemote, + ) + .await?; + let account_id = self.state.owned_worker_id.account_id(); let container_name = self .as_wasi_view() .table() .get::(&container) .map(|container_entry| container_entry.name.clone())?; - let result = - Durability::, SerializableError>::wrap( - self, - WrappedFunctionType::ReadRemote, - "golem blobstore::container::get_data", - (container_name.clone(), name.clone(), start, end), - |ctx| { - ctx.state.blob_store_service.get_data( - account_id, - container_name, - name, - start, - end, - ) - }, - ) - .await; + + let result = if durability.is_live() { + let result = self + .state + .blob_store_service + .get_data(account_id, container_name.clone(), name.clone(), start, end) + .await; + durability + .persist(self, (container_name, name, start, end), result) + .await + } else { + durability.replay(self).await + }; match result { Ok(get_data) => { let incoming_value = self @@ -113,10 +115,15 @@ impl HostContainer for DurableWorkerCtx { name: ObjectName, data: Resource, ) -> anyhow::Result> { - let _permit = self.begin_async_host_function().await?; - record_host_function_call("blobstore::container::container", "write_data"); - let account_id = self.state.owned_worker_id.account_id(); + let durability = Durability2::::new( + self, + "golem blobstore::container", + "write_data", + WrappedFunctionType::WriteRemote, + ) + .await?; + let account_id = self.state.owned_worker_id.account_id(); let container_name = self .as_wasi_view() .table() @@ -127,18 +134,21 @@ impl HostContainer for DurableWorkerCtx { .table() .get::(&data) .map(|outgoing_value_entry| outgoing_value_entry.body.read().unwrap().clone())?; - let result = Durability::::wrap( - self, - WrappedFunctionType::WriteRemote, - "golem blobstore::container::write_data", - (container_name.clone(), name.clone(), data.len() as u64), - |ctx| { - ctx.state - .blob_store_service - .write_data(account_id, container_name, name, data) - }, - ) - .await; + + let result = if durability.is_live() { + let len = data.len() as u64; + let result = self + .state + .blob_store_service + .write_data(account_id, container_name.clone(), name.clone(), data) + .await; + durability + .persist(self, (container_name, name, len), result) + .await + } else { + durability.replay(self).await + }; + match result { Ok(_) => Ok(Ok(())), Err(e) => Ok(Err(format!("{:?}", e))), @@ -149,27 +159,32 @@ impl HostContainer for DurableWorkerCtx { &mut self, container: Resource, ) -> anyhow::Result, Error>> { - let _permit = self.begin_async_host_function().await?; - record_host_function_call("blobstore::container::container", "list_objects"); - let account_id = self.state.owned_worker_id.account_id(); + let durability = Durability2::, SerializableError>::new( + self, + "golem blobstore::container", + "list_object", + WrappedFunctionType::ReadRemote, + ) + .await?; + let account_id = self.state.owned_worker_id.account_id(); let container_name = self .as_wasi_view() .table() .get::(&container) .map(|container_entry| container_entry.name.clone())?; - let result = Durability::, SerializableError>::wrap( - self, - WrappedFunctionType::ReadRemote, - "golem blobstore::container::list_objects", - container_name.clone(), - |ctx| { - ctx.state - .blob_store_service - .list_objects(account_id, container_name) - }, - ) - .await; + + let result = if durability.is_live() { + let result = self + .state + .blob_store_service + .list_objects(account_id, container_name.clone()) + .await; + durability.persist(self, container_name, result).await + } else { + durability.replay(self).await + }; + match result { Ok(list_objects) => { let stream_object_names = self @@ -187,27 +202,34 @@ impl HostContainer for DurableWorkerCtx { container: Resource, name: ObjectName, ) -> anyhow::Result> { - let _permit = self.begin_async_host_function().await?; - record_host_function_call("blobstore::container::container", "delete_object"); - let account_id = self.state.owned_worker_id.account_id(); + let durability = Durability2::::new( + self, + "golem blobstore::container", + "delete_object", + WrappedFunctionType::WriteRemote, + ) + .await?; + let account_id = self.state.owned_worker_id.account_id(); let container_name = self .as_wasi_view() .table() .get::(&container) .map(|container_entry| container_entry.name.clone())?; - let result = Durability::::wrap( - self, - WrappedFunctionType::WriteRemote, - "golem blobstore::container::delete_object", - (container_name.clone(), name.clone()), - |ctx| { - ctx.state - .blob_store_service - .delete_object(account_id, container_name, name) - }, - ) - .await; + + let result = if durability.is_live() { + let result = self + .state + .blob_store_service + .delete_object(account_id, container_name.clone(), name.clone()) + .await; + durability + .persist(self, (container_name, name), result) + .await + } else { + durability.replay(self) + }; + match result { Ok(_) => Ok(Ok(())), Err(e) => Ok(Err(format!("{:?}", e))), @@ -219,27 +241,34 @@ impl HostContainer for DurableWorkerCtx { container: Resource, names: Vec, ) -> anyhow::Result> { - let _permit = self.begin_async_host_function().await?; - record_host_function_call("blobstore::container::container", "delete_objects"); - let account_id = self.state.owned_worker_id.account_id(); + let durability = Durability2::::new( + self, + "golem blobstore::container", + "delete_objects", + WrappedFunctionType::WriteRemote, + ) + .await?; + let account_id = self.state.owned_worker_id.account_id(); let container_name = self .as_wasi_view() .table() .get::(&container) .map(|container_entry| container_entry.name.clone())?; - let result = Durability::), (), SerializableError>::wrap( - self, - WrappedFunctionType::WriteRemote, - "golem blobstore::container::delete_objects", - (container_name.clone(), names.clone()), - |ctx| { - ctx.state - .blob_store_service - .delete_objects(account_id, container_name, names) - }, - ) - .await; + + let result = if durability.is_live() { + let result = self + .state + .blob_store_service + .delete_objects(account_id, container_name.clone(), names.clone()) + .await; + durability + .persist(self, (container_name, names), result) + .await + } else { + durability.replay(self).await + }; + match result { Ok(_) => Ok(Ok(())), Err(e) => Ok(Err(format!("{:?}", e))), @@ -251,27 +280,34 @@ impl HostContainer for DurableWorkerCtx { container: Resource, name: ObjectName, ) -> anyhow::Result> { - let _permit = self.begin_async_host_function().await?; - record_host_function_call("blobstore::container::container", "has_object"); - let account_id = self.state.owned_worker_id.account_id(); + let durability = Durability2::::new( + self, + "golem blobstore::container", + "has_object", + WrappedFunctionType::ReadRemote, + ) + .await?; + let account_id = self.state.owned_worker_id.account_id(); let container_name = self .as_wasi_view() .table() .get::(&container) .map(|container_entry| container_entry.name.clone())?; - let result = Durability::::wrap( - self, - WrappedFunctionType::ReadRemote, - "golem blobstore::container::has_object", - (container_name.clone(), name.clone()), - |ctx| { - ctx.state - .blob_store_service - .has_object(account_id, container_name, name) - }, - ) - .await; + + let result = if durability.is_live() { + let result = self + .state + .blob_store_service + .has_object(account_id, container_name.clone(), name.clone()) + .await; + durability + .persist(self, (container_name, name), result) + .await + } else { + durability.replay(self).await + }; + match result { Ok(has_object) => Ok(Ok(has_object)), Err(e) => Ok(Err(format!("{:?}", e))), @@ -283,32 +319,38 @@ impl HostContainer for DurableWorkerCtx { container: Resource, name: ObjectName, ) -> anyhow::Result> { - let _permit = self.begin_async_host_function().await?; - record_host_function_call("blobstore::container::container", "object_info"); - let account_id = self.state.owned_worker_id.account_id(); - - let container_name = self - .as_wasi_view() - .table() - .get::(&container) - .map(|container_entry| container_entry.name.clone())?; - let result = Durability::< + let durability = Durability2::< Ctx, - (String, String), crate::services::blob_store::ObjectMetadata, SerializableError, - >::wrap( + >::new( self, + "golem blobstore::container", + "object_info", WrappedFunctionType::ReadRemote, - "golem blobstore::container::object_info", - (container_name.clone(), name.clone()), - |ctx| { - ctx.state - .blob_store_service - .object_info(account_id, container_name, name) - }, ) - .await; + .await?; + + let account_id = self.state.owned_worker_id.account_id(); + let container_name = self + .as_wasi_view() + .table() + .get::(&container) + .map(|container_entry| container_entry.name.clone())?; + + let result = if durability.is_live() { + let result = self + .state + .blob_store_service + .object_info(account_id, container_name.clone(), name.clone()) + .await; + durability + .persist(self, (container_name, name), result) + .await + } else { + durability.replay(self) + }; + match result { Ok(object_info) => { let object_info = ObjectMetadata { @@ -324,27 +366,32 @@ impl HostContainer for DurableWorkerCtx { } async fn clear(&mut self, container: Resource) -> anyhow::Result> { - let _permit = self.begin_async_host_function().await?; - record_host_function_call("blobstore::container::container", "clear"); - let account_id = self.state.owned_worker_id.account_id(); + let durability = Durability2::::new( + self, + "golem blobstore::container", + "clear", + WrappedFunctionType::WriteRemote, + ) + .await?; + let account_id = self.state.owned_worker_id.account_id(); let container_name = self .as_wasi_view() .table() .get::(&container) .map(|container_entry| container_entry.name.clone())?; - Durability::::wrap( - self, - WrappedFunctionType::WriteRemote, - "golem blobstore::container::clear", - container_name.clone(), - |ctx| { - ctx.state - .blob_store_service - .clear(account_id, container_name) - }, - ) - .await?; + + if durability.is_live() { + let result = self + .state + .blob_store_service + .clear(account_id, container_name.clone()) + .await; + durability.persist(self, container_name, result).await + } else { + durability.replay(self).await + }?; + Ok(Ok(())) } diff --git a/golem-worker-executor-base/src/durable_host/cli/environment.rs b/golem-worker-executor-base/src/durable_host/cli/environment.rs index d0a2a4bd9..14c1864e0 100644 --- a/golem-worker-executor-base/src/durable_host/cli/environment.rs +++ b/golem-worker-executor-base/src/durable_host/cli/environment.rs @@ -15,8 +15,7 @@ use async_trait::async_trait; use crate::durable_host::serialized::SerializableError; -use crate::durable_host::{Durability, DurableWorkerCtx}; -use crate::metrics::wasm::record_host_function_call; +use crate::durable_host::{Durability2, DurableWorkerCtx}; use crate::workerctx::WorkerCtx; use golem_common::model::oplog::WrappedFunctionType; use wasmtime_wasi::bindings::cli::environment::Host; @@ -24,42 +23,51 @@ use wasmtime_wasi::bindings::cli::environment::Host; #[async_trait] impl Host for DurableWorkerCtx { async fn get_environment(&mut self) -> anyhow::Result> { - let _permit = self.begin_async_host_function().await?; - record_host_function_call("cli::environment", "get_environment"); - Durability::, SerializableError>::wrap( + let durability = Durability2::, SerializableError>::new( self, - WrappedFunctionType::ReadLocal, - "golem_environment::get_environment", - (), - |ctx| Box::pin(async { Host::get_environment(&mut ctx.as_wasi_view()).await }), - ) - .await + "golem environment", + "get_environment", + WrappedFunctionType::ReadLocal + ).await?; + + if durability.is_live() { + let result = Host::get_environment(&mut self.as_wasi_view()).await; + durability.persist(self, (), result).await + } else { + durability.replay(self).await + } } async fn get_arguments(&mut self) -> anyhow::Result> { - let _permit = self.begin_async_host_function().await?; - record_host_function_call("cli::environment", "get_arguments"); - Durability::, SerializableError>::wrap( + let durability = Durability2::, SerializableError>::new( self, - WrappedFunctionType::ReadLocal, - "golem_environment::get_arguments", - (), - |ctx| Box::pin(async { Host::get_arguments(&mut ctx.as_wasi_view()).await }), - ) - .await + "golem environment", + "get_arguments", + WrappedFunctionType::ReadLocal + ).await?; + + if durability.is_live() { + let result = Host::get_arguments(&mut self.as_wasi_view()).await; + durability.persist(self, (), result).await + } else { + durability.replay(self).await + } } async fn initial_cwd(&mut self) -> anyhow::Result> { - let _permit = self.begin_async_host_function().await?; - record_host_function_call("cli::environment", "initial_cwd"); - Durability::, SerializableError>::wrap( + let durability = Durability2::, SerializableError>::new( self, - WrappedFunctionType::ReadLocal, - "golem_environment::get_arguments", // NOTE: for backward compatibility with Golem 1.0 - (), - |ctx| Box::pin(async { Host::initial_cwd(&mut ctx.as_wasi_view()).await }), - ) - .await + "golem environment", + "get_arguments", // TODO: fix in 2.0 - for backward compatibility with Golem 1.0 + WrappedFunctionType::ReadLocal + ).await?; + + if durability.is_live() { + let result = Host::initial_cwd(&mut self.as_wasi_view()).await; + durability.persist(self, (), result).await + } else { + durability.replay(self).await + } } } diff --git a/golem-worker-executor-base/src/durable_host/clocks/monotonic_clock.rs b/golem-worker-executor-base/src/durable_host/clocks/monotonic_clock.rs index 58cc87584..79cf09f6b 100644 --- a/golem-worker-executor-base/src/durable_host/clocks/monotonic_clock.rs +++ b/golem-worker-executor-base/src/durable_host/clocks/monotonic_clock.rs @@ -16,7 +16,7 @@ use async_trait::async_trait; use wasmtime::component::Resource; use crate::durable_host::serialized::SerializableError; -use crate::durable_host::{Durability, DurableWorkerCtx}; +use crate::durable_host::{Durability2, DurableWorkerCtx}; use crate::metrics::wasm::record_host_function_call; use crate::services::oplog::CommitLevel; use crate::workerctx::WorkerCtx; @@ -26,29 +26,37 @@ use wasmtime_wasi::bindings::clocks::monotonic_clock::{Duration, Host, Instant, #[async_trait] impl Host for DurableWorkerCtx { async fn now(&mut self) -> anyhow::Result { - let _permit = self.begin_async_host_function().await?; - record_host_function_call("clocks::monotonic_clock", "now"); - Durability::::wrap( + let durability = Durability2::::new( self, + "monotonic_clock", + "now", WrappedFunctionType::ReadLocal, - "monotonic_clock::now", - (), - |ctx| Box::pin(async { Host::now(&mut ctx.as_wasi_view()).await }), ) - .await + .await?; + + if durability.is_live() { + let result = Host::now(&mut self.as_wasi_view()).await; + durability.persist(self, (), result).await + } else { + durability.replay(self).await + } } async fn resolution(&mut self) -> anyhow::Result { - let _permit = self.begin_async_host_function().await?; - record_host_function_call("clocks::monotonic_clock", "resolution"); - Durability::::wrap( + let durability = Durability2::::new( self, + "monotonic_clock", + "resolution", WrappedFunctionType::ReadLocal, - "monotonic_clock::resolution", - (), - |ctx| Box::pin(async { Host::resolution(&mut ctx.as_wasi_view()).await }), ) - .await + .await?; + + if durability.is_live() { + let result = Host::resolution(&mut self.as_wasi_view()).await; + durability.persist(self, (), result).await + } else { + durability.replay(self) + } } async fn subscribe_instant(&mut self, when: Instant) -> anyhow::Result> { @@ -58,16 +66,23 @@ impl Host for DurableWorkerCtx { } async fn subscribe_duration(&mut self, when: Duration) -> anyhow::Result> { - let _permit = self.begin_async_host_function().await?; - record_host_function_call("clocks::monotonic_clock", "subscribe_duration"); - let now = Durability::::wrap( + let durability = Durability2::::new( self, + "monotonic_clock", + "now", // TODO: fix in 2.0 - should be 'subscribe_duration' but have to keep for backward compatibility with Golem 1.0 WrappedFunctionType::ReadLocal, - "monotonic_clock::now", // should be 'subscribe_duration' but have to keep for backward compatibility with Golem 1.0 - (), - |ctx| Box::pin(async { Host::now(&mut ctx.as_wasi_view()).await }), ) .await?; + + let now = { + if durability.is_live() { + let result = Host::now(&mut self.as_wasi_view()).await; + durability.persist(self, (), result).await + } else { + durability.replay(self).await + } + }?; + self.state.oplog.commit(CommitLevel::DurableOnly).await; let when = now.saturating_add(when); Host::subscribe_instant(&mut self.as_wasi_view(), when).await diff --git a/golem-worker-executor-base/src/durable_host/clocks/wall_clock.rs b/golem-worker-executor-base/src/durable_host/clocks/wall_clock.rs index 1c426f93f..47d5de55f 100644 --- a/golem-worker-executor-base/src/durable_host/clocks/wall_clock.rs +++ b/golem-worker-executor-base/src/durable_host/clocks/wall_clock.rs @@ -15,8 +15,7 @@ use async_trait::async_trait; use crate::durable_host::serialized::{SerializableDateTime, SerializableError}; -use crate::durable_host::{Durability, DurableWorkerCtx}; -use crate::metrics::wasm::record_host_function_call; +use crate::durable_host::{Durability2, DurableWorkerCtx}; use crate::workerctx::WorkerCtx; use golem_common::model::oplog::WrappedFunctionType; use wasmtime_wasi::bindings::clocks::wall_clock::{Datetime, Host}; @@ -24,29 +23,37 @@ use wasmtime_wasi::bindings::clocks::wall_clock::{Datetime, Host}; #[async_trait] impl Host for DurableWorkerCtx { async fn now(&mut self) -> anyhow::Result { - let _permit = self.begin_async_host_function().await?; - record_host_function_call("clocks::wall_clock", "now"); - Durability::::wrap( + let durability = Durability2::::new( self, + "wall_clock", + "now", WrappedFunctionType::ReadLocal, - "wall_clock::now", - (), - |ctx| Box::pin(async { Host::now(&mut ctx.as_wasi_view()).await }), ) - .await + .await?; + + if durability.is_live() { + let result = Host::now(&mut self.as_wasi_view()).await; + durability.persist(self, (), result).await + } else { + durability.replay(self).await + } } async fn resolution(&mut self) -> anyhow::Result { - let _permit = self.begin_async_host_function().await?; - record_host_function_call("clocks::wall_clock", "resolution"); - Durability::::wrap( + let durability = Durability2::::new( self, + "wall_clock", + "resolution", WrappedFunctionType::ReadLocal, - "wall_clock::resolution", - (), - |ctx| Box::pin(async { Host::resolution(&mut ctx.as_wasi_view()).await }), ) - .await + .await?; + + if durability.is_live() { + let result = Host::resolution(&mut self.as_wasi_view()).await; + durability.persist(self, (), result).await + } else { + durability.replay(self).await + } } } diff --git a/golem-worker-executor-base/src/durable_host/durability.rs b/golem-worker-executor-base/src/durable_host/durability.rs index d73246c9d..60f0ea8b5 100644 --- a/golem-worker-executor-base/src/durable_host/durability.rs +++ b/golem-worker-executor-base/src/durable_host/durability.rs @@ -12,8 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +use crate::durable_host::sync_helper::SyncHelperPermit; use crate::durable_host::DurableWorkerCtx; use crate::error::GolemError; +use crate::metrics::wasm::record_host_function_call; use crate::model::PersistenceLevel; use crate::services::oplog::{CommitLevel, Oplog, OplogOps}; use crate::workerctx::WorkerCtx; @@ -22,10 +24,123 @@ use bincode::{Decode, Encode}; use golem_common::model::oplog::{OplogEntry, OplogIndex, WrappedFunctionType}; use std::fmt::Debug; use std::future::Future; +use std::marker::PhantomData; use std::pin::Pin; use std::sync::Arc; use tracing::error; +// TODO: is_live and replay can be merged +// TODO: is SErr always SerializableError? - sometimes SerializableStreamError +pub struct Durability2 { + package: &'static str, + function: &'static str, + function_type: WrappedFunctionType, + _permit: SyncHelperPermit, + begin_index: OplogIndex, + is_live: bool, + persistence_level: PersistenceLevel, + _ctx: PhantomData, + _sok: PhantomData, + _serr: PhantomData, +} + +impl Durability2 { + pub async fn new( + ctx: &mut DurableWorkerCtx, + package: &'static str, + function: &'static str, + function_type: WrappedFunctionType, + ) -> Result + { + let permit = ctx.begin_async_host_function().await?; + record_host_function_call(package, function); + + let begin_index = ctx.state.begin_function(&function_type).await?; + + Ok(Self { + package, + function, + function_type, + _permit: permit, + begin_index, + is_live: ctx.state.is_live(), + persistence_level: ctx.state.persistence_level.clone(), + _ctx: PhantomData, + _sok: PhantomData, + _serr: PhantomData, + }) + } + + pub fn is_live(&self) -> bool { + self.is_live || self.persistence_level == PersistenceLevel::PersistNothing + } + + pub async fn persist( + &self, + ctx: &mut DurableWorkerCtx, + input: SIn, + result: Result, + ) -> Result + where + Ok: Clone, + Err: From + From + Send + Sync, + SIn: Debug + Encode + Send + Sync, + SErr: Debug + Encode + for<'a> From<&'a Err> + From + Send + Sync, + SOk: Debug + Encode + From + Encode + Send + Sync, + { + let serializable_result: Result = result + .as_ref() + .map(|result| result.clone().into()) + .map_err(|err| err.into()); + + let function_name = self.function_name(); + ctx.write_to_oplog::( + &self.function_type, + &function_name, + self.begin_index, + &input, + &serializable_result, + ) + .await?; + + result + } + + pub async fn replay(&self, ctx: &mut DurableWorkerCtx) -> Result + where + Ok: From, + Err: From + From, + SErr: Debug + Encode + Decode + From + Send + Sync, + SOk: Debug + Encode + Decode + Send + Sync, + { + let (_, oplog_entry) = crate::get_oplog_entry!( + ctx.state.replay_state, + OplogEntry::ImportedFunctionInvoked, + OplogEntry::ImportedFunctionInvokedV1 + )?; + + let function_name = self.function_name(); + DurableWorkerCtx::::validate_oplog_entry(&oplog_entry, &function_name)?; + let response: Result = + DurableWorkerCtx::::default_load(ctx.state.oplog.clone(), &oplog_entry).await; + + ctx.state + .end_function(&self.function_type, self.begin_index) + .await?; + + response.map(|sok| sok.into()).map_err(|serr| serr.into()) + } + + fn function_name(&self) -> String { + if self.package.is_empty() { + // For backward compatibility - some of the recorded function names were not following the pattern + self.function.to_string() + } else { + format!("{}::{}", self.package, self.function) + } + } +} + #[async_trait] pub trait Durability { /// A version of `wrap` allowing conversion between the success value and the serialized value within the mutable worker context. diff --git a/golem-worker-executor-base/src/durable_host/filesystem/preopens.rs b/golem-worker-executor-base/src/durable_host/filesystem/preopens.rs index 2b714f98b..60941e4d2 100644 --- a/golem-worker-executor-base/src/durable_host/filesystem/preopens.rs +++ b/golem-worker-executor-base/src/durable_host/filesystem/preopens.rs @@ -17,8 +17,7 @@ use async_trait::async_trait; use wasmtime::component::Resource; use crate::durable_host::serialized::SerializableError; -use crate::durable_host::{Durability, DurableWorkerCtx}; -use crate::metrics::wasm::record_host_function_call; +use crate::durable_host::{Durability2, DurableWorkerCtx}; use crate::workerctx::WorkerCtx; use golem_common::model::oplog::WrappedFunctionType; use wasmtime_wasi::bindings::filesystem::preopens::{Descriptor, Host}; @@ -26,44 +25,42 @@ use wasmtime_wasi::bindings::filesystem::preopens::{Descriptor, Host}; #[async_trait] impl Host for DurableWorkerCtx { async fn get_directories(&mut self) -> anyhow::Result, String)>> { - let _permit = self.begin_async_host_function().await?; - record_host_function_call("cli_base::preopens", "get_directories"); - - let current_dirs1 = Host::get_directories(&mut self.as_wasi_view()).await?; - let current_dirs2 = Host::get_directories(&mut self.as_wasi_view()).await?; - Durability::, SerializableError>::custom_wrap( + let durability = Durability2::, SerializableError>::new( self, + "cli::preopens", + "get_directories", WrappedFunctionType::ReadLocal, - "cli::preopens::get_directories", - (), - |_ctx| Box::pin(async move { Ok(current_dirs1) }), - |_ctx, dirs| { - // We can only serialize the names - Ok(dirs + ) + .await?; + + let current_dirs = Host::get_directories(&mut self.as_wasi_view()).await?; + + let names = { + if durability.is_live() { + let result = Ok(current_dirs .iter() .map(|(_, name)| name.clone()) - .collect::>()) - }, - move |_ctx, names| { - Box::pin(async move { - // Filtering the current set of pre-opened directories by the serialized names - let filtered = current_dirs2 - .into_iter() - .filter(|(_, name)| names.contains(name)) - .collect::>(); + .collect::>()); + durability.persist(self, (), result).await + } else { + durability.replay(self) + } + }?; - if filtered.len() == names.len() { - // All directories were found - Ok(filtered) - } else { - Err(anyhow!( - "Not all previously available pre-opened directories were found" - )) - } - }) - }, - ) - .await + // Filtering the current set of pre-opened directories by the serialized names + let filtered = current_dirs + .into_iter() + .filter(|(_, name)| names.contains(name)) + .collect::>(); + + if filtered.len() == names.len() { + // All directories were found + Ok(filtered) + } else { + Err(anyhow!( + "Not all previously available pre-opened directories were found" + )) + } } } diff --git a/golem-worker-executor-base/src/durable_host/filesystem/types.rs b/golem-worker-executor-base/src/durable_host/filesystem/types.rs index 67031cae4..37efe3981 100644 --- a/golem-worker-executor-base/src/durable_host/filesystem/types.rs +++ b/golem-worker-executor-base/src/durable_host/filesystem/types.rs @@ -35,7 +35,7 @@ use golem_common::model::oplog::WrappedFunctionType; use crate::durable_host::serialized::{ SerializableDateTime, SerializableError, SerializableFileTimes, }; -use crate::durable_host::{Durability, DurableWorkerCtx}; +use crate::durable_host::{Durability2, DurableWorkerCtx}; use crate::metrics::wasm::record_host_function_call; use crate::workerctx::WorkerCtx; @@ -230,11 +230,14 @@ impl HostDescriptor for DurableWorkerCtx { } async fn stat(&mut self, self_: Resource) -> Result { - let _permit = self - .begin_async_host_function() - .await - .map_err(FsError::trap)?; - record_host_function_call("filesystem::types::descriptor", "stat"); + let durability = Durability2::::new( + self, + "filesystem::types::descriptor", + "stat", + WrappedFunctionType::ReadLocal, + ) + .await + .map_err(FsError::trap)?; let path = match self.table().get(&self_)? { Descriptor::File(f) => f.path.clone(), @@ -244,43 +247,34 @@ impl HostDescriptor for DurableWorkerCtx { let mut stat = HostDescriptor::stat(&mut self.as_wasi_view(), self_).await?; stat.status_change_timestamp = None; // We cannot guarantee this to be the same during replays, so we rather not support it - let stat_clone1 = stat; - Durability::::custom_wrap( - self, - WrappedFunctionType::ReadLocal, - "filesystem::types::descriptor::stat", - path.to_string_lossy().to_string(), - |_ctx| { - Box::pin(async move { Ok(stat_clone1) as Result }) - }, - |_ctx, stat| { - Ok(SerializableFileTimes { - data_access_timestamp: stat.data_access_timestamp.map(|t| t.into()), - data_modification_timestamp: stat.data_modification_timestamp.map(|t| t.into()), - }) - }, - move |_ctx, times| { - Box::pin(async move { - let accessed = times.data_access_timestamp.as_ref().map(|t| { - SystemTimeSpec::from(>::into( - t.clone(), - )) - }); - let modified = times.data_modification_timestamp.as_ref().map(|t| { - SystemTimeSpec::from(>::into( - t.clone(), - )) - }); - spawn_blocking(|| set_symlink_times(path, accessed, modified)).await?; - stat.data_access_timestamp = times.data_access_timestamp.map(|t| t.into()); - stat.data_modification_timestamp = - times.data_modification_timestamp.map(|t| t.into()); - Ok(stat) - }) - }, - ) - .await - .map_err(FsError::trap) + let times = if durability.is_live() { + durability + .persist( + self, + path.to_string_lossy().to_string(), + Ok(SerializableFileTimes { + data_access_timestamp: stat.data_access_timestamp.map(|t| t.into()), + data_modification_timestamp: stat + .data_modification_timestamp + .map(|t| t.into()), + }), + ) + .await + } else { + durability.replay(self).await + } + .map_err(FsError::trap)?; + + let accessed = times.data_access_timestamp.as_ref().map(|t| { + SystemTimeSpec::from(>::into(t.clone())) + }); + let modified = times.data_modification_timestamp.as_ref().map(|t| { + SystemTimeSpec::from(>::into(t.clone())) + }); + spawn_blocking(|| set_symlink_times(path, accessed, modified)).await?; + stat.data_access_timestamp = times.data_access_timestamp.map(|t| t.into()); + stat.data_modification_timestamp = times.data_modification_timestamp.map(|t| t.into()); + Ok(stat) } async fn stat_at( @@ -289,11 +283,15 @@ impl HostDescriptor for DurableWorkerCtx { path_flags: PathFlags, path: String, ) -> Result { - let _permit = self - .begin_async_host_function() - .await - .map_err(FsError::trap)?; - record_host_function_call("filesystem::types::descriptor", "stat_at"); + let durability = Durability2::::new( + self, + "filesystem::types::descriptor", + "stat_at", + WrappedFunctionType::ReadLocal, + ) + .await + .map_err(FsError::trap)?; + let full_path = match self.table().get(&self_)? { Descriptor::File(f) => f.path.join(path.clone()), Descriptor::Dir(d) => d.path.join(path.clone()), @@ -303,43 +301,34 @@ impl HostDescriptor for DurableWorkerCtx { HostDescriptor::stat_at(&mut self.as_wasi_view(), self_, path_flags, path).await?; stat.status_change_timestamp = None; // We cannot guarantee this to be the same during replays, so we rather not support it - let stat_clone1 = stat; - Durability::::custom_wrap( - self, - WrappedFunctionType::ReadLocal, - "filesystem::types::descriptor::stat_at", - full_path.to_string_lossy().to_string(), - |_ctx| { - Box::pin(async move { Ok(stat_clone1) as Result }) - }, - |_ctx, stat| { - Ok(SerializableFileTimes { - data_access_timestamp: stat.data_access_timestamp.map(|t| t.into()), - data_modification_timestamp: stat.data_modification_timestamp.map(|t| t.into()), - }) - }, - move |_ctx, times| { - Box::pin(async move { - let accessed = times.data_access_timestamp.as_ref().map(|t| { - SystemTimeSpec::from(>::into( - t.clone(), - )) - }); - let modified = times.data_modification_timestamp.as_ref().map(|t| { - SystemTimeSpec::from(>::into( - t.clone(), - )) - }); - spawn_blocking(|| set_symlink_times(full_path, accessed, modified)).await?; - stat.data_access_timestamp = times.data_access_timestamp.map(|t| t.into()); - stat.data_modification_timestamp = - times.data_modification_timestamp.map(|t| t.into()); - Ok(stat) - }) - }, - ) - .await - .map_err(FsError::trap) + let times = if durability.is_live() { + durability + .persist( + self, + full_path.to_string_lossy().to_string(), + Ok(SerializableFileTimes { + data_access_timestamp: stat.data_access_timestamp.map(|t| t.into()), + data_modification_timestamp: stat + .data_modification_timestamp + .map(|t| t.into()), + }), + ) + .await + } else { + durability.replay(self).await + } + .map_err(FsError::trap)?; + + let accessed = times.data_access_timestamp.as_ref().map(|t| { + SystemTimeSpec::from(>::into(t.clone())) + }); + let modified = times.data_modification_timestamp.as_ref().map(|t| { + SystemTimeSpec::from(>::into(t.clone())) + }); + spawn_blocking(|| set_symlink_times(full_path, accessed, modified)).await?; + stat.data_access_timestamp = times.data_access_timestamp.map(|t| t.into()); + stat.data_modification_timestamp = times.data_modification_timestamp.map(|t| t.into()); + Ok(stat) } async fn set_times_at( diff --git a/golem-worker-executor-base/src/durable_host/golem/mod.rs b/golem-worker-executor-base/src/durable_host/golem/mod.rs index 6fd126450..f40921580 100644 --- a/golem-worker-executor-base/src/durable_host/golem/mod.rs +++ b/golem-worker-executor-base/src/durable_host/golem/mod.rs @@ -25,7 +25,7 @@ use wasmtime_wasi::WasiView; use crate::durable_host::serialized::SerializableError; use crate::durable_host::wasm_rpc::UrnExtensions; -use crate::durable_host::{Durability, DurableWorkerCtx}; +use crate::durable_host::{Durability2, DurableWorkerCtx}; use crate::error::GolemError; use crate::get_oplog_entry; use crate::metrics::wasm::record_host_function_call; @@ -175,47 +175,52 @@ impl golem::api0_2_0::host::Host for DurableWorkerCtx { promise_id: golem::api0_2_0::host::PromiseId, data: Vec, ) -> Result { - let _permit = self.begin_async_host_function().await?; - record_host_function_call("golem::api", "complete_promise"); - let promise_id: PromiseId = promise_id.into(); - Durability::::wrap( + let durability = Durability2::::new( self, - WrappedFunctionType::WriteLocal, + "", // TODO: fix in 2.0 "golem_complete_promise", - promise_id.clone(), - |ctx| { - Box::pin(async move { - Ok(ctx - .public_state - .promise_service - .complete(promise_id, data) - .await?) - }) - }, + WrappedFunctionType::WriteLocal, ) - .await + .await?; + + let promise_id: PromiseId = promise_id.into(); + let result = if durability.is_live() { + let result = self + .public_state + .promise_service + .complete(promise_id.clone(), data) + .await; + + durability.persist(self, promise_id, result).await + } else { + durability.replay(self) + }?; + Ok(result) } async fn delete_promise( &mut self, promise_id: golem::api0_2_0::host::PromiseId, ) -> Result<(), anyhow::Error> { - let _permit = self.begin_async_host_function().await?; - record_host_function_call("golem::api", "delete_promise"); - let promise_id: PromiseId = promise_id.into(); - Durability::::wrap( + let durability = Durability2::::new( self, - WrappedFunctionType::WriteLocal, + "", // TODO: fix in 2.0 "golem_delete_promise", - promise_id.clone(), - |ctx| { - Box::pin(async move { - ctx.public_state.promise_service.delete(promise_id).await; - Ok(()) - }) - }, + WrappedFunctionType::WriteLocal, ) - .await + .await?; + + let promise_id: PromiseId = promise_id.into(); + if durability.is_live() { + let result = Ok(self + .public_state + .promise_service + .delete(promise_id.clone()) + .await); + durability.persist(self, promise_id, result).await + } else { + durability.replay(self).await + } } async fn get_self_uri( @@ -450,8 +455,13 @@ impl golem::api0_2_0::host::Host for DurableWorkerCtx { } async fn generate_idempotency_key(&mut self) -> anyhow::Result { - let _permit = self.begin_async_host_function().await?; - record_host_function_call("golem::api", "generate_idempotency_key"); + let durability = Durability2::::new( + self, + "golem api", + "generate_idempotency_key", + WrappedFunctionType::WriteRemote, + ) + .await?; let current_idempotency_key = self .get_current_idempotency_key() @@ -459,25 +469,16 @@ impl golem::api0_2_0::host::Host for DurableWorkerCtx { .unwrap_or(IdempotencyKey::fresh()); let oplog_index = self.state.current_oplog_index().await; - // NOTE: Now that IdempotencyKey::derived is used, we no longer need to persist this, but we do to avoid breaking existing oplogs - let uuid = Durability::::custom_wrap( - self, - WrappedFunctionType::WriteRemote, - "golem api::generate_idempotency_key", - (), - |_ctx| { - Box::pin(async move { - let key = IdempotencyKey::derived(¤t_idempotency_key, oplog_index); - let uuid = Uuid::parse_str(&key.value.to_string()).unwrap(); // this is guaranteed to be a uuid - Ok::(uuid) - }) - }, - |_ctx, uuid: &Uuid| Ok(uuid.as_u64_pair()), - |_ctx, (high_bits, low_bits)| { - Box::pin(async move { Ok(Uuid::from_u64_pair(high_bits, low_bits)) }) - }, - ) - .await?; + // TODO: Fix in 2.0 - Now that IdempotencyKey::derived is used, we no longer need to persist this, but we do to avoid breaking existing oplogs + let (hi, lo) = if durability.is_live() { + let key = IdempotencyKey::derived(¤t_idempotency_key, oplog_index); + let uuid = Uuid::parse_str(&key.value.to_string()).unwrap(); // this is guaranteed to be a uuid + let result = Ok(uuid.as_u64_pair()); + durability.persist(self, (), result).await + } else { + durability.replay(self).await + }?; + let uuid = Uuid::from_u64_pair(hi, lo); Ok(uuid.into()) } @@ -487,8 +488,13 @@ impl golem::api0_2_0::host::Host for DurableWorkerCtx { target_version: ComponentVersion, mode: UpdateMode, ) -> anyhow::Result<()> { - let _permit = self.begin_async_host_function().await?; - record_host_function_call("golem::api", "update_worker"); + let durability = Durability2::::new( + self, + "golem::api", + "update-worker", + WrappedFunctionType::WriteRemote, + ) + .await?; let worker_id: WorkerId = worker_id.into(); let owned_worker_id = OwnedWorkerId::new(&self.owned_worker_id.account_id, &worker_id); @@ -497,30 +503,19 @@ impl golem::api0_2_0::host::Host for DurableWorkerCtx { UpdateMode::Automatic => golem_api_grpc::proto::golem::worker::UpdateMode::Automatic, UpdateMode::SnapshotBased => golem_api_grpc::proto::golem::worker::UpdateMode::Manual, }; - Durability::< - Ctx, - ( - WorkerId, - u64, - golem_api_grpc::proto::golem::worker::UpdateMode, - ), - (), - SerializableError, - >::wrap( - self, - WrappedFunctionType::WriteRemote, - "golem::api::update-worker", - (worker_id, target_version, mode), - |ctx| { - Box::pin(async move { - ctx.state - .worker_proxy - .update(&owned_worker_id, target_version, mode) - .await - }) - }, - ) - .await?; + + if durability.is_live() { + let result = self + .state + .worker_proxy + .update(&owned_worker_id, target_version, mode) + .await; + durability + .persist(self, (worker_id, target_version, mode), result) + .await + } else { + durability.replay(self).await + }?; Ok(()) } diff --git a/golem-worker-executor-base/src/durable_host/io/poll.rs b/golem-worker-executor-base/src/durable_host/io/poll.rs index 0a0b38d80..5cb2ab932 100644 --- a/golem-worker-executor-base/src/durable_host/io/poll.rs +++ b/golem-worker-executor-base/src/durable_host/io/poll.rs @@ -20,7 +20,7 @@ use wasmtime::component::Resource; use wasmtime_wasi::bindings::io::poll::{Host, HostPollable, Pollable}; use crate::durable_host::serialized::SerializableError; -use crate::durable_host::{Durability, DurableWorkerCtx, SuspendForSleep}; +use crate::durable_host::{Durability2, DurableWorkerCtx, SuspendForSleep}; use crate::metrics::wasm::record_host_function_call; use crate::workerctx::WorkerCtx; @@ -49,18 +49,23 @@ impl HostPollable for DurableWorkerCtx { #[async_trait] impl Host for DurableWorkerCtx { async fn poll(&mut self, in_: Vec>) -> anyhow::Result> { - let _permit = self.begin_async_host_function().await?; - record_host_function_call("io::poll", "poll"); - - let result = Durability::, SerializableError>::wrap_conditionally( + let durability = Durability2::, SerializableError>::new( self, - WrappedFunctionType::ReadLocal, - "golem io::poll::poll", - (), - |ctx| Box::pin(async move { Host::poll(&mut ctx.as_wasi_view(), in_).await }), - |result| is_suspend_for_sleep(result).is_none(), // We must not persist the suspend signal - ) - .await; + "golem io::poll", + "poll", + WrappedFunctionType::ReadLocal + ).await?; + + let result = if durability.is_live() { + let result = Host::poll(&mut self.as_wasi_view(), in_).await; + if is_suspend_for_sleep(&result).is_none() { + durability.persist(self, (), result).await + } else { + result + } + } else { + durability.replay(self).await + }; match is_suspend_for_sleep(&result) { Some(duration) => { diff --git a/golem-worker-executor-base/src/durable_host/io/streams.rs b/golem-worker-executor-base/src/durable_host/io/streams.rs index 052694d7c..4f378a9e0 100644 --- a/golem-worker-executor-base/src/durable_host/io/streams.rs +++ b/golem-worker-executor-base/src/durable_host/io/streams.rs @@ -21,7 +21,7 @@ use crate::durable_host::http::serialized::SerializableHttpRequest; use crate::durable_host::http::{end_http_request, end_http_request_sync}; use crate::durable_host::io::{ManagedStdErr, ManagedStdOut}; use crate::durable_host::serialized::SerializableStreamError; -use crate::durable_host::{Durability, DurableWorkerCtx, HttpRequestCloseOwner}; +use crate::durable_host::{Durability, Durability2, DurableWorkerCtx, HttpRequestCloseOwner}; use crate::error::GolemError; use crate::metrics::wasm::record_host_function_call; use crate::workerctx::WorkerCtx; @@ -39,29 +39,31 @@ impl HostInputStream for DurableWorkerCtx { self_: Resource, len: u64, ) -> Result, StreamError> { - let _permit = self.begin_async_host_function().await?; - record_host_function_call("io::streams::input_stream", "read"); if is_incoming_http_body_stream(self.table(), &self_) { let handle = self_.rep(); let begin_idx = get_http_request_begin_idx(self, handle)?; - let request = get_http_stream_request(self, handle)?; - let result = - Durability::, SerializableStreamError>::wrap( - self, - WrappedFunctionType::WriteRemoteBatched(Some(begin_idx)), - "http::types::incoming_body_stream::read", - request, - |ctx| { - Box::pin(async move { - HostInputStream::read(&mut ctx.as_wasi_view(), self_, len).await - }) - }, - ) - .await; + let durability = Durability2::, SerializableStreamError>::new( + self, + "http::types::incoming_body_stream", + "read", + WrappedFunctionType::WriteRemoteBatched(Some(begin_idx)), + ) + .await?; + + let result = if durability.is_live() { + let request = get_http_stream_request(self, handle)?; + let result = HostInputStream::read(&mut self.as_wasi_view(), self_, len).await; + durability.persist(self, request, result).await + } else { + durability.replay(self).await + }; + end_http_request_if_closed(self, handle, &result).await?; result } else { + let _permit = self.begin_async_host_function().await?; + record_host_function_call("io::streams::input_stream", "read"); HostInputStream::read(&mut self.as_wasi_view(), self_, len).await } }