Skip to content

Commit

Permalink
Merge pull request #1880 from fermyon/key-value-tweaks
Browse files Browse the repository at this point in the history
Key value interface changes
  • Loading branch information
rylev authored Oct 13, 2023
2 parents 1c37552 + c828c8e commit 6c54656
Show file tree
Hide file tree
Showing 32 changed files with 542 additions and 2,347 deletions.
18 changes: 7 additions & 11 deletions crates/key-value-azure/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ struct AzureCosmosStore {

#[async_trait]
impl Store for AzureCosmosStore {
async fn get(&self, key: &str) -> Result<Vec<u8>, Error> {
async fn get(&self, key: &str) -> Result<Option<Vec<u8>>, Error> {
let pair = self.get_pair(key).await?;
Ok(pair.value)
Ok(pair.map(|p| p.value))
}

async fn set(&self, key: &str, value: &[u8]) -> Result<(), Error> {
Expand All @@ -73,11 +73,7 @@ impl Store for AzureCosmosStore {
}

async fn exists(&self, key: &str) -> Result<bool, Error> {
match self.get_pair(key).await {
Ok(_) => Ok(true),
Err(Error::NoSuchKey) => Ok(false),
Err(e) => Err(e),
}
Ok(self.get_pair(key).await?.is_some())
}

async fn get_keys(&self) -> Result<Vec<String>, Error> {
Expand All @@ -86,7 +82,7 @@ impl Store for AzureCosmosStore {
}

impl AzureCosmosStore {
async fn get_pair(&self, key: &str) -> Result<Pair, Error> {
async fn get_pair(&self, key: &str) -> Result<Option<Pair>, Error> {
let query = self
.client
.query_documents(Query::new(format!("SELECT * FROM c WHERE c.id='{}'", key)))
Expand All @@ -100,11 +96,11 @@ impl AzureCosmosStore {
Some(r) => {
let r = r.map_err(log_error)?;
match r.results.first().cloned() {
Some((p, _)) => Ok(p),
None => Err(Error::NoSuchKey),
Some((p, _)) => Ok(Some(p)),
None => Ok(None),
}
}
None => Err(Error::NoSuchKey),
None => Ok(None),
}
}

Expand Down
10 changes: 2 additions & 8 deletions crates/key-value-redis/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,9 @@ struct RedisStore {

#[async_trait]
impl Store for RedisStore {
async fn get(&self, key: &str) -> Result<Vec<u8>, Error> {
async fn get(&self, key: &str) -> Result<Option<Vec<u8>>, Error> {
let mut conn = self.connection.lock().await;
let result: Vec<u8> = conn.get(key).await.map_err(log_error)?;

if result.is_empty() {
Err(Error::NoSuchKey)
} else {
Ok(result)
}
conn.get(key).await.map_err(log_error)
}

async fn set(&self, key: &str, value: &[u8]) -> Result<(), Error> {
Expand Down
36 changes: 13 additions & 23 deletions crates/key-value-sqlite/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ struct SqliteStore {

#[async_trait]
impl Store for SqliteStore {
async fn get(&self, key: &str) -> Result<Vec<u8>, Error> {
async fn get(&self, key: &str) -> Result<Option<Vec<u8>>, Error> {
task::block_in_place(|| {
self.connection
.lock()
Expand All @@ -84,7 +84,7 @@ impl Store for SqliteStore {
.query_map([&self.name, key], |row| row.get(0))
.map_err(log_error)?
.next()
.ok_or(Error::NoSuchKey)?
.transpose()
.map_err(log_error)
})
}
Expand Down Expand Up @@ -119,11 +119,7 @@ impl Store for SqliteStore {
}

async fn exists(&self, key: &str) -> Result<bool, Error> {
match self.get(key).await {
Ok(_) => Ok(true),
Err(Error::NoSuchKey) => Ok(false),
Err(e) => Err(e),
}
Ok(self.get(key).await?.is_some())
}

async fn get_keys(&self) -> Result<Vec<String>, Error> {
Expand Down Expand Up @@ -162,11 +158,6 @@ mod test {
)])),
);

assert!(matches!(
kv.exists(Resource::new_own(42), "bar".to_owned()).await?,
Err(Error::InvalidStore)
));

assert!(matches!(
kv.open("foo".to_owned()).await?,
Err(Error::NoSuchStore)
Expand All @@ -186,7 +177,7 @@ mod test {

assert!(matches!(
kv.get(Resource::new_own(rep), "bar".to_owned()).await?,
Err(Error::NoSuchKey)
Ok(None)
));

kv.set(Resource::new_own(rep), "bar".to_owned(), b"baz".to_vec())
Expand All @@ -198,16 +189,20 @@ mod test {
);

assert_eq!(
b"baz" as &[_],
&kv.get(Resource::new_own(rep), "bar".to_owned()).await??
Some(b"baz" as &[_]),
kv.get(Resource::new_own(rep), "bar".to_owned())
.await??
.as_deref()
);

kv.set(Resource::new_own(rep), "bar".to_owned(), b"wow".to_vec())
.await??;

assert_eq!(
b"wow" as &[_],
&kv.get(Resource::new_own(rep), "bar".to_owned()).await??
Some(b"wow" as &[_]),
kv.get(Resource::new_own(rep), "bar".to_owned())
.await??
.as_deref()
);

assert_eq!(
Expand All @@ -230,16 +225,11 @@ mod test {

assert!(matches!(
kv.get(Resource::new_own(rep), "bar".to_owned()).await?,
Err(Error::NoSuchKey)
Ok(None)
));

kv.drop(Resource::new_own(rep))?;

assert!(matches!(
kv.exists(Resource::new_own(rep), "bar".to_owned()).await?,
Err(Error::InvalidStore)
));

Ok(())
}
}
70 changes: 22 additions & 48 deletions crates/key-value/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use anyhow::Result;
use anyhow::{Context, Result};
use spin_app::MetadataKey;
use spin_core::{async_trait, wasmtime::component::Resource};
use spin_world::v2::key_value;
Expand All @@ -25,7 +25,7 @@ pub trait StoreManager: Sync + Send {

#[async_trait]
pub trait Store: Sync + Send {
async fn get(&self, key: &str) -> Result<Vec<u8>, Error>;
async fn get(&self, key: &str) -> Result<Option<Vec<u8>>, Error>;
async fn set(&self, key: &str, value: &[u8]) -> Result<(), Error>;
async fn delete(&self, key: &str) -> Result<(), Error>;
async fn exists(&self, key: &str) -> Result<bool, Error>;
Expand Down Expand Up @@ -55,6 +55,10 @@ impl KeyValueDispatch {
self.allowed_stores = allowed_stores;
self.manager = manager;
}

pub fn get_store(&self, store: Resource<key_value::Store>) -> anyhow::Result<&Arc<dyn Store>> {
self.stores.get(store.rep()).context("invalid store")
}
}

impl Default for KeyValueDispatch {
Expand Down Expand Up @@ -87,15 +91,9 @@ impl key_value::HostStore for KeyValueDispatch {
&mut self,
store: Resource<key_value::Store>,
key: String,
) -> Result<Result<Vec<u8>, Error>> {
Ok(async {
self.stores
.get(store.rep())
.ok_or(Error::InvalidStore)?
.get(&key)
.await
}
.await)
) -> Result<Result<Option<Vec<u8>>, Error>> {
let store = self.get_store(store)?;
Ok(store.get(&key).await)
}

async fn set(
Expand All @@ -104,58 +102,34 @@ impl key_value::HostStore for KeyValueDispatch {
key: String,
value: Vec<u8>,
) -> Result<Result<(), Error>> {
Ok(async {
self.stores
.get(store.rep())
.ok_or(Error::InvalidStore)?
.set(&key, &value)
.await
}
.await)
let store = self.get_store(store)?;
Ok(store.set(&key, &value).await)
}

async fn delete(
&mut self,
store: Resource<key_value::Store>,
key: String,
) -> Result<Result<(), Error>> {
Ok(async {
self.stores
.get(store.rep())
.ok_or(Error::InvalidStore)?
.delete(&key)
.await
}
.await)
let store = self.get_store(store)?;
Ok(store.delete(&key).await)
}

async fn exists(
&mut self,
store: Resource<key_value::Store>,
key: String,
) -> Result<Result<bool, Error>> {
Ok(async {
self.stores
.get(store.rep())
.ok_or(Error::InvalidStore)?
.exists(&key)
.await
}
.await)
let store = self.get_store(store)?;
Ok(store.exists(&key).await)
}

async fn get_keys(
&mut self,
store: Resource<key_value::Store>,
) -> Result<Result<Vec<String>, Error>> {
Ok(async {
self.stores
.get(store.rep())
.ok_or(Error::InvalidStore)?
.get_keys()
.await
}
.await)
let store = self.get_store(store)?;
Ok(store.get_keys().await)
}

fn drop(&mut self, store: Resource<key_value::Store>) -> Result<()> {
Expand All @@ -166,7 +140,7 @@ impl key_value::HostStore for KeyValueDispatch {

pub fn log_error(err: impl std::fmt::Debug) -> Error {
tracing::warn!("key-value error: {err:?}");
Error::Io(format!("{err:?}"))
Error::Other(format!("{err:?}"))
}

use spin_world::v1::key_value::Error as LegacyError;
Expand All @@ -176,9 +150,7 @@ fn to_legacy_error(value: key_value::Error) -> LegacyError {
Error::StoreTableFull => LegacyError::StoreTableFull,
Error::NoSuchStore => LegacyError::NoSuchStore,
Error::AccessDenied => LegacyError::AccessDenied,
Error::InvalidStore => LegacyError::InvalidStore,
Error::NoSuchKey => LegacyError::NoSuchKey,
Error::Io(s) => LegacyError::Io(s),
Error::Other(s) => LegacyError::Io(s),
}
}

Expand All @@ -192,7 +164,9 @@ impl spin_world::v1::key_value::Host for KeyValueDispatch {
async fn get(&mut self, store: u32, key: String) -> Result<Result<Vec<u8>, LegacyError>> {
let this = Resource::new_borrow(store);
let result = <Self as key_value::HostStore>::get(self, this, key).await?;
Ok(result.map_err(to_legacy_error))
Ok(result
.map_err(to_legacy_error)
.and_then(|v| v.ok_or(LegacyError::NoSuchKey)))
}

async fn set(
Expand Down
41 changes: 16 additions & 25 deletions crates/key-value/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ impl CachingStoreState {
if let Some(previous_task) = previous_task {
previous_task
.await
.map_err(|e| Error::Io(format!("{e:?}")))??
.map_err(|e| Error::Other(format!("{e:?}")))??
}

task.await
Expand All @@ -134,7 +134,7 @@ impl CachingStoreState {
if let Some(previous_task) = self.previous_task.take() {
previous_task
.await
.map_err(|e| Error::Io(format!("{e:?}")))??
.map_err(|e| Error::Other(format!("{e:?}")))??
}

Ok(())
Expand All @@ -148,30 +148,25 @@ struct CachingStore {

#[async_trait]
impl Store for CachingStore {
async fn get(&self, key: &str) -> Result<Vec<u8>, Error> {
async fn get(&self, key: &str) -> Result<Option<Vec<u8>>, Error> {
// Retrieve the specified value from the cache, lazily populating the cache as necessary.

let mut state = self.state.lock().await;

if let Some(value) = state.cache.get(key).cloned() {
value
} else {
// Flush any outstanding writes prior to reading from store. This is necessary because we need to
// guarantee the guest will read its own writes even if entries have been popped off the end of the LRU
// cache prior to their corresponding writes reaching the backing store.
state.flush().await?;

let value = match self.inner.get(key).await {
Ok(value) => Some(value),
Err(Error::NoSuchKey) => None,
e => return e,
};

state.cache.put(key.to_owned(), value.clone());

value
return Ok(value);
}
.ok_or(Error::NoSuchKey)

// Flush any outstanding writes prior to reading from store. This is necessary because we need to
// guarantee the guest will read its own writes even if entries have been popped off the end of the LRU
// cache prior to their corresponding writes reaching the backing store.
state.flush().await?;

let value = self.inner.get(key).await?;

state.cache.put(key.to_owned(), value.clone());

Ok(value)
}

async fn set(&self, key: &str, value: &[u8]) -> Result<(), Error> {
Expand Down Expand Up @@ -204,11 +199,7 @@ impl Store for CachingStore {
}

async fn exists(&self, key: &str) -> Result<bool, Error> {
match self.get(key).await {
Ok(_) => Ok(true),
Err(Error::NoSuchKey) => Ok(false),
Err(e) => Err(e),
}
Ok(self.get(key).await?.is_some())
}

async fn get_keys(&self) -> Result<Vec<String>, Error> {
Expand Down
Loading

0 comments on commit 6c54656

Please sign in to comment.