Skip to content

Commit

Permalink
Update to DynamoKeyValueStore to make other AWS KV store implementati…
Browse files Browse the repository at this point in the history
…ons easier

Signed-off-by: Darwin Boersma <[email protected]>
  • Loading branch information
ogghead committed Nov 5, 2024
1 parent e7ba606 commit 407182d
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 34 deletions.
6 changes: 3 additions & 3 deletions crates/key-value-aws/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ use store::{

/// A key-value store that uses AWS Dynamo as the backend.
#[derive(Default)]
pub struct AwsKeyValueStore {
pub struct AwsDynamoKeyValueStore {
_priv: (),
}

impl AwsKeyValueStore {
impl AwsDynamoKeyValueStore {
/// Creates a new `AwsKeyValueStore`.
pub fn new() -> Self {
Self::default()
Expand All @@ -34,7 +34,7 @@ pub struct AwsDynamoKeyValueRuntimeConfig {
table: String,
}

impl MakeKeyValueStore for AwsKeyValueStore {
impl MakeKeyValueStore for AwsDynamoKeyValueStore {
const RUNTIME_CONFIG_TYPE: &'static str = "aws_dynamo";

type RuntimeConfig = AwsDynamoKeyValueRuntimeConfig;
Expand Down
62 changes: 32 additions & 30 deletions crates/key-value-aws/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,27 +77,24 @@ impl KeyValueAwsDynamo {
auth_options: KeyValueAwsDynamoAuthOptions,
) -> Result<Self> {
let region_clone = region.clone();
let client_fut: std::pin::Pin<Box<dyn std::future::Future<Output = Client> + Send>> =
Box::pin(async move {
let config = match auth_options {
KeyValueAwsDynamoAuthOptions::RuntimeConfigValues(config) => {
SdkConfig::builder()
.credentials_provider(SharedCredentialsProvider::new(config))
.region(Region::new(region_clone))
.behavior_version(BehaviorVersion::latest())
.build()
}
KeyValueAwsDynamoAuthOptions::Environmental => {
aws_config::load_defaults(BehaviorVersion::latest()).await
}
};
Client::new(&config)
});
let client_fut = Box::pin(async move {
let sdk_config = match auth_options {
KeyValueAwsDynamoAuthOptions::RuntimeConfigValues(config) => SdkConfig::builder()
.credentials_provider(SharedCredentialsProvider::new(config))
.region(Region::new(region_clone))
.behavior_version(BehaviorVersion::latest())
.build(),
KeyValueAwsDynamoAuthOptions::Environmental => {
aws_config::load_defaults(BehaviorVersion::latest()).await
}
};
Client::new(&sdk_config)
});

Ok(Self {
client: async_once_cell::Lazy::from_future(client_fut),
table,
region,
client: async_once_cell::Lazy::from_future(client_fut),
})
}
}
Expand Down Expand Up @@ -143,7 +140,7 @@ impl Store for AwsDynamoStore {
async fn set(&self, key: &str, value: &[u8]) -> Result<(), Error> {
self.client
.put_item()
.table_name(self.table.clone())
.table_name(&self.table)
.item(PK, AttributeValue::S(key.to_string()))
.item(VAL, AttributeValue::B(Blob::new(value)))
.send()
Expand All @@ -156,7 +153,7 @@ impl Store for AwsDynamoStore {
if self.exists(key).await? {
self.client
.delete_item()
.table_name(self.table.clone())
.table_name(&self.table)
.key(PK, AttributeValue::S(key.to_string()))
.send()
.await
Expand All @@ -176,22 +173,27 @@ impl Store for AwsDynamoStore {

impl AwsDynamoStore {
async fn get_item(&self, key: &str) -> Result<Option<Vec<u8>>, Error> {
let query = self
let response = self
.client
.get_item()
.table_name(self.table.clone())
.key(PK, aws_sdk_dynamodb::types::AttributeValue::S(key.into()))
.table_name(&self.table)
.key(
PK,
aws_sdk_dynamodb::types::AttributeValue::S(key.to_string()),
)
.send()
.await
.map_err(log_error)?;

Ok(query.item.and_then(|item| {
if let Some(AttributeValue::B(val)) = item.get(VAL) {
Some(val.clone().into_inner())
let val = response.item.and_then(|mut item| {
if let Some(AttributeValue::B(val)) = item.remove(VAL) {
Some(val.into_inner())
} else {
None
}
}))
});

Ok(val)
}

async fn get_keys(&self) -> Result<Vec<String>, Error> {
Expand All @@ -202,7 +204,7 @@ impl AwsDynamoStore {
let mut scan_builder = self
.client
.scan()
.table_name(self.table.clone())
.table_name(&self.table)
.projection_expression(PK);

if let Some(keys) = last_evaluated_key {
Expand All @@ -214,9 +216,9 @@ impl AwsDynamoStore {
let scan_output = scan_builder.send().await.map_err(log_error)?;

if let Some(items) = scan_output.items {
for item in items {
if let Some(AttributeValue::S(pk)) = item.get(PK) {
primary_keys.push(pk.clone());
for mut item in items {
if let Some(AttributeValue::S(pk)) = item.remove(PK) {
primary_keys.push(pk);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/runtime-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ pub fn key_value_config_resolver(
.register_store_type(spin_key_value_azure::AzureKeyValueStore::new())
.unwrap();
key_value
.register_store_type(spin_key_value_aws::AwsKeyValueStore::new())
.register_store_type(spin_key_value_aws::AwsDynamoKeyValueStore::new())
.unwrap();

// Add handling of "default" store.
Expand Down

0 comments on commit 407182d

Please sign in to comment.