diff --git a/Cargo.lock b/Cargo.lock index 0e9cf73e0..5a438c0eb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -330,6 +330,12 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "async-once-cell" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4288f83726785267c6f2ef073a3d83dc3f9b81464e9f99898240cced85fce35a" + [[package]] name = "async-priority-channel" version = "0.1.0" @@ -513,6 +519,329 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c4b4d0bd25bd0b74681c0ad21497610ce1b7c91b1022cd21c80c6fbdd9476b0" +[[package]] +name = "aws-config" +version = "1.5.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7198e6f03240fdceba36656d8be440297b6b82270325908c7381f37d826a74f6" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-sdk-sso", + "aws-sdk-ssooidc", + "aws-sdk-sts", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand 2.1.1", + "hex", + "http 0.2.12", + "ring", + "time", + "tokio", + "tracing", + "url", + "zeroize", +] + +[[package]] +name = "aws-credential-types" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60e8f6b615cb5fc60a98132268508ad104310f0cfb25a1c22eee76efdf9154da" +dependencies = [ + "aws-smithy-async", + "aws-smithy-runtime-api", + "aws-smithy-types", + "zeroize", +] + +[[package]] +name = "aws-runtime" +version = "1.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a10d5c055aa540164d9561a0e2e74ad30f0dcf7393c3a92f6733ddf9c5762468" +dependencies = [ + "aws-credential-types", + "aws-sigv4", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand 2.1.1", + "http 0.2.12", + "http-body 0.4.6", + "once_cell", + "percent-encoding", + "pin-project-lite", + "tracing", + "uuid", +] + +[[package]] +name = "aws-sdk-dynamodb" +version = "1.49.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab0ade000608877169533a54326badd6b5a707d2faf876cfc3976a7f9d7e5329" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand 2.1.1", + "http 0.2.12", + "once_cell", + "regex-lite", + "tracing", +] + +[[package]] +name = "aws-sdk-sso" +version = "1.45.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e33ae899566f3d395cbf42858e433930682cc9c1889fa89318896082fef45efb" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "http 0.2.12", + "once_cell", + "regex-lite", + "tracing", +] + +[[package]] +name = "aws-sdk-ssooidc" +version = "1.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f39c09e199ebd96b9f860b0fce4b6625f211e064ad7c8693b72ecf7ef03881e0" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "http 0.2.12", + "once_cell", + "regex-lite", + "tracing", +] + +[[package]] +name = "aws-sdk-sts" +version = "1.45.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d95f93a98130389eb6233b9d615249e543f6c24a68ca1f109af9ca5164a8765" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-query", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-smithy-xml", + "aws-types", + "http 0.2.12", + "once_cell", + "regex-lite", + "tracing", +] + +[[package]] +name = "aws-sigv4" +version = "1.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc8db6904450bafe7473c6ca9123f88cc11089e41a025408f992db4e22d3be68" +dependencies = [ + "aws-credential-types", + "aws-smithy-http", + "aws-smithy-runtime-api", + "aws-smithy-types", + "bytes", + "form_urlencoded", + "hex", + "hmac", + "http 0.2.12", + "http 1.1.0", + "once_cell", + "percent-encoding", + "sha2", + "time", + "tracing", +] + +[[package]] +name = "aws-smithy-async" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62220bc6e97f946ddd51b5f1361f78996e704677afc518a4ff66b7a72ea1378c" +dependencies = [ + "futures-util", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "aws-smithy-http" +version = "0.60.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c8bc3e8fdc6b8d07d976e301c02fe553f72a39b7a9fea820e023268467d7ab6" +dependencies = [ + "aws-smithy-runtime-api", + "aws-smithy-types", + "bytes", + "bytes-utils", + "futures-core", + "http 0.2.12", + "http-body 0.4.6", + "once_cell", + "percent-encoding", + "pin-project-lite", + "pin-utils", + "tracing", +] + +[[package]] +name = "aws-smithy-json" +version = "0.60.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4683df9469ef09468dad3473d129960119a0d3593617542b7d52086c8486f2d6" +dependencies = [ + "aws-smithy-types", +] + +[[package]] +name = "aws-smithy-query" +version = "0.60.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2fbd61ceb3fe8a1cb7352e42689cec5335833cd9f94103a61e98f9bb61c64bb" +dependencies = [ + "aws-smithy-types", + "urlencoding", +] + +[[package]] +name = "aws-smithy-runtime" +version = "1.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1ce695746394772e7000b39fe073095db6d45a862d0767dd5ad0ac0d7f8eb87" +dependencies = [ + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-runtime-api", + "aws-smithy-types", + "bytes", + "fastrand 2.1.1", + "h2 0.3.26", + "http 0.2.12", + "http-body 0.4.6", + "http-body 1.0.1", + "httparse", + "hyper 0.14.30", + "hyper-rustls 0.24.2", + "once_cell", + "pin-project-lite", + "pin-utils", + "rustls 0.21.12", + "tokio", + "tracing", +] + +[[package]] +name = "aws-smithy-runtime-api" +version = "1.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e086682a53d3aa241192aa110fa8dfce98f2f5ac2ead0de84d41582c7e8fdb96" +dependencies = [ + "aws-smithy-async", + "aws-smithy-types", + "bytes", + "http 0.2.12", + "http 1.1.0", + "pin-project-lite", + "tokio", + "tracing", + "zeroize", +] + +[[package]] +name = "aws-smithy-types" +version = "1.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "147100a7bea70fa20ef224a6bad700358305f5dc0f84649c53769761395b355b" +dependencies = [ + "base64-simd", + "bytes", + "bytes-utils", + "futures-core", + "http 0.2.12", + "http 1.1.0", + "http-body 0.4.6", + "http-body 1.0.1", + "http-body-util", + "itoa", + "num-integer", + "pin-project-lite", + "pin-utils", + "ryu", + "serde 1.0.210", + "time", + "tokio", + "tokio-util", +] + +[[package]] +name = "aws-smithy-xml" +version = "0.60.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab0b0166827aa700d3dc519f72f8b3a91c35d0b8d042dc5d643a91e6f80648fc" +dependencies = [ + "xmlparser", +] + +[[package]] +name = "aws-types" +version = "1.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5221b91b3e441e6675310829fd8984801b772cb1546ef6c0e54dec9f1ac13fef" +dependencies = [ + "aws-credential-types", + "aws-smithy-async", + "aws-smithy-runtime-api", + "aws-smithy-types", + "rustc_version", + "tracing", +] + [[package]] name = "axum" version = "0.7.6" @@ -678,6 +1007,16 @@ version = "0.22.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" +[[package]] +name = "base64-simd" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "339abbe78e73178762e23bea9dfd08e697eb3f3301cd4be981c0f78ba5859195" +dependencies = [ + "outref", + "vsimd", +] + [[package]] name = "base64ct" version = "1.6.0" @@ -862,6 +1201,16 @@ dependencies = [ "serde 1.0.210", ] +[[package]] +name = "bytes-utils" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7dafe3a8757b027e2be6e4e5601ed563c55989fcf1546e933c66c8eb3a058d35" +dependencies = [ + "bytes", + "either", +] + [[package]] name = "camino" version = "1.1.9" @@ -3347,7 +3696,9 @@ dependencies = [ "futures-util", "http 0.2.12", "hyper 0.14.30", + "log", "rustls 0.21.12", + "rustls-native-certs 0.6.3", "tokio", "tokio-rustls 0.24.1", ] @@ -3363,7 +3714,7 @@ dependencies = [ "hyper 0.14.30", "log", "rustls 0.22.4", - "rustls-native-certs", + "rustls-native-certs 0.7.3", "rustls-pki-types", "tokio", "tokio-rustls 0.25.0", @@ -5022,6 +5373,12 @@ version = "6.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2355d85b9a3786f481747ced0e0ff2ba35213a1f9bd406ed906554d7af805a1" +[[package]] +name = "outref" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4030760ffd992bef45b0ae3f10ce1aba99e33464c90d14dd7c039884963ddc7a" + [[package]] name = "overload" version = "0.1.1" @@ -5922,6 +6279,12 @@ dependencies = [ "regex-syntax 0.8.4", ] +[[package]] +name = "regex-lite" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53a49587ad06b26609c52e423de037e7f57f20d53535d66e08c695f347df952a" + [[package]] name = "regex-syntax" version = "0.6.29" @@ -6100,7 +6463,7 @@ dependencies = [ "flume", "futures-util", "log", - "rustls-native-certs", + "rustls-native-certs 0.7.3", "rustls-pemfile 2.1.3", "rustls-webpki 0.102.8", "thiserror", @@ -6272,6 +6635,18 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rustls-native-certs" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9aace74cb666635c918e9c12bc0d348266037aa8eb599b5cba565709a8dff00" +dependencies = [ + "openssl-probe", + "rustls-pemfile 1.0.4", + "schannel", + "security-framework", +] + [[package]] name = "rustls-native-certs" version = "0.7.3" @@ -7351,6 +7726,20 @@ dependencies = [ "wasmtime-wasi-http", ] +[[package]] +name = "spin-key-value-aws" +version = "2.8.0-pre0" +dependencies = [ + "anyhow", + "async-once-cell", + "aws-config", + "aws-credential-types", + "aws-sdk-dynamodb", + "serde 1.0.210", + "spin-core", + "spin-factor-key-value", +] + [[package]] name = "spin-key-value-azure" version = "2.8.0-pre0" @@ -7558,6 +7947,7 @@ dependencies = [ "spin-factor-wasi", "spin-factors", "spin-factors-test", + "spin-key-value-aws", "spin-key-value-azure", "spin-key-value-redis", "spin-key-value-spin", @@ -8898,6 +9288,12 @@ version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" +[[package]] +name = "vsimd" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c3082ca00d5a5ef149bb8b555a72ae84c9c59f7250f013ac822ac2e49b19c64" + [[package]] name = "wac-graph" version = "0.6.0" @@ -10417,6 +10813,12 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "xmlparser" +version = "0.13.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66fee0b777b0f5ac1c69bb06d361268faafa61cd4682ae064a171c16c433e9e4" + [[package]] name = "yaml-rust" version = "0.4.5" diff --git a/crates/key-value-aws/Cargo.toml b/crates/key-value-aws/Cargo.toml new file mode 100644 index 000000000..26f636cbc --- /dev/null +++ b/crates/key-value-aws/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "spin-key-value-aws" +version.workspace = true +authors.workspace = true +edition.workspace = true +license.workspace = true +homepage.workspace = true +repository.workspace = true +rust-version.workspace = true + +[dependencies] +anyhow = { workspace = true } +async-once-cell = "0.5.4" +aws-config = "1.1.7" +aws-credential-types = "1.1.7" +aws-sdk-dynamodb = "1.49.0" +serde = { workspace = true } +spin-core = { path = "../core" } +spin-factor-key-value = { path = "../factor-key-value" } + +[lints] +workspace = true diff --git a/crates/key-value-aws/src/lib.rs b/crates/key-value-aws/src/lib.rs new file mode 100644 index 000000000..a0305be66 --- /dev/null +++ b/crates/key-value-aws/src/lib.rs @@ -0,0 +1,65 @@ +mod store; + +use serde::Deserialize; +use spin_factor_key_value::runtime_config::spin::MakeKeyValueStore; +use store::{ + KeyValueAwsDynamo, KeyValueAwsDynamoAuthOptions, KeyValueAwsDynamoRuntimeConfigOptions, +}; + +/// A key-value store that uses AWS Dynamo as the backend. +#[derive(Default)] +pub struct AwsDynamoKeyValueStore { + _priv: (), +} + +impl AwsDynamoKeyValueStore { + /// Creates a new `AwsKeyValueStore`. + pub fn new() -> Self { + Self::default() + } +} + +/// Runtime configuration for the AWS Dynamo key-value store. +#[derive(Deserialize)] +pub struct AwsDynamoKeyValueRuntimeConfig { + /// The access key for the AWS Dynamo DB account role. + access_key: Option, + /// The secret key for authorization on the AWS Dynamo DB account. + secret_key: Option, + /// The token for authorization on the AWS Dynamo DB account. + token: Option, + /// The AWS region where the database is located + region: String, + /// The AWS Dynamo DB table. + table: String, +} + +impl MakeKeyValueStore for AwsDynamoKeyValueStore { + const RUNTIME_CONFIG_TYPE: &'static str = "aws_dynamo"; + + type RuntimeConfig = AwsDynamoKeyValueRuntimeConfig; + + type StoreManager = KeyValueAwsDynamo; + + fn make_store( + &self, + runtime_config: Self::RuntimeConfig, + ) -> anyhow::Result { + let AwsDynamoKeyValueRuntimeConfig { + access_key, + secret_key, + token, + region, + table, + } = runtime_config; + let auth_options = match (access_key, secret_key) { + (Some(access_key), Some(secret_key)) => { + KeyValueAwsDynamoAuthOptions::RuntimeConfigValues( + KeyValueAwsDynamoRuntimeConfigOptions::new(access_key, secret_key, token), + ) + } + _ => KeyValueAwsDynamoAuthOptions::Environmental, + }; + KeyValueAwsDynamo::new(region, table, auth_options) + } +} diff --git a/crates/key-value-aws/src/store.rs b/crates/key-value-aws/src/store.rs new file mode 100644 index 000000000..4171eb0cf --- /dev/null +++ b/crates/key-value-aws/src/store.rs @@ -0,0 +1,491 @@ +use std::{ + collections::HashMap, + sync::{Arc, Mutex}, +}; + +use anyhow::Result; +use aws_config::{BehaviorVersion, Region, SdkConfig}; +use aws_credential_types::Credentials; +use aws_sdk_dynamodb::{ + config::{ProvideCredentials, SharedCredentialsProvider}, + operation::{batch_get_item::BatchGetItemOutput, get_item::GetItemOutput}, + primitives::Blob, + types::{AttributeValue, DeleteRequest, KeysAndAttributes, PutRequest, WriteRequest}, + Client, +}; +use spin_core::async_trait; +use spin_factor_key_value::{log_error, Cas, Error, Store, StoreManager, SwapError}; + +pub struct KeyValueAwsDynamo { + table: Arc, + region: Arc, + client: async_once_cell::Lazy< + Client, + std::pin::Pin + Send>>, + >, +} + +/// AWS Dynamo Key / Value runtime config literal options for authentication +#[derive(Clone, Debug)] +pub struct KeyValueAwsDynamoRuntimeConfigOptions { + access_key: String, + secret_key: String, + token: Option, +} + +impl KeyValueAwsDynamoRuntimeConfigOptions { + pub fn new(access_key: String, secret_key: String, token: Option) -> Self { + Self { + access_key, + secret_key, + token, + } + } +} + +impl ProvideCredentials for KeyValueAwsDynamoRuntimeConfigOptions { + fn provide_credentials<'a>( + &'a self, + ) -> aws_credential_types::provider::future::ProvideCredentials<'a> + where + Self: 'a, + { + aws_credential_types::provider::future::ProvideCredentials::ready(Ok(Credentials::new( + self.access_key.clone(), + self.secret_key.clone(), + self.token.clone(), + None, // Optional expiration time + "spin_custom_aws_provider", + ))) + } +} + +/// AWS Dynamo Key / Value enumeration for the possible authentication options +#[derive(Clone, Debug)] +pub enum KeyValueAwsDynamoAuthOptions { + /// Runtime Config values indicates credentials have been specified directly + RuntimeConfigValues(KeyValueAwsDynamoRuntimeConfigOptions), + /// Environmental indicates that the environment variables of the process should be used to + /// create the SDK Config for the Dynamo client. This will use the AWS Rust SDK's + /// aws_config::load_defaults to derive credentials based on what environment variables + /// have been set. + /// + /// See https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-authentication.html for options. + Environmental, +} + +impl KeyValueAwsDynamo { + pub fn new( + region: String, + table: String, + auth_options: KeyValueAwsDynamoAuthOptions, + ) -> Result { + let region_clone = region.clone(); + let client_fut = Box::pin(async move { + let sdk_config = match auth_options { + KeyValueAwsDynamoAuthOptions::RuntimeConfigValues(config) => SdkConfig::builder() + .credentials_provider(SharedCredentialsProvider::new(config)) + .region(Region::new(region_clone)) + .behavior_version(BehaviorVersion::latest()) + .build(), + KeyValueAwsDynamoAuthOptions::Environmental => { + aws_config::load_defaults(BehaviorVersion::latest()).await + } + }; + Client::new(&sdk_config) + }); + + Ok(Self { + table: Arc::new(table), + region: Arc::new(region), + client: async_once_cell::Lazy::from_future(client_fut), + }) + } +} + +#[async_trait] +impl StoreManager for KeyValueAwsDynamo { + async fn get(&self, name: &str) -> Result, Error> { + Ok(Arc::new(AwsDynamoStore { + _name: name.to_owned(), + client: self.client.get_unpin().await.clone(), + table: self.table.clone(), + })) + } + + fn is_defined(&self, _store_name: &str) -> bool { + true + } + + fn summary(&self, _store_name: &str) -> Option { + Some(format!( + "AWS DynamoDB region: {:?}, table: {}", + self.region, self.table + )) + } +} + +struct AwsDynamoStore { + _name: String, + client: Client, + table: Arc, +} + +struct CompareAndSwap { + key: String, + client: Client, + table: Arc, + bucket_rep: u32, + etag: Mutex>, +} + +/// Primary key in DynamoDB items used for querying items +const PK: &str = "PK"; +/// Value key in DynamoDB items storing item value as binary +const VAL: &str = "val"; +/// Version key in DynamoDB items used for optimistic locking +const VER: &str = "ver"; + +#[async_trait] +impl Store for AwsDynamoStore { + async fn get(&self, key: &str) -> Result>, Error> { + let item = self.get_item(key).await?; + Ok(item) + } + + async fn set(&self, key: &str, value: &[u8]) -> Result<(), Error> { + self.client + .put_item() + .table_name(self.table.as_str()) + .item(PK, AttributeValue::S(key.to_string())) + .item(VAL, AttributeValue::B(Blob::new(value))) + .send() + .await + .map_err(log_error)?; + Ok(()) + } + + async fn delete(&self, key: &str) -> Result<(), Error> { + if self.exists(key).await? { + self.client + .delete_item() + .table_name(self.table.as_str()) + .key(PK, AttributeValue::S(key.to_string())) + .send() + .await + .map_err(log_error)?; + } + Ok(()) + } + + async fn exists(&self, key: &str) -> Result { + Ok(self.get_item(key).await?.is_some()) + } + + async fn get_keys(&self) -> Result, Error> { + self.get_keys().await + } + + async fn get_many(&self, keys: Vec) -> Result>)>, Error> { + let mut results = Vec::with_capacity(keys.len()); + + let mut keys_and_attributes_builder = KeysAndAttributes::builder(); + for key in keys { + keys_and_attributes_builder = keys_and_attributes_builder.keys(HashMap::from_iter([( + PK.to_owned(), + AttributeValue::S(key), + )])) + } + let mut request_items = Some(HashMap::from_iter([( + self.table.to_string(), + keys_and_attributes_builder.build().map_err(log_error)?, + )])); + + while request_items.is_some() { + let BatchGetItemOutput { + responses: Some(mut responses), + unprocessed_keys, + .. + } = self + .client + .batch_get_item() + .set_request_items(request_items) + .send() + .await + .map_err(log_error)? + else { + return Err(Error::Other("No results".into())); + }; + + if let Some(items) = responses.remove(self.table.as_str()) { + for mut item in items { + let Some(AttributeValue::S(pk)) = item.remove(PK) else { + return Err(Error::Other( + "Could not find 'PK' key on DynamoDB item".into(), + )); + }; + let Some(AttributeValue::B(val)) = item.remove(VAL) else { + return Err(Error::Other( + "Could not find 'val' key on DynamoDB item".into(), + )); + }; + + results.push((pk, Some(val.into_inner()))); + } + } + + request_items = unprocessed_keys; + } + + Ok(results) + } + + async fn set_many(&self, key_values: Vec<(String, Vec)>) -> Result<(), Error> { + let mut data = Vec::with_capacity(key_values.len()); + for (key, val) in key_values { + data.push( + WriteRequest::builder() + .put_request( + PutRequest::builder() + .item(PK, AttributeValue::S(key)) + .item(VAL, AttributeValue::B(Blob::new(val))) + .build() + .map_err(log_error)?, + ) + .build(), + ) + } + + let mut request_items = Some(HashMap::from_iter([(self.table.to_string(), data)])); + + while request_items.is_some() { + let results = self + .client + .batch_write_item() + .set_request_items(request_items) + .send() + .await + .map_err(log_error)?; + + request_items = results.unprocessed_items; + } + + Ok(()) + } + + async fn delete_many(&self, keys: Vec) -> Result<(), Error> { + let mut data = Vec::with_capacity(keys.len()); + for key in keys { + data.push( + WriteRequest::builder() + .delete_request( + DeleteRequest::builder() + .key(PK, AttributeValue::S(key)) + .build() + .map_err(log_error)?, + ) + .build(), + ) + } + + let mut request_items = Some(HashMap::from_iter([(self.table.to_string(), data)])); + + while request_items.is_some() { + let results = self + .client + .batch_write_item() + .set_request_items(request_items) + .send() + .await + .map_err(log_error)?; + + request_items = results.unprocessed_items; + } + + Ok(()) + } + + async fn increment(&self, key: String, delta: i64) -> Result { + let result = self + .client + .update_item() + .table_name(self.table.as_str()) + .key(PK, AttributeValue::S(key)) + .update_expression("ADD #val :delta") + .expression_attribute_names("#val", VAL) + .expression_attribute_values(":delta", AttributeValue::N(delta.to_string())) + .return_values(aws_sdk_dynamodb::types::ReturnValue::UpdatedNew) + .send() + .await + .map_err(log_error)?; + + if let Some(updated_attributes) = result.attributes { + if let Some(AttributeValue::N(new_value)) = updated_attributes.get(VAL) { + return Ok(new_value.parse::().map_err(log_error))?; + } + } + + Err(Error::Other("Failed to increment value".into())) + } + + async fn new_compare_and_swap( + &self, + bucket_rep: u32, + key: &str, + ) -> Result, Error> { + Ok(Arc::new(CompareAndSwap { + key: key.to_string(), + client: self.client.clone(), + table: self.table.clone(), + etag: Mutex::new(None), + bucket_rep, + })) + } +} + +#[async_trait] +impl Cas for CompareAndSwap { + async fn current(&self) -> Result>, Error> { + let GetItemOutput { + item: Some(mut current_item), + .. + } = self + .client + .get_item() + .table_name(self.table.as_str()) + .key( + PK, + aws_sdk_dynamodb::types::AttributeValue::S(self.key.clone()), + ) + .send() + .await + .map_err(log_error)? + else { + return Ok(None); + }; + + if let Some(AttributeValue::B(val)) = current_item.remove(VAL) { + let version = if let Some(AttributeValue::N(ver)) = current_item.remove(VER) { + Some(ver) + } else { + Some(String::from("0")) + }; + self.etag.lock().unwrap().clone_from(&version); + Ok(Some(val.into_inner())) + } else { + Ok(None) + } + } + + /// `swap` updates the value for the key using the etag saved in the `current` function for + /// optimistic concurrency. + async fn swap(&self, value: Vec) -> Result<(), SwapError> { + let mut update_item = self + .client + .update_item() + .table_name(self.table.as_str()) + .key(PK, AttributeValue::S(self.key.clone())) + .update_expression("SET #val=:val, ADD #ver :increment") + .expression_attribute_names("#val", VAL) + .expression_attribute_names("#ver", VER) + .expression_attribute_values(":val", AttributeValue::B(Blob::new(value))) + .expression_attribute_values(":increment", AttributeValue::N("1".to_owned())) + .return_values(aws_sdk_dynamodb::types::ReturnValue::None); + + let current_version = self.etag.lock().unwrap().clone(); + match current_version { + // Existing item with no version key, update under condition that version key still does not exist in Dynamo when operation is executed + Some(version) if version == "0" => { + update_item = update_item.condition_expression("attribute_not_exists(#ver)"); + } + // Existing item with version key, update under condition that version in Dynamo matches stored version + Some(version) => { + update_item = update_item + .condition_expression("#ver = :ver") + .expression_attribute_values(":ver", AttributeValue::N(version)); + } + // Assume new item, insert under condition that item does not already exist + None => { + update_item = update_item + .condition_expression("attribute_not_exists(#pk)") + .expression_attribute_names("#pk", PK); + } + } + + update_item + .send() + .await + .map(|_| ()) + .map_err(|e| SwapError::CasFailed(format!("{e:?}"))) + } + + async fn bucket_rep(&self) -> u32 { + self.bucket_rep + } + + async fn key(&self) -> String { + self.key.clone() + } +} + +impl AwsDynamoStore { + async fn get_item(&self, key: &str) -> Result>, Error> { + let response = self + .client + .get_item() + .table_name(self.table.as_str()) + .key( + PK, + aws_sdk_dynamodb::types::AttributeValue::S(key.to_string()), + ) + .send() + .await + .map_err(log_error)?; + + let val = response.item.and_then(|mut item| { + if let Some(AttributeValue::B(val)) = item.remove(VAL) { + Some(val.into_inner()) + } else { + None + } + }); + + Ok(val) + } + + async fn get_keys(&self) -> Result, Error> { + let mut primary_keys = Vec::new(); + let mut last_evaluated_key = None; + + loop { + let mut scan_builder = self + .client + .scan() + .table_name(self.table.as_str()) + .projection_expression(PK); + + if let Some(keys) = last_evaluated_key { + for (key, val) in keys { + scan_builder = scan_builder.exclusive_start_key(key, val); + } + } + + let scan_output = scan_builder.send().await.map_err(log_error)?; + + if let Some(items) = scan_output.items { + for mut item in items { + if let Some(AttributeValue::S(pk)) = item.remove(PK) { + primary_keys.push(pk); + } + } + } + + last_evaluated_key = scan_output.last_evaluated_key; + if last_evaluated_key.is_none() { + break; + } + } + + Ok(primary_keys) + } +} diff --git a/crates/runtime-config/Cargo.toml b/crates/runtime-config/Cargo.toml index 569026129..75839f47a 100644 --- a/crates/runtime-config/Cargo.toml +++ b/crates/runtime-config/Cargo.toml @@ -23,6 +23,7 @@ spin-factor-sqlite = { path = "../factor-sqlite" } spin-factor-variables = { path = "../factor-variables" } spin-factor-wasi = { path = "../factor-wasi" } spin-factors = { path = "../factors" } +spin-key-value-aws = { path = "../key-value-aws" } spin-key-value-azure = { path = "../key-value-azure" } spin-key-value-redis = { path = "../key-value-redis" } spin-key-value-spin = { path = "../key-value-spin" } diff --git a/crates/runtime-config/src/lib.rs b/crates/runtime-config/src/lib.rs index fd0687361..963924363 100644 --- a/crates/runtime-config/src/lib.rs +++ b/crates/runtime-config/src/lib.rs @@ -400,6 +400,9 @@ pub fn key_value_config_resolver( key_value .register_store_type(spin_key_value_azure::AzureKeyValueStore::new()) .unwrap(); + key_value + .register_store_type(spin_key_value_aws::AwsDynamoKeyValueStore::new()) + .unwrap(); // Add handling of "default" store. let default_store_path = default_store_base_path.map(|p| p.join(DEFAULT_SPIN_STORE_FILENAME));