Skip to content

Commit

Permalink
Refactor: Reduce clone of context object
Browse files Browse the repository at this point in the history
  • Loading branch information
StoneDot committed May 24, 2024
1 parent 5dfed19 commit f7517cd
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 95 deletions.
10 changes: 5 additions & 5 deletions src/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,8 @@ pub fn build_batch_request_items_from_json(
/// > the failed operations are returned in the UnprocessedItems response parameter.
/// > You can investigate and optionally resend the requests. Typically, you would call BatchWriteItem in a loop. Each iteration would
/// > check for unprocessed items and submit a new BatchWriteItem request with those unprocessed items until all items have been processed.
async fn batch_write_item_api(
cx: app::Context,
async fn batch_write_item_api<'a>(
cx: &mut app::Context,
request_items: HashMap<String, Vec<WriteRequest>>,
) -> Result<
Option<HashMap<String, Vec<WriteRequest>>>,
Expand Down Expand Up @@ -251,12 +251,12 @@ async fn batch_write_item_api(
// Basically this function is intended to be defined as `pub async fn`.
// However, to recursively use async function, you have to return a future wrapped by pinned box. For more details: `rustc --explain E0733`.
pub fn batch_write_untill_processed(
cx: app::Context,
mut cx: app::Context,
request_items: HashMap<String, Vec<WriteRequest>>,
) -> Pin<Box<dyn Future<Output = Result<(), aws_sdk_dynamodb::error::SdkError<BatchWriteItemError>>>>>
{
Box::pin(async move {
match batch_write_item_api(cx.clone(), request_items).await {
match batch_write_item_api(&mut cx, request_items).await {
Ok(result) => {
let unprocessed_items: HashMap<String, Vec<WriteRequest>> =
result.expect("alwasy wrapped by Some");
Expand All @@ -279,7 +279,7 @@ pub fn batch_write_untill_processed(
/// It executes batch write operations based on the provided `puts`, `dels`, and `input_file` arguments.
/// At least one argument `puts`, `dels` or `input_file` is required, and all arguments can be specified simultaneously.
pub async fn batch_write_item(
cx: app::Context,
cx: &mut app::Context,
puts: Option<Vec<String>>,
dels: Option<Vec<String>>,
input_file: Option<String>,
Expand Down
14 changes: 7 additions & 7 deletions src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ pub async fn launch_sample(
Private functions
================================================= */

async fn launch_movie_sample(cx: app::Context) -> Result<(), DyneinBootstrapError> {
async fn launch_movie_sample(mut cx: app::Context) -> Result<(), DyneinBootstrapError> {
println!(
"\
Bootstrapping - dynein will create 'Movie' table with official 'Movie' sample data:
Expand All @@ -148,7 +148,7 @@ see https://github.com/awslabs/dynein#working-with-dynamodb-items for detail
);

// Step 1. create tables
prepare_table(&cx, "Movie", vec!["year,N", "title,S"].as_ref()).await;
prepare_table(&mut cx, "Movie", vec!["year,N", "title,S"].as_ref()).await;

// Step 2. wait tables to be created and in ACTIVE status
wait_table_creation(&cx, vec!["Movie"]).await;
Expand Down Expand Up @@ -218,12 +218,12 @@ see https://github.com/awslabs/dynein#working-with-dynamodb-items for detail
batch::batch_write_untill_processed(cx.clone(), request_items).await?;
} // 'whole loop
request_items.insert("Movie".to_string(), write_requests);
batch::batch_write_untill_processed(cx.clone(), request_items).await?;
batch::batch_write_untill_processed(cx, request_items).await?;

Ok(())
}

async fn launch_default_sample(cx: app::Context) -> Result<(), DyneinBootstrapError> {
async fn launch_default_sample(mut cx: app::Context) -> Result<(), DyneinBootstrapError> {
println!(
"\
Bootstrapping - dynein will create 4 sample tables defined here:
Expand Down Expand Up @@ -254,7 +254,7 @@ https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/AppendixSampleT

// Step 1. Create tables
for (table_name, keys) in &tables {
prepare_table(&cx, table_name, keys).await
prepare_table(&mut cx, table_name, keys).await
}

// Step 2. wait tables to be created and in ACTIVE status
Expand Down Expand Up @@ -303,9 +303,9 @@ https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/AppendixSampleT
Ok(())
}

async fn prepare_table(cx: &app::Context, table_name: &str, keys: &[&str]) {
async fn prepare_table(cx: &mut app::Context, table_name: &str, keys: &[&str]) {
match control::create_table_api(
cx.clone(),
cx,
table_name.to_string(),
keys.iter().map(|k| (*k).to_string()).collect(),
)
Expand Down
52 changes: 29 additions & 23 deletions src/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ use aws_sdk_ec2::Client as Ec2SdkClient;
use futures::future::join_all;
use log::{debug, error};
use std::{
borrow::Cow::{self, Owned},
io::{self, Error as IOError, Write},
time,
};
use std::borrow::Cow::Borrowed;

use dialoguer::{theme::ColorfulTheme, Confirm, Select};
use tabwriter::TabWriter;
Expand All @@ -41,7 +43,7 @@ use super::ddb::table;
Public functions
================================================= */

pub async fn list_tables_all_regions(cx: app::Context) {
pub async fn list_tables_all_regions(cx: &mut app::Context) {
// get all regions from us-east-1 regardless specified region
let config = cx
.clone()
Expand All @@ -59,19 +61,19 @@ pub async fn list_tables_all_regions(cx: app::Context) {
res.regions
.expect("regions should exist") // Vec<Region>
.iter()
.map(|r| list_tables(cx.clone().with_region(r.region_name.as_ref().unwrap()))),
.map(|r| list_tables(Owned(cx.clone().with_region(r.region_name.as_ref().unwrap())))),
)
.await;

if cx.is_local().await {
list_tables(cx.clone()).await;
list_tables(Borrowed(cx)).await;
}
}
};
}

pub async fn list_tables(cx: app::Context) {
let table_names = list_tables_api(cx.clone()).await;
pub async fn list_tables(mut cx: Cow<'_, app::Context>) {
let table_names = list_tables_api(cx.to_mut()).await;
let region = cx.effective_region().await.to_string();

println!("DynamoDB tables in region: {}", region);
Expand All @@ -97,25 +99,25 @@ pub async fn list_tables(cx: app::Context) {

/// Executed when you call `$ dy desc --all-tables`.
/// Note that `describe_table` function calls are executed in parallel (async + join_all).
pub async fn describe_all_tables(cx: app::Context) {
let table_names = list_tables_api(cx.clone()).await;
pub async fn describe_all_tables(cx: &mut app::Context) {
let table_names = list_tables_api(cx).await;
join_all(
table_names
.into_iter()
.map(|t| describe_table(cx.clone(), Some(t))),
.map(|t| describe_table(Owned(cx.clone()), Some(t))),
)
.await;
}

/// Executed when you call `$ dy desc (table)`. Retrieve TableDescription via describe_table_api function,
/// then print them in convenient way using table::print_table_description function (default/yaml).
pub async fn describe_table(cx: app::Context, target_table_to_desc: Option<String>) {
pub async fn describe_table(cx: Cow<'_, app::Context>, target_table_to_desc: Option<String>) {
debug!("context: {:#?}", &cx);
debug!("positional arg table name: {:?}", &target_table_to_desc);
let new_context = if let Some(t) = target_table_to_desc {
cx.with_table(t.as_str())
cx.as_ref().clone().with_table(&t)
} else {
cx
cx.into_owned()
};

let desc: TableDescription =
Expand Down Expand Up @@ -171,13 +173,13 @@ pub async fn describe_table_api(cx: &app::Context, table_name: String) -> TableD

/// This function is designed to be called from dynein command, mapped in main.rs.
/// Note that it simply ignores --table option if specified. Newly created table name should be given by the 1st argument "name".
pub async fn create_table(cx: app::Context, name: String, given_keys: Vec<String>) {
pub async fn create_table(cx: &mut app::Context, name: String, given_keys: Vec<String>) {
if given_keys.is_empty() || given_keys.len() >= 3 {
error!("You should pass one or two key definitions with --keys option");
std::process::exit(1);
};

match create_table_api(cx.clone(), name, given_keys).await {
match create_table_api(cx, name, given_keys).await {
Ok(desc) => table::print_table_description(cx.effective_region().await.as_ref(), desc),
Err(e) => {
debug!("CreateTable API call got an error -- {:#?}", e);
Expand All @@ -188,7 +190,7 @@ pub async fn create_table(cx: app::Context, name: String, given_keys: Vec<String
}

pub async fn create_table_api(
cx: app::Context,
cx: &mut app::Context,
name: String,
given_keys: Vec<String>,
) -> Result<
Expand Down Expand Up @@ -219,7 +221,7 @@ pub async fn create_table_api(
})
}

pub async fn create_index(cx: app::Context, index_name: String, given_keys: Vec<String>) {
pub async fn create_index(cx: &mut app::Context, index_name: String, given_keys: Vec<String>) {
if given_keys.is_empty() || given_keys.len() >= 3 {
error!("You should pass one or two key definitions with --keys option");
std::process::exit(1);
Expand Down Expand Up @@ -277,7 +279,7 @@ pub async fn create_index(cx: app::Context, index_name: String, given_keys: Vec<
}

pub async fn update_table(
cx: app::Context,
cx: &mut app::Context,
table_name_to_update: String,
mode_string: Option<String>,
wcu: Option<i64>,
Expand Down Expand Up @@ -410,7 +412,7 @@ async fn update_table_api(
})
}

pub async fn delete_table(cx: app::Context, name: String, skip_confirmation: bool) {
pub async fn delete_table(cx: &mut app::Context, name: String, skip_confirmation: bool) {
debug!("Trying to delete a table '{}'", &name);

let msg = format!("You're trying to delete a table '{}'. Are you OK?", &name);
Expand Down Expand Up @@ -442,7 +444,7 @@ pub async fn delete_table(cx: app::Context, name: String, skip_confirmation: boo
///
/// OnDemand backup is a type of backups that can be manually created. Another type is called PITR (Point-In-Time-Restore) but dynein doesn't support it for now.
/// For more information about DynamoDB on-demand backup: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/BackupRestore.html
pub async fn backup(cx: app::Context, all_tables: bool) {
pub async fn backup(cx: &mut app::Context, all_tables: bool) {
// this "backup" function is called only when --list is NOT given. So, --all-tables would be ignored.
if all_tables {
println!("NOTE: --all-tables option is ignored without --list option. Just trying to create a backup for the target table...")
Expand Down Expand Up @@ -487,8 +489,8 @@ pub async fn backup(cx: app::Context, all_tables: bool) {
}

/// List backups for a specified table. With --all-tables option all backups for all tables in the region are shown.
pub async fn list_backups(cx: app::Context, all_tables: bool) -> Result<(), IOError> {
let backups = list_backups_api(&cx, all_tables).await;
pub async fn list_backups(cx: &mut app::Context, all_tables: bool) -> Result<(), IOError> {
let backups = list_backups_api(cx, all_tables).await;
let mut tw = TabWriter::new(io::stdout());
// First defining header
tw.write_all(
Expand Down Expand Up @@ -524,9 +526,13 @@ pub async fn list_backups(cx: app::Context, all_tables: bool) -> Result<(), IOEr
/// This function restores DynamoDB table from specified backup data.
/// If you don't specify backup data (name) explicitly, dynein will list backups and you can select out of them.
/// Currently overwriting properties during rstore is not supported.
pub async fn restore(cx: app::Context, backup_name: Option<String>, restore_name: Option<String>) {
pub async fn restore(
cx: &mut app::Context,
backup_name: Option<String>,
restore_name: Option<String>,
) {
// let backups = list_backups_api(&cx, false).await;
let available_backups: Vec<BackupSummary> = list_backups_api(&cx, false)
let available_backups: Vec<BackupSummary> = list_backups_api(cx, false)
.await
.into_iter()
.filter(|b: &BackupSummary| b.to_owned().backup_status == Some(BackupStatus::Available))
Expand Down Expand Up @@ -607,7 +613,7 @@ Private functions

/// Basically called by list_tables function, which is called from `$ dy list`.
/// To make ListTables API result reusable, separated API logic into this standalone function.
async fn list_tables_api(cx: app::Context) -> Vec<String> {
async fn list_tables_api(cx: &mut app::Context) -> Vec<String> {
let config = cx.effective_sdk_config().await;
let ddb = DynamoDbSdkClient::new(&config);

Expand Down
42 changes: 26 additions & 16 deletions src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,17 +123,17 @@ Public functions
/// This function calls Scan API and return mutiple items. By default it uses 'table' output format.
/// Scan API retrieves all items in a given table, something like `SELECT * FROM mytable` in SQL world.
pub async fn scan(
cx: app::Context,
cx: &mut app::Context,
index: Option<String>,
consistent_read: bool,
attributes: &Option<String>,
keys_only: bool,
limit: i32,
) {
let ts: app::TableSchema = app::table_schema(&cx).await;
let ts: app::TableSchema = app::table_schema(cx).await;

let items = scan_api(
cx.clone(),
cx,
index,
consistent_read,
attributes,
Expand Down Expand Up @@ -162,7 +162,7 @@ pub async fn scan(
}

pub async fn scan_api(
cx: app::Context,
cx: &mut app::Context,
index: Option<String>,
consistent_read: bool,
attributes: &Option<String>,
Expand All @@ -171,7 +171,7 @@ pub async fn scan_api(
esk: Option<HashMap<String, AttributeValue>>,
) -> ScanOutput {
debug!("context: {:#?}", &cx);
let ts: app::TableSchema = app::table_schema(&cx).await;
let ts: app::TableSchema = app::table_schema(cx).await;

let scan_params: GeneratedScanParams = generate_scan_expressions(&ts, attributes, keys_only);

Expand Down Expand Up @@ -211,9 +211,9 @@ pub struct QueryParams {
/// References:
/// - https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Query.html#Query.KeyConditionExpressions
/// - https://aws.amazon.com/blogs/database/using-sort-keys-to-organize-data-in-amazon-dynamodb/
pub async fn query(cx: app::Context, params: QueryParams) {
pub async fn query(cx: &mut app::Context, params: QueryParams) {
debug!("context: {:#?}", &cx);
let ts: app::TableSchema = app::table_schema(&cx).await;
let ts: app::TableSchema = app::table_schema(cx).await;

debug!("For table '{}' (index '{:?}'), generating KeyConditionExpression using sort_key_expression: '{:?}'", &ts.name, &params.index, &params.sort_key_expression);
let query_params: GeneratedQueryParams = match generate_query_expressions(
Expand Down Expand Up @@ -281,10 +281,15 @@ pub async fn query(cx: app::Context, params: QueryParams) {
}

/// This function calls GetItem API - get an item with given primary key(s). By default it uses 'json' output format.
pub async fn get_item(cx: app::Context, pval: String, sval: Option<String>, consistent_read: bool) {
pub async fn get_item(
cx: &mut app::Context,
pval: String,
sval: Option<String>,
consistent_read: bool,
) {
debug!("context: {:#?}", &cx);
// Use table if explicitly specified by `--table/-t` option. Otherwise, load table name from config file.
let ts: app::TableSchema = app::table_schema(&cx).await;
let ts: app::TableSchema = app::table_schema(cx).await;
let primary_keys = identify_target(&ts, pval, sval);

debug!(
Expand Down Expand Up @@ -334,9 +339,14 @@ pub async fn get_item(cx: app::Context, pval: String, sval: Option<String>, cons

// put_item function saves an item with given primary key(s). You can pass other attributes with --item/-i option in JSON format.
// As per DynamoDB PutItem API behavior, if the item already exists it'd be replaced.
pub async fn put_item(cx: app::Context, pval: String, sval: Option<String>, item: Option<String>) {
pub async fn put_item(
cx: &mut app::Context,
pval: String,
sval: Option<String>,
item: Option<String>,
) {
debug!("context: {:#?}", &cx);
let ts: app::TableSchema = app::table_schema(&cx).await;
let ts: app::TableSchema = app::table_schema(cx).await;
let mut full_item_image = identify_target(&ts, pval, sval); // Firstly, ideitify primary key(s) to ideitnfy an item to put.

debug!(
Expand Down Expand Up @@ -386,9 +396,9 @@ pub async fn put_item(cx: app::Context, pval: String, sval: Option<String>, item
}

// delete_item functions calls DeleteItem API - delete an item with given primary key(s).
pub async fn delete_item(cx: app::Context, pval: String, sval: Option<String>) {
pub async fn delete_item(cx: &mut app::Context, pval: String, sval: Option<String>) {
debug!("context: {:#?}", &cx);
let ts: app::TableSchema = app::table_schema(&cx).await;
let ts: app::TableSchema = app::table_schema(cx).await;
let primary_keys = identify_target(&ts, pval, sval);

debug!(
Expand Down Expand Up @@ -423,7 +433,7 @@ pub async fn delete_item(cx: app::Context, pval: String, sval: Option<String>) {

// UpdateItem API https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_UpdateItem.html
pub async fn update_item(
cx: app::Context,
cx: &mut app::Context,
pval: String,
sval: Option<String>,
set_expression: Option<String>,
Expand All @@ -436,7 +446,7 @@ pub async fn update_item(
std::process::exit(1);
};

let ts: app::TableSchema = app::table_schema(&cx).await;
let ts: app::TableSchema = app::table_schema(cx).await;
let primary_keys = identify_target(&ts, pval.clone(), sval.clone());

debug!(
Expand Down Expand Up @@ -484,7 +494,7 @@ pub async fn update_item(

// https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/WorkingWithItems.html#WorkingWithItems.AtomicCounters
pub async fn atomic_counter(
cx: app::Context,
cx: &mut app::Context,
pval: String,
sval: Option<String>,
set_expression: Option<String>,
Expand Down
Loading

0 comments on commit f7517cd

Please sign in to comment.