diff --git a/crates/adapter/src/fastly/core.rs b/crates/adapter/src/fastly/core.rs index 0d8820ff..e2430b45 100644 --- a/crates/adapter/src/fastly/core.rs +++ b/crates/adapter/src/fastly/core.rs @@ -89,6 +89,7 @@ pub enum HttpKeepaliveMode { pub type PendingObjectStoreLookupHandle = u32; pub type PendingObjectStoreInsertHandle = u32; pub type PendingObjectStoreDeleteHandle = u32; +pub type PendingObjectStoreListHandle = u32; pub type BodyHandle = u32; pub type PendingRequestHandle = u32; pub type RequestHandle = u32; @@ -2310,9 +2311,9 @@ pub mod fastly_erl { } } -pub mod fastly_kv_store { +pub mod fastly_object_store { use super::*; - use crate::bindings::fastly::api::kv_store; + use crate::bindings::fastly::api::object_store; use core::slice; #[export_name = "fastly_object_store#open"] @@ -2322,7 +2323,7 @@ pub mod fastly_kv_store { kv_store_handle_out: *mut KVStoreHandle, ) -> FastlyStatus { let name = unsafe { slice::from_raw_parts(name_ptr, name_len) }; - match kv_store::open(name) { + match object_store::open(name) { Ok(None) => { unsafe { *kv_store_handle_out = INVALID_HANDLE; @@ -2348,7 +2349,7 @@ pub mod fastly_kv_store { body_handle_out: *mut BodyHandle, ) -> FastlyStatus { let key = unsafe { slice::from_raw_parts(key_ptr, key_len) }; - match kv_store::lookup(kv_store_handle, key) { + match object_store::lookup(kv_store_handle, key) { Ok(res) => { unsafe { *body_handle_out = res.unwrap_or(INVALID_HANDLE); @@ -2367,7 +2368,7 @@ pub mod fastly_kv_store { pending_body_handle_out: *mut PendingObjectStoreLookupHandle, ) -> FastlyStatus { let key = unsafe { slice::from_raw_parts(key_ptr, key_len) }; - match kv_store::lookup_async(kv_store_handle, key) { + match object_store::lookup_async(kv_store_handle, key) { Ok(res) => { unsafe { *pending_body_handle_out = res; @@ -2383,7 +2384,7 @@ pub mod fastly_kv_store { pending_body_handle: PendingObjectStoreLookupHandle, body_handle_out: *mut BodyHandle, ) -> FastlyStatus { - match kv_store::pending_lookup_wait(pending_body_handle) { + match object_store::pending_lookup_wait(pending_body_handle) { Ok(res) => { unsafe { *body_handle_out = res.unwrap_or(INVALID_HANDLE); @@ -2402,7 +2403,7 @@ pub mod fastly_kv_store { body_handle: BodyHandle, ) -> FastlyStatus { let key = unsafe { slice::from_raw_parts(key_ptr, key_len) }; - convert_result(kv_store::insert(kv_store_handle, key, body_handle)) + convert_result(object_store::insert(kv_store_handle, key, body_handle)) } #[export_name = "fastly_object_store#insert_async"] @@ -2414,7 +2415,7 @@ pub mod fastly_kv_store { pending_body_handle_out: *mut PendingObjectStoreInsertHandle, ) -> FastlyStatus { let key = unsafe { slice::from_raw_parts(key_ptr, key_len) }; - match kv_store::insert_async(kv_store_handle, key, body_handle) { + match object_store::insert_async(kv_store_handle, key, body_handle) { Ok(res) => { unsafe { *pending_body_handle_out = res; @@ -2429,7 +2430,7 @@ pub mod fastly_kv_store { pub fn pending_insert_wait( pending_body_handle: PendingObjectStoreInsertHandle, ) -> FastlyStatus { - convert_result(kv_store::pending_insert_wait(pending_body_handle)) + convert_result(object_store::pending_insert_wait(pending_body_handle)) } #[export_name = "fastly_object_store#delete_async"] @@ -2440,7 +2441,7 @@ pub mod fastly_kv_store { pending_body_handle_out: *mut PendingObjectStoreDeleteHandle, ) -> FastlyStatus { let key = unsafe { slice::from_raw_parts(key_ptr, key_len) }; - match kv_store::delete_async(kv_store_handle, key) { + match object_store::delete_async(kv_store_handle, key) { Ok(res) => { unsafe { *pending_body_handle_out = res; @@ -2455,7 +2456,510 @@ pub mod fastly_kv_store { pub fn pending_delete_wait( pending_body_handle: PendingObjectStoreDeleteHandle, ) -> FastlyStatus { - convert_result(kv_store::pending_delete_wait(pending_body_handle)) + convert_result(object_store::pending_delete_wait(pending_body_handle)) + } +} + +pub mod fastly_kv_store { + use super::*; + use crate::bindings::fastly::api::kv_store; + use core::slice; + + /// Modes of KV Store insertion. + /// + /// This type serves to facilitate alternative methods of key insertion. + #[repr(C)] + #[derive(Default, Clone, Copy)] + pub enum InsertMode { + /// The default method of insertion. Create a key, or overwrite an existing one + #[default] + Overwrite, + /// Only insert if the key does not currently exist + Add, + /// Append this insertion's body onto a key's value if it exists (or create a new key if there is none) + Append, + /// Prepend this insertion's body onto a key's value if it exists (or create a new key if there is none) + Prepend, + } + + impl From for kv_store::InsertMode { + fn from(value: InsertMode) -> Self { + match value { + InsertMode::Overwrite => Self::Overwrite, + InsertMode::Add => Self::Add, + InsertMode::Append => Self::Append, + InsertMode::Prepend => Self::Prepend, + } + } + } + + #[repr(C)] + pub struct InsertConfig { + pub mode: InsertMode, + pub if_generation_match: u32, + pub metadata: *const u8, + pub metadata_len: u32, + pub time_to_live_sec: u32, + } + + impl Default for InsertConfig { + fn default() -> Self { + InsertConfig { + mode: InsertMode::Overwrite, + if_generation_match: 0, + metadata: std::ptr::null(), + metadata_len: 0, + time_to_live_sec: 0, + } + } + } + + #[repr(C)] + #[derive(Default, Copy, Clone)] + pub enum ListModeInternal { + #[default] + Strong, + Eventual, + } + + impl From for kv_store::ListMode { + fn from(value: ListModeInternal) -> Self { + match value { + ListModeInternal::Strong => Self::Strong, + ListModeInternal::Eventual => Self::Eventual, + } + } + } + + #[repr(C)] + pub struct ListConfig { + pub mode: ListModeInternal, + pub cursor: *const u8, + pub cursor_len: u32, + pub limit: u32, + pub prefix: *const u8, + pub prefix_len: u32, + } + + impl Default for ListConfig { + fn default() -> Self { + ListConfig { + mode: ListModeInternal::Strong, + cursor: std::ptr::null(), + cursor_len: 0, + limit: 0, + prefix: std::ptr::null(), + prefix_len: 0, + } + } + } + + #[repr(C)] + pub struct LookupConfig { + // reserved is just a placeholder, + // can be removed when somethin real is added + reserved: u32, + } + + impl Default for LookupConfig { + fn default() -> Self { + LookupConfig { reserved: 0 } + } + } + + #[repr(C)] + pub struct DeleteConfig { + // reserved is just a placeholder, + // can be removed when somethin real is added + reserved: u32, + } + + impl Default for DeleteConfig { + fn default() -> Self { + DeleteConfig { reserved: 0 } + } + } + + bitflags::bitflags! { + /// `InsertConfigOptions` codings. + #[derive(Default)] + #[repr(transparent)] + pub struct InsertConfigOptions: u32 { + const RESERVED = 1 << 0; + const BACKGROUND_FETCH = 1 << 1; + const IF_GENERATION_MATCH = 1 << 2; + const METADATA = 1 << 3; + const TIME_TO_LIVE_SEC = 1 << 4; + } + /// `ListConfigOptions` codings. + #[derive(Default)] + #[repr(transparent)] + pub struct ListConfigOptions: u32 { + const RESERVED = 1 << 0; + const CURSOR = 1 << 1; + const LIMIT = 1 << 2; + const PREFIX = 1 << 3; + } + /// `LookupConfigOptions` codings. + #[derive(Default)] + #[repr(transparent)] + pub struct LookupConfigOptions: u32 { + const RESERVED = 1 << 0; + } + /// `DeleteConfigOptions` codings. + #[derive(Default)] + #[repr(transparent)] + pub struct DeleteConfigOptions: u32 { + const RESERVED = 1 << 0; + } + } + + impl From for kv_store::InsertConfigOptions { + fn from(value: InsertConfigOptions) -> Self { + let mut res = Self::empty(); + res.set( + Self::RESERVED, + value.contains(InsertConfigOptions::RESERVED), + ); + res.set( + Self::BACKGROUND_FETCH, + value.contains(InsertConfigOptions::BACKGROUND_FETCH), + ); + res.set( + Self::IF_GENERATION_MATCH, + value.contains(InsertConfigOptions::IF_GENERATION_MATCH), + ); + res.set( + Self::METADATA, + value.contains(InsertConfigOptions::METADATA), + ); + res.set( + Self::TIME_TO_LIVE_SEC, + value.contains(InsertConfigOptions::TIME_TO_LIVE_SEC), + ); + res + } + } + + impl From for kv_store::ListConfigOptions { + fn from(value: ListConfigOptions) -> Self { + let mut res = Self::empty(); + res.set(Self::RESERVED, value.contains(ListConfigOptions::RESERVED)); + res.set(Self::CURSOR, value.contains(ListConfigOptions::CURSOR)); + res.set(Self::LIMIT, value.contains(ListConfigOptions::LIMIT)); + res.set(Self::PREFIX, value.contains(ListConfigOptions::PREFIX)); + res + } + } + + #[repr(u32)] + #[derive(Clone, Copy, Debug, PartialEq, Eq)] + pub enum KvError { + Uninitialized, + Ok, + BadRequest, + NotFound, + PreconditionFailed, + PayloadTooLarge, + InternalError, + } + + #[export_name = "fastly_kv_store#open"] + pub fn open_v2( + name_ptr: *const u8, + name_len: usize, + kv_store_handle_out: *mut KVStoreHandle, + ) -> FastlyStatus { + let name = unsafe { slice::from_raw_parts(name_ptr, name_len) }; + match kv_store::open(name) { + Ok(None) => { + unsafe { + *kv_store_handle_out = INVALID_HANDLE; + } + + FastlyStatus::INVALID_ARGUMENT + } + + Ok(Some(res)) => { + unsafe { + *kv_store_handle_out = res; + } + + FastlyStatus::OK + } + + Err(e) => e.into(), + } + } + + #[export_name = "fastly_kv_store#lookup"] + pub fn lookup_v2( + kv_store_handle: KVStoreHandle, + key_ptr: *const u8, + key_len: usize, + // NOTE: mask and config are ignored in the wit definition while they're empty + _lookup_config_mask: LookupConfigOptions, + _lookup_config: *const LookupConfig, + pending_body_handle_out: *mut PendingObjectStoreLookupHandle, + ) -> FastlyStatus { + let key = unsafe { slice::from_raw_parts(key_ptr, key_len) }; + match kv_store::lookup(kv_store_handle, key) { + Ok(res) => { + unsafe { + *pending_body_handle_out = res; + } + + FastlyStatus::OK + } + Err(e) => e.into(), + } + } + + #[export_name = "fastly_kv_store#lookup_wait"] + pub fn pending_lookup_wait_v2( + pending_handle: PendingObjectStoreLookupHandle, + body_handle_out: *mut BodyHandle, + metadata_out: *mut u8, + metadata_len: *mut usize, + generation_out: *mut u32, + kv_error_out: *mut KvError, + ) -> FastlyStatus { + let res = match kv_store::lookup_wait(pending_handle) { + Ok(Some(res)) => res, + Ok(None) => { + unsafe { + *kv_error_out = KvError::NotFound; + } + + return FastlyStatus::OK; + } + Err(e) => { + unsafe { + // TODO: the wit interface doesn't return any KvError values + *kv_error_out = KvError::Uninitialized; + } + + return e.into(); + } + }; + + let max_len = unsafe { *metadata_len }; + + with_buffer!( + metadata_out, + max_len, + { res.metadata(u64::try_from(max_len).trapping_unwrap()) }, + |res| { + let buf = handle_buffer_len!(res, metadata_len); + + unsafe { + *metadata_len = buf.as_ref().map(Vec::len).unwrap_or(0); + } + + std::mem::forget(buf); + } + ); + + unsafe { + *body_handle_out = res.body(); + *generation_out = res.generation(); + *kv_error_out = KvError::Ok; + } + + FastlyStatus::OK + } + + #[export_name = "fastly_kv_store#insert"] + pub fn insert_v2( + kv_store_handle: KVStoreHandle, + key_ptr: *const u8, + key_len: usize, + body_handle: BodyHandle, + insert_config_mask: InsertConfigOptions, + insert_config: *const InsertConfig, + pending_body_handle_out: *mut PendingObjectStoreInsertHandle, + ) -> FastlyStatus { + let key = unsafe { slice::from_raw_parts(key_ptr, key_len) }; + + let insert_config_mask = insert_config_mask.into(); + let insert_config = unsafe { + kv_store::InsertConfig { + mode: (*insert_config).mode.into(), + if_generation_match: (*insert_config).if_generation_match, + metadata: { + let len = usize::try_from((*insert_config).metadata_len).trapping_unwrap(); + Vec::from_raw_parts((*insert_config).metadata as *mut _, len, len) + }, + time_to_live_sec: (*insert_config).time_to_live_sec, + } + }; + + let res = kv_store::insert( + kv_store_handle, + key, + body_handle, + insert_config_mask, + &insert_config, + ); + + // We don't own the memory in metadata, so forget the vector that the insert config holds. + std::mem::forget(insert_config); + + match res { + Ok(res) => { + unsafe { + *pending_body_handle_out = res; + } + + FastlyStatus::OK + } + + Err(e) => e.into(), + } + } + + #[export_name = "fastly_kv_store#insert_wait"] + pub fn pending_insert_wait_v2( + pending_body_handle: PendingObjectStoreInsertHandle, + kv_error_out: *mut KvError, + ) -> FastlyStatus { + match kv_store::insert_wait(pending_body_handle) { + Ok(_) => { + unsafe { + *kv_error_out = KvError::Ok; + } + + FastlyStatus::OK + } + + // TODO: the wit interface doesn't return any KvError values + Err(e) => { + unsafe { + // TODO: the wit interface doesn't return any KvError values + *kv_error_out = KvError::Uninitialized; + } + + e.into() + } + } + } + + #[export_name = "fastly_kv_store#delete"] + pub fn delete_v2( + kv_store_handle: KVStoreHandle, + key_ptr: *const u8, + key_len: usize, + // These are ignored in the wit interface for the time being, as they don't pass any + // meaningful values. + _delete_config_mask: DeleteConfigOptions, + _delete_config: *const DeleteConfig, + pending_body_handle_out: *mut PendingObjectStoreDeleteHandle, + ) -> FastlyStatus { + let key = unsafe { slice::from_raw_parts(key_ptr, key_len) }; + match kv_store::delete(kv_store_handle, key) { + Ok(res) => { + unsafe { + *pending_body_handle_out = res; + } + + FastlyStatus::OK + } + + Err(e) => e.into(), + } + } + + #[export_name = "fastly_kv_store#delete_wait"] + pub fn pending_delete_wait_v2( + pending_body_handle: PendingObjectStoreDeleteHandle, + kv_error_out: *mut KvError, + ) -> FastlyStatus { + match kv_store::delete_wait(pending_body_handle) { + Ok(_) => { + unsafe { + *kv_error_out = KvError::Ok; + } + + FastlyStatus::OK + } + + Err(e) => { + unsafe { + // TODO: the wit interface doesn't return any KvError values + *kv_error_out = KvError::Uninitialized; + } + + e.into() + } + } + } + + #[export_name = "fastly_kv_store#list"] + pub fn list_v2( + kv_store_handle: KVStoreHandle, + list_config_mask: ListConfigOptions, + list_config: *const ListConfig, + pending_body_handle_out: *mut PendingObjectStoreListHandle, + ) -> FastlyStatus { + let mask = list_config_mask.into(); + + let config = unsafe { + kv_store::ListConfig { + mode: (*list_config).mode.into(), + cursor: { + let len = usize::try_from((*list_config).cursor_len).trapping_unwrap(); + Vec::from_raw_parts((*list_config).cursor as *mut _, len, len) + }, + limit: (*list_config).limit, + prefix: { + let len = usize::try_from((*list_config).prefix_len).trapping_unwrap(); + Vec::from_raw_parts((*list_config).cursor as *mut _, len, len) + }, + } + }; + + let res = kv_store::list(kv_store_handle, mask, &config); + + std::mem::forget(config); + + match res { + Ok(res) => { + unsafe { + *pending_body_handle_out = res; + } + + FastlyStatus::OK + } + + Err(e) => e.into(), + } + } + + #[export_name = "fastly_kv_store#list_wait"] + pub fn pending_list_wait_v2( + pending_body_handle: PendingObjectStoreListHandle, + body_handle_out: *mut BodyHandle, + kv_error_out: *mut KvError, + ) -> FastlyStatus { + match kv_store::list_wait(pending_body_handle) { + Ok(res) => { + unsafe { + *kv_error_out = KvError::Ok; + *body_handle_out = res; + } + + FastlyStatus::OK + } + + Err(e) => { + unsafe { + // TODO: the wit interface doesn't return any KvError values + *kv_error_out = KvError::Uninitialized; + } + + e.into() + } + } } } diff --git a/lib/data/viceroy-component-adapter.wasm b/lib/data/viceroy-component-adapter.wasm index 84e57249..de117bbe 100755 Binary files a/lib/data/viceroy-component-adapter.wasm and b/lib/data/viceroy-component-adapter.wasm differ diff --git a/lib/src/component/kv_store.rs b/lib/src/component/kv_store.rs index 2ebc4172..69d886f5 100644 --- a/lib/src/component/kv_store.rs +++ b/lib/src/component/kv_store.rs @@ -1,140 +1,102 @@ use { - super::fastly::api::{http_types, kv_store, types}, - crate::{ - body::Body, - object_store::{ObjectKey, ObjectStoreError}, - session::{ - PeekableTask, PendingKvDeleteTask, PendingKvInsertTask, PendingKvLookupTask, Session, - }, - }, + super::fastly::api::{http_body, kv_store, types}, + crate::session::Session, }; +pub struct LookupResult; + #[async_trait::async_trait] -impl kv_store::Host for Session { - async fn open(&mut self, name: String) -> Result, types::Error> { - if self.object_store.store_exists(&name)? { - let handle = self.obj_store_handle(&name)?; - Ok(Some(handle.into())) - } else { - Ok(None) - } +impl kv_store::HostLookupResult for Session { + async fn body( + &mut self, + _self_: wasmtime::component::Resource, + ) -> http_body::BodyHandle { + todo!() } - async fn lookup( + async fn metadata( &mut self, - store: kv_store::Handle, - key: String, - ) -> Result, types::Error> { - let store = self.get_obj_store_key(store.into()).unwrap(); - let key = ObjectKey::new(&key)?; - match self.obj_lookup(store, &key) { - Ok(obj) => { - let new_handle = self.insert_body(Body::from(obj)); - Ok(Some(new_handle.into())) - } - // Don't write to the invalid handle as the SDK will return Ok(None) - // if the object does not exist. We need to return `Ok(())` here to - // make sure Viceroy does not crash - Err(ObjectStoreError::MissingObject) => Ok(None), - Err(err) => Err(err.into()), - } + _self_: wasmtime::component::Resource, + ) -> Option> { + todo!() } - async fn lookup_async( + async fn generation( &mut self, - store: kv_store::Handle, - key: String, - ) -> Result { - let store = self.get_obj_store_key(store.into()).unwrap(); - let key = ObjectKey::new(key)?; - // just create a future that's already ready - let fut = futures::future::ok(self.obj_lookup(store, &key)); - let task = PendingKvLookupTask::new(PeekableTask::spawn(fut).await); - Ok(self.insert_pending_kv_lookup(task).into()) + _self_: wasmtime::component::Resource, + ) -> u32 { + todo!() } - async fn pending_lookup_wait( + fn drop( &mut self, - pending: kv_store::PendingLookupHandle, - ) -> Result, types::Error> { - let pending_obj = self - .take_pending_kv_lookup(pending.into())? - .task() - .recv() - .await?; - // proceed with the normal match from lookup() - match pending_obj { - Ok(obj) => Ok(Some(self.insert_body(Body::from(obj)).into())), - Err(ObjectStoreError::MissingObject) => Ok(None), - Err(err) => Err(err.into()), - } + _rep: wasmtime::component::Resource, + ) -> wasmtime::Result<()> { + todo!() } +} - async fn insert( +#[async_trait::async_trait] +impl kv_store::Host for Session { + async fn open(&mut self, _name: String) -> Result, types::Error> { + todo!() + } + + async fn lookup( &mut self, - store: kv_store::Handle, - key: String, - body_handle: http_types::BodyHandle, - ) -> Result<(), types::Error> { - let store = self.get_obj_store_key(store.into()).unwrap().clone(); - let key = ObjectKey::new(&key)?; - let bytes = self.take_body(body_handle.into())?.read_into_vec().await?; - self.obj_insert(store, key, bytes)?; + _store: kv_store::Handle, + _key: String, + ) -> Result, types::Error> { + todo!() + } - Ok(()) + async fn lookup_wait( + &mut self, + _handle: kv_store::LookupHandle, + ) -> Result>, types::Error> { + todo!() } - async fn insert_async( + async fn insert( &mut self, - store: kv_store::Handle, - key: String, - body_handle: http_types::BodyHandle, - ) -> Result { - let store = self.get_obj_store_key(store.into()).unwrap().clone(); - let key = ObjectKey::new(&key)?; - let bytes = self.take_body(body_handle.into())?.read_into_vec().await?; - let fut = futures::future::ok(self.obj_insert(store, key, bytes)); - let task = PeekableTask::spawn(fut).await; + _store: kv_store::Handle, + _key: String, + _body_handle: kv_store::BodyHandle, + _mask: kv_store::InsertConfigOptions, + _config: kv_store::InsertConfig, + ) -> Result { + todo!() + } - Ok(self - .insert_pending_kv_insert(PendingKvInsertTask::new(task)) - .into()) + async fn insert_wait(&mut self, _handle: kv_store::InsertHandle) -> Result<(), types::Error> { + todo!() } - async fn pending_insert_wait( + async fn delete( &mut self, - handle: kv_store::PendingInsertHandle, - ) -> Result<(), types::Error> { - Ok((self - .take_pending_kv_insert(handle.into())? - .task() - .recv() - .await?)?) + _store: kv_store::Handle, + _key: String, + ) -> Result { + todo!() } - async fn delete_async( - &mut self, - store: kv_store::Handle, - key: String, - ) -> Result { - let store = self.get_obj_store_key(store.into()).unwrap().clone(); - let key = ObjectKey::new(&key)?; - let fut = futures::future::ok(self.obj_delete(store, key)); - let task = PeekableTask::spawn(fut).await; + async fn delete_wait(&mut self, _handle: kv_store::DeleteHandle) -> Result<(), types::Error> { + todo!() + } - Ok(self - .insert_pending_kv_delete(PendingKvDeleteTask::new(task)) - .into()) + async fn list( + &mut self, + _store: kv_store::Handle, + _mask: kv_store::ListConfigOptions, + _options: kv_store::ListConfig, + ) -> Result { + todo!() } - async fn pending_delete_wait( + async fn list_wait( &mut self, - handle: kv_store::PendingDeleteHandle, - ) -> Result<(), types::Error> { - Ok((self - .take_pending_kv_delete(handle.into())? - .task() - .recv() - .await?)?) + _handle: kv_store::ListHandle, + ) -> Result { + todo!() } } diff --git a/lib/src/component/mod.rs b/lib/src/component/mod.rs index c667056c..f8c7e62e 100644 --- a/lib/src/component/mod.rs +++ b/lib/src/component/mod.rs @@ -53,6 +53,7 @@ pub fn link_host_functions(linker: &mut component::Linker) -> anyh fastly::api::http_resp::add_to_linker(linker, |x| x.session())?; fastly::api::http_types::add_to_linker(linker, |x| x.session())?; fastly::api::log::add_to_linker(linker, |x| x.session())?; + fastly::api::object_store::add_to_linker(linker, |x| x.session())?; fastly::api::kv_store::add_to_linker(linker, |x| x.session())?; fastly::api::purge::add_to_linker(linker, |x| x.session())?; fastly::api::secret_store::add_to_linker(linker, |x| x.session())?; @@ -81,6 +82,7 @@ pub mod http_resp; pub mod http_types; pub mod kv_store; pub mod log; +pub mod object_store; pub mod purge; pub mod secret_store; pub mod types; diff --git a/lib/src/component/object_store.rs b/lib/src/component/object_store.rs new file mode 100644 index 00000000..49a423d2 --- /dev/null +++ b/lib/src/component/object_store.rs @@ -0,0 +1,140 @@ +use { + super::fastly::api::{http_types, object_store, types}, + crate::{ + body::Body, + object_store::{ObjectKey, ObjectStoreError}, + session::{ + PeekableTask, PendingKvDeleteTask, PendingKvInsertTask, PendingKvLookupTask, Session, + }, + }, +}; + +#[async_trait::async_trait] +impl object_store::Host for Session { + async fn open(&mut self, name: String) -> Result, types::Error> { + if self.object_store.store_exists(&name)? { + let handle = self.obj_store_handle(&name)?; + Ok(Some(handle.into())) + } else { + Ok(None) + } + } + + async fn lookup( + &mut self, + store: object_store::Handle, + key: String, + ) -> Result, types::Error> { + let store = self.get_obj_store_key(store.into()).unwrap(); + let key = ObjectKey::new(&key)?; + match self.obj_lookup(store, &key) { + Ok(obj) => { + let new_handle = self.insert_body(Body::from(obj)); + Ok(Some(new_handle.into())) + } + // Don't write to the invalid handle as the SDK will return Ok(None) + // if the object does not exist. We need to return `Ok(())` here to + // make sure Viceroy does not crash + Err(ObjectStoreError::MissingObject) => Ok(None), + Err(err) => Err(err.into()), + } + } + + async fn lookup_async( + &mut self, + store: object_store::Handle, + key: String, + ) -> Result { + let store = self.get_obj_store_key(store.into()).unwrap(); + let key = ObjectKey::new(key)?; + // just create a future that's already ready + let fut = futures::future::ok(self.obj_lookup(store, &key)); + let task = PendingKvLookupTask::new(PeekableTask::spawn(fut).await); + Ok(self.insert_pending_kv_lookup(task).into()) + } + + async fn pending_lookup_wait( + &mut self, + pending: object_store::PendingLookupHandle, + ) -> Result, types::Error> { + let pending_obj = self + .take_pending_kv_lookup(pending.into())? + .task() + .recv() + .await?; + // proceed with the normal match from lookup() + match pending_obj { + Ok(obj) => Ok(Some(self.insert_body(Body::from(obj)).into())), + Err(ObjectStoreError::MissingObject) => Ok(None), + Err(err) => Err(err.into()), + } + } + + async fn insert( + &mut self, + store: object_store::Handle, + key: String, + body_handle: http_types::BodyHandle, + ) -> Result<(), types::Error> { + let store = self.get_obj_store_key(store.into()).unwrap().clone(); + let key = ObjectKey::new(&key)?; + let bytes = self.take_body(body_handle.into())?.read_into_vec().await?; + self.obj_insert(store, key, bytes)?; + + Ok(()) + } + + async fn insert_async( + &mut self, + store: object_store::Handle, + key: String, + body_handle: http_types::BodyHandle, + ) -> Result { + let store = self.get_obj_store_key(store.into()).unwrap().clone(); + let key = ObjectKey::new(&key)?; + let bytes = self.take_body(body_handle.into())?.read_into_vec().await?; + let fut = futures::future::ok(self.obj_insert(store, key, bytes)); + let task = PeekableTask::spawn(fut).await; + + Ok(self + .insert_pending_kv_insert(PendingKvInsertTask::new(task)) + .into()) + } + + async fn pending_insert_wait( + &mut self, + handle: object_store::PendingInsertHandle, + ) -> Result<(), types::Error> { + Ok((self + .take_pending_kv_insert(handle.into())? + .task() + .recv() + .await?)?) + } + + async fn delete_async( + &mut self, + store: object_store::Handle, + key: String, + ) -> Result { + let store = self.get_obj_store_key(store.into()).unwrap().clone(); + let key = ObjectKey::new(&key)?; + let fut = futures::future::ok(self.obj_delete(store, key)); + let task = PeekableTask::spawn(fut).await; + + Ok(self + .insert_pending_kv_delete(PendingKvDeleteTask::new(task)) + .into()) + } + + async fn pending_delete_wait( + &mut self, + handle: object_store::PendingDeleteHandle, + ) -> Result<(), types::Error> { + Ok((self + .take_pending_kv_delete(handle.into())? + .task() + .recv() + .await?)?) + } +} diff --git a/lib/wit/deps/fastly/compute.wit b/lib/wit/deps/fastly/compute.wit index efcad638..bde3e5ec 100644 --- a/lib/wit/deps/fastly/compute.wit +++ b/lib/wit/deps/fastly/compute.wit @@ -657,9 +657,9 @@ interface erl { } /* - * Fastly KV Store + * Fastly Object Store */ -interface kv-store { +interface object-store { use types.{error}; use http-types.{body-handle}; @@ -711,6 +711,110 @@ interface kv-store { ) -> result<_, error>; } +/* + * Fastly KV Store + */ +interface kv-store { + + use types.{error}; + use http-types.{body-handle}; + + type handle = u32; + type lookup-handle = u32; + type insert-handle = u32; + type delete-handle = u32; + type list-handle = u32; + + open: func(name: string) -> result, error>; + + lookup: func( + store: handle, + key: string, + ) -> result; + + resource lookup-result { + body: func() -> body-handle; + metadata: func(max-len: u64) -> result>, error>; + generation: func() -> u32; + } + + lookup-wait: func( + handle: lookup-handle, + ) -> result, error>; + + enum insert-mode { + overwrite, + add, + append, + prepend, + } + + flags insert-config-options { + reserved, + background-fetch, + if-generation-match, + metadata, + time-to-live-sec, + } + + record insert-config { + mode: insert-mode, + if-generation-match: u32, + metadata: list, + time-to-live-sec: u32, + } + + insert: func( + store: handle, + key: string, + body-handle: body-handle, + mask: insert-config-options, + config: insert-config, + ) -> result; + + insert-wait: func( + handle: insert-handle, + ) -> result<_, error>; + + delete: func( + store: handle, + key: string, + ) -> result; + + delete-wait: func( + handle: delete-handle, + ) -> result<_, error>; + + enum list-mode { + strong, + eventual, + } + + flags list-config-options { + reserved, + cursor, + limit, + prefix, + } + + record list-config { + mode: list-mode, + cursor: list, + limit: u32, + prefix: list, + } + + %list: func( + store: handle, + mask: list-config-options, + options: list-config, + ) -> result; + + list-wait: func( + handle: list-handle, + ) -> result; +} + /* * Fastly Secret Store */ @@ -1144,6 +1248,7 @@ world compute { import http-resp; import log; import kv-store; + import object-store; import purge; import secret-store; import config-store;