Skip to content

Commit

Permalink
Merge pull request #2843 from fermyon/key-value-re-factoring
Browse files Browse the repository at this point in the history
Key Value re-*factor*ing
  • Loading branch information
rylev authored Sep 19, 2024
2 parents 1e3d971 + 5db8565 commit 31aa7ec
Show file tree
Hide file tree
Showing 14 changed files with 306 additions and 369 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/factor-key-value/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ tracing = { workspace = true }

[dev-dependencies]
spin-factors-test = { path = "../factors-test" }
spin-key-value-redis = { path = "../key-value-redis" }
spin-key-value-spin = { path = "../key-value-spin" }
spin-key-value-redis = { path = "../key-value-redis" }
tempfile = "3.12.0"
tokio = { version = "1", features = ["macros", "rt"] }

Expand Down
26 changes: 9 additions & 17 deletions crates/factor-key-value/src/host.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use crate::util::EmptyStoreManager;
use anyhow::{Context, Result};
use spin_core::{async_trait, wasmtime::component::Resource};
use spin_world::v2::key_value;
Expand Down Expand Up @@ -40,23 +39,22 @@ pub struct KeyValueDispatch {
}

impl KeyValueDispatch {
pub fn new() -> Self {
Self::new_with_capacity(DEFAULT_STORE_TABLE_CAPACITY)
pub fn new(allowed_stores: HashSet<String>, manager: Arc<dyn StoreManager>) -> Self {
Self::new_with_capacity(allowed_stores, manager, DEFAULT_STORE_TABLE_CAPACITY)
}

pub fn new_with_capacity(capacity: u32) -> Self {
pub fn new_with_capacity(
allowed_stores: HashSet<String>,
manager: Arc<dyn StoreManager>,
capacity: u32,
) -> Self {
Self {
allowed_stores: HashSet::new(),
manager: Arc::new(EmptyStoreManager),
allowed_stores,
manager,
stores: Table::new(capacity),
}
}

pub fn init(&mut self, allowed_stores: HashSet<String>, manager: Arc<dyn StoreManager>) {
self.allowed_stores = allowed_stores;
self.manager = manager;
}

pub fn get_store(&self, store: Resource<key_value::Store>) -> anyhow::Result<&Arc<dyn Store>> {
self.stores.get(store.rep()).context("invalid store")
}
Expand All @@ -66,12 +64,6 @@ impl KeyValueDispatch {
}
}

impl Default for KeyValueDispatch {
fn default() -> Self {
Self::new()
}
}

#[async_trait]
impl key_value::Host for KeyValueDispatch {}

Expand Down
47 changes: 13 additions & 34 deletions crates/factor-key-value/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use spin_factors::{
ConfigureAppContext, Factor, FactorInstanceBuilder, InitContext, PrepareContext, RuntimeFactors,
};
use spin_locked_app::MetadataKey;
use util::DefaultManagerGetter;

/// Metadata key for key-value stores.
pub const KEY_VALUE_STORES_KEY: MetadataKey<Vec<String>> = MetadataKey::new("key_value_stores");
Expand All @@ -21,19 +20,15 @@ pub use runtime_config::RuntimeConfig;
pub use util::{CachingStoreManager, DelegatingStoreManager};

/// A factor that provides key-value storage.
#[derive(Default)]
pub struct KeyValueFactor {
default_label_resolver: Arc<dyn DefaultLabelResolver>,
_priv: (),
}

impl KeyValueFactor {
/// Create a new KeyValueFactor.
///
/// The `default_label_resolver` is used to resolve store managers for labels that
/// are not defined in the runtime configuration.
pub fn new(default_label_resolver: impl DefaultLabelResolver + 'static) -> Self {
Self {
default_label_resolver: Arc::new(default_label_resolver),
}
pub fn new() -> Self {
Self { _priv: () }
}
}

Expand All @@ -53,13 +48,10 @@ impl Factor for KeyValueFactor {
mut ctx: ConfigureAppContext<T, Self>,
) -> anyhow::Result<Self::AppState> {
let store_managers = ctx.take_runtime_config().unwrap_or_default();
let default_label_resolver = self.default_label_resolver.clone();
let default_fn: DefaultManagerGetter =
Arc::new(move |label| default_label_resolver.default(label));

let delegating_manager = DelegatingStoreManager::new(store_managers, default_fn);
let delegating_manager = DelegatingStoreManager::new(store_managers);
let caching_manager = CachingStoreManager::new(delegating_manager);
let store_manager_manager = Arc::new(caching_manager);
let store_manager = Arc::new(caching_manager);

// Build component -> allowed stores map
let mut component_allowed_stores = HashMap::new();
Expand All @@ -73,8 +65,7 @@ impl Factor for KeyValueFactor {
for label in &key_value_stores {
// TODO: port nicer errors from KeyValueComponent (via error type?)
ensure!(
store_manager_manager.is_defined(label)
|| self.default_label_resolver.default(label).is_some(),
store_manager.is_defined(label),
"unknown key_value_stores label {label:?} for component {component_id:?}"
);
}
Expand All @@ -83,7 +74,7 @@ impl Factor for KeyValueFactor {
}

Ok(AppState {
store_manager: store_manager_manager,
store_manager,
component_allowed_stores,
})
}
Expand Down Expand Up @@ -159,22 +150,10 @@ impl FactorInstanceBuilder for InstanceBuilder {
store_manager,
allowed_stores,
} = self;
let mut dispatch = KeyValueDispatch::new_with_capacity(u32::MAX);
dispatch.init(allowed_stores, store_manager);
Ok(dispatch)
}
}

/// Resolves a label to a default [`StoreManager`].
pub trait DefaultLabelResolver: Send + Sync {
/// If there is no runtime configuration for a given store label, return a default store manager.
///
/// If `Option::None` is returned, the store is not allowed.
fn default(&self, label: &str) -> Option<Arc<dyn StoreManager>>;
}

impl<T: DefaultLabelResolver> DefaultLabelResolver for Arc<T> {
fn default(&self, label: &str) -> Option<Arc<dyn StoreManager>> {
self.as_ref().default(label)
Ok(KeyValueDispatch::new_with_capacity(
allowed_stores,
store_manager,
u32::MAX,
))
}
}
10 changes: 10 additions & 0 deletions crates/factor-key-value/src/runtime_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,16 @@ impl RuntimeConfig {
pub fn add_store_manager(&mut self, label: String, store_manager: Arc<dyn StoreManager>) {
self.store_managers.insert(label, store_manager);
}

/// Returns whether a store manager exists for the store with the given label.
pub fn has_store_manager(&self, label: &str) -> bool {
self.store_managers.contains_key(label)
}

/// Returns the store manager for the store with the given label.
pub fn get_store_manager(&self, label: &str) -> Option<Arc<dyn StoreManager>> {
self.store_managers.get(label).cloned()
}
}

impl IntoIterator for RuntimeConfig {
Expand Down
33 changes: 21 additions & 12 deletions crates/factor-key-value/src/runtime_config/spin.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
//! Runtime configuration implementation used by Spin CLI.

use crate::StoreManager;
use crate::{DefaultLabelResolver, RuntimeConfig};
use crate::{RuntimeConfig, StoreManager};
use anyhow::Context as _;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -99,7 +98,25 @@ impl RuntimeConfigResolver {
}

/// Resolves a toml table into a runtime config.
pub fn resolve_from_toml(
///
/// The default stores are also added to the runtime config.
pub fn resolve(&self, table: Option<&impl GetTomlValue>) -> anyhow::Result<RuntimeConfig> {
let mut runtime_config = self.resolve_from_toml(table)?.unwrap_or_default();

for (&label, config) in &self.defaults {
if !runtime_config.store_managers.contains_key(label) {
let store_manager = self
.store_manager_from_config(config.clone())
.with_context(|| {
format!("could not configure key-value store with label '{label}'")
})?;
runtime_config.add_store_manager(label.to_owned(), store_manager);
}
}
Ok(runtime_config)
}

fn resolve_from_toml(
&self,
table: Option<&impl GetTomlValue>,
) -> anyhow::Result<Option<RuntimeConfig>> {
Expand All @@ -115,6 +132,7 @@ impl RuntimeConfigResolver {
})?;
runtime_config.add_store_manager(label.clone(), store_manager);
}

Ok(Some(runtime_config))
}

Expand All @@ -134,15 +152,6 @@ impl RuntimeConfigResolver {
}
}

impl DefaultLabelResolver for RuntimeConfigResolver {
fn default(&self, label: &str) -> Option<Arc<dyn StoreManager>> {
let config = self.defaults.get(label)?;
// TODO(rylev): The unwrap here is not ideal. We should return a Result instead.
// Piping that through `DefaultLabelResolver` is a bit awkward, though.
Some(self.store_manager_from_config(config.clone()).unwrap())
}
}

#[derive(Deserialize, Clone)]
pub struct StoreConfig {
#[serde(rename = "type")]
Expand Down
41 changes: 7 additions & 34 deletions crates/factor-key-value/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,39 +13,15 @@ use tokio::{
};
use tracing::Instrument;

const DEFAULT_CACHE_SIZE: usize = 256;

pub struct EmptyStoreManager;

#[async_trait]
impl StoreManager for EmptyStoreManager {
async fn get(&self, _name: &str) -> Result<Arc<dyn Store>, Error> {
Err(Error::NoSuchStore)
}

fn is_defined(&self, _store_name: &str) -> bool {
false
}
}

/// A function that takes a store label and returns the default store manager, if one exists.
pub type DefaultManagerGetter = Arc<dyn Fn(&str) -> Option<Arc<dyn StoreManager>> + Send + Sync>;

/// A [`StoreManager`] which delegates to other `StoreManager`s based on the store label.
pub struct DelegatingStoreManager {
delegates: HashMap<String, Arc<dyn StoreManager>>,
default_manager: DefaultManagerGetter,
}

impl DelegatingStoreManager {
pub fn new(
delegates: impl IntoIterator<Item = (String, Arc<dyn StoreManager>)>,
default_manager: DefaultManagerGetter,
) -> Self {
pub fn new(delegates: impl IntoIterator<Item = (String, Arc<dyn StoreManager>)>) -> Self {
let delegates = delegates.into_iter().collect();
Self {
delegates,
default_manager,
}
Self { delegates }
}
}

Expand All @@ -54,12 +30,7 @@ impl StoreManager for DelegatingStoreManager {
async fn get(&self, name: &str) -> Result<Arc<dyn Store>, Error> {
match self.delegates.get(name) {
Some(store) => store.get(name).await,
None => {
(self.default_manager)(name)
.ok_or(Error::NoSuchStore)?
.get(name)
.await
}
None => Err(Error::NoSuchStore),
}
}

Expand All @@ -71,7 +42,7 @@ impl StoreManager for DelegatingStoreManager {
if let Some(store) = self.delegates.get(store_name) {
return store.summary(store_name);
}
(self.default_manager)(store_name)?.summary(store_name)
None
}
}

Expand Down Expand Up @@ -104,6 +75,8 @@ pub struct CachingStoreManager<T> {
inner: T,
}

const DEFAULT_CACHE_SIZE: usize = 256;

impl<T> CachingStoreManager<T> {
pub fn new(inner: T) -> Self {
Self::new_with_capacity(NonZeroUsize::new(DEFAULT_CACHE_SIZE).unwrap(), inner)
Expand Down
Loading

0 comments on commit 31aa7ec

Please sign in to comment.