Skip to content

Commit

Permalink
Implemented first draft batch operations
Browse files Browse the repository at this point in the history
Signed-off-by: Darwin Boersma <[email protected]>
  • Loading branch information
ogghead committed Nov 5, 2024
1 parent 917f811 commit 6753fb5
Showing 1 changed file with 118 additions and 4 deletions.
122 changes: 118 additions & 4 deletions crates/key-value-aws/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ use aws_config::{BehaviorVersion, Region, SdkConfig};
use aws_credential_types::Credentials;
use aws_sdk_dynamodb::{
config::{ProvideCredentials, SharedCredentialsProvider},
operation::batch_get_item::BatchGetItemOutput,
primitives::Blob,
types::{AttributeValue, KeysAndAttributes},
types::{AttributeValue, DeleteRequest, KeysAndAttributes, PutRequest, WriteRequest},
Client,
};
use spin_core::async_trait;
Expand Down Expand Up @@ -178,15 +179,128 @@ impl Store for AwsDynamoStore {
}

async fn get_many(&self, keys: Vec<String>) -> Result<Vec<(String, Option<Vec<u8>>)>, Error> {
todo!()
let mut results = Vec::with_capacity(keys.len());

let mut keys_and_attributes_builder = KeysAndAttributes::builder();
for key in keys {
keys_and_attributes_builder = keys_and_attributes_builder.keys(HashMap::from_iter([(
PK.to_owned(),
AttributeValue::S(key),
)]))
}
let mut request_items = Some(HashMap::from_iter([(
self.table.clone(),
keys_and_attributes_builder.build().map_err(log_error)?,
)]));

loop {
let BatchGetItemOutput {
responses: Some(mut responses),
unprocessed_keys,
..
} = self
.client
.batch_get_item()
.set_request_items(request_items)
.send()
.await
.map_err(log_error)?
else {
return Err(Error::Other("No results".into()));
};

if let Some(items) = responses.remove(&self.table) {
for mut item in items {
let Some(AttributeValue::S(pk)) = item.remove(PK) else {
return Err(Error::Other(
"Could not find 'PK' key on DynamoDB item".into(),
));
};
let Some(AttributeValue::B(val)) = item.remove(VAL) else {
return Err(Error::Other(
"Could not find 'val' key on DynamoDB item".into(),
));
};

results.push((pk, Some(val.into_inner())));
}
}

match unprocessed_keys {
None => return Ok(results),
// TODO: break out if we have retried 10+ times?
remaining_keys => request_items = remaining_keys,
}
}
}

async fn set_many(&self, key_values: Vec<(String, Vec<u8>)>) -> Result<(), Error> {
todo!()
let mut data = Vec::with_capacity(key_values.len());
for (key, val) in key_values {
data.push(
WriteRequest::builder()
.put_request(
PutRequest::builder()
.item(PK, AttributeValue::S(key))
.item(VAL, AttributeValue::B(Blob::new(val)))
.build()
.map_err(log_error)?,
)
.build(),
)
}

let mut request_items = Some(HashMap::from_iter([(self.table.clone(), data)]));

loop {
let results = self
.client
.batch_write_item()
.set_request_items(request_items)
.send()
.await
.map_err(log_error)?;

match results.unprocessed_items {
None => return Ok(()),
// TODO: break out if we have retried 10+ times?
remaining_items => request_items = remaining_items,
}
}
}

async fn delete_many(&self, keys: Vec<String>) -> Result<(), Error> {
todo!()
let mut data = Vec::with_capacity(keys.len());
for key in keys {
data.push(
WriteRequest::builder()
.delete_request(
DeleteRequest::builder()
.key(PK, AttributeValue::S(key))
.build()
.map_err(log_error)?,
)
.build(),
)
}

let mut input = Some(HashMap::from_iter([(self.table.clone(), data)]));

loop {
let results = self
.client
.batch_write_item()
.set_request_items(input)
.send()
.await
.map_err(log_error)?;

match results.unprocessed_items {
None => return Ok(()),
// TODO: break out if we have retried 10+ times?
remaining_items => input = remaining_items,
}
}
}

async fn increment(&self, key: String, delta: i64) -> Result<i64, Error> {
Expand Down

0 comments on commit 6753fb5

Please sign in to comment.