From 6753fb5e87ff1c252fe66339bb2c89405f56ea6f Mon Sep 17 00:00:00 2001 From: Darwin Boersma Date: Tue, 5 Nov 2024 08:36:35 -0700 Subject: [PATCH] Implemented first draft batch operations Signed-off-by: Darwin Boersma --- crates/key-value-aws/src/store.rs | 122 +++++++++++++++++++++++++++++- 1 file changed, 118 insertions(+), 4 deletions(-) diff --git a/crates/key-value-aws/src/store.rs b/crates/key-value-aws/src/store.rs index 86bc8cb27..e20ca08fa 100644 --- a/crates/key-value-aws/src/store.rs +++ b/crates/key-value-aws/src/store.rs @@ -5,8 +5,9 @@ use aws_config::{BehaviorVersion, Region, SdkConfig}; use aws_credential_types::Credentials; use aws_sdk_dynamodb::{ config::{ProvideCredentials, SharedCredentialsProvider}, + operation::batch_get_item::BatchGetItemOutput, primitives::Blob, - types::{AttributeValue, KeysAndAttributes}, + types::{AttributeValue, DeleteRequest, KeysAndAttributes, PutRequest, WriteRequest}, Client, }; use spin_core::async_trait; @@ -178,15 +179,128 @@ impl Store for AwsDynamoStore { } async fn get_many(&self, keys: Vec) -> Result>)>, Error> { - todo!() + let mut results = Vec::with_capacity(keys.len()); + + let mut keys_and_attributes_builder = KeysAndAttributes::builder(); + for key in keys { + keys_and_attributes_builder = keys_and_attributes_builder.keys(HashMap::from_iter([( + PK.to_owned(), + AttributeValue::S(key), + )])) + } + let mut request_items = Some(HashMap::from_iter([( + self.table.clone(), + keys_and_attributes_builder.build().map_err(log_error)?, + )])); + + loop { + let BatchGetItemOutput { + responses: Some(mut responses), + unprocessed_keys, + .. + } = self + .client + .batch_get_item() + .set_request_items(request_items) + .send() + .await + .map_err(log_error)? + else { + return Err(Error::Other("No results".into())); + }; + + if let Some(items) = responses.remove(&self.table) { + for mut item in items { + let Some(AttributeValue::S(pk)) = item.remove(PK) else { + return Err(Error::Other( + "Could not find 'PK' key on DynamoDB item".into(), + )); + }; + let Some(AttributeValue::B(val)) = item.remove(VAL) else { + return Err(Error::Other( + "Could not find 'val' key on DynamoDB item".into(), + )); + }; + + results.push((pk, Some(val.into_inner()))); + } + } + + match unprocessed_keys { + None => return Ok(results), + // TODO: break out if we have retried 10+ times? + remaining_keys => request_items = remaining_keys, + } + } } async fn set_many(&self, key_values: Vec<(String, Vec)>) -> Result<(), Error> { - todo!() + let mut data = Vec::with_capacity(key_values.len()); + for (key, val) in key_values { + data.push( + WriteRequest::builder() + .put_request( + PutRequest::builder() + .item(PK, AttributeValue::S(key)) + .item(VAL, AttributeValue::B(Blob::new(val))) + .build() + .map_err(log_error)?, + ) + .build(), + ) + } + + let mut request_items = Some(HashMap::from_iter([(self.table.clone(), data)])); + + loop { + let results = self + .client + .batch_write_item() + .set_request_items(request_items) + .send() + .await + .map_err(log_error)?; + + match results.unprocessed_items { + None => return Ok(()), + // TODO: break out if we have retried 10+ times? + remaining_items => request_items = remaining_items, + } + } } async fn delete_many(&self, keys: Vec) -> Result<(), Error> { - todo!() + let mut data = Vec::with_capacity(keys.len()); + for key in keys { + data.push( + WriteRequest::builder() + .delete_request( + DeleteRequest::builder() + .key(PK, AttributeValue::S(key)) + .build() + .map_err(log_error)?, + ) + .build(), + ) + } + + let mut input = Some(HashMap::from_iter([(self.table.clone(), data)])); + + loop { + let results = self + .client + .batch_write_item() + .set_request_items(input) + .send() + .await + .map_err(log_error)?; + + match results.unprocessed_items { + None => return Ok(()), + // TODO: break out if we have retried 10+ times? + remaining_items => input = remaining_items, + } + } } async fn increment(&self, key: String, delta: i64) -> Result {