Skip to content

Commit

Permalink
refactor: reduce clone usage (#48)
Browse files Browse the repository at this point in the history
Signed-off-by: Hiroaki Goto <[email protected]>
  • Loading branch information
StoneDot authored May 27, 2024
1 parent 59d5cf5 commit 30c11d2
Show file tree
Hide file tree
Showing 9 changed files with 220 additions and 219 deletions.
38 changes: 19 additions & 19 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -545,9 +545,9 @@ pub async fn use_table(
/// Inserts specified table description into cache file.
pub async fn insert_to_table_cache(
cx: &Context,
desc: TableDescription,
desc: &TableDescription,
) -> Result<(), DyneinConfigError> {
let table_name: String = desc
let table_name = desc
.table_name
.clone()
.expect("desc should have table name");
Expand All @@ -560,7 +560,7 @@ pub async fn insert_to_table_cache(

// retrieve current cache from Context and update target table desc.
// key to save the table desc is "<RegionName>/<TableName>" -- e.g. "us-west-2/app_data"
let mut cache: Cache = cx.clone().cache.expect("cx should have cache");
let mut cache: Cache = cx.cache.clone().expect("cx should have cache");
let cache_key = format!("{}/{}", region.as_ref(), table_name);

let mut table_schema_hashmap: HashMap<String, TableSchema> = match cache.tables {
Expand All @@ -577,9 +577,9 @@ pub async fn insert_to_table_cache(
TableSchema {
region: String::from(region.as_ref()),
name: table_name,
pk: key::typed_key("HASH", &desc).expect("pk should exist"),
sk: key::typed_key("RANGE", &desc),
indexes: index_schemas(&desc),
pk: key::typed_key("HASH", desc).expect("pk should exist"),
sk: key::typed_key("RANGE", desc),
indexes: index_schemas(desc),
mode: table::extract_mode(&desc.billing_mode_summary),
},
);
Expand Down Expand Up @@ -618,7 +618,7 @@ pub async fn table_schema(cx: &Context) -> TableSchema {

TableSchema {
region: String::from(cx.effective_region().await.as_ref()),
name: desc.clone().table_name.unwrap(),
name: desc.table_name.to_owned().unwrap(),
pk: key::typed_key("HASH", &desc).expect("pk should exist"),
sk: key::typed_key("RANGE", &desc),
indexes: index_schemas(&desc),
Expand All @@ -628,8 +628,8 @@ pub async fn table_schema(cx: &Context) -> TableSchema {
None => {
// simply maps config data into TableSchema struct.
debug!("current context {:#?}", cx);
let cache: Cache = cx.clone().cache.expect("Cache should exist in context"); // can refactor here using and_then
let cached_tables: HashMap<String, TableSchema> = cache.tables.unwrap_or_else(|| {
let cache = cx.cache.as_ref().expect("Cache should exist in context"); // can refactor here using and_then
let cached_tables = cache.tables.as_ref().unwrap_or_else(|| {
error!("{}", Messages::NoEffectiveTable);
std::process::exit(1)
});
Expand All @@ -645,30 +645,30 @@ pub async fn table_schema(cx: &Context) -> TableSchema {
}

pub fn index_schemas(desc: &TableDescription) -> Option<Vec<IndexSchema>> {
let attr_defs: &Vec<AttributeDefinition> = &desc.clone().attribute_definitions.unwrap();
let attr_defs: &Vec<AttributeDefinition> = desc.attribute_definitions.as_ref().unwrap();

let mut indexes: Vec<IndexSchema> = vec![];

if let Some(gsis) = desc.clone().global_secondary_indexes {
if let Some(gsis) = desc.global_secondary_indexes.as_ref() {
for gsi in gsis {
indexes.push(IndexSchema {
name: gsi.index_name.unwrap(),
name: gsi.index_name.to_owned().unwrap(),
kind: IndexType::Gsi,
pk: key::typed_key_for_schema("HASH", &gsi.key_schema.clone().unwrap(), attr_defs)
pk: key::typed_key_for_schema("HASH", gsi.key_schema.as_ref().unwrap(), attr_defs)
.expect("pk should exist"),
sk: key::typed_key_for_schema("RANGE", &gsi.key_schema.unwrap(), attr_defs),
sk: key::typed_key_for_schema("RANGE", gsi.key_schema.as_ref().unwrap(), attr_defs),
});
}
};

if let Some(lsis) = desc.clone().local_secondary_indexes {
if let Some(lsis) = desc.local_secondary_indexes.as_ref() {
for lsi in lsis {
indexes.push(IndexSchema {
name: lsi.index_name.unwrap(),
name: lsi.index_name.to_owned().unwrap(),
kind: IndexType::Lsi,
pk: key::typed_key_for_schema("HASH", &lsi.key_schema.clone().unwrap(), attr_defs)
pk: key::typed_key_for_schema("HASH", lsi.key_schema.as_ref().unwrap(), attr_defs)
.expect("pk should exist"),
sk: key::typed_key_for_schema("RANGE", &lsi.key_schema.unwrap(), attr_defs),
sk: key::typed_key_for_schema("RANGE", lsi.key_schema.as_ref().unwrap(), attr_defs),
});
}
};
Expand Down Expand Up @@ -743,7 +743,7 @@ async fn save_using_target(
write_dynein_file(DyneinFileType::ConfigFile, config_yaml_string)?;

// save target table info into cache.
insert_to_table_cache(cx, desc).await?;
insert_to_table_cache(cx, &desc).await?;

Ok(())
}
Expand Down
43 changes: 21 additions & 22 deletions src/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use base64::{engine::general_purpose, Engine as _};
use bytes::Bytes;
use log::{debug, error};
use serde_json::Value as JsonValue;
use std::{collections::HashMap, error, fmt, fs, future::Future, io::Error as IOError, pin::Pin};
use std::{collections::HashMap, error, fmt, fs, io::Error as IOError};

use super::app;
use super::data;
Expand Down Expand Up @@ -219,7 +219,7 @@ pub fn build_batch_request_items_from_json(
/// > You can investigate and optionally resend the requests. Typically, you would call BatchWriteItem in a loop. Each iteration would
/// > check for unprocessed items and submit a new BatchWriteItem request with those unprocessed items until all items have been processed.
async fn batch_write_item_api(
cx: app::Context,
cx: &app::Context,
request_items: HashMap<String, Vec<WriteRequest>>,
) -> Result<
Option<HashMap<String, Vec<WriteRequest>>>,
Expand All @@ -232,9 +232,11 @@ async fn batch_write_item_api(

let retry_config = cx
.retry
.clone()
.map(|v| v.batch_write_item.to_owned().unwrap_or(v.default));
let config = cx.effective_sdk_config_with_retry(retry_config).await;
.as_ref()
.map(|v| v.batch_write_item.as_ref().unwrap_or(&v.default));
let config = cx
.effective_sdk_config_with_retry(retry_config.cloned())
.await;
let ddb = DynamoDbSdkClient::new(&config);

match ddb
Expand All @@ -250,36 +252,33 @@ async fn batch_write_item_api(

// Basically this function is intended to be defined as `pub async fn`.
// However, to recursively use async function, you have to return a future wrapped by pinned box. For more details: `rustc --explain E0733`.
pub fn batch_write_untill_processed(
cx: app::Context,
request_items: HashMap<String, Vec<WriteRequest>>,
) -> Pin<Box<dyn Future<Output = Result<(), aws_sdk_dynamodb::error::SdkError<BatchWriteItemError>>>>>
{
Box::pin(async move {
match batch_write_item_api(cx.clone(), request_items).await {
pub async fn batch_write_until_processed(
cx: &app::Context,
mut request_items: HashMap<String, Vec<WriteRequest>>,
) -> Result<(), aws_sdk_dynamodb::error::SdkError<BatchWriteItemError>> {
loop {
request_items = match batch_write_item_api(cx, request_items).await {
Ok(result) => {
let unprocessed_items: HashMap<String, Vec<WriteRequest>> =
result.expect("alwasy wrapped by Some");
// if there's any unprocessed items, recursively call this function itself.
if !unprocessed_items.is_empty() {
// if there are any unprocessed items, retry rest items
debug!("UnprocessedItems: {:?}", &unprocessed_items);
batch_write_untill_processed(cx, unprocessed_items).await
}
// untill it processes items completely.
else {
Ok(())
unprocessed_items
} else {
return Ok(());
}
}
Err(e) => Err(e),
Err(e) => return Err(e),
}
})
}
}

/// This function is intended to be called from main.rs, as a destination of bwrite command.
/// It executes batch write operations based on the provided `puts`, `dels`, and `input_file` arguments.
/// At least one argument `puts`, `dels` or `input_file` is required, and all arguments can be specified simultaneously.
pub async fn batch_write_item(
cx: app::Context,
cx: &app::Context,
puts: Option<Vec<String>>,
dels: Option<Vec<String>>,
input_file: Option<String>,
Expand All @@ -297,7 +296,7 @@ pub async fn batch_write_item(
if puts.is_some() || dels.is_some() {
let mut write_requests = Vec::<WriteRequest>::new();
let parser = DyneinParser::new();
let ts: app::TableSchema = app::table_schema(&cx).await;
let ts: app::TableSchema = app::table_schema(cx).await;

if let Some(items) = puts {
for item in items.iter() {
Expand Down
24 changes: 12 additions & 12 deletions src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ pub fn list_samples() {
}

pub async fn launch_sample(
cx: app::Context,
cx: &app::Context,
sample: Option<String>,
) -> Result<(), DyneinBootstrapError> {
match sample {
Expand All @@ -118,7 +118,7 @@ pub async fn launch_sample(
Private functions
================================================= */

async fn launch_movie_sample(cx: app::Context) -> Result<(), DyneinBootstrapError> {
async fn launch_movie_sample(cx: &app::Context) -> Result<(), DyneinBootstrapError> {
println!(
"\
Bootstrapping - dynein will create 'Movie' table with official 'Movie' sample data:
Expand All @@ -132,10 +132,10 @@ see https://github.com/awslabs/dynein#working-with-dynamodb-items for detail
);

// Step 1. create tables
prepare_table(&cx, "Movie", vec!["year,N", "title,S"].as_ref()).await;
prepare_table(cx, "Movie", vec!["year,N", "title,S"].as_ref()).await;

// Step 2. wait tables to be created and in ACTIVE status
wait_table_creation(&cx, vec!["Movie"]).await;
wait_table_creation(cx, vec!["Movie"]).await;

// Step 3. decompress data
let compressed_data = include_bytes!("./resources/bootstrap/moviedata.json.br");
Expand Down Expand Up @@ -199,15 +199,15 @@ see https://github.com/awslabs/dynein#working-with-dynamodb-items for detail
}
} // 'batch loop
request_items.insert("Movie".to_string(), write_requests);
batch::batch_write_untill_processed(cx.clone(), request_items).await?;
batch::batch_write_until_processed(cx, request_items).await?;
} // 'whole loop
request_items.insert("Movie".to_string(), write_requests);
batch::batch_write_untill_processed(cx.clone(), request_items).await?;
batch::batch_write_until_processed(cx, request_items).await?;

Ok(())
}

async fn launch_default_sample(cx: app::Context) -> Result<(), DyneinBootstrapError> {
async fn launch_default_sample(cx: &app::Context) -> Result<(), DyneinBootstrapError> {
println!(
"\
Bootstrapping - dynein will create 4 sample tables defined here:
Expand Down Expand Up @@ -238,12 +238,12 @@ https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/AppendixSampleT

// Step 1. Create tables
for (table_name, keys) in &tables {
prepare_table(&cx, table_name, keys).await
prepare_table(cx, table_name, keys).await
}

// Step 2. wait tables to be created and in ACTIVE status
let creating_table_names: Vec<&str> = tables.clone().iter().map(|pair| pair.0).collect();
wait_table_creation(&cx, creating_table_names).await;
let creating_table_names: Vec<&str> = tables.iter().map(|pair| pair.0).collect();
wait_table_creation(cx, creating_table_names).await;

println!("Tables are ready and retrieved sample data locally. Now start writing data into samle tables...");
for (table_name, _) in &tables {
Expand All @@ -261,7 +261,7 @@ https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/AppendixSampleT
decompressor.read_to_string(&mut content)?;
// Step 4. load data into tables
let request_items = batch::build_batch_request_items_from_json(content.to_string())?;
batch::batch_write_untill_processed(cx.clone(), request_items).await?;
batch::batch_write_until_processed(cx, request_items).await?;
}

let region = cx.effective_region().await.to_string();
Expand Down Expand Up @@ -289,7 +289,7 @@ https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/AppendixSampleT

async fn prepare_table(cx: &app::Context, table_name: &str, keys: &[&str]) {
match control::create_table_api(
cx.clone(),
cx,
table_name.to_string(),
keys.iter().map(|k| (*k).to_string()).collect(),
)
Expand Down
Loading

0 comments on commit 30c11d2

Please sign in to comment.