Skip to content

Commit

Permalink
chore: added description as in-line comments
Browse files Browse the repository at this point in the history
  • Loading branch information
sifnoc committed Dec 6, 2023
1 parent 4b4e10e commit f227c1f
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 54 deletions.
10 changes: 10 additions & 0 deletions src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,16 @@ use tokio::time::{sleep, Duration};
use crate::json_mst::{JsonEntry, JsonMerkleSumTree};
use summa_backend::merkle_sum_tree::MerkleSumTree;

/// Executor role and functionality.
/// Acts as an intermediary between the Orchestrator and Workers, facilitating the data processing workflow.
/// Each Executor operates in a one-to-one relationship with a Worker, processing entry data into `mini-tree`.
///
/// Key aspects of the Executor's role include:
/// - **Spawning and Connection**: Executors connect with Workers to execute tasks, enhancing the system's scalability.
/// - **Data Handling and Task Distribution**: Executors manage and distribute data entries, ensuring smooth workflow
/// - **Communication Bridge**: They facilitate communication within the data pipeline, relaying 'mini-tree' from Workers to the Orchestrator.
///
/// Executors are dynamically spawned and connected to Workers for task execution.
#[derive(Clone)]
pub struct Executor {
client: Client,
Expand Down
105 changes: 58 additions & 47 deletions src/executor/spawner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,58 +2,69 @@ use std::{future::Future, pin::Pin};

use crate::executor::Executor;

/// ExecutorSpawner responsibility and types.
///
/// Responsible for initializing and terminating Executors, serving as a management point for creating Executor instances and Workers.
///
/// Types include:
/// - MockSpawner: For testing, runs `mini-tree-server` locally.
/// - LocalSpawner: Initializes Executors and Workers in local Docker environments.
/// - CloudSpawner: Optimized for cloud resources and Docker Swarm, manages containers as services for scalability.
///
/// Note: ExecutorSpawner is a trait with key methods `spawn_executor` and `terminate_executor`.
///
pub trait ExecutorSpawner {
// Spawns an executor asynchronously.
//
// This method initializes an Executor and returns a Future that resolves to the Executor.
//
// To achieve this asynchronously (outside of an async trait function), we use a one-time channel ('oneshot`) to deliver the variables to the Future.
//
// Internally, it performs the following codelines:
//
// 1. Uses a 'oneshot'channel for sending the variables from the spawned async task.
// ```
// let (tx, rx) = oneshot::channel();
// ```
// 2. Clones necessary variables (url, name and so on) to move them into the async block.
// ```
// let url = self.url.clone();
// ```
// 3. Spawns an asynchronous task (`tokio::spawn`) that asynchronously creates a worker and sends back its information.
// ```
// tokio::spawn(async move {
// if let Ok(worker_info) =
// Spawner::create_worker(url).await
// {
// let _ = tx.send(worker_info);
// }
// });
// Note that, the "create_worker" is typically declared in the "Spawner" struct that has "ExecutorSpawner"trait.
// 4. Returns a Future that, upon completion, provides an Executor connected to the newly spawned worker.
// ```
// Box::pin(async move {
// let url = rx.await.expect("Failed to receive worker URL");
// Executor::new(url, None);
// });
// ```
/// Spawns an executor asynchronously.
//
/// This method initializes an Executor and returns a Future that resolves to the Executor.
///
/// To achieve this asynchronously (outside of an async trait function), we use a one-time channel ('oneshot`) to deliver the variables to the Future.
///
/// Internally, it performs the following codelines:
///
/// 1. Uses a 'oneshot'channel for sending the variables from the spawned async task.
/// ```ignore
/// let (tx, rx) = oneshot::channel();
/// ```
/// 2. Clones necessary variables (url, name and so on) to move them into the async block.
/// ```ignore
/// let url = self.url.clone();
/// ```
/// 3. Spawns an asynchronous task (`tokio::spawn`) that asynchronously creates a worker and sends back its information.
/// ```ignore
/// tokio::spawn(async move {
/// if let Ok(worker_info) =
/// Spawner::create_worker(url).await
/// {
/// let _ = tx.send(worker_info);
/// }
/// });
/// Note that, the "create_worker" is typically declared in the "Spawner" struct that has "ExecutorSpawner"trait.
/// 4. Returns a Future that, upon completion, provides an Executor connected to the newly spawned worker.
/// ```ignore
/// Box::pin(async move {
/// let url = rx.await.expect("Failed to receive worker URL");
/// Executor::new(url, None);
/// });
/// ```
///
///
// Returns:
// - `Pin<Box<dyn Future<Output = Executor> + Send>>`: A Future that, when awaited, yields an Executor instance and spawns a worker.
//
// - "Pin<Box<dyn Future<Output = ()> + Send>>": A Future that, when awaited, yields an Executor instance and spawns a worker.
fn spawn_executor(&self) -> Pin<Box<dyn Future<Output = Executor> + Send>>;

// Terminates all spawned executors (and/or workers) asynchronously.
//
// This method is responsible for gracefully shutting down all active executors (and/or workers) by calling
// To do this, the "Spawner"may needs some fields for storing some accessing point to the workers, which are spawned with the executors.
// For deliver variables to Future results, use a channel like the pattern at 'spawn_executor'.
//
// The termination process typically involves:
// - Iterating through all active Executors and workers.
// - Invoking kind of 'shutdown'on each executors and workers to initiate their shutdown.
// - Awaiting the completion of all shutdown operations.
//
/// Terminates all spawned executors (and/or workers) asynchronously.
///
/// This method is responsible for gracefully shutting down all active executors (and/or workers) by calling
/// To do this, the "Spawner"may needs some fields for storing some accessing point to the workers, which are spawned with the executors.
/// For deliver variables to Future results, use a channel like the pattern at 'spawn_executor'.
///
/// The termination process typically involves:
/// - Iterating through all active Executors and workers.
/// - Invoking kind of 'shutdown'on each executors and workers to initiate their shutdown.
/// - Awaiting the completion of all shutdown operations.
///
// Returns:
// - `Pin<Box<dyn Future<Output = ()> + Send>>`: A Future that, when awaited, indicates that all executors (and/or workers) have been terminated.
// - "Pin<Box<dyn Future<Output = ()> + Send>>": A Future that, when awaited, indicates that all executors (and/or workers) have been terminated.
fn terminate_executors(&self) -> Pin<Box<dyn Future<Output = ()> + Send>>;
}
27 changes: 20 additions & 7 deletions src/orchestrator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@ use crate::aggregation_merkle_sum_tree::AggregationMerkleSumTree;
use crate::executor::ExecutorSpawner;
use crate::json_mst::JsonEntry;

/// The Orchestrator in Summa Aggregation
///
/// It serves as the central management component, coordinating data processing activities
/// between Executors and Workers, thereby improving the efficiency of building the Merkle sum tree.
///
/// Functions include dynamically spawning Executors, managing task distribution,
/// handling errors and pipeline control, and building the `AggregationMerkleSumTree`
/// by aggregating mini-trees constructed by the Workers.
pub struct Orchestrator<const N_CURRENCIES: usize, const N_BYTES: usize> {
executor_spawner: Box<dyn ExecutorSpawner>,
entry_csvs: Vec<String>,
Expand All @@ -23,12 +31,13 @@ impl<const N_CURRENCIES: usize, const N_BYTES: usize> Orchestrator<N_CURRENCIES,
}
}

// Calculate the range of tasks to be assigned to a executor.
//
// * `executor_index` - The index of the executor.
// * `total_executors` - The total number of executor.
//
// A tuple representing the start and end indices of the tasks assigned to the executor
/// Calculate the range of tasks to be assigned to a executor.
///
/// Parameters:
/// * `executor_index` - The index of the executor.
/// * `total_executors` - The total number of executors.
///
/// A tuple representing the start and end indices of the tasks assigned to the executor
fn calculate_task_range(
&self,
executor_index: usize,
Expand All @@ -46,8 +55,12 @@ impl<const N_CURRENCIES: usize, const N_BYTES: usize> Orchestrator<N_CURRENCIES,
}

/// Processes a list of CSV files concurrently using executors and aggregates the results.
/// This involves splitting the CSV files based on available executors, distributing tasks,
/// and aggregating the results into an `AggregationMerkleSumTree`.
///
/// * `executor_count` - The number of executors to use.\
///
/// * `executor_count` - The number of executors to use.
/// Note: After processing, executors are terminated to release resources.
///
/// Data flow
///
Expand Down

0 comments on commit f227c1f

Please sign in to comment.