Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add delete_async hostcall for kv #332

Merged
merged 1 commit into from
Dec 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions lib/compute-at-edge-abi/compute-at-edge.witx
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,18 @@
(param $pending_objstr_handle $pending_kv_insert_handle)
(result $err (expected (error $fastly_status)))
)

(@interface func (export "delete_async")
(param $store $object_store_handle)
(param $key string)
(param $pending_handle_out (@witx pointer $pending_kv_delete_handle))
(result $err (expected (error $fastly_status)))
)

(@interface func (export "pending_delete_wait")
(param $pending_handle $pending_kv_delete_handle)
(result $err (expected (error $fastly_status)))
)
)

(module $fastly_secret_store
Expand Down
2 changes: 2 additions & 0 deletions lib/compute-at-edge-abi/typenames.witx
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@
(typename $pending_kv_lookup_handle (handle))
;;; A handle to a pending KV insert request.
(typename $pending_kv_insert_handle (handle))
;;; A handle to a pending KV delete request.
(typename $pending_kv_delete_handle (handle))
;;; A handle to a Secret Store.
(typename $secret_store_handle (handle))
;;; A handle to an individual secret.
Expand Down
4 changes: 4 additions & 0 deletions lib/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,10 @@ pub enum HandleError {
#[error("Invalid pending KV insert handle: {0}")]
InvalidPendingKvInsertHandle(crate::wiggle_abi::types::PendingKvInsertHandle),

/// A delete handle was not valid.
#[error("Invalid pending KV delete handle: {0}")]
InvalidPendingKvDeleteHandle(crate::wiggle_abi::types::PendingKvDeleteHandle),

/// A dictionary handle was not valid.
#[error("Invalid dictionary handle: {0}")]
InvalidDictionaryHandle(crate::wiggle_abi::types::DictionaryHandle),
Expand Down
16 changes: 16 additions & 0 deletions lib/src/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,22 @@ impl ObjectStores {

Ok(())
}

pub fn delete(
&self,
obj_store_key: ObjectStoreKey,
obj_key: ObjectKey,
) -> Result<(), ObjectStoreError> {
self.stores
.write()
.map_err(|_| ObjectStoreError::PoisonedLock)?
.entry(obj_store_key)
.and_modify(|store| {
store.remove(&obj_key);
});

Ok(())
}
}

#[derive(Debug, PartialOrd, Ord, PartialEq, Eq, Clone, Default)]
Expand Down
74 changes: 71 additions & 3 deletions lib/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
mod async_item;
mod downstream;

pub use async_item::{AsyncItem, PeekableTask, PendingKvInsertTask, PendingKvLookupTask};
pub use async_item::{
AsyncItem, PeekableTask, PendingKvDeleteTask, PendingKvInsertTask, PendingKvLookupTask,
};

use {
self::downstream::DownstreamResponse,
Expand All @@ -21,8 +23,8 @@ use {
upstream::{SelectTarget, TlsConfig},
wiggle_abi::types::{
self, BodyHandle, ContentEncodings, DictionaryHandle, EndpointHandle,
ObjectStoreHandle, PendingKvInsertHandle, PendingKvLookupHandle, PendingRequestHandle,
RequestHandle, ResponseHandle, SecretHandle, SecretStoreHandle,
ObjectStoreHandle, PendingKvDeleteHandle, PendingKvInsertHandle, PendingKvLookupHandle,
PendingRequestHandle, RequestHandle, ResponseHandle, SecretHandle, SecretStoreHandle,
},
},
cranelift_entity::{entity_impl, PrimaryMap},
Expand Down Expand Up @@ -695,6 +697,60 @@ impl Session {
.ok_or(HandleError::InvalidPendingKvInsertHandle(handle))
}

pub fn obj_delete(
&self,
obj_store_key: ObjectStoreKey,
obj_key: ObjectKey,
) -> Result<(), ObjectStoreError> {
self.object_store.delete(obj_store_key, obj_key)
}

/// Insert a [`PendingKvDelete`] into the session.
///
/// This method returns a new [`PendingKvDeleteHandle`], which can then be used to access
/// and mutate the pending delete.
pub fn insert_pending_kv_delete(
&mut self,
pending: PendingKvDeleteTask,
) -> PendingKvDeleteHandle {
self.async_items
.push(Some(AsyncItem::PendingKvDelete(pending)))
.into()
}

/// Take ownership of a [`PendingKvDelete`], given its [`PendingKvDeleteHandle`].
///
/// Returns a [`HandleError`] if the handle is not associated with a pending delete in the
/// session.
pub fn take_pending_kv_delete(
&mut self,
handle: PendingKvDeleteHandle,
) -> Result<PendingKvDeleteTask, HandleError> {
// check that this is a pending request before removing it
let _ = self.pending_kv_delete(handle)?;

self.async_items
.get_mut(handle.into())
.and_then(Option::take)
.and_then(AsyncItem::into_pending_kv_delete)
.ok_or(HandleError::InvalidPendingKvDeleteHandle(handle))
}

/// Get a reference to a [`PendingDelete`], given its [`PendingKvDeleteHandle`].
///
/// Returns a [`HandleError`] if the handle is not associated with a delete in the
/// session.
pub fn pending_kv_delete(
&self,
handle: PendingKvDeleteHandle,
) -> Result<&PendingKvDeleteTask, HandleError> {
self.async_items
.get(handle.into())
.and_then(Option::as_ref)
.and_then(AsyncItem::as_pending_kv_delete)
.ok_or(HandleError::InvalidPendingKvDeleteHandle(handle))
}

pub fn obj_lookup(
&self,
obj_store_key: &ObjectStoreKey,
Expand Down Expand Up @@ -1051,3 +1107,15 @@ impl From<AsyncItemHandle> for PendingKvInsertHandle {
PendingKvInsertHandle::from(h.as_u32())
}
}

impl From<PendingKvDeleteHandle> for AsyncItemHandle {
fn from(h: PendingKvDeleteHandle) -> AsyncItemHandle {
AsyncItemHandle::from_u32(h.into())
}
}

impl From<AsyncItemHandle> for PendingKvDeleteHandle {
fn from(h: AsyncItemHandle) -> PendingKvDeleteHandle {
PendingKvDeleteHandle::from(h.as_u32())
}
}
60 changes: 56 additions & 4 deletions lib/src/session/async_item.rs
Copy link
Contributor Author

@computermouth computermouth Nov 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not 100% sure about this change. I needed to differentiate the

impl From<PendingKvInsertTask> for AsyncItem

and

impl From<PendingKvDeleteTask> for AsyncItem

... traits, which couldn't be done with the type alias, so I went for the struct newtypes. Hopefully the new/task isn't too silly. It was either that or sprinkle some pub everywhere, figured this was more idiomatic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, this is exactly the right thing to do. Not the prettiest, but things can get a bit messy when you go far enough with the type system :)

Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,38 @@ use futures::FutureExt;
use http::Response;
use tokio::sync::oneshot;

pub type PendingKvLookupTask = PeekableTask<Result<Vec<u8>, ObjectStoreError>>;
pub type PendingKvInsertTask = PeekableTask<Result<(), ObjectStoreError>>;
#[derive(Debug)]
pub struct PendingKvLookupTask(PeekableTask<Result<Vec<u8>, ObjectStoreError>>);
impl PendingKvLookupTask {
pub fn new(t: PeekableTask<Result<Vec<u8>, ObjectStoreError>>) -> PendingKvLookupTask {
PendingKvLookupTask(t)
}
pub fn task(self) -> PeekableTask<Result<Vec<u8>, ObjectStoreError>> {
self.0
}
}

#[derive(Debug)]
pub struct PendingKvInsertTask(PeekableTask<Result<(), ObjectStoreError>>);
impl PendingKvInsertTask {
pub fn new(t: PeekableTask<Result<(), ObjectStoreError>>) -> PendingKvInsertTask {
PendingKvInsertTask(t)
}
pub fn task(self) -> PeekableTask<Result<(), ObjectStoreError>> {
self.0
}
}

#[derive(Debug)]
pub struct PendingKvDeleteTask(PeekableTask<Result<(), ObjectStoreError>>);
impl PendingKvDeleteTask {
pub fn new(t: PeekableTask<Result<(), ObjectStoreError>>) -> PendingKvDeleteTask {
PendingKvDeleteTask(t)
}
pub fn task(self) -> PeekableTask<Result<(), ObjectStoreError>> {
self.0
}
}

/// Represents either a full body, or the write end of a streaming body.
///
Expand All @@ -20,6 +50,7 @@ pub enum AsyncItem {
PendingReq(PeekableTask<Response<Body>>),
PendingKvLookup(PendingKvLookupTask),
PendingKvInsert(PendingKvInsertTask),
PendingKvDelete(PendingKvDeleteTask),
}

impl AsyncItem {
Expand Down Expand Up @@ -104,6 +135,20 @@ impl AsyncItem {
}
}

pub fn as_pending_kv_delete(&self) -> Option<&PendingKvDeleteTask> {
match self {
Self::PendingKvDelete(req) => Some(req),
_ => None,
}
}

pub fn into_pending_kv_delete(self) -> Option<PendingKvDeleteTask> {
match self {
Self::PendingKvDelete(req) => Some(req),
_ => None,
}
}

pub fn as_pending_req(&self) -> Option<&PeekableTask<Response<Body>>> {
match self {
Self::PendingReq(req) => Some(req),
Expand All @@ -130,8 +175,9 @@ impl AsyncItem {
Self::StreamingBody(body) => body.await_ready().await,
Self::Body(body) => body.await_ready().await,
Self::PendingReq(req) => req.await_ready().await,
Self::PendingKvLookup(obj) => obj.await_ready().await,
Self::PendingKvInsert(obj) => obj.await_ready().await,
Self::PendingKvLookup(req) => req.0.await_ready().await,
Self::PendingKvInsert(req) => req.0.await_ready().await,
Self::PendingKvDelete(req) => req.0.await_ready().await,
}
}

Expand All @@ -158,6 +204,12 @@ impl From<PendingKvInsertTask> for AsyncItem {
}
}

impl From<PendingKvDeleteTask> for AsyncItem {
fn from(task: PendingKvDeleteTask) -> Self {
Self::PendingKvDelete(task)
}
}

#[derive(Debug)]
pub enum PeekableTask<T> {
Waiting(oneshot::Receiver<Result<T, Error>>),
Expand Down
2 changes: 1 addition & 1 deletion lib/src/wiggle_abi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ wiggle::from_witx!({
errors: { fastly_status => Error },
async: {
fastly_async_io::{select},
fastly_object_store::{insert, insert_async, pending_insert_wait, lookup_async, pending_lookup_wait},
fastly_object_store::{delete_async, pending_delete_wait, insert, insert_async, pending_insert_wait, lookup_async, pending_lookup_wait},
fastly_http_body::{append, read, write},
fastly_http_req::{
pending_req_select, pending_req_select_v2, pending_req_poll, pending_req_poll_v2,
Expand Down
37 changes: 34 additions & 3 deletions lib/src/wiggle_abi/obj_store_impl.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
//! fastly_obj_store` hostcall implementations.

use super::types::{PendingKvInsertHandle, PendingKvLookupHandle};
use super::types::{PendingKvDeleteHandle, PendingKvInsertHandle, PendingKvLookupHandle};
use crate::session::PeekableTask;
use crate::session::{PendingKvDeleteTask, PendingKvInsertTask, PendingKvLookupTask};

use {
crate::{
Expand Down Expand Up @@ -63,7 +64,8 @@ impl FastlyObjectStore for Session {
// just create a future that's already ready
let fut = futures::future::ok(self.obj_lookup(store, &key));
let task = PeekableTask::spawn(fut).await;
opt_pending_body_handle_out.write(self.insert_pending_kv_lookup(task))?;
opt_pending_body_handle_out
.write(self.insert_pending_kv_lookup(PendingKvLookupTask::new(task)))?;
Ok(())
}

Expand All @@ -74,6 +76,7 @@ impl FastlyObjectStore for Session {
) -> Result<(), Error> {
let pending_obj = self
.take_pending_kv_lookup(pending_body_handle)?
.task()
.recv()
.await?;
// proceed with the normal match from lookup()
Expand Down Expand Up @@ -114,7 +117,8 @@ impl FastlyObjectStore for Session {
let bytes = self.take_body(body_handle)?.read_into_vec().await?;
let fut = futures::future::ok(self.obj_insert(store, key, bytes));
let task = PeekableTask::spawn(fut).await;
opt_pending_body_handle_out.write(self.insert_pending_kv_insert(task))?;
opt_pending_body_handle_out
.write(self.insert_pending_kv_insert(PendingKvInsertTask::new(task)))?;
Ok(())
}

Expand All @@ -124,6 +128,33 @@ impl FastlyObjectStore for Session {
) -> Result<(), Error> {
Ok((self
.take_pending_kv_insert(pending_insert_handle)?
.task()
.recv()
.await?)?)
}

async fn delete_async<'a>(
&mut self,
store: ObjectStoreHandle,
key: &GuestPtr<str>,
opt_pending_delete_handle_out: &GuestPtr<PendingKvDeleteHandle>,
) -> Result<(), Error> {
let store = self.get_obj_store_key(store).unwrap().clone();
let key = ObjectKey::new(&*key.as_str()?.ok_or(Error::SharedMemory)?)?;
let fut = futures::future::ok(self.obj_delete(store, key));
let task = PeekableTask::spawn(fut).await;
opt_pending_delete_handle_out
.write(self.insert_pending_kv_delete(PendingKvDeleteTask::new(task)))?;
Ok(())
}

async fn pending_delete_wait(
&mut self,
pending_delete_handle: PendingKvDeleteHandle,
) -> Result<(), Error> {
Ok((self
.take_pending_kv_delete(pending_delete_handle)?
.task()
.recv()
.await?)?)
}
Expand Down