Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring the Durability construct, WIP #1097

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
309 changes: 178 additions & 131 deletions golem-worker-executor-base/src/durable_host/blobstore/container.rs

Large diffs are not rendered by default.

66 changes: 37 additions & 29 deletions golem-worker-executor-base/src/durable_host/cli/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,51 +15,59 @@
use async_trait::async_trait;

use crate::durable_host::serialized::SerializableError;
use crate::durable_host::{Durability, DurableWorkerCtx};
use crate::metrics::wasm::record_host_function_call;
use crate::durable_host::{Durability2, DurableWorkerCtx};
use crate::workerctx::WorkerCtx;
use golem_common::model::oplog::WrappedFunctionType;
use wasmtime_wasi::bindings::cli::environment::Host;

#[async_trait]
impl<Ctx: WorkerCtx> Host for DurableWorkerCtx<Ctx> {
async fn get_environment(&mut self) -> anyhow::Result<Vec<(String, String)>> {
let _permit = self.begin_async_host_function().await?;
record_host_function_call("cli::environment", "get_environment");
Durability::<Ctx, (), Vec<(String, String)>, SerializableError>::wrap(
let durability = Durability2::<Ctx, Vec<(String, String)>, SerializableError>::new(
self,
WrappedFunctionType::ReadLocal,
"golem_environment::get_environment",
(),
|ctx| Box::pin(async { Host::get_environment(&mut ctx.as_wasi_view()).await }),
)
.await
"golem environment",
"get_environment",
WrappedFunctionType::ReadLocal
).await?;

if durability.is_live() {
let result = Host::get_environment(&mut self.as_wasi_view()).await;
durability.persist(self, (), result).await
} else {
durability.replay(self).await
}
}

async fn get_arguments(&mut self) -> anyhow::Result<Vec<String>> {
let _permit = self.begin_async_host_function().await?;
record_host_function_call("cli::environment", "get_arguments");
Durability::<Ctx, (), Vec<String>, SerializableError>::wrap(
let durability = Durability2::<Ctx, Vec<String>, SerializableError>::new(
self,
WrappedFunctionType::ReadLocal,
"golem_environment::get_arguments",
(),
|ctx| Box::pin(async { Host::get_arguments(&mut ctx.as_wasi_view()).await }),
)
.await
"golem environment",
"get_arguments",
WrappedFunctionType::ReadLocal
).await?;

if durability.is_live() {
let result = Host::get_arguments(&mut self.as_wasi_view()).await;
durability.persist(self, (), result).await
} else {
durability.replay(self).await
}
}

async fn initial_cwd(&mut self) -> anyhow::Result<Option<String>> {
let _permit = self.begin_async_host_function().await?;
record_host_function_call("cli::environment", "initial_cwd");
Durability::<Ctx, (), Option<String>, SerializableError>::wrap(
let durability = Durability2::<Ctx, Option<String>, SerializableError>::new(
self,
WrappedFunctionType::ReadLocal,
"golem_environment::get_arguments", // NOTE: for backward compatibility with Golem 1.0
(),
|ctx| Box::pin(async { Host::initial_cwd(&mut ctx.as_wasi_view()).await }),
)
.await
"golem environment",
"get_arguments", // TODO: fix in 2.0 - for backward compatibility with Golem 1.0
WrappedFunctionType::ReadLocal
).await?;

if durability.is_live() {
let result = Host::initial_cwd(&mut self.as_wasi_view()).await;
durability.persist(self, (), result).await
} else {
durability.replay(self).await
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use async_trait::async_trait;
use wasmtime::component::Resource;

use crate::durable_host::serialized::SerializableError;
use crate::durable_host::{Durability, DurableWorkerCtx};
use crate::durable_host::{Durability2, DurableWorkerCtx};
use crate::metrics::wasm::record_host_function_call;
use crate::services::oplog::CommitLevel;
use crate::workerctx::WorkerCtx;
Expand All @@ -26,29 +26,37 @@ use wasmtime_wasi::bindings::clocks::monotonic_clock::{Duration, Host, Instant,
#[async_trait]
impl<Ctx: WorkerCtx> Host for DurableWorkerCtx<Ctx> {
async fn now(&mut self) -> anyhow::Result<Instant> {
let _permit = self.begin_async_host_function().await?;
record_host_function_call("clocks::monotonic_clock", "now");
Durability::<Ctx, (), Instant, SerializableError>::wrap(
let durability = Durability2::<Ctx, Instant, SerializableError>::new(
self,
"monotonic_clock",
"now",
WrappedFunctionType::ReadLocal,
"monotonic_clock::now",
(),
|ctx| Box::pin(async { Host::now(&mut ctx.as_wasi_view()).await }),
)
.await
.await?;

if durability.is_live() {
let result = Host::now(&mut self.as_wasi_view()).await;
durability.persist(self, (), result).await
} else {
durability.replay(self).await
}
}

async fn resolution(&mut self) -> anyhow::Result<Instant> {
let _permit = self.begin_async_host_function().await?;
record_host_function_call("clocks::monotonic_clock", "resolution");
Durability::<Ctx, (), Instant, SerializableError>::wrap(
let durability = Durability2::<Ctx, Instant, SerializableError>::new(
self,
"monotonic_clock",
"resolution",
WrappedFunctionType::ReadLocal,
"monotonic_clock::resolution",
(),
|ctx| Box::pin(async { Host::resolution(&mut ctx.as_wasi_view()).await }),
)
.await
.await?;

if durability.is_live() {
let result = Host::resolution(&mut self.as_wasi_view()).await;
durability.persist(self, (), result).await
} else {
durability.replay(self)
}
}

async fn subscribe_instant(&mut self, when: Instant) -> anyhow::Result<Resource<Pollable>> {
Expand All @@ -58,16 +66,23 @@ impl<Ctx: WorkerCtx> Host for DurableWorkerCtx<Ctx> {
}

async fn subscribe_duration(&mut self, when: Duration) -> anyhow::Result<Resource<Pollable>> {
let _permit = self.begin_async_host_function().await?;
record_host_function_call("clocks::monotonic_clock", "subscribe_duration");
let now = Durability::<Ctx, (), Instant, SerializableError>::wrap(
let durability = Durability2::<Ctx, Instant, SerializableError>::new(
self,
"monotonic_clock",
"now", // TODO: fix in 2.0 - should be 'subscribe_duration' but have to keep for backward compatibility with Golem 1.0
WrappedFunctionType::ReadLocal,
"monotonic_clock::now", // should be 'subscribe_duration' but have to keep for backward compatibility with Golem 1.0
(),
|ctx| Box::pin(async { Host::now(&mut ctx.as_wasi_view()).await }),
)
.await?;

let now = {
if durability.is_live() {
let result = Host::now(&mut self.as_wasi_view()).await;
durability.persist(self, (), result).await
} else {
durability.replay(self).await
}
}?;

self.state.oplog.commit(CommitLevel::DurableOnly).await;
let when = now.saturating_add(when);
Host::subscribe_instant(&mut self.as_wasi_view(), when).await
Expand Down
39 changes: 23 additions & 16 deletions golem-worker-executor-base/src/durable_host/clocks/wall_clock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,38 +15,45 @@
use async_trait::async_trait;

use crate::durable_host::serialized::{SerializableDateTime, SerializableError};
use crate::durable_host::{Durability, DurableWorkerCtx};
use crate::metrics::wasm::record_host_function_call;
use crate::durable_host::{Durability2, DurableWorkerCtx};
use crate::workerctx::WorkerCtx;
use golem_common::model::oplog::WrappedFunctionType;
use wasmtime_wasi::bindings::clocks::wall_clock::{Datetime, Host};

#[async_trait]
impl<Ctx: WorkerCtx> Host for DurableWorkerCtx<Ctx> {
async fn now(&mut self) -> anyhow::Result<Datetime> {
let _permit = self.begin_async_host_function().await?;
record_host_function_call("clocks::wall_clock", "now");
Durability::<Ctx, (), SerializableDateTime, SerializableError>::wrap(
let durability = Durability2::<Ctx, SerializableDateTime, SerializableError>::new(
self,
"wall_clock",
"now",
WrappedFunctionType::ReadLocal,
"wall_clock::now",
(),
|ctx| Box::pin(async { Host::now(&mut ctx.as_wasi_view()).await }),
)
.await
.await?;

if durability.is_live() {
let result = Host::now(&mut self.as_wasi_view()).await;
durability.persist(self, (), result).await
} else {
durability.replay(self).await
}
}

async fn resolution(&mut self) -> anyhow::Result<Datetime> {
let _permit = self.begin_async_host_function().await?;
record_host_function_call("clocks::wall_clock", "resolution");
Durability::<Ctx, (), SerializableDateTime, SerializableError>::wrap(
let durability = Durability2::<Ctx, SerializableDateTime, SerializableError>::new(
self,
"wall_clock",
"resolution",
WrappedFunctionType::ReadLocal,
"wall_clock::resolution",
(),
|ctx| Box::pin(async { Host::resolution(&mut ctx.as_wasi_view()).await }),
)
.await
.await?;

if durability.is_live() {
let result = Host::resolution(&mut self.as_wasi_view()).await;
durability.persist(self, (), result).await
} else {
durability.replay(self).await
}
}
}

Expand Down
115 changes: 115 additions & 0 deletions golem-worker-executor-base/src/durable_host/durability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::durable_host::sync_helper::SyncHelperPermit;
use crate::durable_host::DurableWorkerCtx;
use crate::error::GolemError;
use crate::metrics::wasm::record_host_function_call;
use crate::model::PersistenceLevel;
use crate::services::oplog::{CommitLevel, Oplog, OplogOps};
use crate::workerctx::WorkerCtx;
Expand All @@ -22,10 +24,123 @@ use bincode::{Decode, Encode};
use golem_common::model::oplog::{OplogEntry, OplogIndex, WrappedFunctionType};
use std::fmt::Debug;
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::sync::Arc;
use tracing::error;

// TODO: is_live and replay can be merged
// TODO: is SErr always SerializableError? - sometimes SerializableStreamError
pub struct Durability2<Ctx, SOk, SErr> {
package: &'static str,
function: &'static str,
function_type: WrappedFunctionType,
_permit: SyncHelperPermit,
begin_index: OplogIndex,
is_live: bool,
persistence_level: PersistenceLevel,
_ctx: PhantomData<Ctx>,
_sok: PhantomData<SOk>,
_serr: PhantomData<SErr>,
}

impl<Ctx: WorkerCtx, SOk, SErr> Durability2<Ctx, SOk, SErr> {
pub async fn new(
ctx: &mut DurableWorkerCtx<Ctx>,
package: &'static str,
function: &'static str,
function_type: WrappedFunctionType,
) -> Result<Self, GolemError>
{
let permit = ctx.begin_async_host_function().await?;
record_host_function_call(package, function);

let begin_index = ctx.state.begin_function(&function_type).await?;

Ok(Self {
package,
function,
function_type,
_permit: permit,
begin_index,
is_live: ctx.state.is_live(),
persistence_level: ctx.state.persistence_level.clone(),
_ctx: PhantomData,
_sok: PhantomData,
_serr: PhantomData,
})
}

pub fn is_live(&self) -> bool {
self.is_live || self.persistence_level == PersistenceLevel::PersistNothing
}

pub async fn persist<SIn, Ok, Err>(
&self,
ctx: &mut DurableWorkerCtx<Ctx>,
input: SIn,
result: Result<Ok, Err>,
) -> Result<Ok, Err>
where
Ok: Clone,
Err: From<SErr> + From<GolemError> + Send + Sync,
SIn: Debug + Encode + Send + Sync,
SErr: Debug + Encode + for<'a> From<&'a Err> + From<GolemError> + Send + Sync,
SOk: Debug + Encode + From<Ok> + Encode + Send + Sync,
{
let serializable_result: Result<SOk, SErr> = result
.as_ref()
.map(|result| result.clone().into())
.map_err(|err| err.into());

let function_name = self.function_name();
ctx.write_to_oplog::<SIn, SOk, Err, SErr>(
&self.function_type,
&function_name,
self.begin_index,
&input,
&serializable_result,
)
.await?;

result
}

pub async fn replay<Ok, Err>(&self, ctx: &mut DurableWorkerCtx<Ctx>) -> Result<Ok, Err>
where
Ok: From<SOk>,
Err: From<SErr> + From<GolemError>,
SErr: Debug + Encode + Decode + From<GolemError> + Send + Sync,
SOk: Debug + Encode + Decode + Send + Sync,
{
let (_, oplog_entry) = crate::get_oplog_entry!(
ctx.state.replay_state,
OplogEntry::ImportedFunctionInvoked,
OplogEntry::ImportedFunctionInvokedV1
)?;

let function_name = self.function_name();
DurableWorkerCtx::<Ctx>::validate_oplog_entry(&oplog_entry, &function_name)?;
let response: Result<SOk, SErr> =
DurableWorkerCtx::<Ctx>::default_load(ctx.state.oplog.clone(), &oplog_entry).await;

ctx.state
.end_function(&self.function_type, self.begin_index)
.await?;

response.map(|sok| sok.into()).map_err(|serr| serr.into())
}

fn function_name(&self) -> String {
if self.package.is_empty() {
// For backward compatibility - some of the recorded function names were not following the pattern
self.function.to_string()
} else {
format!("{}::{}", self.package, self.function)
}
}
}

#[async_trait]
pub trait Durability<Ctx: WorkerCtx, SerializableInput, SerializableSuccess, SerializableErr> {
/// A version of `wrap` allowing conversion between the success value and the serialized value within the mutable worker context.
Expand Down
Loading
Loading