diff --git a/README.md b/README.md index 7784925..b7be6b6 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ ## ᝰ.ᐟ What am i? -i'm hoopoe, the social event platform allows your hoop get heard! +i'm hoopoe, a realtime social event platform allows your hoop get heard! ## Execution flow & system design? @@ -19,7 +19,7 @@ i'm hoopoe, the social event platform allows your hoop get heard! - **step3)** instance of `NotifData` is cached on redis and stored in db. -- **step4)** client invokes `/notif/get/owner/` api to get its notification during the app execution in a short polling manner. +- **step4)** client invokes `/notif/get/owner/` api to get its notification during the app execution in a short polling manner or through ws streaming. ``` ------------------ server1/node1 actor ----------------- ___________ diff --git a/src/apis/v1/ws/notif.rs b/src/apis/v1/ws/notif.rs index 525ed73..57b1cb4 100644 --- a/src/apis/v1/ws/notif.rs +++ b/src/apis/v1/ws/notif.rs @@ -21,7 +21,9 @@ use crate::*; the notif broker however stores data on redis and db allows the client to fetch notifs for an owner in a short polling manner this way is used to fetch all notifs for an owern in realtime as - they're receiving by the RMQ consumer. + they're receiving by the RMQ consumer, there is a jobq mpsc channel + being used to send the received notif through RMQ to this channel + likely we're receiving it in here using the rx of the mpsc channel. addr: localhost:2344/v1/stream/notif/consume/?owner=100&room=notif_room owner is the notification owner which must be equal to the `receiver_info` field inside the notif_data instance received by the consumer. diff --git a/src/interfaces/crypter.rs b/src/interfaces/crypter.rs index 6718209..4d4980f 100644 --- a/src/interfaces/crypter.rs +++ b/src/interfaces/crypter.rs @@ -34,13 +34,13 @@ use crate::*; the async keywords. */ pub trait Crypter{ - fn encrypt(&self, secure_cell_config: &mut SecureCellConfig); - fn decrypt(&self, secure_cell_config: &mut SecureCellConfig); + fn encrypt(&mut self, secure_cell_config: &mut SecureCellConfig); + fn decrypt(&mut self, secure_cell_config: &mut SecureCellConfig); } // used for en(de)crypting image in form of Vec slice or &[u8] impl Crypter for &[u8]{ - fn encrypt(&self, secure_cell_config: &mut wallexerr::misc::SecureCellConfig){ + fn encrypt(&mut self, secure_cell_config: &mut wallexerr::misc::SecureCellConfig){ match wallexerr::misc::Wallet::secure_cell_encrypt(secure_cell_config){ // passing the redis secure_cell_config instance Ok(data) => { secure_cell_config.data = data @@ -64,7 +64,7 @@ impl Crypter for &[u8]{ }; } - fn decrypt(&self, secure_cell_config: &mut wallexerr::misc::SecureCellConfig){ + fn decrypt(&mut self, secure_cell_config: &mut wallexerr::misc::SecureCellConfig){ match wallexerr::misc::Wallet::secure_cell_decrypt(secure_cell_config){ Ok(encrypted) => { @@ -101,7 +101,7 @@ impl Crypter for &[u8]{ // used for en(de)crypting image in form of Vec impl Crypter for Vec{ - fn encrypt(&self, secure_cell_config: &mut wallexerr::misc::SecureCellConfig){ + fn encrypt(&mut self, secure_cell_config: &mut wallexerr::misc::SecureCellConfig){ match wallexerr::misc::Wallet::secure_cell_encrypt(secure_cell_config){ // passing the redis secure_cell_config instance Ok(data) => { secure_cell_config.data = data @@ -125,7 +125,7 @@ impl Crypter for Vec{ }; } - fn decrypt(&self, secure_cell_config: &mut wallexerr::misc::SecureCellConfig){ + fn decrypt(&mut self, secure_cell_config: &mut wallexerr::misc::SecureCellConfig){ match wallexerr::misc::Wallet::secure_cell_decrypt(secure_cell_config){ Ok(encrypted) => { @@ -161,9 +161,18 @@ impl Crypter for Vec{ // used for en(de)crypting data in form of string impl Crypter for String{ - fn decrypt(&self, secure_cell_config: &mut SecureCellConfig){ + fn decrypt(&mut self, secure_cell_config: &mut SecureCellConfig){ + + // encrypt convert the raw string into hex encrypted thus + // calling decrypt method on the hex string returns the + // raw string + secure_cell_config.data = hex::decode(&self).unwrap(); match Wallet::secure_cell_decrypt(secure_cell_config){ // passing the redis secure_cell_config instance Ok(data) => { + + // update the self by converting the data into string format from its utf8 + *self = std::str::from_utf8(&data).unwrap().to_string(); + secure_cell_config.data = data }, Err(e) => { @@ -185,11 +194,19 @@ impl Crypter for String{ }; } - fn encrypt(&self, secure_cell_config: &mut SecureCellConfig){ - match Wallet::secure_cell_encrypt(secure_cell_config){ + fn encrypt(&mut self, secure_cell_config: &mut SecureCellConfig){ + + // use the self as the input data to be encrypted + secure_cell_config.data = self.clone().as_bytes().to_vec(); + + match Wallet::secure_cell_encrypt(secure_cell_config){ Ok(encrypted) => { let stringified_data = hex::encode(&encrypted); + + // update the self or the string with the hex encrypted data + *self = stringified_data; + // update the data field with the encrypted content bytes secure_cell_config.data = encrypted; diff --git a/src/server/mod.rs b/src/server/mod.rs index 7fa6dd6..5e78f32 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -177,8 +177,9 @@ impl HoopoeServer{ let connection = sea_orm::Database::connect( db_url ).await.unwrap(); - let fresh = args.fresh; + // migration process at runtime + let fresh = args.fresh; // if fresh{ // log::info!("fresh db..."); // Migrator::fresh(connection).await.unwrap(); diff --git a/src/workers.rs b/src/workers.rs index d4f2b15..1ed004e 100644 --- a/src/workers.rs +++ b/src/workers.rs @@ -16,4 +16,5 @@ pub mod cqrs; // cqrs actor components pub mod notif; // broker actor component pub mod zerlog; // zerlog actor component -pub mod scheduler; // hoop scheduler actor component \ No newline at end of file +pub mod scheduler; // hoop scheduler actor component +pub mod actor; \ No newline at end of file diff --git a/src/workers/Kafka.md b/src/workers/Kafka.md new file mode 100644 index 0000000..406df92 --- /dev/null +++ b/src/workers/Kafka.md @@ -0,0 +1,112 @@ + +### **1. Kafka Topics** +A **topic** in Kafka is a logical channel to which producers send messages and from which consumers read messages. Think of a topic as a named category or feed, and each message published to Kafka is categorized under a topic. It's like an exchange it RMQ. Topic is where all the messages get collected in there. + +- **Example**: If you are working with a log aggregation system, you might have topics like `application_logs`, `error_logs`, and `event_logs`. + +- **Durability**: Kafka topics store data for a specified retention period, even after messages are consumed. This makes it possible to replay or reprocess the data. Unlike the RMQ which removes the messages from the queue once the cosumer receives them this enforces us to use a new queue per each consumer. + +### **2. Partitions** +A **topic** is divided into multiple **partitions** to enable parallelism and scalability. + +- **Partitions** allow Kafka to scale horizontally, meaning that more partitions allow more consumers to consume in parallel, leading to higher throughput. + +- **Message Order**: Within a single partition, Kafka guarantees the order of messages (i.e., messages are read in the order they are written). However, across multiple partitions, Kafka doesn't guarantee message ordering. + +- **Partition Key**: When producing messages, you can specify a **partition key** to control which partition the message is routed to. If no key is specified, Kafka will use a round-robin strategy to distribute messages across partitions. It's like routing key in RMQ. + +- **Example**: A topic `sensor_readings` could have 10 partitions. If you send messages with the same sensor ID as a partition key, all readings from that sensor would go to the same partition, maintaining their order. + +### **3. Messages** +A **message** in Kafka is the basic unit of data. It consists of: + - **Key** (optional): Helps to determine which partition the message is routed to. + - **Value**: The actual data (the payload) that is being sent (e.g., a JSON object, string, etc.). + - **Timestamp**: When the message was created. + + Kafka **messages** are stored in **topics** and are read by consumers. + +- **Message Structure**: Messages can be serialized in different formats (e.g., JSON, Avro, Protobuf), depending on how the data is intended to be consumed. + +### **4. Batch Sending** +To optimize performance, Kafka producers can send messages in **batches** rather than individually. + +- **Batching** reduces the number of network requests, as multiple messages are sent in a single request. This leads to higher throughput and better utilization of Kafka brokers. + +- Producers buffer messages in memory and send them as a batch when either: + - A certain batch size limit is reached. + - A certain time limit is exceeded. + +- **Trade-off**: Sending in batches reduces network overhead but can increase latency, as messages are delayed in memory until the batch is full or the time limit is reached. + +### **5. Offset** +An **offset** is a unique identifier that Kafka assigns to each message within a partition. It indicates the position of a message within that partition. Allows consumers resume consuming where they've left. + +- **Message Offset**: Kafka keeps track of each message using its offset within the partition. Each partition has its own sequence of offsets starting from `0` and increasing as more messages are produced. + +- **Consumer Offset**: Consumers use offsets to track which messages have been read. When a consumer reads a message from a partition, it can store the offset to know where to resume if it needs to continue later (e.g., after a crash or restart). + +- **Offset Example**: In a partition, the first message might have offset `0`, the next one `1`, and so on. + +### **6. Single Consumers vs. Consumer Groups** +Kafka's flexibility comes from how it handles consumers, and it supports two main models: **single consumers** and **consumer groups**. + +#### **Single Consumer** +A **single consumer** consumes data from one or more partitions of a topic. Each partition is assigned to one consumer, and only one consumer processes messages from each partition. + +- **Example**: If a topic has 3 partitions and you have 1 consumer, that consumer will read from all 3 partitions sequentially. + +#### **Consumer Groups** +A **consumer group** is a group of consumers that work together to consume messages from a topic. Kafka ensures that each partition is consumed by **only one consumer within the group**. + +- **Parallel Processing**: Kafka distributes partitions among consumers in the group, allowing messages to be processed in parallel. + +- **Example**: If you have a topic with 4 partitions and 2 consumers in the same group, Kafka will assign 2 partitions to each consumer. If a consumer crashes, Kafka will rebalance and reassign the partitions to the remaining consumers. + +- **Multiple Consumer Groups**: Multiple consumer groups can consume the same topic independently. Each group maintains its own offsets, so they don't interfere with one another. + - **Example**: You could have two different consumer groups, one for real-time processing (group A) and another for batch processing (group B), both consuming from the same topic but handling the data in different ways. + +- Consumer groups are like multiple queues bounded to multiple exchanges each of which receives related messages from the exchanges. like consumer1 bind its queue to exchange1 and exchange2. + +### **7. Committing Messages** +Committing a message in Kafka means recording that the message has been successfully processed by a consumer like ack in RMQ, allowing Kafka to manage offsets properly. + +- **Auto-Commit**: By default, Kafka can automatically commit offsets periodically, meaning that the consumer keeps track of the last message it read. However, this can lead to issues if the consumer crashes after reading a message but before processing it. + +- **Manual Commit**: With **manual commit**, consumers can explicitly commit offsets after processing each message (or batch of messages). This gives consumers control over when to mark a message as processed, ensuring better fault tolerance. + + - **Example**: Suppose a consumer reads a message, processes it, and then manually commits the offset for that message. If the consumer crashes before committing the offset, it will re-read the message upon recovery. + +- **Offset Committing in Consumer Groups**: Each consumer in a group commits offsets independently. If a consumer fails, the new consumer taking over the partition will resume from the last committed offset. + +--- + +### **How These Components Work Together in Kafka**: + +1. **Topic Creation**: + - Let's say you have a Kafka topic `user_activity_logs` with 6 partitions. + +2. **Producer Sends Messages**: + - A producer sends log messages (user interactions) to Kafka. If the producer specifies a **key** (e.g., `userID`), Kafka uses the key to route messages to specific partitions. Messages are sent in **batches** to reduce network overhead. + +3. **Consumers in a Group**: + - You have 3 consumers (in **consumer group A**) reading from `user_activity_logs`. Kafka assigns each consumer 2 partitions to read from (since the topic has 6 partitions and there are 3 consumers). + +4. **Message Offset Tracking**: + - Each message in the partition is assigned an **offset** (e.g., 0, 1, 2...). Consumers track these offsets so they know where to resume in case of failure. + - The consumers can **commit offsets** manually or let Kafka handle it automatically, ensuring that they process each message only once. + +5. **Rebalancing and Scaling**: + - If a new consumer is added to the group, Kafka automatically **rebalances** the partitions among all consumers, ensuring that no two consumers in the group read from the same partition. Conversely, if a consumer crashes, Kafka redistributes its partitions to the remaining consumers. + +--- + +### **Summary**: +- **Kafka topics** are high-level channels for organizing data. +- **Partitions** allow parallelism by splitting a topic into substreams. +- **Messages** are the individual data units that are published and consumed. +- **Batch sending** improves producer performance by sending multiple messages in a single network call. +- **Offsets** track message positions within partitions, both for Kafka's storage and consumers' progress. +- **Single consumers** read data from specific partitions, while **consumer groups** allow partitioned, parallel processing. +- **Committing messages** is how Kafka ensures data processing reliability by keeping track of the last successfully processed message per consumer. + +This design enables Kafka to scale, handle large volumes of data, and maintain reliability in data processing systems. \ No newline at end of file diff --git a/src/workers/README.md b/src/workers/Rmq.md similarity index 100% rename from src/workers/README.md rename to src/workers/Rmq.md diff --git a/src/workers/actor.rs b/src/workers/actor.rs new file mode 100644 index 0000000..2e8990a --- /dev/null +++ b/src/workers/actor.rs @@ -0,0 +1,1238 @@ + + + +// ------------------------------------------------------------ +/* actor worker threadpool implementations from scratch + + https://ryhl.io/blog/actors-with-tokio/ + https://medium.com/@maturationofthe/leveraging-rusts-tokio-library-for-asynchronous-actor-model-cf6d477afb19 + https://www.reddit.com/r/rust/comments/xec77k/rayon_or_tokio_for_heavy_filesystem_io_workloads/ + + + What is an Actor (modern threadpool)? + Actor is a threadpool or a single threaded structure which has its own mailbox and cron scheduler + to receive and execute tasks inside its thread of execution it can use tokio or os threads to execute + async io or cpu tasks, they talk through message sending patterns like mpsc in local and grpc remotely + the message or task execution can be happened by receiving the task from the actor eventloop which is + the receiver of the mailbox jobq mpsc channel in local or the grpc remotely then execute in a free + thread if it has threadpool or its own single thread of execution, the thread can be either a light + or os thread for executing async io or intensive cpu tasks. + + runtime takes the async task and put it inside the thread queue + then at an appropriate time the scheduler pop the task out of the + thread queue to execute it and if a thread is free it tries to steal + the task from other threads, once the task gets executed the runtime + waker of future object poll the task result out and other codes can + fill the placeholder to use the actual value of the solved task. + async tasks execution flow in user space level (not os threads): + >_ code : developer put a future on the stack + >_ code : hey runtime scheduler, developer did an await on the task in a lightweight thread of execution of an actor which has its own stack! + >_ code : if you don't mind me asking, can i have the result right now? + >_ runtime scheduler: allow me to poll the result out of the future stack,... oh no sorry the future is not ready yet! i'll go for other tasks and notify you once it gets ready + >_ code : alright sir, i'll keep executing other tasks, perhaps sending the result through an mpsc channel to.... + >_ runtime scheduler: hey i've just received the result of the future you wanted earlier from the waker, here is the result. + >_ code : as i was saying, ....to outside of this current thread! great other threads and scopes can now use the result as they're receiving the result from the channel that i've sent earlier. + + the threadpool of tokio is used for light io and rayon is used for cpu heavy io and is global and shared + across the program to speedup the execution with jobq mpsc channel like the one in actor object to execute + received message or future task/job in its own thread. however in those days that async wasn't there we had + been using the queue concepts to queue some tasks then execute them one by one but with the arrival of the + async tasks/jobs/processes we can just wait for multiple tasks while we're executing others simultaneously + and concurrently, here is the concurrency model for actor worker object (actix): + > example of executing async task: + - handle each socket in the background asyncly in a tokio spawn like writing bytes to a networking interface like tcp based protocols (ws, http, rmq, redis and rpc) + - mutating an object in a separate thread by locking on the mutex to acquire the lock to avoid blocking the current thread + - waiting for some io to get received while other tasks are being executed simultaneously + ::> in this case the runtime scheduler check for the task to poll out of the stack if + it has completed otherwise the waker fo the future object notify the scheduler as + soon as the task gets completed. + > creating new actor object creates new lightweight thread of execution using tokio spawn to execute asyc tasks inside that + > thread management: each actor has its own lighweight thread of execution (an actor object is a lightweight worker thread) + > async task process execution in the background: use tokio::spawn over tokio runtime scheduler + > send async task resp across threads: use jobq based channels mailboxes, mpsc or rpc + > async task example: atomic syncing for mutating data in threads using arc mutex + > control over the execution flow: use tokio select to only join that thread which is completed sooner than others + > distributed clustering talking: use rpc for sending message and calling each other methdos + conclusion: + A threadpool has its own internal eventloop queue for popping out tasks. + actor is a simple structure that can be used to execute async tasks and jobs in the whole + actor system threadpool they can also communicate and send message to each other by using + their mailbox, mailbox gives each actor a unique address to send message togehter using + channels like mpsc and oneshot in local and rpc and rmq in a remote manner. + if there are two many variables to be stored on CPU registers they'll be stored on the stack + related to the current thread cause each thread gets a seprate stack. + tokio runtime will execute all async tasks in its lightweight threads when we put the + #[tokio::main] above the main function we can also spawn async task inside a separate + lightweight thread manually by using tokio::spawn which contains lightweight thread workers + can be used to build actor to execute tasks inside of it hence talk with mpsc channels, we + can build multithreaded web servers upon tokio runtime in which each socket will be handled + inside tokio spawn threads as well as executing each api + + ==================================== + HOW 2 USE THESE IMPLEMENTATIONS: + ==================================== + for worker in 0..10{ //// spawning tokio green threads for 10 workers + tokio::spawn(async move{ //// spawning tokio worker green threadpool to solve async task + + //// any heavy logic here that must be shared using tokio channels inside a threadpool + //// ... + + }); + } + + let mut tokio_async_worker = AsyncWorker::new(); + let mut native_sync_worker = NativeSyncWorker::spawn(n_workers); + let mut rayon_sync_worker = RayonSyncWorker::new(); + let (sender, receiver) = std_mpsc::channel::(); + let cloned_sender = sender.clone(); + + native_sync_worker.execute(move ||{ + let async_heavy_method = || async move{ + // mining(); + let big_end_bytes = number.to_be_bytes(); + let index = 0; + let new_chunk = cloned_ops(big_end_bytes[index]); + cloned_sender.send(new_chunk).unwrap(); + } + block_on(async_heavy_method()); + }); + + rayon_sync_worker.spawn(move ||{ + block_on(async_heavy_method()); + }); + + tokio_async_worker.spawn(async move{ + async_heavy_method().await; + Ok(()) + }) + tokio_async_worker.execute().await // wait for all the workers of this worker to complete if there were any + + + let bytes: Vec = receiver.iter().take(n_workers).collect() // collecting data from all workers + +*/ + +use std::any::Any; +use std::ops::{Deref, DerefMut}; +use std::{default, fmt, thread}; +use std::sync::mpsc as std_mpsc; +use futures::future::select; +use sha2::digest::generic_array::functional; +use tokio::sync::mpsc; +use futures_util::Future; +use is_type::Is; +use uuid::Uuid; +use std::sync::{Arc, Mutex}; +use log::info; +use crate::*; + + +struct ActerMessage{ + pub to: String, + pub from: String, + pub body: String, +} + +#[derive(Clone)] +struct Acter{ + // std::sync::Mutex is not Send so we can't move it into tokio spawn + // we must use tokio Mutex + pub mailbox: Arc>>, + pub communicator: tokio::sync::mpsc::Sender, +} + +impl Acter{ + + pub async fn send(&mut self) -> std::pin::Pin>>{ + todo!() + } + + // execute tasks and messages in its threadpool (mainly tokio spawn) + pub async fn execute(&mut self){ + + // use custom worker threadpool + let this = self.clone(); + let mut pool = workerthreadpool::sync::RayonSyncWorker::new(); + pool.spawn(Box::new(||{ + // don't deref the arced_mutex_mailbox since Clone is not implemented for that + // and can't be move out of the type since derefing return the owned type it's + // kinda like clone the type + tokio::spawn(async move{ + // PROBLEM: can't move out self because it's behind a mutable pointer + // some how we should move it to tokio scope without losing ownership + // passing its ref to tokio scope is not ok since the reference won't + // be valid and must be static cause self is only valid inside the method + // body and by moving the &self.mailbox, the pointer will be in tokio scope + // and the `self` itself will be dropped out once the method gets executed + // so it escapes the method body, + // SOLUTION: use clone, Box, Arc, (Rc is for single thread) + // we've used Arc and Mutex to make it sendable, shareable and safe to share + let arced_mutex_mailbox = this.mailbox.clone(); + let mut mailbox = arced_mutex_mailbox.lock().await; + while let Some(task) = (*mailbox).recv().await{ + // ... + } + + }); + })); + + pool.execute().await; // it will receive the spawned task inside a free thread then it call it + + } + + pub async fn start(&mut self) -> Self { + // create mailbox and communicator + // ... + todo!() + } + +} + + +pub mod workerthreadpool{ + + /* + --------------- GUIDE TO CREATE A MULTITHREADED WEB SERVER --------------- + every worker is a thread with an id along with the thread itself, a threadpool is a vector containing the number + of spawned workers then we'll send the async job to the sender channel by calling the execute method and while we're + streaming over the arced mutexed receiver inside each thread we receives the task in on of those free threads and + finally call the async task, the spawning threads in the background part are done inside the spawn() method also every + worker threadpool needs a channel and multiple spawned threads in the background so we could send the task to the + channel and receives it in one of the free spawned thread so the steps would be: + 1 - create channel and spawn threads in the background waiting to receive from the channel by calling new() method + 2 - pass the task to spawn() or execute() method then send it to channel + 3 - make sure receiver is of type Arc> + + how threadpool works? + once the execute method is called the task is sent to the jobq channel + spawned threads on the other hand are thrilling to receive the task + coming from the channel that's why we should put the receiver inside + mutex and make it arc to move it between threads, once a free thread + acquire the lock on the receiver then the task can be called inside + that thread + */ + pub use super::*; + + // ------------------------------------------------------------ + // ------------ aync worker threadpool using tokio ------------ + // ------------------------------------------------------------ + // async worker pool scheduler using tokio based on mpsc jobq channel protocol + // this scheduler is used for asynchronous IO by not blocking the thread using tokio green threads + pub mod _async{ + + use super::*; + + pub struct AsyncWorker{ + count: usize, // number of workers + sender: mpsc::UnboundedSender>, // sender async side with no byte limitation + receiver: mpsc::UnboundedReceiver>, // receiver async side with no byte limitation + } + + + impl AsyncWorker{ // E can be shared between threads + + pub fn new() -> Self{ + let (sender, + receiver) = mpsc::unbounded_channel(); // async mpsc jobq channel channel with no byte limitation to avoid deadlocks and race conditions + AsyncWorker{ + count: 0, // will be equaled to the number of workers by solving all the jobs which are comming to the downside of the mpsc jobq channel + sender, + receiver + } + } + + pub fn spawn(&mut self, task: T) + where + T: Future> + Send + 'static, // T can be shared between threads + T::Output: Is>, // T is a future and now we can access the Output type to make sure that is of type Result<(), E> - T::Output is the GAT of the Future trait + { + let sender = self.sender.clone(); + tokio::spawn(async move{ // spawn the task inside tokio green threads + let res = task.await; + match sender.send(res.into_val()){ + Ok(()) => (), + Err(_) => panic!("Impossible Panic for Sender"), + } + }); + self.count += 1; + } + + + pub async fn execute(mut self) -> Result<(), E>{ + + std::mem::drop(self.sender); // make sure that the sender is dead since we want to receive all the messages and avoid deadlocks and race condition + let mut index = 0; + + loop{ // we can use while let Some() syntax + match self.receiver.recv().await{ + Some(Ok(())) => { + assert!(index < self.count); + } + Some(Err(e)) => { + assert!(index < self.count); + return Err(e); + } + None => { + assert_eq!(index, self.count); + break Ok(()); // return this to the main + } + } + index+=1; + } + + } + + } + + type AsyncJob = Box F + Send + Sync + 'static>; + fn create_async_job(job: impl FnOnce(I) -> F + Send + Sync + 'static) -> AsyncJob // static dispatch for job + where F: std::future::Future, + O: Send + Sync + 'static + { + Box::new(job) + } + async fn run(){ + let async_job = create_async_job::(|status|async move{String::from("")}); + tokio::spawn(async move{ + async_job(0).await; + }); + } + + + } + + + // ---------------------------------------------------------------------------------- + // ------------ none async worker threadpool using rayon and std::thread ------------ + // ---------------------------------------------------------------------------------- + // a sync task scheduler (worker pool) with mpsc as the jobq channel protocol + // this scheduler is used for synchronous IO by blocking the thread using rust native std thread - alternative to this is rayon + pub mod sync{ + + + use super::*; + + type Job = Box; // a job is of type closure which must be Send and static across all threads inside a Box on the heap + + + //// there is no guaranteed order of execution for spawns, given that other threads + //// may steal tasks at any time, however, they are generally prioritized in a LIFO order + //// on the thread from which they were spawned, other threads always steal from the + //// other end of the deque, like FIFO order, the idea is that recent tasks are most + //// likely to be fresh in the local CPU's cache, while other threads can steal older stale tasks. + pub struct RayonSyncWorker{ + count: usize, // number of workers + sender: mpsc::UnboundedSender, // sender async side with no byte limitation + receiver: mpsc::UnboundedReceiver, // receiver async side with no byte limitation + } + + + impl RayonSyncWorker{ + + pub fn new() -> Self{ + let (sender, + receiver) = mpsc::unbounded_channel(); // async mpsc jobq channel channel with no byte limitation to avoid deadlocks and race conditions + RayonSyncWorker{ + count: 0, // will be equaled to the number of workers by solving all the jobs which are comming to the downside of the mpsc jobq channel + sender, + receiver + } + } + + pub fn spawn(&mut self, task: Job) + where + { + let sender = self.sender.clone(); + rayon::spawn(move || { // firing off a task into the rayon threadpool in the 'static or global scope + match sender.send(task){ + Ok(()) => (), + Err(_) => panic!("Impossible Panic for Sender"), + } + }); + self.count += 1; + } + + pub async fn execute(mut self) -> Result<(), Box>{ + + std::mem::drop(self.sender); // make sure that the sender is dead since we want to receive all the messages and avoid deadlocks and race condition + let mut index = 0; + + loop{ // we can use while let Some() syntax + match self.receiver.recv().await{ + Some(job) => { + job(); + assert!(index < self.count); + }, + None => { + assert_eq!(index, self.count); + break Ok(()); // return this to the main + } + } + index+=1; + } + + } + + } + + /* + NOTE: the process of heavy cpu io must not be blocking that's why rayon is not + going to be used for none blocking operations cause it moves the tasks into the cpu + core instead of using threads per cpu it uses one thread per each cpu core, tokio + however can be used for io blocking tasks cause it uses lightweight thread of + execution and it blocks a light thread. + lightweight threads in tokio is user thread space and naitive threads in rayon is os + threads, the first ones have less overhead than the seconds ones. + spawning native threads are too slow since thread handling in rust is depends + on user base context switching means that based on the load of the IO in the + app rust might solve the data load inside another cpu core and use multiprocessing + approach, it's like rayon threadpool which are global threads and shared across the app + which causes the race with other threads and steal tasks + • https://www.reddit.com/r/rust/comments/az9ogy/help_am_i_doing_something_wrong_or_do_threads/ + • https://www.reddit.com/r/rust/comments/cz4bt8/is_there_a_simple_way_to_create_lightweight/ + */ + struct Worker{ + id: Uuid, + thread: Option>, //// thread is of type JoinHandld struct which return nothing or () + } + + pub struct NativeSyncWorker { + workers: Vec, + sender: std_mpsc::Sender, // all sends will be asynchronous and they never block + } + + enum Message { + NewJob(Job), + Terminate, + } + + impl NativeSyncWorker{ + + pub fn spawn(size: usize) -> NativeSyncWorker { + assert!(size > 0); + let (sender, receiver) = std_mpsc::channel(); + let receiver = Arc::new(Mutex::new(receiver)); // reading and writing from an IO must be mutable thus the receiver must be inside a Mutex cause data inside Arc can't be borrows as mutable since the receiver read operation is a mutable process + let mut workers = Vec::with_capacity(size); // capacity is not always equals to the length and the capacity of this vector is same as the maximum size based on the system arch, on 32 bits arch usize is 4 bytes and on 64 bits arch usize is 8 bytes + for _ in 0..size { // since the receiver is not bounded to trait Clone we must clone it using Arc in each iteration cause we want to share it between multiple threads to get what the sender has sent + workers.push(Worker::new(Uuid::new_v4(), Arc::clone(&receiver))); + } + NativeSyncWorker{workers, sender} + } + + pub fn execute(&self, f: F) where F: FnOnce() + Send + 'static { // calling this method means send the incoming task from the process through the mpsc sender to down side of the channel in order to block a free thread using the receiver on locking the mutex + let job = Box::new(f); + self.sender.send(Message::NewJob(job)).unwrap(); // by executing the task handler sender will send a job asynchronously and only one receiver at a time can get that job and solve it by locking on the mutex to block the choosen thread since thread safe programming is all about this pattern! + } + } + + impl Drop for NativeSyncWorker{ // shutting down all threads on ctrl + c by dropping all of them + fn drop(&mut self) { // destructor for NativeSyncWorker struct + info!("Sending terminate message to all workers."); + for _ in &self.workers { + self.sender.send(Message::Terminate).unwrap(); + } + info!("Shutting down all workers."); + for worker in &mut self.workers { + info!("Shutting down worker {}", worker.id); + if let Some(thread) = worker.thread.take(){ // take() takes the value out of the option, leaving a None in its place + thread.join().unwrap(); // joining on thread will block the current thread to get the computation result and stop the thread from being processed in the background + } + } + } + } + + impl Worker{ + fn new(id: Uuid, receiver: Arc>>) -> Worker { + let thread = thread::spawn(move || loop { // spawning a thread inside the new() method and waiting for the receiver until a job becomes available to down side of the channel + // following is called the eventloop handler, handles and execute + // all coming tasks from the channel in a loop using while let some + // streaming and execute them in a separate thread as they're getting + // received. + while let Ok(message) = receiver.lock().unwrap().recv(){ // iterate through the receiver to get all incoming messages - since other thread shouldn't mutate this message while this thread is waiting for the job we must do a locking on the message received from the sender to acquire the mutex by blocking the current thread to avoid being in dead lock, shared state and race condition situation + match message { + Message::NewJob(job) => { + info!("Worker {} got a job; executing.", id); + job(); // this might be an async task or job spawned by the tokio spawner in the background + } + Message::Terminate => { + info!("Worker {} was told to terminate.", id); + break; // break the loop of this worker so we are not interested in solving task any more + } + } + } + }); + Worker { + id, + thread: Some(thread), + } + } + } + + + } +} + +// see actor.rs for a real world example of writing a threadpool actor object +// executor uses an event loop to execute tasks in their own thread. +pub async fn ExecutorEventLoop(){ + + // actor worker streaming with while let some, ws, atomic addr, mailbox, rpc, redis, rmq) + // actor worker task execution with io vs os threads (lazy static &mut rc arc mutex rwlock select spawn channels scheudler interval) + // thread queue and runtime scheduler to pop the task out of the queue, send sync static separate io threads vs os threads thread joining + + + // concurrency : (&mut <-> Mutex RwLock) + // threadpool : light or os threads, eventloop (threadpool channel queue to handle events in a loop see actor.rs with threadpool) + // atomic syncing : channels mutex rwlock arc select + // future objects : async io task, thread joining on main (caller) thread + // purchase locking : lock the object when someone is minting it using select mutex spawn + + /* + simple executor without threadpool but with eventloop: + task is a unit of work thread or groutine that must be executed + by runtime executor by poping it out of the queue, where each task + runs to completion before the next task starts. + it's similar to how tokio event loop manages async io future object + using an eventloop in its lightweight thread of execution + */ + + use tokio::sync::{mpsc::channel, Mutex}; + enum EventData{ + Task(Task), + Quit + } + // traits as separate objects must be behind dyn keyword + // and boxed into the heap cause they're unsized + type Function = fn(); + struct Fucntions + Send + Sync + 'static>{ + pub func: Function, + pub cls: F, + pub boxed_cls: Box, + pub fut: A, + pub fut1: std::pin::Pin + Send + Sync + 'static>> + } + struct Task{ + name: String, + func: Box, // closures are traits, traits are not sized thus we use dyn and need to behind pointer hence Box + Send + Sync + 'static>>>, + } + impl Task{ + fn new(name: String, func: F) -> Self{ + Task { name, func: Box::new(func), fut: None } + } + fn run(self){ // don't use &self in here cause we can't move out of it since func is not cloned + (self.func)() // calling the the function + } + } + struct Executor{ + pub tx: mpsc::Sender, + pub rx: mpsc::Receiver + } + impl Executor{ + fn new(buffer_size: usize) -> Self{ + let (tx, rx) = tokio::sync::mpsc::channel::(buffer_size); + Executor{ + tx, + rx + } + } + fn spawn(&mut self, name: String, func: F){ + self.tx.send(EventData::Task(Task::new(name, func))); + } + // the event lopp + async fn run(&mut self){ + // await puases the execution and run the future until it completes + // futures don't block the thread of execution + while let Some(event_data) = self.rx.recv().await{ + match event_data{ + EventData::Task(task) => { + log::info!("executing the task with name: {}", task.name); + std::thread::spawn(move || task.run()); + }, + EventData::Quit => { + break; + }, + _ => { + panic!("invalid event data, event loop is panicked"); + } + } + } + } + } + +} + +pub fn StockPriceEvent(){ + + /* -------------------------------------------------------------------- + condvar is used to check that either a condition var is met + inside a mutex or not, this will block the mutex thread by + waiting on it until this cond var receives a notification in + somewhere else. + CondVar: block the thread such that it consumes no CPU time while + waiting for an event to occur, other threads can do their jobs + functions in this module will block the current thread of + execution, condvars are typically associated with a boolean + predicate (a condition) and a mutex, the predicate is always + verified inside of the mutex before determining that a + thread must block. + condvar blocks the mutex thread by waiting until new changes + is received which has notified in other threads. + what we're doing here is basically we're monitoring a stock price in + a safe manner in such a way that we're creating 10 threads, each of + them wants to mutate the price of the stock instance but the actual + instance is wrapped through a mutex and has a condvar, the updating + process is happened like we start by locking on the stock instance + then call update price method after that notify the condvar that the + value of the price of the stock instance has changed so the notification + process is happening inside each thread. + then in the function we're calling the wait for release which lock the + stock again and checks its price against a limit causes to block the + main (caller) thread until the price of the stock is smaller than the limit, it + depends on the update price every time the update price function update + the price of the stock a notif gets triggered which will be checked + by the wait for release method to check the price agains the limit this + process continues constantly the main (caller) thread is blocked until the price + reaches a higher amount than the limit. + */ + + + use std::sync::{Arc, Mutex, Condvar}; + + struct Buffer{ + pub data: Arc>>, + pub size: usize + } + + #[derive(Debug, Clone)] + struct Stock{ + name: String, + price: f64 + } + impl Stock{ + fn new(name: &str, price: f64) -> Self{ + Self { name: name.to_string(), price } + } + fn getPrice(&self) -> f64{ + self.price + } + fn getName(&self) -> &str{ // ret pointer, use the lifetime of the self + &self.name + } + fn updatePrice(&mut self, new_price: f64){ + self.price = new_price; + } + } + + // worker, locker + struct Monitor{ + pub event: std::sync::Mutex, + pub events: Option>, + pub event_signal: Condvar, + pub std_worker: thread::JoinHandle<()>, + pub tokio_worker: tokio::task::JoinHandle<()>, + pub locker: std::sync::Mutex<()> + } + + impl Monitor{ + fn new(init_event: Stock) -> Self{ + Self { + events: None, + event: std::sync::Mutex::new(init_event), + event_signal: Condvar::new(), + std_worker: thread::spawn(move ||{}), + tokio_worker: tokio::spawn(async move{}), + locker: std::sync::Mutex::new(()) + } + } + + fn update_price(&self, new_price: f64){ + let mut get_stock = self.event.lock().unwrap(); + (*get_stock).updatePrice(new_price); + + // wakes up one blocked mutex thread on this condvar + // we notify the condvar that the stock price is changed + // the update_price method locks the Stock instance, updates + // its price, and then calls notify_one() on the Condvar. + // this notifies any thread waiting or blocking on the Condvar + // that the stock price has changed. + self.event_signal.notify_one() // notify the blocked thread that the value has changed, wake it up from the wait status + } + + /* + once the price of the locked stock reaches the limit we wait, wait blocks the current thread + until this condition variable receives a notification which will be triggered inside the + update_price method means that the price of the stock has changed and we need to block the + mutex thread again until we reaches the limit again for the stock price. + in the wait_for_release() method, we lock the Stock object. it then enters a loop where + it continually checks if the price of the Stock is less than a certain limit. if the price + is less than the limit, the method calls the self.event_signal.wait(get_stock) + method. this block the current (main) thread of the mutex, until another thread calls notify_one() + or notify_all() on the same Condvar + the consequence of this, is that if the price of the Stock is initially less than the limit, + this method will block the current (main) thread until the price increases to the limit or above. + this will allow other threads to update the price of the Stock while the current (main) thread is + blocked. once the prices reaches the limit, the wait() method will return. the method will + exit the loop and continue executing. + using a Condvar in this way, we can effectively manage access to the Stock. By using the + wait_for_release() method, the main (caller) thread waits for the price of the Stock to reach a certain + limit before proceeding. this is useful in scenarios where the order of operations matters, + for example when one operation depends on the result of another. example scenarios would be + things like managing stocks, purchasing a product, or a warehouse ledger system. + */ + fn wait_for_release(&self){ + let limit = 115.0; + let mut get_stock = self.event.lock().unwrap(); + while get_stock.getPrice() < limit{ // we block and wait as long as the the price is smaller than the limit + get_stock = self.event_signal.wait(get_stock).unwrap(); + } + + } + + } + + + /* + testing: + basically in here we're updating the price + in 10 threads and block the main (caller) thread if + the price is smaller than the limit until + we notify the blocked thread by the condvar + that the price value is changed, then there + would be no need to wait for the notif until + another thread tries to update the price. + we spawn the update_price() method inside 10 + threads then block the main (caller) thread if the price + is not met the limit finally we iterate through + all the threads to join them on the main (caller) thread + and wait for them to finish. + waiting in os threads means blocking the thread + until we get the result. + */ + + // arc monitor to move it between threads + let monitor = Arc::new(Monitor::new(Stock::new("DOGTOKEN", 100.0))); + let threads = (0..10) + .map(|counter|{ + let cloned_monitor = monitor.clone(); + // we'll update the price of the monitor instance in a separate 10 of threads + thread::spawn(move ||{ + cloned_monitor.update_price(110.0 + 2.0*(counter as f64)); + }) + }) + .collect::>(); // if you don't know the type use _ + + // we'll check the price of the stock against the limit + // if it was less than the limit then we'll block the main + // thread until the notifier notify the condvar in another + // thread with a new value of the price, then we'll wait and + // block the thread until the price reaches higher than the limit again. + // ------- this blocks the main (caller) thread ------- + monitor.wait_for_release(); + + // join on all threads in main (caller) thread to execute the stock price task + for thread in threads{ + thread.join().unwrap(); + } + + // finally get the final value of the stock event after all mutations + let final_value = monitor.event.lock().unwrap(); + println!("final value of the stock is {:?}", final_value); + + + // wait_for_release() method blocks the main (caller) thread until we reach + // the limit, or receives a notification from the condvar which might + // happens in another thread by updating the price of the stock. + + /* + product minting: + - condvar with tokio spawn threads: use a condvar with a mutex and lock the product id in a tokio + io thread then check while the product is still locked or its state is not minted yet, + we'll block the current thread until the notifier notifies the condvar once the + product gets minted successfully and its state changed to minted then we'll remove it + from the lock_ids. + - channels with tokio spawn thrads: use a mutex and lock the product id in a tokio io thread + then send a true flag to a channel if the product id is being locked then start minting + product in another tokio io thread finally use select to control the flow of execution + of each joinhandle task. + */ + + +} + +pub fn MutexCondvarPlayground(){ + + /* + in monolithic : use mutex, rwlock, arc, channels, spawn, select for atomic syncing and avoiding deadlocks + in microservice: use distributed lock tools like redlock, zookeeper, k8s + eventloop streaming with channels as publisher and subscriber of events like condvar + atomic syncing with mutex, condvar and channels in async and none async block + + none async version: std threadpool executor eventloop, channels, mutex will block the current thread and wait for the task to gets executed + async version : tokio threadpool executor eventloop, channel, mutex won't block the thread and execute tasks in the background thread + + NOTE: don't use Condvar in async environment cause it blocks the thread + NOTE: Mutex in both std and tokio will block the thread to make sure that only one thread can access data for writing + + use channel to share data between threads instead of Mutex or RwLock + tokio::spawn(), tokio::select! {}, channels, tokio::sync::Mutex + execute async io future obejcts in an io thread in a none blocking manner + there is no need to block threads cause we want to execute other tasks + without blocking any code flow or section in a concurrent manner. + + use mutex to ensure that only one thread can access the protected data. + mutex always block the current thread to ensure that only one thread can mutate the data. + do the locking in a separate thread if you care about executing rest of the codes in a none blocking manner. + use condvar to block the thread or wait for a notification on some value changes. + joining on thread block the thread till we get the result from that + in none async env we should use os threads which might get block during the execution when we need the result inside the thread. + + tokio runtime scheduler and executor can execute async io tasks in a none blocking manner + by awaiting on async tasks the executor pause the execution in there but don't block the thread + it goes to run and execute other tasks in other threads concurrently in a none blocking manner + it then notify the caller and fill the placeholder once the task completes. + */ + + + /* ---------------- BUCKET ---------------- + a bucket that contains a queue which is a pool of events + all the events are safe to be shared and mutated between threads. + it has a locker, worker, condvar and channels, eventloop streamer + to receive the task using receiver, use condvar with mutex + to lock the thread and wait for the notifier then if we want + to fill the bucket we should use either channels, or condvar mutex + */ + pub struct Bucket + Send + Sync>{ + pub signal: std::sync::Condvar, + pub broadcaster: std::sync::mpsc::Sender, + pub receiver: std::sync::mpsc::Receiver, + pub fut: F, + pub dep_injection_fut: std::pin::Pin>>, + pub worker_handler: std::thread::JoinHandle<()>, + pub queue: BufferEvent // a thread safe queue to mutate it - while !self.queue.is_empty() { pop the card out } + } + pub struct BufferEvent{ + pub data: std::sync::Arc>>, // Mutex always block the thread for mutating avoid accessing the data by other threads at the same time + pub size: usize + } + impl Drop for BufferEvent{ + fn drop(&mut self) { + self.data.lock().unwrap().clear(); // clear the whole events + } + } + /* ---------------------------------------- */ + + + // example: safe atomic data syncing between threads using mutex and condvar + #[derive(Debug, Clone)] + pub enum Color{ + Yellow, + Red, + Blue + } + #[derive(Clone, Debug)] + pub struct ExpensiveCar{ + pub brand: String, + pub color: Color + } + + impl fmt::Display for ExpensiveCar{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let color_string = format!("{:#?}", self.color); + write!(f, "brand: {}, color: {}", self.brand, color_string) // write to the current buffer formatter + } + } + + impl ExpensiveCar { + fn new(brand: &str, color: Color) -> ExpensiveCar { + ExpensiveCar { + brand: brand.to_string(), + color, + } + } + } + + #[derive(Debug)] + pub struct Garage{ + queue: Buffer, + guard: std::sync::Condvar, // the condvar notifier notifies the blocked or waiting thread that there might be data to be consumed. + } + + #[derive(Debug)] + pub struct Buffer{ + // Arc allows us to share between multiple threads by cloning the data (each thread gets its own ref to it) + // Mutex ensures only one thread at a time are mutating the data + pub data: std::sync::Arc>>, + } + impl Buffer{ + pub fn new() -> Self{ + Self{data: std::sync::Arc::new(std::sync::Mutex::new(vec![]))} + } + + pub fn size(&self) -> usize{ + let get_data = self.data.lock().unwrap(); + get_data.len() + } + } + + #[derive(Debug, Clone)] + pub struct PooledObject{ + pub item: T, // used to push back the last item into the data buffer when the pooled object is being dropped + pub pool: std::sync::Arc>>, + } + + // if you want to park means the garage must get blocked + // by the mutex and tells other threads that i'm busy with + // parking a car wait (block) until i release the lock and + // the car gets parked in there. + impl Garage{ + + pub fn new() -> Self{ + Self { queue: Buffer::new(), guard: std::sync::Condvar::new() } + } + + pub fn pool_size(&self) -> usize{ + self.queue.size() + } + + pub fn acquire(&self) -> Option>{ + let mut get_cars = self.queue.data.lock().unwrap(); + let pool_of_expensive_cars = (*get_cars).pop() + .map(|car| PooledObject{ // mapping the poped out element into a PooledObject + item: car, + pool: self.queue.data.clone(), + }); + pool_of_expensive_cars + } + + pub fn park(&self, car: ExpensiveCar){ // try to park an expensive car inside the garage + let mut get_queue = self.queue.data.lock().unwrap(); + (*get_queue).push(car); + + // notify the blocked thread that we have pushed a new car into the garage + // since the thread will be blocked if there would be no data in queue + // hence we should notify the thread as soon as a car comes into the garage + self.guard.notify_one(); + } + pub fn get(&self) -> ExpensiveCar{ + let mut get_queue = self.queue.data.lock().unwrap(); + while get_queue.is_empty(){ + get_queue = self.guard.wait(get_queue).unwrap(); + } + + // if the blocked thread gets woken up in here we remove the + // first item from the queue and return it since the get method + // method is about returning an expensive car + (*get_queue).remove(0) + } + + } + + + /* ---------------------------------------------------------------------- + impl ownership and borrowing traits for the PooledObject + deref mutable/immutable pointer will be called by the compiler at runtime + we can also use * to deref a type and get the value out of the pointer + deref any pointer (immutable or mutable) to the item of type T. + change the value of a pointer or &mut by dereferencing it like *v = 1; + + the drop, deref and derefmut will be called automatically at runtime + when the type wants to gets dropped out of the ram and dereferenced + by * or its inner value gets accessed by during the execution. + + note: pointers contain inner value of a type so accessing the inner + value requires to deref the pointer so when we use * it allows us to + access the inner value for mutating or reading, the Deref/DerefMut + traits however can be implemented for smart pointers like Mutex, Arc, Box + making them behave like regular references. + reimplementing the Deref/DerefMut trait methods for a type allows us + to call their methods on the object when we try to access the inner value + of the type by * or + + when we use * operator the deref or derefmut trait methods will be called + by the compiler as well as for smart pointers cause smart pointers implement + the traits any type that implements the deref or derefmut traits, trait methods + will be invoked once it’s dereferenced by * + */ + impl Drop for PooledObject{ + fn drop(&mut self) { + // we must push back the item into the pool cause we've poped it out + // in acquire method + println!("dropping pooled object, pushing back the item into the pool"); + let mut get_items = self.pool.lock().unwrap(); + (*get_items).push(self.item.clone()); + } + } + impl Deref for PooledObject{ // dereference the immutable pointer of type T which is the type of the pooled object item + type Target = T; + // compiler calls the deref() method once the type gets dereferenced by * immutably + fn deref(&self) -> &Self::Target { + println!("dereferencing pooled object"); + &self.item + } + } + impl DerefMut for PooledObject{ // dereference the mutable pointer of type T which is the type of the pooled object item + // this is useful when we have a mutable pointer to the last item like `ref mut` + // compiler calls the deref() method once the type gets dereferenced by * mutably + fn deref_mut(&mut self) -> &mut Self::Target { + println!("dereferencing pooled object mutably"); + &mut self.item + } + } + + /* + actor workers have os or ligh thread of executions (see actor.rs) + they can communicate with other parts through message passing techniques + using jobq channels, they're basically an smart threadpool object + with an isolated state from each other and are safe to mutate data + in their states. there must also be a task executor or some runtime + scheduler to handle task execution process in each thread. + tools: threadpool, mutex, spawn, arc, channels, executor and eventloop + */ + // worker threads + pub struct Handlers{ + pub producer: std::thread::JoinHandle<()>, + pub consumer: std::thread::JoinHandle<()> + } + + impl Handlers{ + pub fn new() -> Self{ + Self { producer: std::thread::spawn(move||{}), consumer: std::thread::spawn(move ||{}) } + } + pub fn produce(&mut self, garage: std::sync::Arc){ + // create 10 new expensive cars in a thread + // the producer threads, produces 10 expensive cars to park them + let cloned_garage = garage.clone(); + self.producer = thread::spawn(move || { + for i in 0..10{ + let new_expensive_car = ExpensiveCar::new("BMW", Color::Red); + cloned_garage.park(new_expensive_car); + } + }); + } + pub fn consume(&mut self, garage: std::sync::Arc){ + // get all the 10 cars from the garage one by one + // the consumer thread, consumes or get 10 expensive car from the garage + // since the garage queue is a lock hence we should wait and block the + // thread per each consumed data + self.consumer = thread::spawn(move ||{ + for i in 0..10{ + let get_expensive_car_from_queue = garage.get(); + println!("got a car: {}", get_expensive_car_from_queue); + } + }); + } + + } + + // with mutex and condvar we can control the producer and consumer + // threads at the same time, the first one is used to produce the + // data and the second one is used to consume the data by locking + // the data one at a time. + + let garage = std::sync::Arc::new(Garage::new()); + + + // ---------------------------------------------------------- + // poping all the cars out of the garage using consumer thread + // ---------------------------------------------------------- + let mut handler = Handlers::new(); + handler.produce(garage.clone()); // fill the garage + // handler.consume(garage.clone()); // pop out the cars one by one + + // join the both producer and consumer threads at the same time + // the first one is responsible for creating then parking cars + // inside the garage queue and second one is responsible for + // getting cars one by one from the garage, since the garage + // queue is a locker we should consider this that at the time of + // parking a new car we should notify the blocked thread since + // the thread or the consumer is blocked until a new card comes + // into the parking. + let Handlers { producer, consumer } = handler; + producer.join().unwrap(); + // consumer.join().unwrap(); + + // ---------------------------------------------------------- + // poping all the cars out of the garage using pooled object + // ---------------------------------------------------------- + + // testing pooledobject with mutable and immutable pointers to the + // the last item of type T + let pool = garage.clone(); + + // scope1 + { // define a new scope for fetching cars + let mut get_poped_out_item_with_pool = pool.acquire(); // acquireing the pooledobject + match get_poped_out_item_with_pool{ + // poped_out_item_with_pool is a mutable poitner to the PooledObject + // when dropping the object it calls the deref mut trait + Some(ref mut poped_out_item_with_pool) => { + println!("scope1 pooledobject : {:#?}", poped_out_item_with_pool.pool); + poped_out_item_with_pool.color = Color::Yellow; + }, + None => println!("there is no item!"), + } + + // after the scope has ended this object is dropped and automatically the last item returned to the pool + // since we're dereferencing mutably to mutate the color of the car the DerefMut trait will be called during + // this process and the log message will be printed out to the console. + } + + println!("-------------------------------------------"); + println!("pool size is : {:#?}", pool.pool_size()); + + // scope2 + let get_car = pool.acquire(); + match get_car{ + Some(ref car) => println!("scope2 pooledobject: {:?}", car), + None => println!("there is no item!"), + }; + + println!("-------------------------------------------"); + println!("pool size is : {:#?}", pool.pool_size()); + + // scope3 + let last_car = pool.acquire(); + match last_car{ + Some(ref car) => println!("scope3 pooledobject: {:?}", car), + None => println!("there is no item!"), + }; + + println!("-------------------------------------------"); + println!("pool size is : {:#?}", pool.pool_size()); + + +} + +pub async fn jobQChannelFromScratch(){ + + // use trait to pass different types to a function through a single interface + // use Any to try to cast any type that impls Any trait into an specific type + // pass different functions to a method using Fn closure + // dependency injection for observer field inside the Mutex (it can be other smart pointers like Arc and Box also) + + trait GetVal{ + fn getVal(&self) -> Self; + } + impl GetVal for T{ + fn getVal(&self) -> Self { + self.clone() + } + } + + struct Person{ + info: V, + // dependency injection with Mutex like Box smart pointers + subscribers: Vec>>, + } + + impl Person{ + pub fn new(info: V) -> Self{ + Self { info, subscribers: Vec::new() } + } + + // hanlding dynamic dispatch, supports any type through a single interface + pub fn set_info(&mut self, val: V){ + + // cast the value into the Any trait, we could use Box also + let any_val = &val as &dyn Any; + match any_val.downcast_ref::(){ + Some(string) => { + // ... + }, + None => { + println!("can't downcast it to string"); + } + }; + + self.info = val.getVal(); + self.publish(val); // name has changed + } + + // moving out of a type which is behind pointer takes + // the ownership of the type so we can't do this if the type is + // being used by the function scope and is behind a pointer, + // like self.name which returning it takes the ownership of + // the &self, Rust won't allow to return a pointer from the + // function since once the function gets executed all the inner + // types will be dropped out of the ram and having a pointer outside + // of the function would be meaningless unless we specify a valid + // lifetime for the pointer like using the self lifetime or pass + // directly to the function call addressing this issue would be either + // returning a pointer to the self.name or clone the self. + pub fn get_info(&self) -> &V{ + &self.info + } + + // this method adds new subscriber closure function to the current ones + pub fn subscribe(&mut self, f: F){ + self.subscribers.push(std::sync::Arc::new(std::sync::Mutex::new(f))); + } + + // this method publish a new value to all subscribers by iterating through + // all of them and call each closure function of them by passing the value + // the closure function body will be executed for each of the subscriber + fn publish(&self, value: V){ // this method notify all subscribers by passing the new value to each subscriber function + for subscriber in &self.subscribers{ // use & or clone the vector + if let Ok(subscriber_function) = subscriber.lock(){ + subscriber_function(value.clone()) // notify the subscriber with a new name, call the closure function and pass the new name to see the log + } + } + } + + } + + + // since the subscribe method has &mut self, in order to call + // the method on the instance in another thread we should wrap + // the instance in Mutex + let person = std::sync::Arc::new( + std::sync::Mutex::new( + Person::new(String::from("wildonion")) + ) + ); + + let cloned_person = std::sync::Arc::clone(&person); + + // -------------------------------------------------------- + // working with person object completely in another thread. + // -------------------------------------------------------- + let thread1 = thread::spawn(move ||{ + let mut person = cloned_person.lock().unwrap(); + person.subscribe(move |info| { + // the subscription logic goes here + // for now we're just logging things! + println!("[subthread subscriber]"); + println!("subscribing > value is : {}", info); + }); + + // updating the info field, will notify all subscribers + // and call their functions, which the logic has written right above + person.set_info(String::from("new wildonion")); + }); + + // ------------------------------------------------------------- + // working with person object completely inside the main (caller) thread. + // ------------------------------------------------------------- + // block the main (caller) thread for subscription + // subscribe() method push a new subscriber to the vector only + person.lock().unwrap().subscribe(move |info|{ + println!("[main (caller) thread subscriber]"); + println!("subscribing > value is : {}", info) + }); + + // set_info() change the info field as well as notify subscribers + // with the updated value + // block the main (caller) thread for changing the ingo + person.lock().unwrap().set_info(String::from("28")); + + // block the main (caller) thread to wait for the thread to complete the task + // wait for the thread to finish, this method returns immediately if the + // thread has already finished, so joining on the thread can be important + // if we need a result coming from the thread otherwise the thread will + // be solved in the background like tokio spawn threads. + thread1.join().unwrap(); + + +} \ No newline at end of file diff --git a/src/workers/actorWorker.md b/src/workers/actorWorker.md new file mode 100644 index 0000000..530570b --- /dev/null +++ b/src/workers/actorWorker.md @@ -0,0 +1,264 @@ + +Actors are threads with isolated and no shared state with outside world, they have a single thread of execution which enables them to execute jobs received by their mailbox jobq channel one at a time, each job however can be scheduled to be executed periodically or at an specific time. + +### **1. Workers and Threads**: +A **worker** typically refers to an execution unit (or process) that handles jobs or tasks. Workers can be implemented using **threads** or **processes** depending on the underlying concurrency model. + +- **Threads** are lightweight units of execution within a process. They share the same memory space, allowing for efficient data sharing but requiring synchronization mechanisms (e.g., locks) to prevent race conditions. + +- **Workers as Threads**: In many systems, workers are **threads** of execution, meaning each worker runs in its own thread and executes jobs concurrently. The runtime (like a thread pool) schedules the jobs to be executed by the threads. For example, in a typical **multithreading model** (e.g., in Python's `ThreadPoolExecutor` or Java's `ExecutorService`), each worker thread picks up jobs and executes them. + +--- + +### **2. Workers and Actors**: +In some systems, workers might be implemented as **actors** in the **actor model** of concurrency, which is different from the traditional thread-based model. + +- **Actor Model**: In this model, an **actor** is an independent unit of computation that: + - Has its own state and does not share memory with other actors. + - Communicates with other actors by **sending messages**. + - Processes one message at a time in isolation (single-threaded), which eliminates the need for explicit locking or synchronization. + +- **Workers as Actors**: In actor-based systems (like **Akka** in Scala/Java or **Erlang**), workers are often considered **actors**. Each worker (actor) handles a job by processing a message it receives. The system schedules these actors, and they communicate via messages instead of shared memory, ensuring more scalable and fault-tolerant designs. + +--- + +### **3. Workers Scheduled by Runtime**: +- **Runtime scheduling** typically refers to how the execution environment (e.g., a thread pool, actor system, or task scheduler) manages and schedules the execution of tasks or jobs by workers. + + - **For threads**: The operating system (OS) or a thread pool is responsible for scheduling threads based on the available resources (CPU, etc.). The runtime (like the JVM, Python interpreter, or Go runtime) will schedule worker threads to pick up jobs from a job queue and execute them. + + - **For actors**: In an actor-based system, the actor runtime (like Akka or Erlang's BEAM virtual machine) handles scheduling by dispatching messages to actors. Each actor processes one message at a time, ensuring no shared state and easier management of concurrency. + +--- + +### **So, are workers threads or actors?** + +- **Workers as threads**: In many traditional systems (e.g., web servers, thread pools, etc.), workers are implemented as **threads**, which execute jobs concurrently and share memory. The scheduling of these jobs is handled by the OS or runtime environment (like the JVM or Python runtime). + +- **Workers as actors**: In an **actor-based concurrency model**, workers can be **actors**. Actors are single-threaded, independent units that process messages one at a time. They don't share memory and rely on message-passing for coordination. + +### **Conclusion**: +- Workers can be **threads** in systems that use traditional thread-based concurrency. +- Workers can also be **actors** in systems that implement the actor model. +- The **runtime** schedules and manages the execution of these workers, whether they are threads or actors. + +Both models are valid, and the choice depends on the concurrency model you're working with. If you're using an actor-based framework, workers are actors. In thread-based models, workers are threads (or sometimes processes in multiprocessing systems). + +In the **actor model**, each actor handles **one message at a time** in its **single thread of execution**, which may seem counterintuitive in terms of speed, but it can still be **highly performant** due to several factors that make the actor model efficient for concurrent systems. + +### **Why Actors Handling One Message at a Time Can Be Fast**: + +#### 1. **No Locks or Synchronization Overhead** +- **Traditional multithreading** often requires shared state between threads, which leads to the need for locks, mutexes, or other synchronization mechanisms to prevent race conditions like channels. These synchronization mechanisms can cause bottlenecks, deadlocks, or contention, slowing down the system finally add some overheads. + +- **Actor model advantage**: Since each actor processes one message at a time in isolation (without shared state), there's **no need for locks or synchronization**. This eliminates one of the major performance penalties in traditional multithreading environments. Actors don't block or wait on shared resources because each has its own state. + +#### 2. **Message Passing and Asynchronous Execution** +- In the actor model, communication happens through **message passing**, and this is typically done **asynchronously**. When an actor sends a message to another actor, it doesn't wait for a response; it can continue processing other tasks or messages. + +- **Non-blocking**: The actor's non-blocking nature ensures that the system isn't slowed down by waiting for results. Instead, actors can keep working on their own queue of messages, processing each one independently. + +#### 3. **Efficient Use of System Resources (Event Loop)** +- In actor-based systems, the **actor runtime** (e.g., Akka, Erlang) often uses an **event loop** or a **lightweight threading model** (like **green threads** or **fibers**) to switch between actors efficiently. + +- **Lightweight threads**: These are far cheaper in terms of CPU and memory resources compared to OS-level threads. The runtime can manage thousands or even millions of actors on a few physical threads, which enables the system to handle massive amounts of concurrency. + +- **Example**: In Akka or Erlang, the runtime will multiplex actors over a small number of real threads (OS threads), but because actors don't block each other, the system remains highly efficient, processing many tasks concurrently even if each actor handles one message at a time. + +#### 4. **Scalability and Distribution** +- The actor model is designed to **scale horizontally**. You can distribute actors across multiple machines or cores without worrying about shared memory or locks. + +- **Distributed actors**: In a distributed system (e.g., with Akka or Erlang), actors can be placed on different nodes, allowing the system to handle a massive amount of work in parallel. The scalability allows the system to be highly performant even as the number of actors (and thus the number of tasks) grows. + +#### 5. **Minimized Context Switching** +- **Traditional threads** (managed by the OS) involve **context switching**—saving the state of one thread and loading the state of another—when switching between tasks. This can introduce significant overhead, especially with a large number of threads. + +- **Actor model advantage**: The actor runtime can manage many actors with minimal overhead since the actors are lightweight and don't require expensive context switching between tasks. In some systems (like Erlang's BEAM VM), the runtime is optimized to handle lightweight actors (or processes) efficiently, leading to very low-cost task switching. + +#### 6. **Fine-Grained Concurrency** +- By processing one message at a time, each actor encapsulates its state, which simplifies reasoning about concurrency. In traditional systems, concurrency bugs are often hard to detect and fix because of shared state and race conditions. + +- **Actor model advantage**: Since each actor processes only one message at a time, there's no risk of inconsistent state due to concurrent access. The **granularity of concurrency** in the actor model makes it easy to scale and maintain, reducing the overhead of debugging and error handling. + +#### 7. **High Parallelism with Event-Driven Design** +- In event-driven systems, actors react to incoming messages as events, and they respond as quickly as they can. This makes the actor model **ideal for reactive, real-time systems** where tasks are relatively independent and can be broken down into smaller units of work. + +- **Example**: In a system with thousands of independent users interacting with a service (e.g., a chat application), each user can be modeled as an actor. Since they don't need to share state directly, each user (actor) can process messages in parallel without locking or contention, which keeps the system responsive and fast. + +### **Real-World Examples of Fast Actor-Based Systems**: + +#### 1. **Erlang** +- Erlang is a classic example of how the actor model can be extremely performant. Erlang's **BEAM virtual machine** can handle **millions of lightweight processes** (actors) with low memory overhead. Erlang was built for telecommunications systems, where high concurrency, reliability, and low-latency performance are essential. + +- **Telecom example**: Erlang has been used in systems that handle millions of simultaneous connections, such as phone switches or messaging systems, with actors processing each connection without blocking. + +#### 2. **Akka (JVM-based)** +- **Akka** is an actor-based framework for building concurrent, distributed, and fault-tolerant applications on the JVM. It's used in high-throughput systems like **real-time analytics**, **IoT systems**, and **financial services**. + +- **Parallel task execution**: Akka can spawn thousands or millions of actors that process independent tasks in parallel, making it an ideal choice for distributed systems that need high concurrency but want to avoid the complexities of traditional thread-based concurrency. + +--- + +### **To Summarize: How Actors Handle One Message at a Time Efficiently** +- **No locking or contention**: By isolating state within each actor, there's no need for locks, which removes major bottlenecks found in traditional multithreading. +- **Asynchronous message passing**: Actors handle tasks asynchronously, enabling non-blocking execution and high concurrency. +- **Event-driven processing**: The event loop and lightweight thread model used in actor runtimes reduce context-switching overhead. +- **Scalability**: Actors can scale across multiple cores or machines, allowing them to process many messages in parallel. +- **Fine-grained concurrency**: Each actor processes one message at a time, simplifying concurrency management and allowing for high parallelism. + +Even though each actor processes one message at a time, the overall system can handle a **massive number of concurrent tasks** efficiently, leveraging the lightweight nature of actors, message-passing concurrency, and non-blocking architecture. This makes actor-based systems **very fast and scalable**, particularly in distributed environments like blockchain smart contracts. + +**Actix** is a powerful actor-based framework in **Rust** designed for building highly concurrent and asynchronous systems. It's particularly known for its efficiency, low overhead, and the ability to handle large amounts of tasks or connections concurrently, making it one of the fastest actor-based systems in existence: + +--- + +### **1. Actix Actors** + +In Actix, an **actor** is a fundamental unit of computation, just like in the actor model. Actors are isolated entities that manage their own state and communicate with each other via **message passing**. + +- **Key Properties**: + - **Encapsulated state**: Each actor has its own internal state that is not shared with other actors. + - **Message handling**: Actors process incoming messages one at a time asynchronously. + - **Concurrency model**: Actors are executed in **separate lightweight tasks** (similar to green threads or fibers), which are scheduled by the Rust async runtime (e.g., using **Tokio** or **async-std**). + - **Lifecycle management**: Actix actors have lifecycle events, allowing you to handle initialization, suspension, and stopping logic in a structured way. + +--- + +### **2. Messages in Actix** + +Messages are the primary way actors communicate. In Actix, messages are **typed** and must implement the `Message` trait. Each message also has an associated response type, which allows actors to send replies to the sender. + +- **Defining Messages**: + ```rust + use actix::prelude::*; + + struct MyMessage(String); + + impl Message for MyMessage { + type Result = usize; // The result type of the message + } + ``` + +- **Sending Messages**: Actors communicate by sending messages to each other. In Actix, sending a message is **non-blocking** and happens asynchronously. + ```rust + actor_instance.do_send(MyMessage("Hello".into())); + ``` + +- **Handling Messages**: Actors implement the `Handler` trait for each message type they want to handle. + ```rust + impl Handler for MyActor { + type Result = usize; // The result returned when the message is processed + + fn handle(&mut self, msg: MyMessage, _: &mut Self::Context) -> Self::Result { + msg.0.len() // Return the length of the message string + } + } + ``` + +--- + +### **3. Actix Context** + +The **context** in Actix represents the actor's execution environment and lifecycle. The context is responsible for: +- **Managing the actor's lifecycle**: Starting, suspending, stopping, and restarting the actor. +- **Handling messages**: The context schedules messages for the actor to process, ensuring that messages are handled one at a time. +- **Timers and Intervals**: The context also allows actors to schedule timed events, such as delayed execution of a message or repeating intervals. + +Each actor in Actix runs within a specific context type, and the most common one is `Context`, which is used for non-supervised actors. + +--- + +### **4. Actor System in Actix** + +The **actor system** in Actix is responsible for managing actors, handling message delivery, and ensuring actors are scheduled for execution. When you start an application, you first create the actor system, which sets up the runtime environment for actors to operate in. + +- **Starting the Actor System**: + ```rust + #[actix_rt::main] + async fn main() { + let actor = MyActor.start(); // Starts the actor + actor.do_send(MyMessage("Hello".into())); // Sends a message to the actor + } + ``` + +- **Supervision**: Actix supports actor supervision, where one actor (a supervisor) can monitor and restart child actors if they fail. This helps in building fault-tolerant systems. + +--- + +### **5. Async and Concurrent Execution** + +One of Actix's core strengths is that it integrates seamlessly with Rust's **async/await** ecosystem. This means you can write **asynchronous actors** that can perform non-blocking I/O operations, such as handling HTTP requests, database queries, or file operations without blocking the thread. Cause I/O won't block the threads they must get executed inside a light thread of execution while the thread is executing other tasks. + +- **Async Message Handling**: Actix allows actors to perform asynchronous operations by using `async` functions inside message handlers. + ```rust + impl Handler for MyActor { + type Result = ResponseFuture; + + fn handle(&mut self, msg: MyMessage, _: &mut Self::Context) -> Self::Result { + Box::pin(async move { + // Perform async operations + msg.0.len() // Example result: length of the string + }) + } + } + ``` + +- **Concurrency**: Actix uses Rust's native async runtimes (e.g., **Tokio**) to provide **concurrency**. Multiple actors can execute concurrently, and since actors don't share memory and don't need locks, this avoids the complexity of traditional multithreaded programs. + +--- + +### **6. Lightweight Execution with Minimal Overhead** + +Actix actors are **lightweight**, similar to **green threads** or **fibers**. Each actor runs in its own lightweight task, which makes it possible to run **thousands or even millions** of actors concurrently on a single machine, depending on the available resources. + +- **Thread Pool**: Actix operates on a pool of OS threads managed by the async runtime. It uses cooperative multitasking to ensure the system remains responsive, allowing lightweight tasks (actors) to execute concurrently without the overhead of context switching between threads. + +- **No Global State**: Since actors don't share state (they pass messages to each other), there is no need for locking or synchronization mechanisms like mutexes or semaphores, reducing performance overhead. + +--- + +### **7. Supervision and Fault Tolerance** + +Actix follows the actor model's philosophy of **let it crash**. This means that if an actor encounters an unrecoverable error, it can be safely **restarted or stopped** by a supervising actor. + +- **Supervision Tree**: You can set up actors in a **supervision tree**, where each supervisor monitors child actors. If a child actor crashes, the supervisor can restart it, ensuring the system remains robust and fault-tolerant. + + ```rust + struct MySupervisor; + + impl Supervisor for MySupervisor { + fn restarting(&mut self, ctx: &mut Context) { + // Custom logic when the actor restarts + } + } + ``` + +- **Error Handling**: Supervisors can define what happens when an actor fails, such as restarting the actor, logging the error, or escalating the failure to a higher-level supervisor. + +--- + +### **8. Real-World Use Cases of Actix** + +Actix is widely used in building **high-performance, low-latency systems** such as: +- **Web servers**: Actix Web, built on Actix, is one of the fastest web frameworks in Rust. +- **Microservices**: The actor model naturally lends itself to microservices architectures, where services are isolated and communicate asynchronously. +- **IoT and Real-Time Systems**: Actix's ability to handle large numbers of concurrent tasks makes it ideal for IoT systems that deal with massive data streams or event-driven architectures. + +--- + +### **9. Comparison to Other Actor Models** +- **Erlang and Akka**: Both Erlang and Akka also implement actor models, but Actix is notable for being **extremely fast** due to Rust's zero-cost abstractions, memory safety guarantees, and native async support. + +- **Efficiency in Actix**: Actix benefits from Rust's low-level control over memory and concurrency, providing **speed and safety** without garbage collection (like in Akka/Scala) or runtime VM overhead (like Erlang). + +--- + +### **Summary of Key Concepts in Actix:** +1. **Actors**: Independent units that handle state and process messages asynchronously. +2. **Messages**: Typed communication between actors. +3. **Context**: Manages the actor's execution and message handling environment. +4. **Actor System**: Manages actors and their communication, scheduling, and lifecycle. +5. **Concurrency**: Non-blocking, asynchronous execution via Rust's async runtime. +6. **Lightweight Tasks**: Efficient use of system resources, enabling the creation of many actors with minimal overhead. +7. **Supervision**: Fault tolerance via supervisors that monitor and restart actors. +8. **Performance**: Actix is designed for high throughput and low latency, making it ideal for real-time and concurrent systems. \ No newline at end of file diff --git a/src/workers/notif/mod.rs b/src/workers/notif/mod.rs index b603d9c..5df2cb7 100644 --- a/src/workers/notif/mod.rs +++ b/src/workers/notif/mod.rs @@ -40,13 +40,27 @@ → actors have their own os or ligh thread of execution which uses to spawn tasks they've received via message passing channels or mailbox → to share a data between threads it must be Send Sync and live valid → initialize storage and actors data structures once and pack them in AppContext struct then share this between threads + → what are objects: are an isolated thread objects contains light thread for executing tasks, cron scheudling and jobq mailbox + → talk between two objects using job/task/msg queue with mpsc and rpc based channels like rmq, redis, kafka + → receive tasks from the channel by streaming over eventloop with while let Some() = rx.recv().await{} + → what eventloop does: executing received tasks inside a light thread of execution + → stream is an eventloop receiver channel of some jobq that can be iterated over to get data as they're coming from the channel ======================================================================================== - a sexy actor to produce/consume messages from different type of brokers - it uses RMQ, Redis and Kafka to produce and consume massive messages in - realtime, kindly it supports data AES256 encryption through producing - messages to the broker. we can send either producing or consuming message - to this actor to start producing or consuming in the background + NotifBrokerActor is the worker of handling the process of publishing and consuming + messages through rmq, redis and kafka, talking to the NotifBrokerActor can be done + by sending it a message contains the setup either to publish or consume something + to and from an specific broker, so generally it's a sexy actor to produce/consume + messages from different type of brokers it uses RMQ, Redis and Kafka to produce and + consume massive messages in realtime, kindly it supports data AES256 encryption + through producing messages to the broker. we can send either producing or consuming + message to this actor to start producing or consuming in the background. + + ************************************************************************************ + it's notable that for realtime push notif streaming we MUST start consuming from + the specified broker passed in to the message structure when talking with actor, in + a place where the application logic which is likely a server is being started. + ************************************************************************************ brokering is all about queueing, sending and receiving messages way more faster, safer and reliable than a simple eventloop or a tcp based channel. @@ -57,18 +71,18 @@ receive the old outputs and pass them through the brokers to receive them in some http service for responding the caller. In rmq producer sends message to exchange the a consumer can bind its queue to - the exchange to receive the messages, routing key determines the pattern of receive + the exchange to receive the messages, routing key determines the pattern of receiving messages inside the bounded queue from the exchange In kafka producer sends messages to topic the consumer can receives data from the topic, Rmq adds an extra layer on top of msg handling logic which is creating queue per each consumer. - Offset in kafka is an strategy which determines the way of tracking the sequential - order of receiving messages by kafka topics it’s like routing key in rmq + offset in kafka is an strategy which determines the way of tracking the sequential + order of receiving messages by kafka topics it's like routing key in rmq - BROKER TYPES: - ---> KAFKA - ---> REDIS PUBSUB - ---> RMQ + BROKER TYPES: (preferred stack: RMQ + RPC + WebSocket + ShortPollingJobId) + → REDIS PUBSUB => light task queue + → KAFKA => high latency hight throughput + → RMQ => low latency low throughput -ˋˏ✄┈┈┈┈ >_ the consuming task has been started by sending the ConsumeNotif message to this actor which will execute the streaming loop over the queue in @@ -84,12 +98,10 @@ notif producer -----payload-----> Exchange -ˋˏ✄┈┈┈┈ - >_ client can use a short polling technique to fetch notifications for an - specific owner from redis or db, this is the best solution to implement a + >_ client uses a short polling technique or websocket streaming to fetch notifications + for an specific owner from redis or db, this is the best solution to implement a realtiming strategy on the client side to fetch what's happening on the - server side in realtime. there is another expensive alternaitive to this - which is startiung a websocket server in backend side and send all notifications - to the client through the ws channels in realtime. + server side in realtime. _________ _________ | server1 | <------- RMQ notif broker -------> | server2 | @@ -98,6 +110,8 @@ ------------------- client ------------------ ======================================================================================== */ +use constants::STORAGE_IO_ERROR_CODE; +use redis_async::resp::FromResp; use tokio::spawn; use crate::*; use deadpool_lapin::lapin::protocol::channel; @@ -145,6 +159,7 @@ pub struct PublishNotifToKafka{ pub struct ConsumeNotifFromKafka{ // we must build a unique consumer per each consuming process pub topic: String, pub consumerId: String, + pub decryptionConfig: Option } #[derive(Message, Clone, Serialize, Deserialize, Debug, Default, ToSchema)] @@ -160,6 +175,8 @@ pub struct PublishNotifToRedis{ #[rtype(result = "()")] pub struct ConsumeNotifFromRedis{ pub channel: String, + pub redis_cache_exp: u64, + pub decryption_config: Option } #[derive(Message, Clone, Serialize, Deserialize, Debug, Default, ToSchema)] @@ -222,9 +239,9 @@ pub struct ConsumeNotif{ // we'll create a channel then start consuming by bindi | queue1 | <----- |consumer1| ------> routing_key1 <--------------------------- --------- | - producer1 ---------- -----------------> routing_key0 - |____ messages > routing_key1 ------> | exchange| - ____ messages > routing_key4 ------> -----------------> routing_key2 + producer1 ---------- -----------------> routing_key0 ......... + |____ messages > routing_key1 ------> | exchange1| + ____ messages > routing_key4 ------> -----------------> routing_key2 ......... | | -------- ----------- producer2 ----------- | | queue2 | <----| consumer2 | ------> routing_key4 <------------------ ----------- @@ -244,7 +261,7 @@ pub struct NotifBrokerActor{ pub notif_broker_sender: tokio::sync::mpsc::Sender, // use to send notif data to mpsc channel for ws pub app_storage: std::option::Option>, // REQUIRED: communicating with third party storage pub notif_mutator_actor: Addr, // REQUIRED: communicating with mutator actor to write into redis and db - pub zerlog_producer_actor: Addr // REQUIRED: send any error log to the zerlog queue + pub zerlog_producer_actor: Addr // REQUIRED: send any error log to the zerlog rmq queue } impl Actor for NotifBrokerActor{ @@ -281,22 +298,390 @@ impl NotifBrokerActor{ pub async fn publishToRedis(&self, channel: &str, notif_data: NotifData, encryptionConfig: Option){ let storage = self.app_storage.clone(); - let redis_pool = storage.as_ref().unwrap().get_redis_pool().await.unwrap(); + let rmq_pool = storage.as_ref().unwrap().get_lapin_pool().await.unwrap(); + let redis_pool = storage.unwrap().get_redis_pool().await.unwrap(); + let notif_mutator_actor = self.notif_mutator_actor.clone(); let zerlog_producer_actor = self.clone().zerlog_producer_actor; + // will be used to send received notif data from the broker to ws mpsc channel, + // later we can receive the notif in ws server setup and send it to the owner + let notif_data_sender = self.notif_broker_sender.clone(); + + match redis_pool.get().await{ + Ok(mut redis_conn) => { + + let mut dataString = serde_json::to_string(¬if_data).unwrap(); + let finalData = if encryptionConfig.is_some(){ + + let CryptoConfig{ secret, passphrase, unique_redis_id } = encryptionConfig.unwrap(); + let mut secure_cell_config = &mut wallexerr::misc::SecureCellConfig{ + secret_key: hex::encode(secret), + passphrase: hex::encode(passphrase), + data: vec![], + }; + + // after calling encrypt method dataString has changed and contains the hex encrypted data + dataString.encrypt(secure_cell_config); + + // make sure that we have redis unique id and encrypted data in secure cell + // then cache the condif on redis with expirable key + if + !unique_redis_id.is_empty() && + !secure_cell_config.secret_key.is_empty() && + !secure_cell_config.passphrase.is_empty(){ + + log::info!("[*] caching secure cell config on redis"); + + // cache the secure cell config on redis for 5 mins + // this is faster than storing it on disk or file + let str_secure_cell = serde_json::to_string_pretty(&secure_cell_config).unwrap(); + let redis_key = format!("Redis_notif_encryption_config_for_{}", unique_redis_id); + let _: () = redis_conn.set_ex(redis_key, str_secure_cell, 300).await.unwrap(); + } + + dataString + } else{ + dataString + }; - // publish easily to redis - // ... - + let _: () = redis_conn.publish(channel, finalData).await.unwrap(); + + }, + Err(e) => { + use crate::error::{ErrorKind, HoopoeErrorResponse}; + let error_content = &e.to_string(); + let error_content_ = error_content.as_bytes().to_vec(); + let mut error_instance = HoopoeErrorResponse::new( + *constants::STORAGE_IO_ERROR_CODE, // error code + error_content_, // error content + ErrorKind::Storage(crate::error::StorageError::RedisPool(e)), // error kind + "NotifBrokerActor.publishToRedis.redis_pool", // method + Some(&zerlog_producer_actor) + ).await; + } + } } /* ******************************************************************************** */ /* ***************************** CONSUMING FROM REDIS ***************************** */ /* ******************************************************************************** */ - pub async fn consumeFromRedis(&self, channel: &str){ + pub async fn consumeFromRedis(&self, channel: &str, decryption_config: Option, redis_cache_exp: u64){ + + let storage = self.app_storage.clone(); + let rmq_pool = storage.as_ref().unwrap().get_lapin_pool().await.unwrap(); + let redis_pubsubs_conn = storage.as_ref().unwrap().get_async_redis_pubsub_conn().await.unwrap(); + let redis_pool = storage.unwrap().get_redis_pool().await.unwrap(); + let notif_mutator_actor = self.notif_mutator_actor.clone(); + let zerlog_producer_actor = self.clone().zerlog_producer_actor; + // will be used to send received notif data from the broker to ws mpsc channel, + // later we can receive the notif in ws server setup and send it to the owner + let notif_data_sender = self.notif_broker_sender.clone(); + + // first thing first check the redis is up! + match redis_pool.get().await{ + Ok(mut redis_conn) => { + + // try to find the secure cell config on the redis and + // do passphrase and secret key validation logic before + // consuming messages + let mut secure_cell_config = if let Some(mut config) = decryption_config.clone(){ + + // get the secure cell config from redis cache + let redis_key = format!("Redis_notif_encryption_config_for_{}", config.unique_redis_id); + let is_key_there: bool = redis_conn.exists(&redis_key).await.unwrap(); + + let secure_cell_config = if is_key_there{ + let get_secure_cell: String = redis_conn.get(redis_key).await.unwrap(); + serde_json::from_str::(&get_secure_cell).unwrap() + } else{ + SecureCellConfig::default() + }; + + config.secret = hex::encode(config.secret); + config.passphrase = hex::encode(config.passphrase); + + // make sure that both passphrase and secret key are the same + // inside the stored secure cell config on redis + if config.passphrase != secure_cell_config.passphrase || + config.secret != secure_cell_config.secret_key{ + log::error!("[!] wrong passphrase or secret key"); + return; // terminate the caller and cancel consuming, api must gets called again + } + + // return the loaded instance from redis + secure_cell_config + + } else{ + SecureCellConfig::default() + }; + + // use redis async for handling realtime streaming of events + let get_streamer = redis_pubsubs_conn + .subscribe(channel) + .await; + + let Ok(mut pubsubstream) = get_streamer else{ + let e = get_streamer.unwrap_err(); + let source = &e.source().unwrap().to_string(); // we know every goddamn type implements Error trait, we've used it here which allows use to call the source method on the object + let err_instance = crate::error::HoopoeErrorResponse::new( + *STORAGE_IO_ERROR_CODE, // error hex (u16) code + source.as_bytes().to_vec(), // text of error source in form of utf8 bytes + crate::error::ErrorKind::Storage(crate::error::StorageError::RedisAsync(e)), // the actual source of the error caused at runtime + &String::from("NotifBrokerActor.consumeFromRedis.get_streamer"), // current method name + Some(&zerlog_producer_actor) + ).await; + return; // terminate the caller + }; + + let cloned_notif_data_sender_channel = notif_data_sender.clone(); + tokio::spawn(async move{ + // realtime streaming over redis channel to receive emitted notifs + while let Some(message) = pubsubstream.next().await{ + let resp_val = message.unwrap(); + let mut channelData = String::from_resp(resp_val).unwrap(); // this is the expired key + + // ===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>> + // ===>>>===>>>===>>>===>>>===>>> data decryption logic ===>>>===>>>===>>>===>>>===>>> + // ===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>> + // if we have a config means the data has been encrypted + let finalData = if + !secure_cell_config.clone().secret_key.is_empty() && + !secure_cell_config.clone().passphrase.is_empty(){ + + // after calling decrypt method has changed and now contains raw string + channelData.decrypt(&mut secure_cell_config); + + // channelData is now raw string which can be decoded into the NotifData structure + channelData + + } else{ + // no decryption config is needed, just return the raw data + // there would be no isse with decoding this into NotifData + log::error!("secure_cell_config is empty, data is not encrypted"); + channelData + }; + // ===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>> + // ===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>> + // ===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>> + + // either decrypted or the raw data as string + log::info!("[*] received data: {}", finalData); + + // decoding the string data into the NotifData structure (convention) + let get_notif_event = serde_json::from_str::(&finalData); + match get_notif_event{ + Ok(notif_event) => { + + log::info!("[*] deserialized data: {:?}", notif_event); + + // ================================================================= + /* -------------------------- send to mpsc channel for ws streaming + // ================================================================= + this is the most important part in here, we're slightly sending the data + to downside of a jobq mpsc channel, the receiver eventloop however will + receive data in websocket handler which enables us to send realtime data + received from RMQ to the browser through websocket server: RMQ over websocket + once we receive the data from the mpsc channel in websocket handler we'll + send it to the browser through websocket channel. + */ + if let Err(e) = cloned_notif_data_sender_channel.send(finalData).await{ + log::error!("can't send notif data to websocket channel due to: {}", e.to_string()); + } + + // ============================================================================= + // ------------- if the cache on redis flag was activated we then store on redis + // ============================================================================= + if redis_cache_exp != 0{ + match redis_pool.get().await{ + Ok(mut redis_conn) => { + + // key: String::from(notif_receiver.id) | value: Vec + let redis_notif_key = format!("notif_owner:{}", ¬if_event.receiver_info); + + // -ˋˏ✄┈┈┈┈ extend notifs + let get_events: RedisResult = redis_conn.get(&redis_notif_key).await; + let events = match get_events{ + Ok(events_string) => { + let get_messages = serde_json::from_str::>(&events_string); + match get_messages{ + Ok(mut messages) => { + messages.push(notif_event.clone()); + messages + }, + Err(e) => { + use crate::error::{ErrorKind, HoopoeErrorResponse}; + let error_content = &e.to_string(); + let error_content_ = error_content.as_bytes().to_vec(); + let mut error_instance = HoopoeErrorResponse::new( + *constants::CODEC_ERROR_CODE, // error code + error_content_, // error content + ErrorKind::Codec(crate::error::CodecError::Serde(e)), // error kind + "NotifBrokerActor.decode_serde_redis", // method + Some(&zerlog_producer_actor) + ).await; + return; // terminate the caller + } + } + + }, + Err(e) => { + // we can't get the key means this is the first time we're creating the key + // or the key is expired already, we'll create a new key either way and put + // the init message in there. + let init_message = vec![ + notif_event.clone() + ]; + + init_message + + } + }; + + // -ˋˏ✄┈┈┈┈ caching the notif event in redis with expirable key + // chaching in redis is an async task which will be executed + // in the background with an expirable key + tokio::spawn(async move{ + let events_string = serde_json::to_string(&events).unwrap(); + let is_key_there: bool = redis_conn.exists(&redis_notif_key.clone()).await.unwrap(); + if is_key_there{ // update only the value + let _: () = redis_conn.set(&redis_notif_key.clone(), &events_string).await.unwrap(); + } else{ // initializing a new expirable key containing the new notif data + /* + make sure you won't get the following error: + called `Result::unwrap()` on an `Err` value: MISCONF: Redis is configured to + save RDB snapshots, but it's currently unable to persist to disk. Commands that + may modify the data set are disabled, because this instance is configured to + report errors during writes if RDB snapshotting fails (stop-writes-on-bgsave-error option). + Please check the Redis logs for details about the RDB error. + SOLUTION: restart redis :) + */ + let _: () = redis_conn.set_ex(&redis_notif_key.clone(), &events_string, redis_cache_exp).await.unwrap(); + } + }); + + }, + Err(e) => { + use crate::error::{ErrorKind, HoopoeErrorResponse}; + let error_content = &e.to_string(); + let error_content_ = error_content.as_bytes().to_vec(); + let mut error_instance = HoopoeErrorResponse::new( + *constants::STORAGE_IO_ERROR_CODE, // error code + error_content_, // error content + ErrorKind::Storage(crate::error::StorageError::RedisPool(e)), // error kind + "NotifBrokerActor.consumeFromRedis.redis_pool", // method + Some(&zerlog_producer_actor) + ).await; + return; // terminate the caller + } + } + } + + // ============================================================================= + // ------------- store in database + // ============================================================================= + // -ˋˏ✄┈┈┈┈ store notif in db by sending message to the notif mutator actor worker + // sending StoreNotifEvent message to the notif event mutator actor + // spawning the async task of storing data in db in the background + // worker of lightweight thread of execution using tokio threadpool + tokio::spawn( + { + let cloned_message = notif_event.clone(); + let cloned_mutator_actor = notif_mutator_actor.clone(); + let zerlog_producer_actor = zerlog_producer_actor.clone(); + async move{ + match cloned_mutator_actor + .send(StoreNotifEvent{ + message: cloned_message, + local_spawn: true + }) + .await + { + Ok(_) => { () }, + Err(e) => { + let source = &e.to_string(); // we know every goddamn type implements Error trait, we've used it here which allows use to call the source method on the object + let err_instance = crate::error::HoopoeErrorResponse::new( + *MAILBOX_CHANNEL_ERROR_CODE, // error hex (u16) code + source.as_bytes().to_vec(), // text of error source in form of utf8 bytes + crate::error::ErrorKind::Actor(crate::error::ActixMailBoxError::Mailbox(e)), // the actual source of the error caused at runtime + &String::from("NotifBrokerActor.consumeFromRedis.notif_mutator_actor.send"), // current method name + Some(&zerlog_producer_actor) + ).await; + return; + } + } + } + } + ); + + }, + Err(e) => { + log::error!("[!] can't deserialized into NotifData struct"); + use crate::error::{ErrorKind, HoopoeErrorResponse}; + let error_content = &e.to_string(); + let error_content_ = error_content.as_bytes().to_vec(); + let mut error_instance = HoopoeErrorResponse::new( + *constants::CODEC_ERROR_CODE, // error code + error_content_, // error content + ErrorKind::Codec(crate::error::CodecError::Serde(e)), // error kind + "NotifBrokerActor.consumeFromRedis.decode_serde", // method + Some(&zerlog_producer_actor) + ).await; + return; // terminate the caller + } + } + + } + }); + }, + Err(e) => { + use crate::error::{ErrorKind, HoopoeErrorResponse}; + let error_content = &e.to_string(); + let error_content_ = error_content.as_bytes().to_vec(); + let mut error_instance = HoopoeErrorResponse::new( + *constants::STORAGE_IO_ERROR_CODE, // error code + error_content_, // error content + ErrorKind::Storage(crate::error::StorageError::RedisPool(e)), // error kind + "NotifBrokerActor.consumeFromRedis.redis_pool", // method + Some(&zerlog_producer_actor) + ).await; + } + } + + } + + /* ******************************************************************************** */ + /* ***************************** PUBLISHING TO KAFKA ****************************** */ + /* ******************************************************************************** */ + pub async fn publishToKafka(&self, channel: &str, notif_data: NotifData, encryptionConfig: Option){ + + let storage = self.app_storage.clone(); + let rmq_pool = storage.as_ref().unwrap().get_lapin_pool().await.unwrap(); + let redis_pool = storage.unwrap().get_redis_pool().await.unwrap(); + let notif_mutator_actor = self.notif_mutator_actor.clone(); + let zerlog_producer_actor = self.clone().zerlog_producer_actor; + // will be used to send received notif data from the broker to ws mpsc channel, + // later we can receive the notif in ws server setup and send it to the owner + let notif_data_sender = self.notif_broker_sender.clone(); + + // ... + + } + + /* ******************************************************************************** */ + /* ***************************** CONSUMING FROM KAFKA ***************************** */ + /* ******************************************************************************** */ + pub async fn consumeFromKafka(&self, channel: &str, consumerId: &str, decryptionConfig: Option){ + + let storage = self.app_storage.clone(); + let rmq_pool = storage.as_ref().unwrap().get_lapin_pool().await.unwrap(); + let redis_pool = storage.unwrap().get_redis_pool().await.unwrap(); + let notif_mutator_actor = self.notif_mutator_actor.clone(); + let zerlog_producer_actor = self.clone().zerlog_producer_actor; + // will be used to send received notif data from the broker to ws mpsc channel, + // later we can receive the notif in ws server setup and send it to the owner + let notif_data_sender = self.notif_broker_sender.clone(); - // start consuming from redis // ... } @@ -307,7 +692,7 @@ impl NotifBrokerActor{ // if a consumer service wants to read notifs received from the rmq it // needs to either fetch from redis or db the method doesn't return // anything back to the caller - pub async fn consume(&self, exp_seconds: u64, + pub async fn consumeFromRmq(&self, exp_seconds: u64, consumer_tag: &str, queue: &str, binding_key: &str, exchange: &str, store_in_db: bool, @@ -408,7 +793,7 @@ impl NotifBrokerActor{ Ok(mut redis_conn) => { // get the secure cell config from redis cache - let redis_key = format!("notif_encryption_config_for_{}", config.unique_redis_id); + let redis_key = format!("Rmq_notif_encryption_config_for_{}", config.unique_redis_id); let is_key_there: bool = redis_conn.exists(&redis_key).await.unwrap(); let secure_cell_config = if is_key_there{ @@ -509,10 +894,11 @@ impl NotifBrokerActor{ ====-----====-----====-----====-----====----- */ let encrypted_data_buffer = secure_cell_config.data.clone(); // data is the encrypted buffer - let stringified_encrypted_data_buffer = serde_json::to_string_pretty(&encrypted_data_buffer).unwrap(); + let mut stringified_encrypted_data_buffer = serde_json::to_string_pretty(&encrypted_data_buffer).unwrap(); stringified_encrypted_data_buffer.decrypt(&mut secure_cell_config); - let raw_data = secure_cell_config.data.clone(); - let raw_data_str = std::str::from_utf8(&raw_data).unwrap().to_string(); + // stringified_encrypted_data_buffer now contains the raw string of data + // we can use it to decode it into structure + // ... // pass the secure_cell_config instance to decrypt the data, note // that the instance must contains the encrypted data in form of utf8 bytes @@ -552,6 +938,7 @@ impl NotifBrokerActor{ // either decrypted or the raw data as string log::info!("[*] received data: {}", data); + // decoding the string data into the NotifData structure (convention) let get_notif_event = serde_json::from_str::(&data); match get_notif_event{ Ok(notif_event) => { @@ -562,11 +949,11 @@ impl NotifBrokerActor{ /* -------------------------- send to mpsc channel for ws streaming // ================================================================= this is the most important part in here, we're slightly sending the data - to downside of a jobq mpsc channel, the receiver however will receive data in - websocket handler which enables us to send realtime data received from RMQ - to the client through websocket server: RMQ over websocket + to downside of a jobq mpsc channel, the receiver eventloop however will + receive data in websocket handler which enables us to send realtime data + received from RMQ to the browser through websocket server: RMQ over websocket once we receive the data from the mpsc channel in websocket handler we'll - send it to the client through websocket channel. + send it to the browser through websocket channel. */ if let Err(e) = cloned_notif_data_sender_channel.send(data).await{ log::error!("can't send notif data to websocket channel due to: {}", e.to_string()); @@ -802,7 +1189,7 @@ impl NotifBrokerActor{ /* ********************************************************************* */ /* ***************************** PRODUCING ***************************** */ /* ********************************************************************* */ - pub async fn produce(&self, data: &str, exchange: &str, routing_key: &str, exchange_type: &str, unique_redis_id: &str, secure_cell_config: &mut SecureCellConfig){ + pub async fn publishToRmq(&self, data: &str, exchange: &str, routing_key: &str, exchange_type: &str, unique_redis_id: &str, secure_cell_config: &mut SecureCellConfig){ let zerlog_producer_actor = self.clone().zerlog_producer_actor; let this = self.clone(); @@ -836,7 +1223,7 @@ impl NotifBrokerActor{ // cache the secure cell config on redis for 5 mins // this is faster than storing it on disk or file let str_secure_cell = serde_json::to_string_pretty(&cloned_secure_cell_config).unwrap(); - let redis_key = format!("notif_encryption_config_for_{}", unique_redis_id); + let redis_key = format!("Rmq_notif_encryption_config_for_{}", unique_redis_id); let _: () = redis_conn.set_ex(redis_key, str_secure_cell, 300).await.unwrap(); }, Err(e) => { @@ -1044,10 +1431,10 @@ impl ActixMessageHandler for NotifBrokerActor{ easier way for encryption using Crypter trait ====-----====-----====-----====-----====----- */ - let notif_data_str = serde_json::to_string_pretty(¬if_data).unwrap(); + let mut notif_data_str = serde_json::to_string_pretty(¬if_data).unwrap(); notif_data_str.encrypt(secure_cell_config); - let encrypted_data = secure_cell_config.data.clone(); // it now contains the encrypted data - let encrypted_data_hex_string = hex::encode(&encrypted_data); + // notif_data_str now is the hex encrypted of the raw data + // ... let str_data = match Wallet::secure_cell_encrypt(secure_cell_config){ @@ -1064,7 +1451,7 @@ impl ActixMessageHandler for NotifBrokerActor{ }, Err(e) => { let zerlog_producer_actor = self.zerlog_producer_actor.clone(); - // log the error in the a lightweight thread of execution inside tokio threads + // log the error in the a background lightweight thread of execution inside tokio threads // since we don't need output or any result from the task inside the thread thus // there is no channel to send data to outside of tokio::spawn, the writing however // consists of file and rmq (network) operations which are none blocking async io @@ -1111,13 +1498,13 @@ impl ActixMessageHandler for NotifBrokerActor{ // every actor has its own thread of execution. if local_spawn{ async move{ - this.produce(&stringified_data, &exchange_name, &routing_key, &exchange_type, &unique_redis_id, &mut secure_cell_config).await; + this.publishToRmq(&stringified_data, &exchange_name, &routing_key, &exchange_type, &unique_redis_id, &mut secure_cell_config).await; } .into_actor(self) // convert the future into an actor future of type NotifBrokerActor .spawn(ctx); // spawn the future object into this actor context thread } else{ // spawn the future in the background into the tokio lightweight thread tokio::spawn(async move{ - this.produce(&stringified_data, &exchange_name, &routing_key, &exchange_type, &unique_redis_id, &mut secure_cell_config).await; + this.publishToRmq(&stringified_data, &exchange_name, &routing_key, &exchange_type, &unique_redis_id, &mut secure_cell_config).await; }); } @@ -1155,7 +1542,7 @@ impl ActixMessageHandler for NotifBrokerActor{ // every actor has its own thread of execution. if local_spawn{ async move{ - this.consume( + this.consumeFromRmq( redis_cache_exp, &tag, &queue, @@ -1169,7 +1556,7 @@ impl ActixMessageHandler for NotifBrokerActor{ .spawn(ctx); // spawn the future object into this actor context thread } else{ // spawn the future in the background into the tokio lightweight thread tokio::spawn(async move{ - this.consume( + this.consumeFromRmq( redis_cache_exp, &tag, &queue, @@ -1224,7 +1611,6 @@ impl ActixMessageHandler for NotifBrokerActor{ }; - // spawn the task in the background ligh thread or // the actor thread itself. // don't await on the spawn; let the task execute @@ -1253,15 +1639,17 @@ impl ActixMessageHandler for NotifBrokerActor{ fn handle(&mut self, msg: ConsumeNotifFromRedis, ctx: &mut Self::Context) -> Self::Result { - let ConsumeNotifFromRedis { channel } = msg.clone(); + let ConsumeNotifFromRedis { channel, decryption_config, redis_cache_exp } = msg.clone(); let this = self.clone(); let task = async move{ - // must store in db (mutator actor) - // cache on redis - // zerlog producer - // send to ws mpsc notif sender + // await on the consuming task, tells runtime we need the result + // of this task, if it's ready runtime return the result back to + // the caller otherwise suspend the this.publishToRedis() function + // until the task is ready to be polled, meanwhile it executes other + // tasks (won't block the thread) + this.consumeFromRedis(&channel, decryption_config, redis_cache_exp).await; }; @@ -1284,6 +1672,34 @@ impl ActixMessageHandler for NotifBrokerActor{ fn handle(&mut self, msg: PublishNotifToKafka, ctx: &mut Self::Context) -> Self::Result { + let PublishNotifToKafka{ topic, local_spawn, notif_data, encryptionConfig } = msg.clone(); + + let this = self.clone(); + let task = async move{ + + // await on the publishing task, tells runtime we need the result + // of this task, if it's ready runtime return the result back to + // the caller otherwise suspend the this.publishToRedis() function + // until the task is ready to be polled, meanwhile it executes other + // tasks (won't block the thread) + this.publishToKafka(&topic, notif_data, encryptionConfig).await; + + }; + + // spawn the task in the background ligh thread or + // the actor thread itself. + // don't await on the spawn; let the task execute + // in the background unless you want to use select + // or tell the runtime someone needs the result right + // now but notify later once the task is completed + // and don't block the thread + if local_spawn{ + task + .into_actor(self) + .spawn(ctx); + } else{ + spawn(task); + } } } @@ -1297,5 +1713,29 @@ impl ActixMessageHandler for NotifBrokerActor{ fn handle(&mut self, msg: ConsumeNotifFromKafka, ctx: &mut Self::Context) -> Self::Result { + let ConsumeNotifFromKafka{ topic, consumerId, decryptionConfig } = msg.clone(); + + let this = self.clone(); + let task = async move{ + + // await on the consuming task, tells runtime we need the result + // of this task, if it's ready runtime return the result back to + // the caller otherwise suspend the this.publishToRedis() function + // until the task is ready to be polled, meanwhile it executes other + // tasks (won't block the thread) + this.consumeFromKafka(&topic, &consumerId, decryptionConfig).await; + + }; + + // spawn the task in the background ligh thread or + // the actor thread itself. + // don't await on the spawn; let the task execute + // in the background unless you want to use select + // or tell the runtime someone needs the result right + // now but notify later once the task is completed + // and don't block the thread + spawn(task); + } + } diff --git a/src/workers/streaming.md b/src/workers/streaming.md new file mode 100644 index 0000000..6494784 --- /dev/null +++ b/src/workers/streaming.md @@ -0,0 +1,223 @@ + +> Comprehensive Guide to Choose the Most Appropriate Broker + +we'll walk through some steps to see the comparisons between RMQ and Kafka. However there are some notable things need to be mentioned in here such that if we need high latency, hight throughput, distributed streaming and trustable retaining mechanism for messages after consuming them by consumers it's good to go with **Kafka** other than that for handling complex message routing patterns, low latency, low throughput fast message deletion process from the queue after consuming would be better option to go with **RMQ**. + +# Design 1 (Celery + RMQ + ShortPolling) + +In this scenario, where we have a system using HTTP requests to receive prediction requests, and we send those requests to an AI service that uses **Celery** for task queuing, then sends a **job ID** to the client for polling, the architecture could benefit from optimizations. Whether it's good to replace the current system with **RabbitMQ (RMQ)** or **Kafka** depends on a few factors. + +### **Current System Summary**: + +1. **HTTP Requests**: The client sends a request to the AI service for predictions. +2. **Celery**: The AI service queues tasks using Celery, which is typically backed by a message broker (RabbitMQ in this case). +3. **Job ID + Polling**: The client is short-polling the system using the job ID to check if the prediction task is complete. + +### **Potential Problems with the Current Setup**: + +1. **Short Polling Overhead**: the client is repeatedly hitting the server to check if the result is ready. This can add unnecessary load to the system and increase latency for the client if the response isn’t ready immediately. +2. **Task Queuing**: Celery typically uses RabbitMQ as its broker to handle task queues. However, if we are looking for higher throughput, more durability, or real-time stream processing, this might not be the most optimal for large-scale use cases. + +# Design 2 (Kafka Only): + +We **can replace the HTTP-based communication** (both the initial prediction request and the short polling) with **Kafka** to create a more streamlined, event-driven architecture. This will remove the need for HTTP, Celery, and RabbitMQ for queuing and task distribution. Here’s how this setup could work using Kafka: + +### **1. Client Sends Request via Kafka (Instead of HTTP)** + +- **Current State**: The client sends an HTTP request to the AI service and receives a `jobId` in return. +- **With Kafka**: Instead of sending an HTTP request, the client **produces a message** to a Kafka topic (e.g., `prediction_requests`). This message would contain all the necessary data for the prediction request. + +### **2. AI Service Subscribes to Kafka Topic** + +- The **AI service** (which replaces Celery and RabbitMQ in this model) would **consume messages** from the `prediction_requests` topic. It listens to the topic, retrieves the incoming prediction requests, and processes them. +- Once the AI service finishes processing the prediction (i.e., running the model inference), it **produces the result** to another Kafka topic, say `prediction_results`, which includes the `jobId` and the final prediction result. + +### **3. Client Polling is Replaced with Kafka Consumer** + +- **Current State**: The client short polls the AI service over HTTP to check if the prediction is ready. +- **With Kafka**: The client can now **subscribe** to the `prediction_results` topic as a Kafka consumer. Once the AI service finishes processing the prediction and publishes the result to `prediction_results`, the client receives it **in real-time**. + - The client no longer needs to send repeated HTTP requests for polling. Instead, it waits for Kafka to push the result to it automatically. + +### **Key Benefits of Using Kafka in This Setup:** + +1. **Elimination of HTTP Short Polling**: Kafka's consumer model removes the need for short polling. The client simply subscribes to a Kafka topic and gets notified as soon as the result is ready. +2. **Decoupling**: Kafka helps to decouple the client service and the AI service. Neither needs to know about the internal workings of the other—they just read from and write to Kafka topics. +3. **Scalability**: Kafka is designed to handle high-throughput data streams. It can manage a massive volume of prediction requests and results, making it ideal for scaling up if needed. +4. **Real-Time Processing**: Kafka’s model allows for real-time message consumption. The AI service processes the message and the client is notified immediately once the result is published. +5. **Durability and Message Retention**: Kafka retains messages for a configurable period, so even if the client service temporarily goes down, it can still retrieve the result later without losing the message. This is particularly useful if there’s any downtime in the client service. +6. **Simplified System Architecture**: we no longer need HTTP for both requests and polling. we also remove Celery and RabbitMQ as task queueing systems, simplifying our architecture. + +### **Proposed Kafka-Based Architecture**: + +1. **Kafka Topic: `prediction_requests`**: + - **Producer**: The client sends a prediction request (e.g., model inputs) to this topic. + - **Consumer**: The AI service consumes these requests from the topic, processes them, and produces a prediction result. +2. **Kafka Topic: `prediction_results`**: + - **Producer**: Once the AI service completes the prediction, it sends the result (including the `jobId`) to this topic. + - **Consumer**: The client subscribes to this topic and listens for the result, matching the `jobId` with its request. + +### **Detailed Workflow:** + +1. **Client sends request**: + - The client **produces a message** (with the model input and a `jobId`) to the Kafka topic `prediction_requests`. + - Kafka brokers store this message in the `prediction_requests` topic. +2. **AI Service processes request**: + - The AI service **consumes** the messages from the `prediction_requests` topic. + - It processes the request (performs the inference) and generates the prediction. + - After generating the prediction, it **produces a new message** with the `jobId` and the prediction result to the `prediction_results` topic. +3. **Client receives result**: + - The client is a **consumer** of the `prediction_results` topic. + - It listens to the topic and retrieves the prediction result once the AI service has published it. + +### **How to Handle Different Scenarios**: + +- **Multiple Consumers**: Kafka handles multiple consumers efficiently. If we have several client services (or AI services), Kafka’s partitioning model ensures messages are distributed properly. +- **Failure Scenarios**: If the client or AI service fails, Kafka's message retention ensures that messages are not lost. The AI service can pick up pending prediction requests from the `prediction_requests` topic when it recovers, and clients can retrieve prediction results that were published while they were down. +- **Message Ordering**: Kafka partitions can help maintain the order of messages within a partition. If message order is important (e.g., if prediction requests must be processed in sequence), we can configure Kafka to ensure the right order of processing. + +### **Advantages and Trade-offs**: + +### **Advantages**: + +- **Asynchronous and Decoupled**: Kafka allows the client and AI service to be decoupled, making it easier to scale both independently. +- **Real-Time, Event-Driven**: The client gets results as soon as they’re ready without the need for inefficient polling. +- **Scalable and Reliable**: Kafka is designed to scale, and it provides durability with message retention. It’s a good choice if we expect growth or high throughput. + +### **Potential Trade-offs**: + +- **Increased Complexity**: Kafka can introduce complexity, especially in terms of managing brokers, topics, partitions, and consumers. If our current system is small or simple, Kafka might feel like over-engineering. +- **Operational Overhead**: Kafka requires robust monitoring and maintenance. Running Kafka clusters requires experience with distributed systems. +- **Latency**: If our AI service is highly real-time and latency-sensitive, Kafka introduces slightly more overhead compared to HTTP for very small tasks, but this is generally minimal. + +### **Conclusion**: + +This change will: + +- Eliminate short polling. +- Allow us to implement an **event-driven, real-time architecture**. +- Scale more efficiently, handling high-throughput requests while maintaining fault tolerance and durability. + +However, Kafka might add complexity and overhead, so it’s a good fit if our system needs high throughput, scalability, and real-time processing, or if we want to decouple services effectively. If we're operating at a smaller scale or prefer simplicity, the current setup might still suffice with some optimizations like WebSockets for polling. + +# Design 3 (RMQ Only): + +**RabbitMQ (RMQ)** can be used to handle the entire prediction request and response flow, and it can provide real-time capabilities as an alternative to Kafka. However, RabbitMQ is designed more as a message broker with different characteristics than Kafka, so the approach is a bit different but still capable of achieving the same functionality for real-time processing. + +Here’s how we can structure the architecture using only **RabbitMQ**, and how it compares to Kafka in handling real-time interactions: + +### **Proposed Architecture Using RabbitMQ**: + +1. **Client Sends Request via RabbitMQ**: + - Instead of sending HTTP requests, the client **publishes a message** (containing the prediction request and a `jobId`) to a RabbitMQ **queue** (or exchange). + - This message would represent the prediction job that needs to be processed. +2. **AI Service Subscribes to RabbitMQ Queue**: + - The AI service **consumes** the prediction requests from the RabbitMQ queue. It picks up messages from the queue as tasks to be processed (similar to how Celery works with RabbitMQ behind the scenes). + - After processing the request (running the AI model), the AI service **publishes the result** to a **response queue** or directly back to the client. +3. **Client Receives Prediction Result via RabbitMQ**: + - Instead of short polling over HTTP, the client **subscribes to a queue** or an **exchange** in RabbitMQ, waiting for the AI service to send the result. + - Once the prediction result is ready, the AI service publishes the result to this queue, and the client consumes it as soon as it's available (real-time push-based communication). + +### **Steps to Achieve Real-Time Messaging with RabbitMQ:** + +### **1. Initial Request Submission (Prediction Request)** + +- The client publishes a message to RabbitMQ, typically to a **direct** or **fanout exchange**. The exchange then routes the message to a queue where the AI service listens for tasks. +- This replaces the HTTP POST request. The client is effectively pushing a job into a queue managed by RabbitMQ. + +### **2. AI Service Processes the Request** + +- The AI service is a **consumer** of the queue containing the prediction requests. It consumes messages from this queue, processes the prediction task, and generates a result. +- After processing, the AI service needs to send the result back to the client. This can be done by either publishing to a **response queue** or using **RPC (Remote Procedure Call)** patterns in RabbitMQ. + +### **3. Client Waits for the Response (Real-Time)** + +- The client subscribes to a **response queue** or listens for the AI service to push the prediction result back via RabbitMQ. This is done in real-time, meaning that as soon as the AI service publishes the result, the client will receive the message immediately. +- This completely eliminates short polling, as the client gets the result as soon as it's available, making it real-time. + +### **Patterns We Can Use with RabbitMQ for Real-Time Communication:** + +### **Pattern 1: Pub/Sub Model** + +- **Prediction Request**: Client publishes to an exchange (e.g., **direct** or **fanout**) that routes the message to a queue where the AI service listens for prediction jobs. +- **Prediction Result**: The AI service, after processing, publishes the result to another exchange/queue that the client is subscribed to. + +This is a simple **publish-subscribe (pub/sub)** model where clients can receive messages in real-time. + +### **Pattern 2: RPC (Remote Procedure Call) with RabbitMQ** + +- RabbitMQ has a built-in pattern for **RPC**, which allows for request-response messaging. Here's how it works: + - The client sends a request to a queue and specifies a **reply-to** queue and a **correlation ID**. + - The AI service processes the message and sends the result back to the **reply-to** queue with the same correlation ID. + - The client consumes the response from the **reply-to** queue, ensuring it matches the request based on the correlation ID. + + **How RPC eliminates polling**: + + - The client sends the request via RabbitMQ and waits for a response by subscribing to the reply-to queue. Once the AI service finishes the prediction, the response is pushed back to the client. + - No polling is necessary—the client only receives the result once it's ready, providing real-time responses. + +### **Pattern 3: Direct Message Routing** + +- If we have multiple AI services or clients, RabbitMQ supports **routing keys** to send specific messages to specific queues or consumers. This way, we can implement fine-grained control over which AI service handles which requests, and the corresponding client receives the results. + +### **Advantages of RabbitMQ for This Use Case**: + +1. **Real-Time Communication**: + - RabbitMQ supports **push-based notifications**, so the client doesn’t need to short-poll the AI service. The result is delivered in real-time as soon as the AI service processes the prediction. +2. **Simplicity**: + - RabbitMQ is relatively easy to set up and manage for smaller-scale applications compared to Kafka. It doesn’t require the same level of operational complexity and is lighter-weight in some scenarios. +3. **Task Queuing + Messaging**: + - RabbitMQ handles task queuing effectively (similar to how Celery works). Since it’s already widely used with Celery, we are leveraging its core strengths as a message broker for task distribution and messaging. +4. **Routing Flexibility**: + - RabbitMQ has powerful message routing mechanisms (using exchanges, queues, and routing keys), allowing we to define how messages are distributed between services. This can be particularly useful if we have multiple AI models or different types of tasks that need to be handled. +5. **Lower Latency for Smaller Systems**: + - For systems where low-latency, real-time messaging is critical but the scale is not extremely high (compared to Kafka's high-throughput systems), RabbitMQ is a great fit. It has lower latency in smaller environments. +6. **Durability and Reliability**: + - RabbitMQ supports **message persistence** and **acknowledgements**, so even if the AI service or client service crashes, the messages won’t be lost. They can be retrieved once the services come back online. + +### **Limitations and Considerations**: + +- **Not Built for High Throughput**: + - RabbitMQ is great for lower-latency real-time communication but isn’t as scalable as Kafka for extremely high-throughput systems. If we're processing millions of messages per second, RabbitMQ may not handle this as efficiently as Kafka. +- **Message Retention**: + - RabbitMQ doesn’t retain messages like Kafka. Once a message is consumed, it’s gone unless we configure it to be persistent and manually manage requeuing. If we need to retain messages for auditing, retraining, or replaying predictions, Kafka would be more suited. +- **Complexity with Fanout/Routing for Large Systems**: + - While RabbitMQ is great for routing messages in smaller systems, it can become more complex as the system grows. Managing exchanges, queues, and routing keys for large-scale systems with multiple consumers might introduce overhead. +- **No Built-in Message Replay**: + - RabbitMQ doesn’t support Kafka-style message replay natively. If the client misses a message, we will have to handle the failure and requeue the message manually, whereas Kafka retains the message until we explicitly remove it. + +### **Conclusion**: + +RabbitMQ can handle real-time communication between our client and AI service as well, eliminating the need for HTTP and Celery. By utilizing **RabbitMQ's pub/sub model or RPC pattern**, we can build an event-driven architecture that provides real-time, push-based notifications for prediction results. This setup is suitable for systems that don’t require Kafka’s high-throughput capabilities but still need reliable, low-latency communication. + +So, if our system doesn’t need Kafka’s scale and durability features but we want to avoid the overhead of HTTP and Celery, RabbitMQ can be a very effective alternative for real-time messaging. + +# **Evaluation of RabbitMQ vs. Kafka:** + +| **Aspect** | **RabbitMQ** | **Kafka** | +| --- | --- | --- | +| **Message Delivery** | Immediate delivery for smaller tasks | Streaming-based, better for large-scale delivery | +| **Push Updates** | Can easily integrate with WebSocket or SSE for real-time updates | Requires a more complex consumer setup but is possible with Kafka consumers | +| **Throughput** | Lower, not ideal for massive scaling | High throughput, designed for stream processing | +| **Fault Tolerance** | Requires additional configuration (mirrored queues) | Built-in fault tolerance and replication | +| **Message Replay** | Not built-in, messages are deleted after consumption unless manually persisted | Built-in message retention, can replay messages for retraining/audit | +| **Client Polling Overhead** | Can avoid short polling with WebSockets or real-time push-based communication | Can use Kafka consumers with event-based architecture to avoid polling entirely | +| **Operational Complexity** | Easier to manage, simpler to set up | More complex, requires management of brokers, topics, partitions, etc. | + +### Why RMQ would be a better option than Kafka: + +1. Maintenance of legacy applications that depend on Rabbit MQ +2. Staff training cost and steep learning curve required for implementing Kafka +3. Infrastructure cost for Kafka is higher than that for Rabbit MQ. +4. Troubleshooting problems in Kafka implementation is difficult when compared to that in Rabbit MQ implementation. + - A Rabbit MQ Developer can easily maintain and support applications that use Rabbit MQ. + - The same is not true with Kafka. Experience with just Kafka development is not sufficient to maintain and support applications that use Kafka. The support personnel require other skills like zoo-keeper, networking, disk storage too. + +## Conclusion: + +Personally I would prefer to go with options 3 due to its great message routing mechanism and low latency as well as supports built in **RPC** for req-reply process. However if the system is facing high loads, fast handling of massive prediction streamings throughput, **Kafka** would be a better approach indeed. For the final talks the followings are mentioned patterns (we've assumed that the Celery is removed): + +- RMQ FanOut + WebSocket streaming (supports browser realtime communication, eliminates the shortpolling overhead) +- RMQ + RPC with Correlation Id (eliminates the need of shortpolling overhead) +- RMQ + RPC with Correlation Id + WebSocket streaming (supports browser realtime communication and eliminates the need of shortpolling overhead) ✅ +- Kafka with WebSocket streaming (supports browser realtime communication, eliminates the shortpolling overhead) +- Kafka only (eliminates the need of shortpolling overhead, hight throughput but high latency) \ No newline at end of file