diff --git a/crates/adapter/src/fastly/core.rs b/crates/adapter/src/fastly/core.rs index 5f6bc58e..ffbd1114 100644 --- a/crates/adapter/src/fastly/core.rs +++ b/crates/adapter/src/fastly/core.rs @@ -2677,6 +2677,21 @@ pub mod fastly_kv_store { PreconditionFailed, PayloadTooLarge, InternalError, + TooManyRequests, + } + + impl From for KvError { + fn from(value: kv_store::KvStatus) -> Self { + match value { + kv_store::KvStatus::Ok => Self::Ok, + kv_store::KvStatus::BadRequest => Self::BadRequest, + kv_store::KvStatus::NotFound => Self::NotFound, + kv_store::KvStatus::PreconditionFailed => Self::PreconditionFailed, + kv_store::KvStatus::PayloadTooLarge => Self::PayloadTooLarge, + kv_store::KvStatus::InternalError => Self::InternalError, + kv_store::KvStatus::TooManyRequests => Self::TooManyRequests, + } + } } #[export_name = "fastly_kv_store#open"] @@ -2735,22 +2750,25 @@ pub mod fastly_kv_store { pending_handle: PendingObjectStoreLookupHandle, body_handle_out: *mut BodyHandle, metadata_out: *mut u8, - metadata_len: *mut usize, + metadata_len: usize, + nwritten_out: *mut usize, generation_out: *mut u32, kv_error_out: *mut KvError, ) -> FastlyStatus { let res = match kv_store::lookup_wait(pending_handle) { - Ok(Some(res)) => res, - Ok(None) => { + Ok((res, status)) => { unsafe { - *kv_error_out = KvError::NotFound; + *kv_error_out = status.into(); } - return FastlyStatus::OK; + let Some(res) = res else { + return FastlyStatus::OK; + }; + + res } Err(e) => { unsafe { - // TODO: the wit interface doesn't return any KvError values *kv_error_out = KvError::Uninitialized; } @@ -2758,27 +2776,27 @@ pub mod fastly_kv_store { } }; - let max_len = unsafe { *metadata_len }; - with_buffer!( metadata_out, - max_len, - { res.metadata(u64::try_from(max_len).trapping_unwrap()) }, + metadata_len, + { res.metadata(u64::try_from(metadata_len).trapping_unwrap()) }, |res| { - let buf = handle_buffer_len!(res, metadata_len); + let buf = handle_buffer_len!(res, nwritten_out); unsafe { - *metadata_len = buf.as_ref().map(Vec::len).unwrap_or(0); + *nwritten_out = buf.as_ref().map(Vec::len).unwrap_or(0); } std::mem::forget(buf); } ); + let body = res.body(); + let generation = res.generation(); + unsafe { - *body_handle_out = res.body(); - *generation_out = res.generation(); - *kv_error_out = KvError::Ok; + *body_handle_out = body; + *generation_out = generation; } FastlyStatus::OK @@ -2839,18 +2857,16 @@ pub mod fastly_kv_store { kv_error_out: *mut KvError, ) -> FastlyStatus { match kv_store::insert_wait(pending_body_handle) { - Ok(_) => { + Ok(status) => { unsafe { - *kv_error_out = KvError::Ok; + *kv_error_out = status.into(); } FastlyStatus::OK } - // TODO: the wit interface doesn't return any KvError values Err(e) => { unsafe { - // TODO: the wit interface doesn't return any KvError values *kv_error_out = KvError::Uninitialized; } @@ -2890,9 +2906,9 @@ pub mod fastly_kv_store { kv_error_out: *mut KvError, ) -> FastlyStatus { match kv_store::delete_wait(pending_body_handle) { - Ok(_) => { + Ok(status) => { unsafe { - *kv_error_out = KvError::Ok; + *kv_error_out = status.into(); } FastlyStatus::OK @@ -2900,7 +2916,6 @@ pub mod fastly_kv_store { Err(e) => { unsafe { - // TODO: the wit interface doesn't return any KvError values *kv_error_out = KvError::Uninitialized; } @@ -2916,19 +2931,27 @@ pub mod fastly_kv_store { list_config: *const ListConfig, pending_body_handle_out: *mut PendingObjectStoreListHandle, ) -> FastlyStatus { - let mask = list_config_mask.into(); + let mask = kv_store::ListConfigOptions::from(list_config_mask); let config = unsafe { kv_store::ListConfig { mode: (*list_config).mode.into(), - cursor: { + cursor: if mask.contains(kv_store::ListConfigOptions::CURSOR) { let len = usize::try_from((*list_config).cursor_len).trapping_unwrap(); Vec::from_raw_parts((*list_config).cursor as *mut _, len, len) + } else { + Vec::new() + }, + limit: if mask.contains(kv_store::ListConfigOptions::LIMIT) { + (*list_config).limit + } else { + 0 }, - limit: (*list_config).limit, - prefix: { + prefix: if mask.contains(kv_store::ListConfigOptions::PREFIX) { let len = usize::try_from((*list_config).prefix_len).trapping_unwrap(); - Vec::from_raw_parts((*list_config).cursor as *mut _, len, len) + Vec::from_raw_parts((*list_config).prefix as *mut _, len, len) + } else { + Vec::new() }, } }; @@ -2957,10 +2980,10 @@ pub mod fastly_kv_store { kv_error_out: *mut KvError, ) -> FastlyStatus { match kv_store::list_wait(pending_body_handle) { - Ok(res) => { + Ok((res, status)) => { unsafe { - *kv_error_out = KvError::Ok; - *body_handle_out = res; + *kv_error_out = status.into(); + *body_handle_out = res.unwrap_or(INVALID_HANDLE); } FastlyStatus::OK @@ -2968,8 +2991,8 @@ pub mod fastly_kv_store { Err(e) => { unsafe { - // TODO: the wit interface doesn't return any KvError values *kv_error_out = KvError::Uninitialized; + *body_handle_out = INVALID_HANDLE; } e.into() diff --git a/lib/src/component/error.rs b/lib/src/component/error.rs index 3f9bb4ef..64b7f4b9 100644 --- a/lib/src/component/error.rs +++ b/lib/src/component/error.rs @@ -1,5 +1,5 @@ use { - super::fastly::api::{http_req, types}, + super::fastly::api::{http_req, kv_store::KvStatus, types}, crate::{ config::ClientCertError, error::{self, HandleError}, @@ -12,6 +12,7 @@ use { status::InvalidStatusCode, uri::InvalidUri, }, + wasmtime_wasi::ResourceTableError, }; impl types::Error { @@ -95,6 +96,12 @@ impl From for types::Error { } } +impl From for types::Error { + fn from(_: std::string::FromUtf8Error) -> Self { + types::Error::InvalidArgument + } +} + impl From for types::Error { fn from(err: wiggle::GuestError) -> Self { use wiggle::GuestError::*; @@ -146,6 +153,30 @@ impl From for types::Error { } } +impl From for types::Error { + fn from(err: ResourceTableError) -> Self { + match err { + _ => panic!("{}", err), + } + } +} + +impl From for KvStatus { + fn from(err: KvStoreError) -> Self { + use KvStoreError::*; + match err { + Uninitialized => panic!("{}", err), + Ok => KvStatus::Ok, + BadRequest => KvStatus::BadRequest, + NotFound => KvStatus::NotFound, + PreconditionFailed => KvStatus::PreconditionFailed, + PayloadTooLarge => KvStatus::PayloadTooLarge, + InternalError => KvStatus::InternalError, + TooManyRequests => KvStatus::TooManyRequests, + } + } +} + impl From for types::Error { fn from(_: KeyValidationError) -> Self { types::Error::GenericError diff --git a/lib/src/component/kv_store.rs b/lib/src/component/kv_store.rs index e479c9b9..ba961c0c 100644 --- a/lib/src/component/kv_store.rs +++ b/lib/src/component/kv_store.rs @@ -1,7 +1,21 @@ use { - super::fastly::api::{http_body, kv_store, types}, - super::types::TrappableError, - crate::linking::ComponentCtx, + super::{ + fastly::api::{ + http_body, + kv_store::{self, InsertMode}, + types, + }, + types::TrappableError, + }, + crate::{ + linking::ComponentCtx, + object_store::{ObjectKey, ObjectStoreError}, + session::{ + PeekableTask, PendingKvDeleteTask, PendingKvInsertTask, PendingKvListTask, + PendingKvLookupTask, + }, + wiggle_abi::types::KvInsertMode, + }, wasmtime_wasi::WasiView, }; @@ -55,65 +69,229 @@ impl kv_store::HostLookupResult for ComponentCtx { #[async_trait::async_trait] impl kv_store::Host for ComponentCtx { - async fn open(&mut self, _name: String) -> Result, types::Error> { - todo!() + async fn open(&mut self, name: Vec) -> Result, types::Error> { + let name = String::from_utf8(name)?; + if self.session.kv_store.store_exists(&name)? { + // todo (byoung), handle optional/none/error case + let h = self.session.kv_store_handle(&name)?; + Ok(Some(h.into())) + } else { + Err(ObjectStoreError::UnknownObjectStore(name.to_owned()).into()) + } } async fn lookup( &mut self, - _store: kv_store::Handle, - _key: String, - ) -> Result { - todo!() + store: kv_store::Handle, + key: Vec, + ) -> Result { + let store = self.session.get_kv_store_key(store.into()).unwrap(); + let key = String::from_utf8(key)?; + // just create a future that's already ready + let fut = futures::future::ok(self.session.obj_lookup(store.clone(), ObjectKey::new(key)?)); + let task = PeekableTask::spawn(fut).await; + let lh = self + .session + .insert_pending_kv_lookup(PendingKvLookupTask::new(task)); + Ok(lh.into()) } async fn lookup_wait( &mut self, - _handle: kv_store::LookupHandle, - ) -> Result>, types::Error> { - todo!() + handle: kv_store::LookupHandle, + ) -> Result< + ( + Option>, + kv_store::KvStatus, + ), + types::Error, + > { + let resp = self + .session + .take_pending_kv_lookup(handle.into())? + .task() + .recv() + .await?; + + match resp { + Ok(value) => { + let lr = kv_store::LookupResult { + body: self.session.insert_body(value.body.into()).into(), + metadata: match value.metadata_len { + 0 => None, + _ => Some(value.metadata), + }, + generation: value.generation, + }; + + let res = self.table().push(lr)?; + + Ok((Some(res), kv_store::KvStatus::Ok)) + } + Err(e) => Ok((None, e.into())), + } } async fn insert( &mut self, - _store: kv_store::Handle, - _key: String, - _body_handle: kv_store::BodyHandle, - _mask: kv_store::InsertConfigOptions, - _config: kv_store::InsertConfig, + store: kv_store::Handle, + key: Vec, + body_handle: kv_store::BodyHandle, + mask: kv_store::InsertConfigOptions, + config: kv_store::InsertConfig, ) -> Result { - todo!() + let body = self + .session + .take_body(body_handle.into())? + .read_into_vec() + .await?; + let store = self.session.get_kv_store_key(store.into()).unwrap(); + let key = String::from_utf8(key)?; + + let mode = match config.mode { + InsertMode::Overwrite => KvInsertMode::Overwrite, + InsertMode::Add => KvInsertMode::Add, + InsertMode::Append => KvInsertMode::Append, + InsertMode::Prepend => KvInsertMode::Prepend, + }; + + let meta = if mask.contains(kv_store::InsertConfigOptions::METADATA) { + Some(config.metadata) + } else { + None + }; + + let igm = if mask.contains(kv_store::InsertConfigOptions::IF_GENERATION_MATCH) { + Some(config.if_generation_match) + } else { + None + }; + + let ttl = if mask.contains(kv_store::InsertConfigOptions::TIME_TO_LIVE_SEC) { + Some(std::time::Duration::from_secs( + config.time_to_live_sec as u64, + )) + } else { + None + }; + + let fut = futures::future::ok(self.session.kv_insert( + store.clone(), + ObjectKey::new(key)?, + body, + Some(mode), + igm, + meta, + ttl, + )); + let task = PeekableTask::spawn(fut).await; + let handle = self + .session + .insert_pending_kv_insert(PendingKvInsertTask::new(task)); + Ok(handle.into()) } - async fn insert_wait(&mut self, _handle: kv_store::InsertHandle) -> Result<(), types::Error> { - todo!() + async fn insert_wait( + &mut self, + handle: kv_store::InsertHandle, + ) -> Result { + let resp = self + .session + .take_pending_kv_insert(handle.into())? + .task() + .recv() + .await?; + + match resp { + Ok(()) => Ok(kv_store::KvStatus::Ok), + Err(e) => Ok(e.into()), + } } async fn delete( &mut self, - _store: kv_store::Handle, - _key: String, + store: kv_store::Handle, + key: Vec, ) -> Result { - todo!() + let store = self.session.get_kv_store_key(store.into()).unwrap(); + let key = String::from_utf8(key)?; + // just create a future that's already ready + let fut = futures::future::ok(self.session.kv_delete(store.clone(), ObjectKey::new(key)?)); + let task = PeekableTask::spawn(fut).await; + let lh = self + .session + .insert_pending_kv_delete(PendingKvDeleteTask::new(task)); + Ok(lh.into()) } - async fn delete_wait(&mut self, _handle: kv_store::DeleteHandle) -> Result<(), types::Error> { - todo!() + async fn delete_wait( + &mut self, + handle: kv_store::DeleteHandle, + ) -> Result { + let resp = self + .session + .take_pending_kv_delete(handle.into())? + .task() + .recv() + .await?; + + match resp { + Ok(()) => Ok(kv_store::KvStatus::Ok), + Err(e) => Ok(e.into()), + } } async fn list( &mut self, - _store: kv_store::Handle, - _mask: kv_store::ListConfigOptions, - _options: kv_store::ListConfig, + store: kv_store::Handle, + mask: kv_store::ListConfigOptions, + options: kv_store::ListConfig, ) -> Result { - todo!() + let store = self.session.get_kv_store_key(store.into()).unwrap(); + + let cursor = if mask.contains(kv_store::ListConfigOptions::CURSOR) { + Some(String::from_utf8(options.cursor)?) + } else { + None + }; + + let prefix = if mask.contains(kv_store::ListConfigOptions::PREFIX) { + Some(String::from_utf8(options.prefix)?) + } else { + None + }; + + let limit = if mask.contains(kv_store::ListConfigOptions::LIMIT) { + Some(options.limit) + } else { + None + }; + + let fut = futures::future::ok(self.session.kv_list(store.clone(), cursor, prefix, limit)); + let task = PeekableTask::spawn(fut).await; + let handle = self + .session + .insert_pending_kv_list(PendingKvListTask::new(task)); + Ok(handle.into()) } async fn list_wait( &mut self, - _handle: kv_store::ListHandle, - ) -> Result { - todo!() + handle: kv_store::ListHandle, + ) -> Result<(Option, kv_store::KvStatus), types::Error> { + let resp = self + .session + .take_pending_kv_list(handle.into())? + .task() + .recv() + .await?; + + match resp { + Ok(value) => Ok(( + Some(self.session.insert_body(value.into()).into()), + kv_store::KvStatus::Ok, + )), + Err(e) => Ok((None, e.into())), + } } } diff --git a/lib/wit/deps/fastly/compute.wit b/lib/wit/deps/fastly/compute.wit index 1006d781..cc1076dc 100644 --- a/lib/wit/deps/fastly/compute.wit +++ b/lib/wit/deps/fastly/compute.wit @@ -726,12 +726,22 @@ interface kv-store { type delete-handle = u32; type list-handle = u32; - open: func(name: string) -> result, error>; + enum kv-status { + ok, + bad-request, + not-found, + precondition-failed, + payload-too-large, + internal-error, + too-many-requests, + } + + open: func(name: list) -> result, error>; lookup: func( store: handle, - key: string, - ) -> result; + key: list, + ) -> result; resource lookup-result { body: func() -> body-handle; @@ -741,7 +751,7 @@ interface kv-store { lookup-wait: func( handle: lookup-handle, - ) -> result, error>; + ) -> result, kv-status>, error>; enum insert-mode { overwrite, @@ -767,7 +777,7 @@ interface kv-store { insert: func( store: handle, - key: string, + key: list, body-handle: body-handle, mask: insert-config-options, config: insert-config, @@ -775,16 +785,16 @@ interface kv-store { insert-wait: func( handle: insert-handle, - ) -> result<_, error>; + ) -> result; delete: func( store: handle, - key: string, + key: list, ) -> result; delete-wait: func( handle: delete-handle, - ) -> result<_, error>; + ) -> result; enum list-mode { strong, @@ -813,7 +823,7 @@ interface kv-store { list-wait: func( handle: list-handle, - ) -> result; + ) -> result, kv-status>, error>; } /*