From db73cddb4b0f1e79d7626f2d71e9527fae1cca8c Mon Sep 17 00:00:00 2001 From: wildonion Date: Thu, 19 Sep 2024 22:41:19 +0330 Subject: [PATCH] feat(streaming): finish kafka streaming --- .env | 5 + .github/workflows/cicd.yml | 4 +- .github/workflows/ui.yml | 27 + Cargo.lock | 95 ++- Cargo.toml | 1 + docker-compose.yml | 31 + {src/workers => docs}/Kafka.md | 0 {src/workers => docs}/Rmq.md | 0 {src/workers => docs}/actorWorker.md | 101 ++- {src/tests => docs}/govstokio.threads.md | 0 {src/tests => docs}/lightvscpu.thread.md | 0 {src/tests => docs}/mutexcondvar.md | 0 docs/pin.md | 24 + docs/pointer.md | 47 ++ {src/workers => docs}/streaming.md | 0 docs/trait.md | 191 +++++ docs/vm.md | 326 ++++++++ infra/api.http.json | 149 +++- logs/error-kind/zerlog.log | 8 + src/apis/v1/http/auth.rs | 4 +- src/apis/v1/http/health.rs | 19 +- src/apis/v1/http/notif.rs | 368 ++++++--- src/config/mod.rs | 8 + src/error/mod.rs | 4 +- src/interfaces/product.rs | 4 +- src/lockers/llm.rs | 38 +- src/models/event.rs | 16 +- src/server/mod.rs | 20 +- src/tests/task.rs | 2 + src/tests/tx.rs | 4 +- src/workers/notif/mod.rs | 901 +++++++++++++++++------ src/workers/scheduler/mod.rs | 7 +- 32 files changed, 2034 insertions(+), 370 deletions(-) create mode 100644 .github/workflows/ui.yml rename {src/workers => docs}/Kafka.md (100%) rename {src/workers => docs}/Rmq.md (100%) rename {src/workers => docs}/actorWorker.md (72%) rename {src/tests => docs}/govstokio.threads.md (100%) rename {src/tests => docs}/lightvscpu.thread.md (100%) rename {src/tests => docs}/mutexcondvar.md (100%) create mode 100644 docs/pin.md create mode 100644 docs/pointer.md rename {src/workers => docs}/streaming.md (100%) create mode 100644 docs/trait.md create mode 100644 docs/vm.md diff --git a/.env b/.env index 1c94015..0a350a2 100644 --- a/.env +++ b/.env @@ -21,6 +21,11 @@ AMQP_PORT=5672 AMQP_USERNAME=hoopoe AMQP_PASSWORD=geDteDd0Ltg2135FJYQ6rjNYHYkGQa70 +KAFKA_HOST=localhost # kafka +KAFKA_PORT=9092 +KAFKA_USERNAME=hoopoe +KAFKA_PASSWORD=geDteDd0Ltg2135FJYQ6rjNYHYkGQa70 + IO_BUFFER_SIZE=1204 FILE_SIZE=209715200 ENVIRONMENT=prod diff --git a/.github/workflows/cicd.yml b/.github/workflows/cicd.yml index 31918d5..5aff6c7 100644 --- a/.github/workflows/cicd.yml +++ b/.github/workflows/cicd.yml @@ -32,7 +32,7 @@ jobs: username: ${{ secrets.SERVER_USER }} password: ${{ secrets.SERVER_PASSWORD }} port: 22 - script: | # 1 - pull services from the custom registry (each image in dockercompose file prefixed with registry name) | 2 - up all the services - cd /root/hoopoe + script: | # cd hoopoe directly cause we're already in root + cd hoopoe sudo docker compose -f "docker-compose.yml" pull sudo docker compose -f "docker-compose.yml" up -d \ No newline at end of file diff --git a/.github/workflows/ui.yml b/.github/workflows/ui.yml new file mode 100644 index 0000000..9c78525 --- /dev/null +++ b/.github/workflows/ui.yml @@ -0,0 +1,27 @@ +name: CI/CD Pipeline + +on: + push: + branches: [main] + pull_request: + branches: [main] + +jobs: + build: + runs-on: ubuntu-latest + steps: + - name: Deploy & Build + uses: appleboy/ssh-action@master + with: + host: ${{ secrets.SERVER_HOST }} + username: ${{ secrets.SERVER_USER }} + password: ${{ secrets.SERVER_PASSWORD }} + port: 22 + script: | # cd SomeUiRepo directly cause we're already in root + if [ ! -d "/root/ui" ]; then + git clone https://wildonion:${{ secrets.ACCESS_TOKEN }}@github.com/wildonion/SomeUiRepo.git + fi + cd SomeUiRepo + git pull origin main + npm install + npm run build \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 5368c1c..d3757dd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1021,7 +1021,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c3ef8005764f53cd4dca619f5bf64cafd4664dada50ece25e4d81de54c80cc0b" dependencies = [ "once_cell", - "proc-macro-crate", + "proc-macro-crate 3.1.0", "proc-macro2", "quote", "syn 2.0.75", @@ -2408,6 +2408,7 @@ dependencies = [ "rand", "rand_chacha", "rayon", + "rdkafka", "redis-async", "reqwest 0.12.7", "rslock", @@ -2978,6 +2979,18 @@ dependencies = [ "pkg-config", ] +[[package]] +name = "libz-sys" +version = "1.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2d16453e800a8cf6dd2fc3eb4bc99b786a9b90c663b8559a5b1a041bf89e472" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "linux-raw-sys" version = "0.3.8" @@ -3382,6 +3395,27 @@ dependencies = [ "libc", ] +[[package]] +name = "num_enum" +version = "0.5.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f646caf906c20226733ed5b1374287eb97e3c2a5c227ce668c1f2ce20ae57c9" +dependencies = [ + "num_enum_derive", +] + +[[package]] +name = "num_enum_derive" +version = "0.5.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dcbff9bc912032c62bf65ef1d5aea88983b420f4f839db1e9b0c281a25c9c799" +dependencies = [ + "proc-macro-crate 1.3.1", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "object" version = "0.36.3" @@ -3556,7 +3590,7 @@ version = "3.6.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d830939c76d294956402033aee57a6da7b438f2294eb94864c37b0569053a42c" dependencies = [ - "proc-macro-crate", + "proc-macro-crate 3.1.0", "proc-macro2", "quote", "syn 1.0.109", @@ -3840,13 +3874,23 @@ dependencies = [ "uint", ] +[[package]] +name = "proc-macro-crate" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f4c021e1093a56626774e81216a4ce732a735e5bad4868a03f3ed65ca0c3919" +dependencies = [ + "once_cell", + "toml_edit 0.19.15", +] + [[package]] name = "proc-macro-crate" version = "3.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6d37c51ca738a55da99dc0c4a34860fd675453b8b36209178c2249bb13651284" dependencies = [ - "toml_edit", + "toml_edit 0.21.1", ] [[package]] @@ -4151,6 +4195,36 @@ dependencies = [ "yasna", ] +[[package]] +name = "rdkafka" +version = "0.36.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1beea247b9a7600a81d4cc33f659ce1a77e1988323d7d2809c7ed1c21f4c316d" +dependencies = [ + "futures-channel", + "futures-util", + "libc", + "log", + "rdkafka-sys", + "serde", + "serde_derive", + "serde_json", + "slab", + "tokio", +] + +[[package]] +name = "rdkafka-sys" +version = "4.7.0+2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55e0d2f9ba6253f6ec72385e453294f8618e9e15c2c6aba2a5c01ccf9622d615" +dependencies = [ + "libc", + "libz-sys", + "num_enum", + "pkg-config", +] + [[package]] name = "reactor-trait" version = "1.1.0" @@ -4874,7 +4948,7 @@ version = "0.71.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3b3e39d4fdceefc3d9876a529e8b1824c200d89d60fc0d6b100451861757235e" dependencies = [ - "proc-macro-crate", + "proc-macro-crate 3.1.0", "proc-macro2", "proc-macro2-diagnostics", "quote", @@ -5061,7 +5135,7 @@ version = "0.71.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "acaac9df10ca8df373381a6a46faa9ae7cadd53f07307c7c208920ba02dc4a78" dependencies = [ - "proc-macro-crate", + "proc-macro-crate 3.1.0", "proc-macro2", "quote", "regex", @@ -6324,6 +6398,17 @@ version = "0.6.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0dd7358ecb8fc2f8d014bf86f6f638ce72ba252a2c3a2572f2a795f1d23efb41" +[[package]] +name = "toml_edit" +version = "0.19.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b5bb770da30e5cbfde35a2d7b9b8a2c4b8ef89548a7a6aeab5c9a576e3e7421" +dependencies = [ + "indexmap 2.4.0", + "toml_datetime", + "winnow", +] + [[package]] name = "toml_edit" version = "0.21.1" diff --git a/Cargo.toml b/Cargo.toml index df4a4f7..f463cf5 100755 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,6 +24,7 @@ tracing-subscriber = "0.3" serde = {version = "1", features = ["derive"] } serde_json = { version = "1", features = ["preserve_order"] } redis-async = "0.17" +rdkafka = "0.36.2" sha2 = "0.10" is_type = "0.2.1" rayon = "1.10.0" diff --git a/docker-compose.yml b/docker-compose.yml index d014dca..0658be1 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -119,6 +119,37 @@ services: timeout: 10s retries: 5 start_period: 10s +##### =================================================================================================== + kafka: + image: apache/kafka:latest + container_name: kafka + restart: unless-stopped + ports: + - "9092:9092" + networks: + - hoopoe + - hoopoe_is_there1 + - hoopoe_is_there2 + hostname: kafka + healthcheck: + test: rabbitmq-diagnostics -q ping + interval: 30s + timeout: 10s + retries: 5 + start_period: 10s + environment: + KAFKA_NODE_ID: 1 + KAFKA_PROCESS_ROLES: broker,controller + KAFKA_LISTENERS: PLAINTEXT://localhost:9092,CONTROLLER://localhost:9093 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 + KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT + KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + KAFKA_NUM_PARTITIONS: 3 ##### =================================================================================================== adminer: container_name: adminer diff --git a/src/workers/Kafka.md b/docs/Kafka.md similarity index 100% rename from src/workers/Kafka.md rename to docs/Kafka.md diff --git a/src/workers/Rmq.md b/docs/Rmq.md similarity index 100% rename from src/workers/Rmq.md rename to docs/Rmq.md diff --git a/src/workers/actorWorker.md b/docs/actorWorker.md similarity index 72% rename from src/workers/actorWorker.md rename to docs/actorWorker.md index 530570b..1cd45a5 100644 --- a/src/workers/actorWorker.md +++ b/docs/actorWorker.md @@ -1,5 +1,5 @@ -Actors are threads with isolated and no shared state with outside world, they have a single thread of execution which enables them to execute jobs received by their mailbox jobq channel one at a time, each job however can be scheduled to be executed periodically or at an specific time. +Tasks can be represented as light threads, more of essence ligth threads can be more smarter objects like with isolated and no shared state mechanism with outside world cause they handle one message at a time and ensures internal state remains consistent while processing the message, they have a single thread of execution which enables them to execute jobs received by their jobq mailbox channel one at a time, each job however can be scheduled to be executed periodically or at an specific time. Putting all together creating a package that can execute tasks in ligh thread of executions or light threadpool without sacrificing resources, internal state sharing and syncing with mutex brings us something called Actor which can execute tasks one at a time in the actor runtime light threadpool by sending message to it. ### **1. Workers and Threads**: A **worker** typically refers to an execution unit (or process) that handles jobs or tasks. Workers can be implemented using **threads** or **processes** depending on the underlying concurrency model. @@ -261,4 +261,101 @@ Actix is widely used in building **high-performance, low-latency systems** such 5. **Concurrency**: Non-blocking, asynchronous execution via Rust's async runtime. 6. **Lightweight Tasks**: Efficient use of system resources, enabling the creation of many actors with minimal overhead. 7. **Supervision**: Fault tolerance via supervisors that monitor and restart actors. -8. **Performance**: Actix is designed for high throughput and low latency, making it ideal for real-time and concurrent systems. \ No newline at end of file +8. **Performance**: Actix is designed for high throughput and low latency, making it ideal for real-time and concurrent systems. + +In the context of **actor-based systems** like **Actix** (or other actor frameworks), **actors** are lightweight entities that handle tasks by receiving and processing messages asynchronously. Here's a clearer explanation, tying together the concept of **lightweight threads** (or tasks) and how the **actor runtime** manages their scheduling: + +### **Actors as Lightweight Tasks (or Threads)** +- **Actors** can be thought of as **lightweight tasks** or **green threads** that are scheduled by the **actor runtime**. They are **not full-fledged OS threads** (which can be more resource-heavy and involve expensive context switches), but instead are smaller, more efficient units of execution. + +- These **lightweight tasks** (sometimes called **fibers** or **coroutines**) are more resource-efficient than OS threads because they run within a single or few OS threads, multiplexing many actors over a small number of system threads. This allows a large number of actors to run concurrently. + +- **Actix** uses **async/await** in Rust, allowing it to leverage Rust's async runtime (e.g., **Tokio** or **async-std**) for scheduling these lightweight tasks. This means that many actors can be created and run concurrently, making it possible to efficiently scale the system while keeping the resource footprint low. + +### **Actor Runtime and Task Scheduling** +- The **actor runtime** (like the Actix system) is responsible for **scheduling and managing the execution of actors**. The actor runtime handles several important tasks: + 1. **Message Delivery**: Actors communicate by sending messages to each other. The actor runtime ensures that messages are delivered to the correct actor. + 2. **Task Scheduling**: Each actor processes one message at a time in its own **single-threaded context**. The runtime schedules when an actor should wake up and process the next message in its queue. This is done without blocking other actors, thanks to the non-blocking and asynchronous nature of the actor model. + 3. **Multiplexing Actors**: The runtime efficiently multiplexes many actors over a few OS threads. This way, a single physical thread can handle the work of hundreds or even thousands of actors, switching between actors when they are ready to process the next message. + 4. **Concurrency Without Shared State**: Since actors don’t share memory and each actor processes messages one at a time, there’s no need for locks or synchronization mechanisms (like mutexes). The runtime manages message queues for each actor, ensuring messages are processed in the correct order. + +### **Message Passing and Asynchronous Execution** +- In actor systems, the **messages** that actors send to each other are how work is distributed. Instead of directly calling functions or manipulating shared state, actors interact by **passing messages**. The runtime ensures that each actor processes its incoming messages **asynchronously**. + +- **Non-blocking execution**: When an actor receives a message, it can process it in a non-blocking way (using `async/await` in Rust). This means the actor doesn’t have to wait for long-running tasks to complete (such as I/O operations), allowing the runtime to switch to another actor and keep the system responsive. + +### **Actors as Single-Threaded Execution Units** +- **Single-threaded execution**: Each actor processes **only one message at a time**. This may seem like a limitation, but it is a deliberate design choice that simplifies concurrency. Since the actor can only process one message at a time, there’s no risk of race conditions within the actor itself. It ensures that all internal state remains consistent while processing a message. + +- The runtime efficiently switches between actors when they have messages to process, ensuring high concurrency and throughput. + +--- + +### **Putting It All Together** +In **Actix** (and actor systems in general): +- **Actors** are lightweight tasks that are not full OS threads, but instead **lightweight units of execution** that run within the context of a few OS threads. +- The **actor runtime** schedules these tasks (actors) by: + 1. Handling **message delivery**. + 2. Scheduling the actors for execution **asynchronously** when they receive messages. + 3. Ensuring that actors process **one message at a time** (single-threaded), which avoids the need for locks or synchronization. + 4. **Multiplexing** a large number of actors over a small number of OS threads, making the system **scalable and efficient**. + +This design ensures that the system can handle **high concurrency and low-latency** workloads while keeping resource consumption low and simplifying concurrency management. + +--- + +### **Example of Actix Actor Lifecycle and Message Handling** + +```rust +use actix::prelude::*; + +// Define an actor +struct MyActor; + +impl Actor for MyActor { + type Context = Context; +} + +// Define a message +#[derive(Message)] +#[rtype(result = "usize")] +struct MyMessage(String); + +// Implement message handler +impl Handler for MyActor { + type Result = usize; + + fn handle(&mut self, msg: MyMessage, _: &mut Self::Context) -> Self::Result { + println!("Received message: {}", msg.0); + msg.0.len() // Return the length of the message string + } +} + +#[actix_rt::main] +async fn main() { + // Start an instance of the actor + let actor = MyActor.start(); + + // Send a message asynchronously + let result = actor.send(MyMessage("Hello, Actix!".to_string())).await; + + // Output the result (length of the message string) + match result { + Ok(res) => println!("Message length: {}", res), + Err(err) => println!("Error handling message: {:?}", err), + } +} +``` + +### **Key Concepts in the Example**: +- The `MyActor` actor processes **one message at a time** (each message of type `MyMessage`). +- The actor’s runtime manages the **scheduling of tasks** and the **delivery of messages**. +- The message is processed **asynchronously**, without blocking, allowing other actors to work concurrently. + +--- + +### **Summary** +- **Actors** in Actix are **lightweight tasks** that are scheduled by the **actor runtime** to process messages. +- The actor runtime **schedules these tasks** efficiently over a pool of OS threads, ensuring high concurrency with minimal resource usage. +- Since actors are **single-threaded**, they process **one message at a time**, which ensures consistency and simplifies concurrency management. +- This model allows Actix to handle **large-scale concurrent systems** efficiently, using Rust's powerful async capabilities for non-blocking execution. \ No newline at end of file diff --git a/src/tests/govstokio.threads.md b/docs/govstokio.threads.md similarity index 100% rename from src/tests/govstokio.threads.md rename to docs/govstokio.threads.md diff --git a/src/tests/lightvscpu.thread.md b/docs/lightvscpu.thread.md similarity index 100% rename from src/tests/lightvscpu.thread.md rename to docs/lightvscpu.thread.md diff --git a/src/tests/mutexcondvar.md b/docs/mutexcondvar.md similarity index 100% rename from src/tests/mutexcondvar.md rename to docs/mutexcondvar.md diff --git a/docs/pin.md b/docs/pin.md new file mode 100644 index 0000000..fc43bb6 --- /dev/null +++ b/docs/pin.md @@ -0,0 +1,24 @@ + + +> Why we need to pin future objects into the RAM? + +When dealing with future objects in Rust, breaking cycles (circular references) is essential to prevent memory leaks and ensure proper memory management. While `Rc` (Reference Counting) and `Box` are commonly used for managing ownership and references in Rust, they are not sufficient for breaking cycles in future objects. The `Pin` type is specifically designed to address the issue of breaking cycles in future objects and ensuring soundness in asynchronous Rust code. Here's why `Pin` is used for breaking cycles in future objects: + +### Why `Pin` is Used for Breaking Cycles in Future Objects: + +1. **Preventing Unwinding and Stabilizing Memory:** + - `Pin` is used to prevent unwinding (moving) of future objects in memory, ensuring that the memory location of the future object remains stable. This is crucial for breaking cycles in future objects and preventing memory leaks. + +2. **Ensuring Soundness in Asynchronous Code:** + - Asynchronous Rust code, especially when using futures, requires additional guarantees to ensure memory safety and prevent undefined behavior. `Pin` provides these guarantees by restricting the movement of future objects once they are pinned. + +3. **Enforcing Pinning Contract:** + - `Pin` enforces the pinning contract, which states that once an object is pinned, it cannot be moved in memory. This is crucial for breaking cycles in future objects and maintaining the integrity of the asynchronous code. + +4. **Interaction with Unsafe Code:** + - When dealing with unsafe code or FFI (Foreign Function Interface) interactions, `Pin` is used to ensure that future objects remain stable and do not violate memory safety rules. + +5. **Compatibility with `Future` Trait:** + - `Pin` is designed to work seamlessly with the `Future` trait and asynchronous programming in Rust, providing a safe and efficient way to break cycles in future objects. + +While `Rc` and `Box` are useful for managing ownership and references in Rust, they do not provide the necessary guarantees for breaking cycles in future objects and ensuring memory stability in asynchronous code. `Pin` is specifically designed for this purpose and is the recommended approach for handling future objects in Rust's asynchronous ecosystem. \ No newline at end of file diff --git a/docs/pointer.md b/docs/pointer.md new file mode 100644 index 0000000..0c9005e --- /dev/null +++ b/docs/pointer.md @@ -0,0 +1,47 @@ +The differences between `&mut Type`, `Box`, and `Box<&mut Type>` in Rust relate to ownership, borrowing, and memory management. Here's a breakdown of each type and their characteristics: + +### `&mut Type`: + +1. **Mutable Reference:** + - `&mut Type` represents a mutable reference to a value of type `Type`. + - It allows mutable access to the referenced value but enforces Rust's borrowing rules, ensuring that there is only one mutable reference to the value at a time. + +2. **Borrowing:** + - The reference is borrowed and does not own the value it points to. + - The lifetime of the reference is tied to the scope in which it is borrowed, and it cannot outlive the value it references. + +### `Box`: + +1. **Heap Allocation:** + - `Box` is a smart pointer that owns a value of type `Type` allocated on the heap. + - It provides a way to store values with a known size that can be dynamically allocated and deallocated. +2. **Ownership Transfer:** + - `Box` transfers ownership of the boxed value to the box itself. + - It allows moving the box between scopes and passing it to functions without worrying about lifetimes. + +### `Box<&mut Type>`: + +1. **Boxed Mutable Reference:** + - `Box<&mut Type>` is a box containing a mutable reference to a value of type `Type`. + - It allows for mutable access to the value, similar to `&mut Type`, but with the value stored on the heap. + +2. **Indirection and Ownership:** + - The box owns the mutable reference and manages its lifetime on the heap. + - This can be useful when you need to store a mutable reference with a dynamic lifetime or when you want to transfer ownership of the reference. + + use box to store on the heap to break the cycle of self ref types and manage the lifetime dynamically + on the heap of the type + +### Comparison: + +- **`&mut Type`:** Represents a mutable reference with borrowing semantics and strict lifetime rules. +- **`Box`:** Represents ownership of a value on the heap with the ability to move it between scopes. +- **`Box<&mut Type>`:** Represents ownership of a mutable reference stored on the heap, providing flexibility in managing mutable references with dynamic lifetimes. + +### When to Use Each: + +- **`&mut Type`:** Use when you need mutable access to a value within a limited scope and want to enforce borrowing rules. +- **`Box`:** Use when you need to store a value on the heap and transfer ownership between scopes. +- **`Box<&mut Type>`:** Use when you need to store a mutable reference with dynamic lifetime requirements or when you want to manage mutable references on the heap. + +Each type has its own use cases based on ownership, borrowing, and memory management requirements in Rust. Understanding the differences between them helps in choosing the appropriate type for your specific needs. \ No newline at end of file diff --git a/src/workers/streaming.md b/docs/streaming.md similarity index 100% rename from src/workers/streaming.md rename to docs/streaming.md diff --git a/docs/trait.md b/docs/trait.md new file mode 100644 index 0000000..ad87160 --- /dev/null +++ b/docs/trait.md @@ -0,0 +1,191 @@ +The `impl Trait` syntax and `Box` are both used in Rust for handling trait objects, but they have different implications and usage scenarios. Here are the key differences between `impl Trait` in the return type of a method and `Box`: + +### `impl Trait` in Return Type: + +1. **Static Dispatch:** + - When using `impl Trait` in the return type of a method, the actual concrete type returned by the function is known at compile time. + - This enables static dispatch, where the compiler can optimize the code based on the specific type returned by the function. + +2. **Inferred Type:** + - The concrete type returned by the function is inferred by the compiler based on the implementation. + - This allows for more concise code without explicitly specifying the concrete type in the function signature. + +3. **Single Type:** + - With `impl Trait`, the function can only return a single concrete type that implements the specified trait. + - The actual type returned by the function is hidden from the caller, providing encapsulation. + +### `Box`: +1. **Dynamic Dispatch:** + - When using `Box`, the trait object is stored on the heap and accessed through a pointer, enabling dynamic dispatch. + - Dynamic dispatch allows for runtime polymorphism, where the actual type can be determined at runtime. + +2. **Trait Object:** + - `Box` represents a trait object, which can hold any type that implements the specified trait. + - This is useful when you need to work with different concrete types that implement the same trait without knowing the specific type at compile time. + +3. **Runtime Overhead:** + - Using `Box` incurs a runtime overhead due to heap allocation and dynamic dispatch. + - This can impact performance compared to static dispatch with `impl Trait`. + +### When to Use Each: + +- **`impl Trait`:** Use `impl Trait` when you have a single concrete type to return from a function and want to leverage static dispatch for performance optimization. +- **`Box`:** Use `Box` when you need to work with multiple types that implement a trait dynamically at runtime or when dealing with trait objects in a more flexible and polymorphic way. + +In summary, `impl Trait` is used for static dispatch with a single concrete type known at compile time, while `Box` is used for dynamic dispatch with trait objects that can hold different types implementing the same trait at runtime. The choice between them depends on the specific requirements of your code in terms of performance, flexibility, and polymorphism. + +In Rust, both **dynamic dispatch** (using `Box`) and **static dispatch** (using `impl Interface1`) require the type (`User` in this case) to implement the trait (`Interface1`). This is because the Rust type system enforces that any type used in place of a trait must conform to the behavior (methods and associated types) defined by that trait, regardless of whether it's dynamically or statically dispatched. Let's break down why this is necessary for both dispatch mechanisms: + +### 1. **Static Dispatch (`impl Interface1`)**: + +- **Static dispatch** means that the concrete type implementing the trait (`User`) is known at **compile time**. +- When you use `impl Interface1` in a function signature, it tells the compiler that the function will return a type that **implements** the trait `Interface1`. However, the **concrete type** (in this case `User`) is known during compilation, allowing the compiler to **inline** or **monomorphize** the code for performance. +- Since the compiler is generating specialized code for the type, it needs to ensure at compile time that the type (`User`) conforms to the behavior defined in the `Interface1` trait. This allows the compiler to know that the concrete type has the necessary methods and properties, which are essential to the functionality of the trait. + +Thus, the type **must implement the trait** for the static dispatch case because the type-specific methods need to be generated and compiled. + +### 2. **Dynamic Dispatch (`Box`)**: + +- **Dynamic dispatch** means that the concrete type implementing the trait is not known at compile time but rather at **runtime**. This is accomplished through **vtable-based dispatching** (a pointer to a table of function pointers, called a vtable, is used to resolve method calls at runtime). +- When you use `Box`, you're telling the compiler that any type that **implements** the trait `Interface1` can be stored in the `Box`, but the exact type (`User`, for example) will be determined at runtime. +- Even though the type is determined at runtime, the trait must still be implemented by the type. This is because the **vtable** that will be used at runtime is generated by the compiler based on the methods defined in the `Interface1` trait. Without implementing the trait, the compiler has no way of ensuring that the type provides the necessary methods to satisfy the contract of the trait. + +Thus, the type **must implement the trait** for the dynamic dispatch case as well, because the vtable needs to point to the correct methods, and this vtable is built based on the trait implementation. + +### Why Both Require the Trait to be Implemented + +The core reason that both dynamic (`Box`) and static (`impl Interface1`) dispatch require the trait to be implemented is that **traits define a set of behaviors (methods and possibly associated types) that a type must provide**. + +- In static dispatch, the compiler needs to generate type-specific code at compile time, so it needs to know that the type (`User`) satisfies the trait. +- In dynamic dispatch, even though the actual method resolution happens at runtime, the compiler must still generate the appropriate vtable (for dynamic method lookup) based on the trait's methods. Thus, the type still needs to implement the trait to ensure that the correct methods are available at runtime. + +### Key Differences: + +- **Static Dispatch (`impl Interface1`)**: + - Monomorphized at **compile time**. + - Type is known at compile time. + - Compiler generates type-specific code (faster, more optimized). + +- **Dynamic Dispatch (`Box`)**: + - Resolved at **runtime** using vtables. + - Type is not known until runtime. + - Slight overhead due to runtime method lookup. + +In both cases, however, the type must adhere to the contract defined by the trait, which is why the trait must be implemented regardless of whether static or dynamic dispatch is used. This allows the Rust compiler to ensure type safety and that the type provides the necessary methods. + +In Rust, for a trait to be used in **dynamic dispatch** (i.e., when you use `Box`), the trait must be **object-safe**. This is because **dynamic dispatch** in Rust relies on **vtables** (virtual method tables) to look up methods at runtime, and the **structure of the trait** must meet certain conditions to support this mechanism. Let's explore why traits need to be object-safe for dynamic dispatch and what makes a trait object-safe. + +### Why Traits Must Be Object-Safe for Dynamic Dispatch + +Dynamic dispatch works by storing a **trait object** (e.g., `Box`) where the **concrete type** implementing the trait isn't known at compile time. Instead, a **vtable** is used at runtime to resolve the appropriate method to call. A **vtable** is essentially a table of function pointers, and the runtime system uses it to invoke the correct method for the concrete type. That's why we can't use **GAT** or `type`, `self` (`&self` is allowed) and `Self` in object safe trait, there must be only functions without having a direction to the implementor. + +To allow the Rust compiler to generate the necessary vtable and resolve method calls dynamically at runtime, the trait must have properties that allow the compiler to generate a **single vtable** that can be used to look up methods. If the trait allows certain features that are incompatible with this approach, then it's not possible to use the trait in dynamic dispatch. + +### What Makes a Trait Object-Safe? + +> dynamic dispatching can also be used as dependency injection process through `Box`, `Arc`, `&dyn Trait`, or `Rc`. + +A trait is **object-safe** if it satisfies certain constraints that ensure it can be used to create a **trait object** (like `Box`, `Arc`, `&dyn Trait`, or `Rc`). These constraints exist because dynamic dispatch requires that the methods can be looked up through a vtable, and the trait methods must have predictable behavior that fits this model. + +#### The Key Rules for Object Safety: + +1. **No `self` by Value in Methods**: + - A trait cannot have methods that take `self` by value (i.e., `self` is passed by value, like `fn method(self)`). This is because in dynamic dispatch, Rust only knows about the **trait object** (a pointer to the value), but it doesn't know the size or concrete type of the underlying object. + + - If the trait method took `self` by value, Rust would need to know the concrete size of `self` to move it (cause due to ownership rules in order to move a type we should know its types hence the size accordingly), but in a `dyn Trait`, the size is unknown (because different types can implement the trait at runtime with different sizes). Thus, Rust can't generate a vtable entry for such methods. + + **Invalid example**: + ```rust + trait MyTrait { + fn invalid_method(self); // Not object-safe because `self` is taken by value + } + ``` + + **Valid example**: + ```rust + trait MyTrait { + fn valid_method(&self); // Object-safe because `self` is a reference + } + ``` + +2. **No Generic Methods**: + - A trait cannot have methods that are generic over types (i.e., methods that use type parameters like `fn method(&self)`), because the vtable needs to provide a fixed signature for each method. By the way passing generic into the methdos or trait signature could solve the problem of polymorphism. + + - With generic methods, Rust would need to create separate versions of the method for each type `T`, but this is not compatible with dynamic dispatch, where the method's signature must be fixed in the vtable. + + **Invalid example**: + ```rust + trait MyTrait { + fn invalid_method(&self, value: T); // Not object-safe because it's generic + } + ``` + + **Valid example**: + ```rust + trait MyTrait { + fn valid_method(&self); // Object-safe + } + ``` + +### Why the Rules Ensure Object Safety + +The key issue with dynamic dispatch is that the type of the underlying object (`self`) is not known at compile time. For dynamic dispatch to work, the methods must be able to operate on a **trait object**, which is essentially a **pointer** to the object implementing the trait using either smart pointers like `Box`, `Rc` or `Arc` or `&'valid dyn`. The **vtable** only has information about the method signatures at a high level, without knowledge of the concrete type behind the trait object cause it only knows it by pointer! + +Let's consider these constraints in light of the vtable: + +- **`self` by Value**: If a method takes `self` by value, it needs to know how much memory to move or drop (that's why we can't call other methods on the instance after calling a method which has `self` instead of `&self` cause it's already been moved). But since Rust only has a pointer to the trait object and doesn't know the actual size of `self`, this operation is unsafe or impossible. Therefore, the trait must only use references (`&self` or `&mut self`), where Rust can safely dereference the pointer without needing to know the size of the underlying object. + +- **Generics**: For a method that's generic over some type `T`, the method could behave differently depending on the type `T`, and Rust would need to generate different versions of the method for each instantiation. However, in dynamic dispatch, the vtable needs a **single entry** for each method with a fixed signature. This means the method can't be generic, since the exact types must be known when the method is compiled and linked into the vtable. + +### Example of a Non-Object-Safe Trait + +```rust +trait NotObjectSafe { + // This method takes self by value, which is not allowed for dynamic dispatch + fn consume(self); + + // This method is generic, which also prevents it from being object-safe + fn generic_method(&self, arg: T); +} +``` + +### Example of an Object-Safe Trait + +```rust +trait Interface{ + fn getCode(&self); +} +trait CantBe{ + fn getCode(&self) -> &Self; +} +impl CantBe for String{ + fn getCode(&self) -> &Self { + self + } +} +impl Interface for String{ + fn getCode(&self) { + + } +} + +// can't use CantBe trait for dynamic dispatch and dependency injections +// let deps: Vec> = vec![Box::new(String::from("12923"))]; + +// using Interface for dynamic dispatch and dependency injections +let deps: Vec> = vec![Box::new(String::from("12923"))]; +``` + +The `Interface` trait is object-safe because it only has methods that take a reference (`&self`), and no methods are generic, involve ownership transfer or even returning `Self`. + +### Why Object Safety is Important for Vtables + +In **dynamic dispatch**, the vtable is a lookup table for method calls. It must contain the addresses of the actual methods that can be called on the trait object (since everything is done through pointing). If the trait includes methods that involve operations Rust can't handle without knowing the exact type (`self` by value, generics, etc.), then the vtable would be unable to provide correct entries for these methods, and the dynamic dispatch mechanism would break. + +In essence, the **object-safety rules** ensure that the trait's methods can be **represented in a vtable** and that the **dynamic dispatch** mechanism can resolve the correct method at runtime without needing knowledge of the concrete type through only pointer or referencing. +dynamic dispatch requires object safety (don't have `self`, `Self`, generic) rules to ensure trait's methods can be inside a vtable. + +### Conclusion + +A trait must be **object-safe** for dynamic dispatch because **dynamic dispatch** relies on the **vtable** to look up method implementations at runtime. The vtable requires fixed method signatures and doesn't have knowledge of the concrete type implementing the trait. Thus, the trait needs to follow certain constraints (no `self` by value, no generics) to ensure that methods can be correctly stored in and looked up from the vtable. This guarantees that Rust can invoke the correct method at runtime without needing the size or type-specific details of the implementing type. +After all dispatching dynamically means that there would be a type regardless of its size that needs to gets extended at runtime by implementing a trait for that to add some extra ability on the object like new methods. \ No newline at end of file diff --git a/docs/vm.md b/docs/vm.md new file mode 100644 index 0000000..cbc0bf0 --- /dev/null +++ b/docs/vm.md @@ -0,0 +1,326 @@ + + +### 1. **Compilation and Bytecode Generation** + - When we compile a smart contract, it gets transformed into bytecode. This bytecode is what will be deployed onto the blockchain. Along with the bytecode, the **ABI (Application Binary Interface)** is generated, which defines how you can interact with the contract (e.g., what functions can be called and what parameters are needed). + +### 2. **Contract Deployment** + - When we deploy a smart contract, the bytecode is included in a transaction that is sent to the blockchain. However, additional information such as the **sender's address (deployer)** and possibly some initial parameters are included in this transaction. The contract address is actually **determined by the blockchain** based on the deployer's address and the nonce (number of transactions sent by the deployer). + - Once deployed, the smart contract exists as a **self-contained entity** on the blockchain, with its own address and state. + +### 3. **Contracts as Actors** + - The contract can be considered an "actor" on the blockchain. It holds its **own state** and the **bytecode** for its execution logic. The contract's state can include variables like balances, ownership, and other data. + - Importantly, the contract has no private key or ability to initiate actions on its own—it only responds to external transactions or calls. These calls are made by external accounts (EOAs) or other contracts. + +### 4. **Invoking Contract Methods (Transactions)** + - When a client wants to interact with a smart contract (e.g., call a function), a **transaction** is created. This transaction contains: + - The **address** of the contract being called. + - The **function signature** and any **parameters** for the function. + - The **gas limit** and **gas price**. + - The **sender's address** and any **value** (if sending cryptocurrency, e.g., Ether on Ethereum). + + This transaction is then broadcast to the blockchain. + +### 5. **Transaction Execution by Validators** + - When a smart contract function is invoked, the transaction gets processed by the **validators (or miners)**. These validators execute the contract's bytecode in the blockchain's **virtual machine (VM)**. + - The execution happens in a **deterministic** way on every validator node, meaning that all nodes must execute the same code and arrive at the same result. This ensures **consensus**. + + **Clarification**: Not all contract function calls require a full transaction. Some calls are "read-only" (view/pure functions), which means they do not alter the contract state. These can be executed **locally** on a node without involving validators. Only state-changing calls (e.g., transferring tokens, modifying contract data) require transactions and gas fees. + + +### 6. **Execution on Validator Hardware** + - When a state-changing transaction is sent, the contract's bytecode is executed on the **validator nodes' hardware** within the VM environment. Each node runs the same bytecode, ensuring that all nodes reach the same output and update the contract's state consistently. + - Validators (or miners) execute the transaction, consume **gas** for every operation, and modify the state accordingly. + - Once the transaction is processed, the **new state** of the contract is written to the blockchain. + +### Additional Points: + - **Gas Mechanism**: Every operation performed by a smart contract consumes a certain amount of **gas**, which is paid by the sender of the transaction. The gas ensures that the network is compensated for the computational resources used, and also prevents infinite loops or denial-of-service attacks. + - **Consensus**: The result of contract execution is agreed upon by the network's consensus mechanism (e.g., Proof of Work, Proof of Stake). Once the consensus is reached, the state is updated across the entire network. + + +When we refer to **contract bytecode**, we're talking about the **compiled form** of the smart contract. This bytecode is essentially a machine-readable version of the smart contract, which is deployed on the blockchain when the contract is created. It includes the logic and instructions that the contract will execute, but it is not in human-readable form like the original source code (e.g., Solidity). + +### Breakdown of What Happens When a Function is Invoked!? + +1. **Contract Bytecode on the Chain**: + - Once a smart contract is deployed, its **bytecode** is stored at the contract's address on the blockchain. This bytecode remains there permanently, and any interaction with the contract refers back to this bytecode. + - The bytecode contains all the logic of the contract: its functions, fallback functions, and any storage structures. + - The blockchain nodes store this bytecode and reference it whenever an interaction occurs with the contract. + +2. **How a Function Call Is Encoded in a Transaction**: + - When a client (user or another contract) wants to call a function on a smart contract, the **transaction data** field includes the following: + - **Function selector**: A 4-byte identifier derived from the function's **signature**. The signature is a hash of the function name and its parameters (e.g., `transfer(address,uint256)`). + - **Encoded arguments**: The parameters (arguments) that are passed to the function, encoded in a format understood by the contract (using Ethereum's ABI encoding, for example). + + Example: + - Let's say you are calling `transfer(address recipient, uint256 amount)` in a smart contract. + - The first 4 bytes of the transaction data will be the **function selector**, which is the first 4 bytes of the keccak256 hash of the function signature: `transfer(address,uint256)`. + - The rest of the transaction data will be the encoded values of `recipient` and `amount`. + +3. **How the Virtual Machine (VM) Executes the Call**: + - When the transaction is received by a validator node, it needs to **execute the function call**. This happens in the blockchain's **virtual machine**. + - The VM looks at the **contract address** specified in the transaction and retrieves the **contract's bytecode** from storage (the blockchain). + - The transaction's **data field** (which contains the function selector and encoded parameters) is passed into the VM. + +4. **Matching the Function Call to the Bytecode**: + - Inside the contract bytecode, each function has its own entry point. The VM uses the **function selector** from the transaction to identify which function is being called. + - When a contract is compiled, the bytecode includes a **jump table** or a **dispatcher** logic that knows how to route incoming function calls based on their selectors. + - The VM takes the first 4 bytes of the transaction's data (the function selector), looks it up in the contract's bytecode, and **jumps** to the corresponding section of the bytecode where that function is defined. + +5. **Executing the Function**: + - Once the correct function has been located within the bytecode, the VM begins executing the instructions defined for that function, using the provided parameters. + - During execution, the VM may modify the contract's **storage** (e.g., updating balances, variables) or transfer cryptocurrency, depending on the logic of the function. + - All of this is executed in a deterministic manner, meaning every validator node will perform the exact same computation to ensure consensus. + +### Example of How This Works in Practice: +Let's take a simplified scenario where you're interacting with a deployed contract to call the `transfer(address,uint256)` function. + +1. **Step 1: Create a Transaction** + - You create a transaction that specifies: + - The **contract address**. + - The **data field** containing the **function selector** for `transfer(address,uint256)` (the first 4 bytes of the hash of the function signature). + - The **encoded arguments** (e.g., the recipient's address and the amount to transfer). + +2. **Step 2: Validator Receives the Transaction** + - The validator receives the transaction and sees that it is directed at a specific **contract address**. The contract bytecode is already stored at this address on the blockchain. + +3. **Step 3: VM Execution** + - The VM loads the contract's bytecode and examines the **function selector** in the transaction's data. + - It matches this selector to a specific section in the contract's bytecode that corresponds to the `transfer` function. + +4. **Step 4: Function Execution** + - The VM starts executing the bytecode for the `transfer` function, using the **arguments** provided in the transaction data (i.e., recipient and amount). + - If the function involves modifying balances or state variables, these changes are performed and then written to the blockchain as part of the **contract's state**. + +### Summary of Key Concepts: +- **Contract Bytecode**: The compiled code of the smart contract stored at the contract's address. It contains all the logic of the contract's functions. +- **Function Call**: When a contract function is invoked, the transaction contains a **function selector** and encoded arguments. +- **VM Execution**: The virtual machine retrieves the contract's bytecode and uses the **function selector** to identify which part of the bytecode to execute. +- **Jump Table/Dispatcher**: Part of the bytecode that helps route the transaction to the correct function in the contract. + +Let's walk through the entire process from compiling a smart contract to invoking a function on it, step by step. This will cover everything from compiling the contract code to interacting with it on the blockchain. + +``` +1) compile contract using compiler +2) create an actor contract struct contains bytecode, owner, ... on the chain +3) assign an address to the bytecode of actor contract +4) function call of the contract in form of a hashed tx will be sent to the chain +5) all validators at the same receive it, they fetch the contract bytecode using its address +6) all validators at the same time try to match the contract function signature in the tx against the actor contract bytecode +7) all validators at the same execute the tx in vm +8) actor contract state gets mutated in the entire chain +``` + +### 1. **Writing the Smart Contract** + +The first step is writing the smart contract code. This is typically done in a high-level language that is suitable for the blockchain platform you're working on. For example: +- **Ethereum**: Solidity or Vyper +- **Solana**: Rust +- **Binance Smart Chain**: Solidity + +Here's an example of a simple Solidity contract: + +```solidity +pragma solidity ^0.8.0; + +contract SimpleStorage { + uint256 storedData; + + function set(uint256 data) public { + storedData = data; + } + + function get() public view returns (uint256) { + return storedData; + } +} +``` + +### 2. **Compiling the Contract** + +Once the contract code is written, it needs to be **compiled** into **bytecode** and an **ABI (Application Binary Interface)**. The compiler translates the human-readable source code (like Solidity) into **bytecode**, which is understood by the blockchain's **virtual machine**. + +#### Compilation Output: +- **Bytecode**: The machine code that the blockchain can execute. +- **ABI**: A JSON structure that defines how to interact with the contract, including details about its functions, arguments, and return types. + +For example, after compiling the `SimpleStorage` contract, you might get the following: +- **Bytecode**: `0x608060405234801561001057600080fd5b50610120806100206000396000f3fe...` +- **ABI**: +```json +[ + { + "inputs": [ + { + "internalType": "uint256", + "name": "data", + "type": "uint256" + } + ], + "name": "set", + "outputs": [], + "stateMutability": "nonpayable", + "type": "function" + }, + { + "inputs": [], + "name": "get", + "outputs": [ + { + "internalType": "uint256", + "name": "", + "type": "uint256" + } + ], + "stateMutability": "view", + "type": "function" + } +] +``` + +### 3. **Deploying the Contract** + +Now that you have the **bytecode** and **ABI**, you can deploy the contract to the blockchain. The deployment process involves sending a **transaction** to the blockchain that contains the bytecode and possibly some initial parameters. Here's how it works: + +#### Steps: +1. **Create a Transaction**: + - A transaction is created to deploy the contract. This transaction includes: + - **Bytecode**: The compiled contract code. + - **Sender's Address**: The account deploying the contract (this becomes the deployer or owner). + - **Gas Limit**: The maximum amount of gas the deployer is willing to spend for deployment. + +2. **Broadcast the Transaction**: + - The transaction is broadcast to the network, and validators (or miners, depending on the chain) will pick it up. + - If the transaction is successful, a **contract address** is assigned to the deployed contract. This address is deterministically created using the **deployer's address** and the **transaction nonce**. + +3. **Deployment Confirmation**: + - After the contract is deployed, it gets an **address on the blockchain**. This address is where the contract is stored, and it's where users or other contracts can interact with it. + + Example: `0x1234...5678` could be the contract's new address on the blockchain. + +### 4. **Interacting with the Deployed Contract** + +Once the contract is deployed, it can be interacted with by invoking its functions. Interactions with smart contracts can either: +- **Read data (view)**: Call a function that reads from the contract's storage without modifying it. +- **Write data (state-changing)**: Call a function that modifies the contract's state, such as changing stored values, transferring tokens, etc. + +#### Example of Function Call: +Let's say you want to call the `set` function to store the number `42` in the `SimpleStorage` contract. The process involves: + +#### Steps: +1. **Create a Transaction** (for state-changing function calls): + - To invoke a contract function that modifies the state, you must create a **transaction**. This includes: + - **Contract Address**: The address of the deployed contract. + - **Function Selector**: The first 4 bytes of the keccak256 hash of the function signature (e.g., `set(uint256)`). + - **Encoded Parameters**: The parameters for the function, encoded using the contract's ABI. + - **Gas Limit & Gas Price**: The amount of gas you're willing to pay for the transaction. + - **Sender's Address**: The address of the account making the transaction. + + For `set(uint256)`, you encode `42` as the parameter: + - **Function Selector** for `set(uint256)`: `0x60fe47b1` + - **Encoded Data**: `000000000000000000000000000000000000000000000000000000000000002a` (42 in hex) + + The **transaction data** would look something like: + ```text + 0x60fe47b1000000000000000000000000000000000000000000000000000000000000002a + ``` + +2. **Send the Transaction**: + - This transaction is then sent to the blockchain. Validators will pick up the transaction and start executing it using the contract's **bytecode** that is stored at the contract's address. + +3. **VM Execution**: + - The validators (nodes running the Ethereum Virtual Machine or another blockchain's VM) will execute the bytecode stored at the contract address. They match the **function selector** with the corresponding function in the bytecode and begin execution using the **parameters** provided in the transaction. + +4. **State Change**: + - As the function executes, the VM modifies the contract's state. In the case of `set(uint256)`, the value `42` will be stored in the contract's storage variable `storedData`. + +5. **Transaction Finalization**: + - Once the transaction is processed, it is added to a block, and the state changes (like the new value `42` in `storedData`) are finalized across the network. + +### 5. **Reading Data from the Contract (View Functions)** + +For functions that don't modify the contract state (like the `get()` function), you can **call** them without creating a full transaction. These are "view" or "pure" functions that only read data and don't require a gas fee. + +#### Steps: +1. **Call the Contract Function**: + - You can create a **read-only call** to the contract. In this case, there's no need to pay gas since no state change is involved. + +2. **VM Executes Locally**: + - The contract's bytecode is executed locally by a node, and the result is returned. In the case of `get()`, the value stored in `storedData` (which we set to `42`) will be returned. + +3. **Return Value**: + - The result of the function (`42` in this case) is returned to the user or client application, and no state changes occur. + +### Complete Flow Summary + +1. **Contract Compilation**: + - Source code is compiled into bytecode and ABI. + +2. **Contract Deployment**: + - A transaction with the contract's bytecode is sent to the blockchain, and the contract is assigned an address. + +3. **Function Invocation (State-Changing)**: + - To invoke a contract's function (e.g., `set(uint256)`), a transaction is created with encoded function call data, sent to the network, and executed by validators, updating the contract's state. + +4. **Function Invocation (Read-Only)**: + - Read-only function calls (e.g., `get()`) are executed locally by the node and do not involve a transaction or gas fees. + +In the context of blockchain smart contracts, it is **not accurate** to say that contract actors (i.e., smart contracts) communicate directly through traditional RPC (Remote Procedure Call) protocols like **gRPC** or **Cap'n Proto (capnpc)**. Instead, blockchain communication and interaction between smart contracts follow a different paradigm based on transactions and blockchain infrastructure. Here's why: + +### Key Differences Between Smart Contract Communication and RPC + +1. **Smart Contracts on Blockchain**: + - **Smart contracts** are self-contained programs that exist on the blockchain and only execute when they are **invoked** through transactions. + - They do not initiate actions or "calls" on their own, unlike traditional client-server RPC models. Instead, they are **passive** and only respond to **external transactions** initiated by users or other contracts. + - Communication between smart contracts happens **within the blockchain environment** (e.g., Ethereum Virtual Machine for Ethereum-based blockchains) and is executed as part of the same transaction, **not through external RPC protocols**. + +2. **Transaction-Based Communication**: + - When a smart contract invokes another smart contract (i.e., a function from another contract), it happens **within the blockchain**, using the virtual machine's execution environment. This is fundamentally different from external systems communicating over RPC. + - Contract-to-contract communication happens **synchronously** as part of a single transaction. The invoking contract includes the address of the contract being called and passes encoded data (using ABI encoding) as part of the transaction. + - This communication happens **on-chain** and does not leave the blockchain ecosystem. + +3. **No Network Calls Involved**: + - Unlike **gRPC** or **Cap'n Proto**, which are used for inter-service communication across a network (e.g., between microservices in a distributed system), smart contracts do not rely on external network protocols. Everything happens inside the blockchain, and there is no need for external protocol translation. + - Calls between contracts do not involve standard networking layers (e.g., HTTP/2, TCP) or serialization formats used in gRPC or Cap'n Proto. + +### How Smart Contract Communication Actually Works: + +Here's how smart contracts typically communicate with one another on a blockchain like Ethereum: + +1. **Contract A Calls Contract B**: + - Contract A wants to invoke a function on Contract B. This can happen within the same transaction. + - Contract A knows the address of Contract B and prepares a function call (using the ABI to encode the function selector and parameters). + +2. **Execution in the VM**: + - The VM handles the execution. It routes the call to Contract B, checks its bytecode, and executes the appropriate function based on the provided data. + - Any state changes or computations happen inside the VM, and the result of Contract B's function call is returned to Contract A. + +3. **Gas Consumption**: + - All these interactions consume **gas**, and the costs are part of the single transaction. The contract making the call (Contract A) must ensure it has provided sufficient gas for the transaction, which includes both the computation in Contract A and the interaction with Contract B. + +4. **Atomicity and Reentrancy**: + - All contract interactions within a transaction are **atomic**, meaning that if any step fails (e.g., the call to Contract B fails), the entire transaction is reverted. + - This ensures consistency across the blockchain but introduces considerations like **reentrancy attacks**, which need to be mitigated in contract design. + +### External Communication and Oracles + +While smart contracts do not use RPC to communicate with each other on-chain, they can indirectly interact with **off-chain systems** using **oracles**. Oracles act as bridges between off-chain data and smart contracts, providing external information such as price feeds, weather data, or events from real-world systems. + +- **Oracles** typically communicate with off-chain systems using traditional networking protocols (sometimes even gRPC or other RPC systems). +- However, when oracles relay data back to the smart contract, they do so via on-chain **transactions**—again, not through RPC calls directly. + +### When is RPC Used? + +RPC protocols like **gRPC** or **Cap'n Proto** are used in blockchain contexts, but their role is more in the context of how **users** or **clients** communicate with blockchain nodes, rather than how contracts communicate with each other on-chain. + +- **JSON-RPC** is a commonly used protocol for communication between clients and Ethereum nodes. For example, when a user or an application wants to interact with the blockchain (e.g., querying the blockchain or sending a transaction), they use JSON-RPC over HTTP or WebSocket to communicate with a node. +- **gRPC** or **Cap'n Proto** might be used by some blockchain infrastructure services, but this would be for things like node-to-node communication, off-chain services, or layer 2 solutions. + +### Summary: + +- Smart contracts communicate with each other through **on-chain transactions** executed within the **blockchain's virtual machine**, not through external RPC protocols like gRPC or Cap'n Proto. +- All communication between contracts is **transactional** and handled within the blockchain environment, using the **contract address**, **function selector**, and **encoded parameters**. +- RPC protocols like **JSON-RPC** are used for **client-to-node** communication (i.e., how external users interact with the blockchain), but **not for contract-to-contract** interactions on-chain. + +If you're looking for how **off-chain systems** or **clients** interact with the blockchain, then RPC (e.g., JSON-RPC) is involved, but **contract-to-contract communication** stays entirely within the blockchain's execution environment. + +since the contract compiled bytecode is stored on the chain we can use abi/idl of the contract for RPC calls which helps us to see what methods and types are inside the contract in order to create tx object and send it to the chain through RPC calls which contains the contract methods. \ No newline at end of file diff --git a/infra/api.http.json b/infra/api.http.json index 3266714..97c10ce 100755 --- a/infra/api.http.json +++ b/infra/api.http.json @@ -34,7 +34,7 @@ { "key": "cover", "type": "file", - "src": "/Users/wildonion/Desktop/onion.png" + "src": "/Users/wildonion/Desktop/DALL·E 2024-08-20 13.18.23 - A conceptual book cover design featuring a man without a suit walking in the middle. The ground beneath him subtly incorporates a heart with a brain e.webp" }, { "key": "etype", @@ -63,7 +63,7 @@ }, { "key": "started_at", - "value": "17578657778", + "value": "1724869829", "type": "text" }, { @@ -85,6 +85,11 @@ "key": "invitations", "value": "[{\"entrance_fee\": 10, \"friend_id\": 8}]", "type": "text" + }, + { + "key": "end_at", + "value": "1724869980", + "type": "text" } ] }, @@ -103,7 +108,7 @@ "name": "Notif", "item": [ { - "name": "Register Notif (Produce)", + "name": "Publish To RMQ", "request": { "method": "POST", "header": [ @@ -114,7 +119,7 @@ ], "body": { "mode": "raw", - "raw": "{\n \"producer_info\": {\n \"info\":{\n \"local_spawn\": true, // spawn the task into the actor thread itself\n \"notif_data\": { \n \"receiver_info\": \"1\",\n \"id\": \"unqie-id0\",\n \"action_data\": {\n \"pid\": 200.4\n }, \n \"actioner_info\": \"2\", \n \"action_type\": \"ProductPurchased\", \n \"fired_at\": 1714316645, \n \"is_seen\": false\n }, \n \"exchange_name\": \"SavageEx\",\n \"exchange_type\": \"fanout\", // amq.topic for pubsub\n \"routing_key\": \"\", // routing pattern or key - will be ignored if type is fanout\n \"encryption_config\": { // this can be null\n \"secret\": \"wildonion@1234\",\n \"passphrase\": \"wildonion@12345\",\n \"unique_redis_id\": \"notif_unique_redis_id\" // !!!!must be unique for every new notif!!!!\n }\n }\n },\n \"consumer_info\": null\n}", + "raw": "{\n \"producer_info\": {\n \"Rmq\":{\n \"local_spawn\": true, // spawn the task into the actor thread itself\n \"notif_data\": { \n \"receiver_info\": \"1\",\n \"id\": \"unqie-id0\",\n \"action_data\": {\n \"pid\": 200.4\n }, \n \"actioner_info\": \"2\", \n \"action_type\": \"ProductPurchased\", \n \"fired_at\": 1714316645, \n \"is_seen\": false\n }, \n \"exchange_name\": \"SavageEx\",\n \"exchange_type\": \"fanout\", // amq.topic for pubsub\n \"routing_key\": \"\", // routing pattern or key - will be ignored if type is fanout\n \"encryption_config\": { // this can be null\n \"secret\": \"wildonion@1234\",\n \"passphrase\": \"wildonion@12345\"\n }\n },\n \"Kafka\": null,\n \"Redis\": null\n },\n \"consumer_info\": null\n}", "options": { "raw": { "language": "json" @@ -136,7 +141,106 @@ "response": [] }, { - "name": "Register Notif (Consume)", + "name": "Publish To Kafka", + "request": { + "method": "POST", + "header": [ + { + "key": "token_time", + "value": "124kUmD39YUwJRKxaQqTkFidRW3uAzhcFniiky7b2HjFPME8R5ZLdFgs9z6WncoLbPh72EPNX" + } + ], + "body": { + "mode": "raw", + "raw": "{\n \"producer_info\": {\n \"Kafka\":{\n \"local_spawn\": true, // spawn the task into the actor thread itself\n \"notif_data\": { \n \"receiver_info\": \"1\",\n \"id\": \"unqie-id0\",\n \"action_data\": {\n \"pid\": 200.4\n }, \n \"actioner_info\": \"2\", \n \"action_type\": \"ProductPurchased\", \n \"fired_at\": 1714316645, \n \"is_seen\": false\n }, \n \"brokers\": \"localhost:9092\", // localhost:29092,localhost:39092,localhost:49092\n \"headers\": [{\n \"key\": \"wildonionKey\", \n \"val\": \"wildonionValue\"\n }],\n \"partitions\": 4,\n \"topic\": \"SavageTopic\",\n \"encryptionConfig\": { // this can be null\n \"secret\": \"wildonion@1234\",\n \"passphrase\": \"wildonion@12345\"\n }\n },\n \"Rmq\": null,\n \"Redis\": null\n },\n \"consumer_info\": null\n}", + "options": { + "raw": { + "language": "json" + } + } + }, + "url": { + "raw": "{{events-v1}}/notif/", + "host": [ + "{{events-v1}}" + ], + "path": [ + "notif", + "" + ] + }, + "description": "if you want to produce notif into the RMQ channel put `consumer_info: null` and fill the `producer_info`, to consume notifs just put `producer_info: null` and fill the `consumer_info`" + }, + "response": [] + }, + { + "name": "Publish To Redis", + "request": { + "method": "POST", + "header": [ + { + "key": "token_time", + "value": "124kUmD39YUwJRKxaQqTkTRZGDr9L6fszxLz6xKvXrqJSJQNo9vAMjYs5tbbiRPskTGUgRegd" + } + ], + "body": { + "mode": "raw", + "raw": "{\n \"producer_info\": {\n \"Redis\":{\n \"local_spawn\": true, // spawn the task into the actor thread itself\n \"notif_data\": { \n \"receiver_info\": \"1\",\n \"id\": \"unqie-id0\",\n \"action_data\": {\n \"pid\": 200.4\n }, \n \"actioner_info\": \"2\", \n \"action_type\": \"ProductPurchased\", \n \"fired_at\": 1714316645, \n \"is_seen\": false\n }, \n \"channel\": \"channel-to-produce-msg-to\",\n \"encryptionConfig\": { // this can be null\n \"secret\": \"wildonion@1234\",\n \"passphrase\": \"wildonion@12345\"\n }\n },\n \"Rmq\": null,\n \"Kafka\": null\n },\n \"consumer_info\": null\n}", + "options": { + "raw": { + "language": "json" + } + } + }, + "url": { + "raw": "{{events-v1}}/notif/", + "host": [ + "{{events-v1}}" + ], + "path": [ + "notif", + "" + ] + }, + "description": "if you want to produce notif into the RMQ channel put `consumer_info: null` and fill the `producer_info`, to consume notifs just put `producer_info: null` and fill the `consumer_info`" + }, + "response": [] + }, + { + "name": "Start RMQ Consumer", + "request": { + "method": "POST", + "header": [ + { + "key": "token_time", + "value": "124kUmD39YUwJRKxaQqTkRUAJQtFUBmfjrpL99YjTeaJ1jBEDgt7sCVdKKdvoUi4n6Ennr8e2" + } + ], + "body": { + "mode": "raw", + "raw": "{\n \"producer_info\": null,\n \"consumer_info\": {\n \"Rmq\": {\n \"queue\": \"TestOnion\",\n \"exchange_name\": \"SavageEx\",\n \"routing_key\": \"\",\n \"tag\": \"cons_tag0\",\n \"redis_cache_exp\": 300, // this can be 0 to not to store data on redis\n \"local_spawn\": true, // spawn the task into the actor thread itself\n \"store_in_db\": true,\n \"decryption_config\": { // this can be null\n \"secret\": \"wildonion@1234\",\n \"passphrase\": \"wildonion@12345\"\n }\n },\n \"Kafka\": null,\n \"Redis\": null\n }\n}", + "options": { + "raw": { + "language": "json" + } + } + }, + "url": { + "raw": "{{events-v1}}/notif/", + "host": [ + "{{events-v1}}" + ], + "path": [ + "notif", + "" + ] + }, + "description": "if you want to produce notif into the RMQ channel put `consumer_info: null` and fill the `producer_info`, to consume notifs just put `producer_info: null` and fill the `consumer_info`" + }, + "response": [] + }, + { + "name": "Start Redis Consumer", "request": { "method": "POST", "header": [ @@ -147,7 +251,40 @@ ], "body": { "mode": "raw", - "raw": "{\n \"producer_info\": null,\n \"consumer_info\": {\n \"info\": {\n \"queue\": \"TestOnion\",\n \"exchange_name\": \"SavageEx\",\n \"routing_key\": \"\",\n \"tag\": \"cons_tag0\",\n \"redis_cache_exp\": 300, // this can be 0 to not to store data on redis\n \"local_spawn\": true, // spawn the task into the actor thread itself\n \"store_in_db\": true,\n \"decryption_config\": { // this can be null\n \"secret\": \"wildonion@1234\",\n \"passphrase\": \"wildonion@12345\",\n \"unique_redis_id\": \"notif_unique_redis_id\" // !!!!must be unique for every new notif!!!!\n }\n }\n }\n}", + "raw": "{\n \"producer_info\": null,\n \"consumer_info\": {\n \"Redis\": {\n \"channel\": \"channel-to-consume-msg-from\",\n \"redis_cache_exp\": 300, // this can be 0 to not to store data on redis\n \"decryptionConfig\": { // this can be null\n \"secret\": \"wildonion@1234\",\n \"passphrase\": \"wildonion@12345\"\n }\n },\n \"Rmq\": null,\n \"Kafka\": null\n }\n}", + "options": { + "raw": { + "language": "json" + } + } + }, + "url": { + "raw": "{{events-v1}}/notif/", + "host": [ + "{{events-v1}}" + ], + "path": [ + "notif", + "" + ] + }, + "description": "if you want to produce notif into the RMQ channel put `consumer_info: null` and fill the `producer_info`, to consume notifs just put `producer_info: null` and fill the `consumer_info`" + }, + "response": [] + }, + { + "name": "Start Kafka Consumer", + "request": { + "method": "POST", + "header": [ + { + "key": "token_time", + "value": "124kUmD39YUwJRKxaQqTkFidRW3uAzhcFniiky7b2HjFPME8R5ZLdFgs9z6WncoLbPh72EPNX" + } + ], + "body": { + "mode": "raw", + "raw": "{\n \"producer_info\": null,\n \"consumer_info\": {\n \"Kafka\": {\n \"brokers\": \"localhost:9092\", // localhost:29092,localhost:39092,localhost:49092\n \"topics\": [\"SavageTopic\"],\n \"consumerGroupId\": \"cid\", // this can be UUID\n \"redis_cache_exp\": 300, // this can be 0 to not to store data on redis\n \"decryptionConfig\": { // this can be null\n \"secret\": \"wildonion@1234\",\n \"passphrase\": \"wildonion@12345\"\n }\n },\n \"Rmq\": null,\n \"Redis\": null\n }\n}", "options": { "raw": { "language": "json" diff --git a/logs/error-kind/zerlog.log b/logs/error-kind/zerlog.log index dcd2c0b..022cfcd 100755 --- a/logs/error-kind/zerlog.log +++ b/logs/error-kind/zerlog.log @@ -50,3 +50,11 @@ code: 65451 | message: invalid parameter | due to: Themis Error | time: 1721815033092 | method name: CrypterInterface.encrypt.Wallet::secure_cell_decrypt code: 65451 | message: invalid parameter | due to: Themis Error | time: 1721817375695 | method name: CrypterInterface.encrypt.Wallet::secure_cell_decrypt +code: 65535 | message: Message production error: UnknownPartition (Local: Unknown partition) | due to: Redis, RMQ or Seaorm Error + | time: 1726766912112 | method name: NotifBrokerActor.publishToKafka.deliveryStatus +code: 65535 | message: Message production error: UnknownPartition (Local: Unknown partition) | due to: Redis, RMQ or Seaorm Error + | time: 1726767026858 | method name: NotifBrokerActor.publishToKafka.deliveryStatus +code: 65535 | message: Message production error: UnknownPartition (Local: Unknown partition) | due to: Redis, RMQ or Seaorm Error + | time: 1726767152352 | method name: NotifBrokerActor.publishToKafka.deliveryStatus +code: 65531 | message: invalid number at line 1 column 2 | due to: Serde Error + | time: 1726771961937 | method name: NotifBrokerActor.consumeFromKafka.decode_serde diff --git a/src/apis/v1/http/auth.rs b/src/apis/v1/http/auth.rs index ca3855b..5309eff 100644 --- a/src/apis/v1/http/auth.rs +++ b/src/apis/v1/http/auth.rs @@ -37,8 +37,8 @@ pub async fn generate_access_token( */ ctrl: &mut FlowCtrl, // https://salvo.rs/book/features/openapi.html#extractors (QueryParam, HeaderParam, CookieParam, PathParam, FormBody, JsonBody) - secret: HeaderParam, // used to extract the token time from the header as well as showcasing in swagger ui - params: QueryParam // query param is required, showcasing in swagger ui + // secret: HeaderParam, // used to extract the token time from the header as well as showcasing in swagger ui + // params: QueryParam // query param is required, showcasing in swagger ui ){ diff --git a/src/apis/v1/http/health.rs b/src/apis/v1/http/health.rs index 6c14690..fbc7174 100644 --- a/src/apis/v1/http/health.rs +++ b/src/apis/v1/http/health.rs @@ -7,7 +7,7 @@ use constants::STORAGE_IO_ERROR_CODE; use context::AppContext; use interfaces::crypter::Crypter; // use it here so we can call the encrypt and decrypt methods on the &[u8] use interfaces::product::ProductExt; -use lockers::llm::Product; +use lockers::llm::{Product, PurchasingStatus}; use models::event::UserSecureCellConfig; use serde::{Deserialize, Serialize}; use crate::*; @@ -162,7 +162,8 @@ pub async fn mint( depot: &mut Depot, ctrl: &mut FlowCtrl, // with this we can control the flow of each route like executing the next handler // https://salvo.rs/book/features/openapi.html#extractors (QueryParam, HeaderParam, CookieParam, PathParam, FormBody, JsonBody) - prod: JsonBody, // used to extract the request body as well as showcasing in swagger ui + // prod: JsonBody, // used to extract the request body as well as showcasing in swagger ui + // NOTE: don't use JsonBody if you're using #[salvo(extract(default_source(from="body")))] ){ let app_ctx = depot.obtain::>().unwrap(); // extracting shared app context @@ -177,16 +178,16 @@ pub async fn mint( */ let mut prod = req.extract::().await.unwrap(); - let (is_being_minted, mut updatedProductReceiver) = prod.atomic_purchase_status().await; + let (productStatus, mut updatedProductReceiver) = prod.atomic_purchase_status().await; - match is_being_minted{ - true => { + match productStatus{ + PurchasingStatus::Minting => { // someone is minting let server_time = format!("{}", chrono::Local::now().to_string()); res.status_code = Some(StatusCode::LOCKED); res.render(Json( HoopoeResponse{ data: prod.clone(), - message: "pid is locked", + message: "someone is minting the pid cuz it's locked", is_err: true, status: StatusCode::LOCKED.as_u16(), meta: Some( @@ -197,7 +198,7 @@ pub async fn mint( } )); // reject the request }, - false => { + _ => { // product is locked for minting // if you want to use while let Some the prod must be cloned in every iteration // hence using if let Some is the best option in here to avoid using clone. if let Some(product) = updatedProductReceiver.recv().await{ @@ -206,13 +207,13 @@ pub async fn mint( res.render(Json( HoopoeResponse{ data: prod.clone(), - message: "notify you once the minting gets done", + message: "product got locked for minting, notify you once the minting gets done", is_err: false, status: StatusCode::OK.as_u16(), meta: Some( serde_json::json!({ "server_time": server_time, - "minted_product": product + "product_for_minting": product }) ) } diff --git a/src/apis/v1/http/notif.rs b/src/apis/v1/http/notif.rs index a04e75e..483df48 100644 --- a/src/apis/v1/http/notif.rs +++ b/src/apis/v1/http/notif.rs @@ -65,46 +65,139 @@ pub async fn register_notif( let get_producer_info = register_notif_req.clone().producer_info; let get_consumer_info = register_notif_req.clone().consumer_info; - // there is a producer info in body + /* ************************************************************* */ + /* *************** HE TRIES TO PRODUCE SOMETHING *************** */ + /* ************************************************************* */ if get_producer_info.is_some(){ + let producer_info = get_producer_info.unwrap(); - let mut notif = producer_info.info; - notif.exchange_name = format!("{}.notif:{}", APP_NAME, notif.exchange_name); - - /* -ˋˏ✄┈┈┈┈ - >_ sending notif to RMQ in the background - you may want to produce data in a loop{} constantly - like sending gps data contains lat and long into the - exchange so other consumers be able to consume constantly - as the data coming at a same time, kindly put the sending - message logic to actor inside a loop{}. - */ - tokio::spawn( // running the producing notif job in the background in a separate thread - { - let cloned_app_ctx = app_ctx.clone(); - let cloned_notif = notif.clone(); - async move{ - // producing nofit by sending the ProduceNotif message to - // the producer actor, - match cloned_app_ctx.clone().unwrap().actors.as_ref().unwrap() - .broker_actors.notif_actor.send(cloned_notif).await - { - Ok(_) => { () }, - Err(e) => { - let source = &e.source().unwrap().to_string(); // we know every goddamn type implements Error trait, we've used it here which allows use to call the source method on the object - let err_instance = crate::error::HoopoeErrorResponse::new( - *MAILBOX_CHANNEL_ERROR_CODE, // error hex (u16) code - source.as_bytes().to_vec(), // text of error source in form of utf8 bytes - crate::error::ErrorKind::Actor(crate::error::ActixMailBoxError::Mailbox(e)), // the actual source of the error caused at runtime - &String::from("register_notif.producer_actor.notif_actor.send"), // current method name - Some(&zerlog_producer_actor) - ).await; - return; + if producer_info.Rmq.is_some(){ + + // since the actor supports the PublishNotifToRmq message so + // use the unwrapped data to send the rmq producing message to the actor + let mut notif = producer_info.clone().Rmq.unwrap(); + notif.exchange_name = format!("{}.notif:{}", APP_NAME, notif.exchange_name); + + /* -ˋˏ✄┈┈┈┈ + >_ sending notif to RMQ in the background + you may want to produce data in a loop{} constantly + like sending gps data contains lat and long into the + exchange so other consumers be able to consume in realtime + as the data coming at a same time to achieve this + kindly put the sending message logic to actor + inside a loop{}. + */ + tokio::spawn( // running the producing notif job in the background in a separate thread + { + let cloned_app_ctx = app_ctx.clone(); + let cloned_notif = notif.clone(); + async move{ + // producing nofit by sending the ProduceNotif message to + // the producer actor, + match cloned_app_ctx.clone().unwrap().actors.as_ref().unwrap() + .broker_actors.notif_actor.send(cloned_notif).await + { + Ok(_) => { () }, + Err(e) => { + let source = &e.source().unwrap().to_string(); // we know every goddamn type implements Error trait, we've used it here which allows use to call the source method on the object + let err_instance = crate::error::HoopoeErrorResponse::new( + *MAILBOX_CHANNEL_ERROR_CODE, // error hex (u16) code + source.as_bytes().to_vec(), // text of error source in form of utf8 bytes + crate::error::ErrorKind::Actor(crate::error::ActixMailBoxError::Mailbox(e)), // the actual source of the error caused at runtime + &String::from("register_notif.producer_actor.rmq.notif_actor.send"), // current method name + Some(&zerlog_producer_actor) + ).await; + return; + } } } - } - } - ); + } + ); + + } else if producer_info.Kafka.is_some(){ + + // since the actor supports the PublishNotifToKafka message so + // use the unwrapped data to send the kafka producing message to the actor + let mut notif = producer_info.clone().Kafka.unwrap(); + + /* -ˋˏ✄┈┈┈┈ + >_ sending notif to Kafka in the background + you may want to produce data in a loop{} constantly + like sending gps data contains lat and long into the + exchange so other consumers be able to consume in realtime + as the data coming at a same time to achieve this + kindly put the sending message logic to actor + inside a loop{}. + */ + tokio::spawn( // running the producing notif job in the background in a separate thread + { + let cloned_app_ctx = app_ctx.clone(); + let cloned_notif = notif.clone(); + async move{ + // producing nofit by sending the ProduceNotif message to + // the producer actor, + match cloned_app_ctx.clone().unwrap().actors.as_ref().unwrap() + .broker_actors.notif_actor.send(cloned_notif).await + { + Ok(_) => { () }, + Err(e) => { + let source = &e.source().unwrap().to_string(); // we know every goddamn type implements Error trait, we've used it here which allows use to call the source method on the object + let err_instance = crate::error::HoopoeErrorResponse::new( + *MAILBOX_CHANNEL_ERROR_CODE, // error hex (u16) code + source.as_bytes().to_vec(), // text of error source in form of utf8 bytes + crate::error::ErrorKind::Actor(crate::error::ActixMailBoxError::Mailbox(e)), // the actual source of the error caused at runtime + &String::from("register_notif.producer_actor.kafka.notif_actor.send"), // current method name + Some(&zerlog_producer_actor) + ).await; + return; + } + } + } + } + ); + + } else{ + + // since the actor supports the PublishNotifToRedis message so + // use the unwrapped data to send the redis producing message to the actor + let mut notif = producer_info.clone().Redis.unwrap(); + + /* -ˋˏ✄┈┈┈┈ + >_ sending notif to Redis in the background + you may want to produce data in a loop{} constantly + like sending gps data contains lat and long into the + exchange so other consumers be able to consume in realtime + as the data coming at a same time to achieve this + kindly put the sending message logic to actor + inside a loop{}. + */ + tokio::spawn( // running the producing notif job in the background in a separate thread + { + let cloned_app_ctx = app_ctx.clone(); + let cloned_notif = notif.clone(); + async move{ + // producing nofit by sending the ProduceNotif message to + // the producer actor, + match cloned_app_ctx.clone().unwrap().actors.as_ref().unwrap() + .broker_actors.notif_actor.send(cloned_notif).await + { + Ok(_) => { () }, + Err(e) => { + let source = &e.source().unwrap().to_string(); // we know every goddamn type implements Error trait, we've used it here which allows use to call the source method on the object + let err_instance = crate::error::HoopoeErrorResponse::new( + *MAILBOX_CHANNEL_ERROR_CODE, // error hex (u16) code + source.as_bytes().to_vec(), // text of error source in form of utf8 bytes + crate::error::ErrorKind::Actor(crate::error::ActixMailBoxError::Mailbox(e)), // the actual source of the error caused at runtime + &String::from("register_notif.producer_actor.redis.notif_actor.send"), // current method name + Some(&zerlog_producer_actor) + ).await; + return; + } + } + } + } + ); + } // in here the background task might have halted, executed or even // crashed but the response is sent already to the caller regardless @@ -119,69 +212,176 @@ pub async fn register_notif( is_err: false, status: StatusCode::OK.as_u16(), meta: Some( - serde_json::to_value(¬if).unwrap() + serde_json::to_value(&producer_info).unwrap() ) } )); + /* ************************************************************* */ + /* *************** HE TRIES TO CONSUME SOMETHING *************** */ + /* ************************************************************* */ } else if get_consumer_info.is_some(){ // there is a consumer info in body + let consumer_info = get_consumer_info.unwrap(); - let mut notif = consumer_info.info; - notif.exchange_name = format!("{}.notif:{}", APP_NAME, notif.exchange_name); - notif.queue = format!("{}.{}:{}", APP_NAME, notif.tag, notif.queue); - - /* -ˋˏ✄┈┈┈┈ - >_ start consuming in the background - if you know the RMQ info you may want to start consuming in the - background exactly when the server is being started otherwise - you would have to send message to its actor to start it as an - async task in the background. - */ - tokio::spawn( // running the consuming notif job in the background in a separate thread - { - let cloned_app_ctx = app_ctx.clone(); - let cloned_notif = notif.clone(); - async move{ - // consuming notif by sending the ConsumeNotif message to - // the consumer actor, - match cloned_app_ctx.clone().unwrap().actors.as_ref().unwrap() - .broker_actors.notif_actor.send(cloned_notif).await - { - Ok(_) => { - - // in here you could access the notif for an owner using - // a redis key like: notif_owner:3 which retrieve all data - // on the redis for the receiver with id 3 - () - - }, - Err(e) => { - let source = &e.source().unwrap().to_string(); // we know every goddamn type implements Error trait, we've used it here which allows use to call the source method on the object - let err_instance = crate::error::HoopoeErrorResponse::new( - *MAILBOX_CHANNEL_ERROR_CODE, // error hex (u16) code - source.as_bytes().to_vec(), // text of error source in form of utf8 bytes - crate::error::ErrorKind::Actor(crate::error::ActixMailBoxError::Mailbox(e)), // the actual source of the error caused at runtime - &String::from("register_notif.consumer_actor.notif_actor.send"), // current method name - Some(&zerlog_producer_actor) - ).await; - return; + if consumer_info.Rmq.is_some(){ + + // since the actor supports the ConsumehNotifFromRmq message so + // use the unwrapped data to send the rmq consuming message to the actor + let mut notif = consumer_info.clone().Rmq.unwrap(); + + notif.exchange_name = format!("{}.notif:{}", APP_NAME, notif.exchange_name); + notif.queue = format!("{}.{}:{}", APP_NAME, notif.tag, notif.queue); + + /* -ˋˏ✄┈┈┈┈ + >_ start consuming in the background + if you know the RMQ info you may want to start consuming in the + background exactly WHERE the server is being started otherwise + you would have to send message to its actor to start it as an + async task in the background. + */ + tokio::spawn( // running the consuming notif job in the background in a separate thread + { + let cloned_app_ctx = app_ctx.clone(); + let cloned_notif = notif.clone(); + async move{ + // consuming notif by sending the ConsumeNotif message to + // the consumer actor, + match cloned_app_ctx.clone().unwrap().actors.as_ref().unwrap() + .broker_actors.notif_actor.send(cloned_notif).await + { + Ok(_) => { + + // in here you could access the notif for an owner using + // a redis key like: notif_owner:3 which retrieve all data + // on the redis for the receiver with id 3 + () + + }, + Err(e) => { + let source = &e.source().unwrap().to_string(); // we know every goddamn type implements Error trait, we've used it here which allows use to call the source method on the object + let err_instance = crate::error::HoopoeErrorResponse::new( + *MAILBOX_CHANNEL_ERROR_CODE, // error hex (u16) code + source.as_bytes().to_vec(), // text of error source in form of utf8 bytes + crate::error::ErrorKind::Actor(crate::error::ActixMailBoxError::Mailbox(e)), // the actual source of the error caused at runtime + &String::from("register_notif.consumer_actor.rmq.notif_actor.send"), // current method name + Some(&zerlog_producer_actor) + ).await; + return; + } } - } - + + } } - } - ); + ); + + } else if consumer_info.Kafka.is_some(){ + + // since the actor supports the ConsumehNotifFromKafka message so + // use the unwrapped data to send the kafka consuming message to the actor + let mut notif = consumer_info.clone().Kafka.unwrap(); + + /* -ˋˏ✄┈┈┈┈ + >_ start consuming in the background + if you know the Kafka info you may want to start consuming in the + background exactly WHERE the server is being started otherwise + you would have to send message to its actor to start it as an + async task in the background. + */ + tokio::spawn( // running the consuming notif job in the background in a separate thread + { + let cloned_app_ctx = app_ctx.clone(); + let cloned_notif = notif.clone(); + async move{ + // consuming notif by sending the ConsumeNotif message to + // the consumer actor, + match cloned_app_ctx.clone().unwrap().actors.as_ref().unwrap() + .broker_actors.notif_actor.send(cloned_notif).await + { + Ok(_) => { + + // in here you could access the notif for an owner using + // a redis key like: notif_owner:3 which retrieve all data + // on the redis for the receiver with id 3 + () + + }, + Err(e) => { + let source = &e.source().unwrap().to_string(); // we know every goddamn type implements Error trait, we've used it here which allows use to call the source method on the object + let err_instance = crate::error::HoopoeErrorResponse::new( + *MAILBOX_CHANNEL_ERROR_CODE, // error hex (u16) code + source.as_bytes().to_vec(), // text of error source in form of utf8 bytes + crate::error::ErrorKind::Actor(crate::error::ActixMailBoxError::Mailbox(e)), // the actual source of the error caused at runtime + &String::from("register_notif.consumer_actor.kafka.notif_actor.send"), // current method name + Some(&zerlog_producer_actor) + ).await; + return; + } + } + + } + } + ); + + } else{ + + // since the actor supports the ConsumehNotifFromRedis message so + // use the unwrapped data to send the redis consuming message to the actor + let mut notif = consumer_info.clone().Redis.unwrap(); + + /* -ˋˏ✄┈┈┈┈ + >_ start consuming in the background + if you know the Redis info you may want to start consuming in the + background exactly WHERE the server is being started otherwise + you would have to send message to its actor to start it as an + async task in the background. + */ + tokio::spawn( // running the consuming notif job in the background in a separate thread + { + let cloned_app_ctx = app_ctx.clone(); + let cloned_notif = notif.clone(); + async move{ + // consuming notif by sending the ConsumeNotif message to + // the consumer actor, + match cloned_app_ctx.clone().unwrap().actors.as_ref().unwrap() + .broker_actors.notif_actor.send(cloned_notif).await + { + Ok(_) => { + + // in here you could access the notif for an owner using + // a redis key like: notif_owner:3 which retrieve all data + // on the redis for the receiver with id 3 + () + + }, + Err(e) => { + let source = &e.source().unwrap().to_string(); // we know every goddamn type implements Error trait, we've used it here which allows use to call the source method on the object + let err_instance = crate::error::HoopoeErrorResponse::new( + *MAILBOX_CHANNEL_ERROR_CODE, // error hex (u16) code + source.as_bytes().to_vec(), // text of error source in form of utf8 bytes + crate::error::ErrorKind::Actor(crate::error::ActixMailBoxError::Mailbox(e)), // the actual source of the error caused at runtime + &String::from("register_notif.consumer_actor.redis.notif_actor.send"), // current method name + Some(&zerlog_producer_actor) + ).await; + return; + } + } + + } + } + ); + + } // fill the response object, salvo returns it by itself to the caller res.status_code = Some(StatusCode::OK); res.render(Json( HoopoeResponse::<&[u8]>{ data: &[], - message: "SUCCESS: notification registered succesfully, consumer has started consuming", + message: "SUCCESS: consumer has started consuming", is_err: false, status: StatusCode::OK.as_u16(), meta: Some( - serde_json::to_value(¬if).unwrap() + serde_json::to_value(&consumer_info).unwrap() ) } )); @@ -211,7 +411,9 @@ pub async fn register_notif( /* -ˋˏ✄┈┈┈┈ >_ this route is used mainly to retrieve notifications in a short polling manner, client must call this api in an interval to fetch notifs for an - owner or whole data like every 5 seconds to simulate realtiming in client. + owner or whole data like every 5 seconds to simulate realtiming in client + fetching notifs for an specific owner requires ownerId which is exactly + like a jobId in short polling process. */ #[endpoint] pub async fn get_notif( diff --git a/src/config/mod.rs b/src/config/mod.rs index f469f78..0475b88 100755 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -18,6 +18,10 @@ pub struct Env{ pub AMQP_HOST: String, pub AMQP_USERNAME: String, pub AMQP_PASSWORD: String, + pub KAFKA_PORT: String, + pub KAFKA_HOST: String, + pub KAFKA_USERNAME: String, + pub KAFKA_PASSWORD: String, pub TCP_PORT: String, pub GRPC_PORT: String, pub HTTP_PORT: String, @@ -69,6 +73,10 @@ impl EnvExt for Env{ AMQP_HOST: std::env::var("AMQP_HOST").expect("AMQP_HOST must be there!"), AMQP_USERNAME: std::env::var("AMQP_USERNAME").expect("AMQP_USERNAME must be there!"), AMQP_PASSWORD: std::env::var("AMQP_PASSWORD").expect("AMQP_PASSWORD must be there!"), + KAFKA_PORT: std::env::var("KAFKA_PORT").expect("KAFKA_PORT must be there!"), + KAFKA_HOST: std::env::var("KAFKA_HOST").expect("KAFKA_HOST must be there!"), + KAFKA_USERNAME: std::env::var("KAFKA_USERNAME").expect("KAFKA_USERNAME must be there!"), + KAFKA_PASSWORD: std::env::var("KAFKA_PASSWORD").expect("KAFKA_PASSWORD must be there!"), TCP_PORT: std::env::var("TCP_PORT").expect("TCP_PORT must be there!"), GRPC_PORT: std::env::var("GRPC_PORT").expect("GRPC_PORT must be there!"), HTTP_PORT: std::env::var("HTTP_PORT").expect("HTTP_PORT must be there!"), diff --git a/src/error/mod.rs b/src/error/mod.rs index a7f1af6..fa60e0c 100644 --- a/src/error/mod.rs +++ b/src/error/mod.rs @@ -190,7 +190,7 @@ pub enum ErrorKind{ Crypter(CrypterError), #[error("Actix HTTP or WS Server Error")] Server(ServerError), // actix server io - #[error("Redis, RMQ or Seaorm Error")] + #[error("Redis, Kafka, RMQ or Seaorm Error")] Storage(StorageError), // seaorm, redis, rmq #[error("Chrono, Parse Error")] Time(TimeError), @@ -249,6 +249,8 @@ pub enum StorageError{ Rmq(#[from] deadpool_lapin::lapin::Error), #[error("[RMQ] - failed to get lapin pool")] RmqPool(#[from] deadpool_lapin::PoolError), + #[error("[KAFKA] - failed to connect to kafka")] + Kafka(#[from] rdkafka::error::KafkaError), #[error("[SEAORM] - faild to do db operation")] SeaOrm(#[from] sea_orm::DbErr) } diff --git a/src/interfaces/product.rs b/src/interfaces/product.rs index 1133443..d1eeefd 100644 --- a/src/interfaces/product.rs +++ b/src/interfaces/product.rs @@ -1,7 +1,7 @@ -use crate::lockers::llm::Product; +use crate::lockers::llm::{Product, PurchasingStatus}; /* ----------------- @@ -30,6 +30,6 @@ use crate::lockers::llm::Product; */ pub trait ProductExt{ type Product; - async fn atomic_purchase_status(&mut self) -> (bool, tokio::sync::mpsc::Receiver); + async fn atomic_purchase_status(&mut self) -> (PurchasingStatus, tokio::sync::mpsc::Receiver); async fn mint(&mut self) -> (bool, Product); // the actual logic to purchase a product and send it to the mint service } \ No newline at end of file diff --git a/src/lockers/llm.rs b/src/lockers/llm.rs index c08976a..ba89d3c 100644 --- a/src/lockers/llm.rs +++ b/src/lockers/llm.rs @@ -138,9 +138,16 @@ pub struct Product{ pub is_minted: bool, } +#[derive(Debug, PartialEq)] +pub enum PurchasingStatus{ + Minting, // someone is minting + Locked, // pid is locked and ready to gets minted, notify client + Done +} + impl ProductExt for Product{ type Product = Self; - async fn atomic_purchase_status(&mut self) -> (bool, tokio::sync::mpsc::Receiver) { + async fn atomic_purchase_status(&mut self) -> (PurchasingStatus, tokio::sync::mpsc::Receiver) { start_minting(self.clone()).await } async fn mint(&mut self) -> (bool, Product){ @@ -254,7 +261,7 @@ impl ProductExt for Product{ step3) if it's in there then we must reject the request step4) otherwise we can proceed to minting process */ -pub(self) async fn start_minting(product: Product) -> (bool, tokio::sync::mpsc::Receiver){ +pub(self) async fn start_minting(product: Product) -> (PurchasingStatus, tokio::sync::mpsc::Receiver){ /* ___ IMPORTANT ╰┈➤ in handling async future io tasks remember to use Mutex in a separate light io threads @@ -271,7 +278,10 @@ pub(self) async fn start_minting(product: Product) -> (bool, tokio::sync::mpsc:: let lock_ids = constants::PURCHASE_DEMO_LOCK_MUTEX.clone(); let (tx, mut rx) = - tokio::sync::mpsc::channel::(1024); + tokio::sync::mpsc::channel::(1024); + + let (psender, preceiver) = + tokio::sync::mpsc::channel::(1024); /* ___ IMPORTANT ╰┈➤ each actix worker thread contains the whole application factory instance each of them @@ -294,26 +304,31 @@ pub(self) async fn start_minting(product: Product) -> (bool, tokio::sync::mpsc:: can use this data which causes to block the client request and makes him to get halted until the lock gets released. */ + // ===================================== locking task tokio::spawn( { // cloning necessary data before moving into async move{} scope - let tx = tx.clone(); + let clonedTx = tx.clone(); let lock_ids = lock_ids.clone(); async move{ let mut write_lock = lock_ids.lock().await; if (*write_lock).contains(&pid){ log::info!("rejecting client request cause the id is still being minted!"); // reject the request since the product is being minted - tx.send(true).await; // sending the true flag as rejecting the request + clonedTx.send(PurchasingStatus::Minting).await; // sending the minting flag as rejecting the request } else{ + clonedTx.send(PurchasingStatus::Locked).await; // sending the locked flag as the product is already locked and ready to gets minted (*write_lock).push(pid); // save the id for future minters to reject their request during the first minting process } } } ); - let (psender, preceiver) = - tokio::sync::mpsc::channel::(1024); + // ===================================== minting task + // we'll await on this task until it gets solved completely + // by the runtime which means the product has been minted + // successfully. + let clonedTx1 = tx.clone(); // second spawn, minting process and releasing the pid lock let mintTask = tokio::spawn( { @@ -352,6 +367,7 @@ pub(self) async fn start_minting(product: Product) -> (bool, tokio::sync::mpsc:: */ let mut write_lock = lock_ids.lock().await; (*write_lock).retain(|&p_id| p_id != pid); // product got minted, don't keep its id in the locks vector, other clients can demand the minting process easily on the next request + clonedTx1.send(PurchasingStatus::Done).await; // sending done flag as product is not locked any more and the minting is done } } ); @@ -383,16 +399,16 @@ pub(self) async fn start_minting(product: Product) -> (bool, tokio::sync::mpsc:: // if we receive something from the channel we know 100 percent // it's a true flag or a rejecting flag cause we're only sending // true flag to the channel. - _ = rx.recv() => { - return (true, preceiver); // product is being minted + status = rx.recv() => { + return (status.unwrap(), preceiver); }, // if this branch is selected means the product is inside // the minting process and we should release the lock after // it completes and notify the client later, note that the // whole logic of the minting process is inside the mintTask - // spawned task + // spawned task, mintTask does return nothing! _ = mintTask => { // if mintTask was solved then we simply return false and the receiver - return (false, preceiver); // during the minting process, if we're here means the minting is done + return (PurchasingStatus::Done, preceiver); // during the minting process, if we're here means the minting is done }, } diff --git a/src/models/event.rs b/src/models/event.rs index 2d7f67b..f236be9 100644 --- a/src/models/event.rs +++ b/src/models/event.rs @@ -4,7 +4,7 @@ use chrono::{DateTime, NaiveDateTime}; use sea_orm::prelude::DateTimeWithTimeZone; use serde::{Serialize, Deserialize}; -use workers::notif::{ConsumeNotif, ProduceNotif}; +use workers::notif::{ProducerInfo, ConsumerInfo}; use crate::*; @@ -49,22 +49,12 @@ pub enum TokenTimeScope{ Read } -#[derive(Serialize, Deserialize, Clone, Default, Debug, ToSchema)] -pub struct ProduceNotifInfo{ - pub info: ProduceNotif, -} - -#[derive(Serialize, Deserialize, Clone, Default, Debug, ToSchema)] -pub struct ConsumeNotifInfo{ - pub info: ConsumeNotif, -} - #[derive(Serialize, Deserialize, Clone, Default, Debug, ToSchema)] #[derive(Extractible)] #[salvo(extract(default_source(from="body")))] pub struct RegisterNotif{ - pub producer_info: Option, - pub consumer_info: Option, + pub producer_info: Option, + pub consumer_info: Option, } #[derive(Serialize, Deserialize, Clone, Debug, Default, ToSchema)] diff --git a/src/server/mod.rs b/src/server/mod.rs index 5e78f32..459cd83 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -12,7 +12,7 @@ use context::AppContext; use tokio::sync::{mpsc::Receiver, Mutex}; use tokio_stream::wrappers::UnboundedReceiverStream; use wallexerr::misc::Wallet; -use workers::zerlog::ZerLogProducerActor; +use workers::{scheduler, zerlog::ZerLogProducerActor}; use salvo::affix_state; use salvo::conn::rustls::{Keycert, RustlsConfig}; use migration::{Migrator, MigratorTrait}; @@ -292,16 +292,6 @@ impl Actor for HoopoeWsServerActor{ impl HoopoeWsServerActor{ - async fn check_health(user_id: usize){ - let mut interval = tokio::time::interval(constants::PING_INTERVAL); - tokio::spawn(async move{ - loop{ - interval.tick().await; // tick every 5 second in a loop - log::info!("websocket session for user#{} is alive", user_id); - } - }); - } - pub async fn send_message_to_ws_users_in_room(my_id: usize, msg: Message, this_room: &str) { let this_cloned_msg = msg.clone(); let msg = if let Ok(s) = this_cloned_msg.to_str() { @@ -351,7 +341,13 @@ impl HoopoeWsServerActor{ return; } - Self::check_health(user_id).await; + // check the heartbeat of the connected client in the background every 10 seconds + // runInterval() is a method that takes a closure which returns a future object + // and a period param to execute the closure periodically in the background light + // thread of tokio runtime. + scheduler::runInterval(move || async move{ + log::info!("websocket session for user#{} is alive", user_id); + }, 10).await; /* ------------------------- can't move pointers which is not live long enough into diff --git a/src/tests/task.rs b/src/tests/task.rs index 8145165..608e7d5 100644 --- a/src/tests/task.rs +++ b/src/tests/task.rs @@ -69,6 +69,7 @@ pub struct Task, S, O> where // J is a Future pub pool: Vec>, pub worker: std::sync::Mutex>, // execute the task inside the background worker, this is a thread which is safe to be mutated in other threads pub lock: std::sync::Mutex<()>, // the task itself is locked and can't be used by other threads + pub state: std::sync::Arc>> // the state of the worker must be safe to be shared between threads } // thread safe eventloop and queue: arc mutex vec T vs arc mutex receiver T @@ -108,6 +109,7 @@ impl + Send + Sync + 'static + Clone, S: S tokio::spawn(job) ) }, + state: std::sync::Arc::new(tokio::sync::Mutex::new(vec![])), lock: Default::default(), }; diff --git a/src/tests/tx.rs b/src/tests/tx.rs index c962c13..06e1f85 100644 --- a/src/tests/tx.rs +++ b/src/tests/tx.rs @@ -3,7 +3,7 @@ use std::sync::atomic::AtomicU8; use interfaces::payment::PaymentProcess; use models::event::ActionType; -use notif::ProduceNotif; +use notif::PublishNotifToRmq; use rand_chacha::ChaCha12Core; use salvo::rate_limiter::QuotaGetter; use wallexerr::misc::Wallet; @@ -305,7 +305,7 @@ impl ActixMessageHandler for StatelessTransactionPool{ } let prod_notif = - ProduceNotif{ + PublishNotifToRmq{ local_spawn: true, notif_data: NotifData{ id: Uuid::new_v4().to_string(), diff --git a/src/workers/notif/mod.rs b/src/workers/notif/mod.rs index 5df2cb7..42cde99 100644 --- a/src/workers/notif/mod.rs +++ b/src/workers/notif/mod.rs @@ -1,25 +1,12 @@ -/* ======================================================================================== - REALTIME NOTIF EVENT STREAMING DESIGN PATTERN (README files inside tests folder) - - local & remote RPC channels : Kafka, RMQ, Redis pubsub, jobq mpsc, pg db - thread safe eventloop queue : the receiver of each broker or the mpsc channel like Arc>> - queue : buffer of thread safe event objects like Buffer{data:Arc>>} - syntax : while let Some(notif_data) = mpscReceiverEventloopOrRmqOrKafkaOrRedisSubscriber.recv().await{} - CronSchedulerActor : an actor background worker to call io periodically using ctx.run_interval(), loop{} and tokio time and spawn or redis key space notifications pubsub - storing and caching : store and cache received notif data in pg db and on redis - node talking : local talking with mailbox & remote talking with (JSON/G/Capnp)RPC - websocket config : send received notif data through the ws mpsc sender / receive notif data in ws server scope - notify notif owner : through http short polling api or websocket streaming - async io runTime Executor : using tokio spawn to spawn light io thread per each async io task - NotifBrokerActor use cases : - → wallet service prodcons actor with different payment portal => receive command object - → transactionpool service prodcons actor => receive transaction object - → product service prodcons actor => receive order object to purchase atomically +/* ======================================================================================== + REALTIME NOTIF EVENT STREAMING DESIGN PATTERN (README files inside docs folder) + ======================================================================================== + concurrency tools & notes : → an eventloop is a thread safe receiver queue of the mpsc channel which receives tasks and execute them in free background thread - → tokio::select, tokio::spawn, tokio::sync::{Mutex, mpsc, RwLock}, std::sync::{Condvar, Arc, Mutex} + → actor with Box::pin(async{}), tokio::select, tokio::spawn, tokio::sync::{Mutex, mpsc, RwLock}, std::sync::{Condvar, Arc, Mutex} → cpu tasks are graph and geo calculations as well as cryptography algorithms which are resource intensive → async io tasks are io and networking calls which must be handled simultaneously in order to scale resources → async io task execution inside light threadpool: wait on the task but don't block the thread, continue with executing other tasks @@ -33,11 +20,12 @@ → tokio mutex suspend the async task if the lock is busy it suspend the io task instead of blocking the executor thread → std stuffs block and suspend the thread and stop it from executing other tasks while it doing some heavy operations inside the thread like mutex logics → tokio stuffs suspend the async io task process instead of blocking the thread and allows the thread executing other tasks simultaneously - → use channels for atomic syncing between threads instead of using mutex in both async and none async context + → use channels for atomic syncing between threads instead of using mutex in both async and none async context, send the mutated/updated data to channel instead of using mutex or condvar → if we want some result of an either async io or cpu task we have the options of either using of mutex, channels or joining on the thread (would block cpu threads) → as soon as the future or async io task is ready to yeild a value the runtime meanwhile of handling other tasks would notify the caller about the result → as soon as the the result of the task is ready to be returned from the os thread the os thread will be stopped blocking and continue with executing other tasks → actors have their own os or ligh thread of execution which uses to spawn tasks they've received via message passing channels or mailbox + → actors receive messages asyncly using their receiver eventloop of their jobq mpsc mailbox, they execute them one at a time to ensure the internal state remains consistent cause there is no mutex → to share a data between threads it must be Send Sync and live valid → initialize storage and actors data structures once and pack them in AppContext struct then share this between threads → what are objects: are an isolated thread objects contains light thread for executing tasks, cron scheudling and jobq mailbox @@ -45,72 +33,21 @@ → receive tasks from the channel by streaming over eventloop with while let Some() = rx.recv().await{} → what eventloop does: executing received tasks inside a light thread of execution → stream is an eventloop receiver channel of some jobq that can be iterated over to get data as they're coming from the channel - ======================================================================================== - NotifBrokerActor is the worker of handling the process of publishing and consuming - messages through rmq, redis and kafka, talking to the NotifBrokerActor can be done - by sending it a message contains the setup either to publish or consume something - to and from an specific broker, so generally it's a sexy actor to produce/consume - messages from different type of brokers it uses RMQ, Redis and Kafka to produce and - consume massive messages in realtime, kindly it supports data AES256 encryption - through producing messages to the broker. we can send either producing or consuming - message to this actor to start producing or consuming in the background. - - ************************************************************************************ - it's notable that for realtime push notif streaming we MUST start consuming from - the specified broker passed in to the message structure when talking with actor, in - a place where the application logic which is likely a server is being started. - ************************************************************************************ - - brokering is all about queueing, sending and receiving messages way more faster, - safer and reliable than a simple eventloop or a tcp based channel. - all brokers contains message/task/job queue to handle communication between services - asyncly, they've been designed specially for decoupling tight long running services - due to their durability nature like giving predicting output of an ai model service - while the ai model is working on other input prediction in the background we can - receive the old outputs and pass them through the brokers to receive them in some - http service for responding the caller. - In rmq producer sends message to exchange the a consumer can bind its queue to - the exchange to receive the messages, routing key determines the pattern of receiving - messages inside the bounded queue from the exchange - In kafka producer sends messages to topic the consumer can receives data from - the topic, Rmq adds an extra layer on top of msg handling logic which is creating - queue per each consumer. - offset in kafka is an strategy which determines the way of tracking the sequential - order of receiving messages by kafka topics it's like routing key in rmq - - BROKER TYPES: (preferred stack: RMQ + RPC + WebSocket + ShortPollingJobId) - → REDIS PUBSUB => light task queue - → KAFKA => high latency hight throughput - → RMQ => low latency low throughput - -ˋˏ✄┈┈┈┈ - >_ the consuming task has been started by sending the ConsumeNotif message - to this actor which will execute the streaming loop over the queue in - either the notif consumer actor context itself or the tokio spawn thread: - - notif consumer -----Q(Consume Payload)-----> Exchange -----notif CQRS writer-----> cache/store on Redis & db - - -ˋˏ✄┈┈┈┈ - >_ the producing task has been started by sending the ProduceNotif message - to this actor which will execute the publishing process to the exchange - in either the notif producer actor context itself or the tokio spawn thread: - - notif producer -----payload-----> Exchange - - -ˋˏ✄┈┈┈┈ - >_ client uses a short polling technique or websocket streaming to fetch notifications - for an specific owner from redis or db, this is the best solution to implement a - realtiming strategy on the client side to fetch what's happening on the - server side in realtime. - - _________ _________ - | server1 | <------- RMQ notif broker -------> | server2 | - --------- --------- - | ws | ws - ------------------- client ------------------ - ======================================================================================== */ +*/ use constants::STORAGE_IO_ERROR_CODE; +use rdkafka::consumer::CommitMode; +use rdkafka::consumer::Consumer; +use rdkafka::consumer::StreamConsumer; +use rdkafka::error::KafkaError; +use rdkafka::message::Header; +use rdkafka::message::Headers; +use rdkafka::message::OwnedHeaders; +use rdkafka::producer::FutureProducer; +use rdkafka::producer::FutureRecord; +use rdkafka::ClientConfig; +use rdkafka::Message; use redis_async::resp::FromResp; use tokio::spawn; use crate::*; @@ -145,20 +82,126 @@ use crate::models::event::*; use crate::interfaces::crypter::Crypter; +/* ======================================================================================== + NotifBrokerActor is the worker of handling the process of publishing and consuming + messages through rmq, redis and kafka, talking to the NotifBrokerActor can be done + by sending it a message contains the setup either to publish or consume something + to and from an specific broker, so generally it's a sexy actor to produce/consume + messages from different type of brokers it uses RMQ, Redis and Kafka to produce and + consume massive messages in realtime, kindly it supports data AES256 encryption + through producing messages to the broker. we can send either producing or consuming + message to this actor to start producing or consuming in the background. + + ************************************************************************************ + it's notable that for realtime push notif streaming we MUST start consuming from + the specified broker passed in to the message structure when talking with actor, in + a place where the application logic which is likely a server is being started. + MAKE SURE YOU'VE STARTED CONSUMING BEFORE PRODUCING + ************************************************************************************ + + brokering is all about queueing, sending and receiving messages way more faster, + safer and reliable than a simple eventloop or a tcp based channel. + all brokers contains message/task/job queue to handle communication between services + asyncly, they've been designed specially for decoupling tight long running services + due to their durability nature like giving predicting output of an ai model service + while the ai model is working on other input prediction in the background we can + receive the old outputs and pass them through the brokers to receive them in some + http service for responding the caller. + In rmq producer sends message to exchange the a consumer can bind its queue to + the exchange to receive the messages, routing key determines the pattern of receiving + messages inside the bounded queue from the exchange + In kafka producer sends messages to topic the consumer can receives data from + the topic, Rmq adds an extra layer on top of msg handling logic which is creating + queue per each consumer. + offset in kafka is an strategy which determines the way of tracking the sequential + order of receiving messages by kafka topics it's like routing key in rmq + + BROKER TYPES: (preferred stack: RMQ + RPC + WebSocket + ShortPollingJobId) + → REDIS PUBSUB => light task queue + → KAFKA => high latency hight throughput + -ˋˏ✄┈┈┈┈ + >_ topic contains messages allows consumers to consume from topics each topic + can be divided into multiple partitions for example a topic might have 10 + partitions, each message can have its own unique partition key which specifies + to which partition the message will go. Offset can be assigned to each message + within a partition it specifies the position of the message in that partition + it's useful for the consumers to resume consuming where they've left. + a consumer can commit the message after processing it which tells kafka that + the consumer has received and processed the message completely. + single consumer consumes messages from specific partitions but in group of + consumers kafka ensures that each partition is consumed by only one consumer + within the group like if a topic with 4 partitions and 2 consumers are in the + same group, Kafka will assign 2 partitions to each consumer: + + ----------> partition-key1 queue(m1, m2, m3, ...) - all messages with key1 + --------- ------------ | + |consumer1| <-----consume-----> |Kafka Broker| <-----topic-----> partition-key3 queue(m1, m2, m3, ...) - all messages with key3 + --------- ------------ | + |_______partition1&2______________| | |----------> partition-key2 queue(m1, m2, m3, ...) - all messages with key2 + --------- | | | + |consumer2| | | ----------> partition-key4 queue(m1, m2, m3, ...) - all messages with key4 + --------- | | + | | --------- + --------------partition3&4------ | producer| + --------- + it's notable that committing the offset too early, instead, might cause message + loss, since upon recovery the consumer will start from the next message, skipping + the one where the failure occurred. + + → RMQ => low latency low throughput + -ˋˏ✄┈┈┈┈ + >_ the consuming task has been started by sending the ConsumeNotif message + to this actor which will execute the streaming loop over the queue in + either the notif consumer actor context itself or the tokio spawn thread: + + notif consumer -----Q(Consume Payload)-----> Exchange -----notif CQRS writer-----> cache/store on Redis & db + + -ˋˏ✄┈┈┈┈ + >_ the producing task has been started by sending the ProduceNotif message + to this actor which will execute the publishing process to the exchange + in either the notif producer actor context itself or the tokio spawn thread: + + notif producer -----payload-----> Exchange + + -ˋˏ✄┈┈┈┈ + >_ client uses a short polling technique or websocket streaming to fetch notifications + for an specific owner from redis or db, this is the best solution to implement a + realtiming strategy on the client side to fetch what's happening on the + server side in realtime. + + _________ _________ + | server1 | <------- RMQ notif broker -------> | server2 | + --------- --------- + | ws | ws + ------------------- client ------------------ + ======================================================================================== +*/ + #[derive(Message, Clone, Serialize, Deserialize, Debug, Default, ToSchema)] #[rtype(result = "()")] pub struct PublishNotifToKafka{ pub topic: String, + pub brokers: String, + pub partitions: u64, + pub headers: Vec, pub local_spawn: bool, pub notif_data: NotifData, pub encryptionConfig: Option } +#[derive(Clone, Serialize, Deserialize, Debug, Default, ToSchema)] +pub struct KafkaHeader{ + pub key: String, + pub val: String, +} + #[derive(Message, Clone, Serialize, Deserialize, Debug, Default, ToSchema)] #[rtype(result = "()")] pub struct ConsumeNotifFromKafka{ // we must build a unique consumer per each consuming process - pub topic: String, - pub consumerId: String, + pub topics: Vec, + pub consumerGroupId: String, + pub brokers: String, + pub redis_cache_exp: u64, pub decryptionConfig: Option } @@ -187,7 +230,7 @@ pub struct HealthMsg{ #[derive(Message, Clone, Serialize, Deserialize, Debug, Default, ToSchema)] #[rtype(result = "()")] -pub struct ProduceNotif{ +pub struct PublishNotifToRmq{ pub local_spawn: bool, // either spawn in actor context or tokio threadpool pub notif_data: NotifData, pub exchange_name: String, @@ -196,16 +239,29 @@ pub struct ProduceNotif{ pub encryption_config: Option, } +#[derive(Clone, Serialize, Deserialize, Debug, Default, ToSchema)] +pub struct ProducerInfo{ + pub Rmq: Option, + pub Kafka: Option, + pub Redis: Option +} + +#[derive(Clone, Serialize, Deserialize, Debug, Default, ToSchema)] +pub struct ConsumerInfo{ + pub Rmq: Option, + pub Kafka: Option, + pub Redis: Option +} + #[derive(Clone, Serialize, Deserialize, Debug, Default, ToSchema)] pub struct CryptoConfig{ pub secret: String, pub passphrase: String, - pub unique_redis_id: String, } #[derive(Message, Clone, Serialize, Deserialize, Debug, Default, ToSchema)] #[rtype(result = "()")] -pub struct ConsumeNotif{ // we'll create a channel then start consuming by binding a queue to the exchange +pub struct ConsumeNotifFromRmq{ // we'll create a channel then start consuming by binding a queue to the exchange /* -ˋˏ✄┈┈┈┈ following queue gets bounded to the passed in exchange type with its routing key, when producer wants to produce notif data it sends them @@ -235,6 +291,8 @@ pub struct ConsumeNotif{ // we'll create a channel then start consuming by bindi 1) producer produces messages and send them to the exchange with an specific routing key 2) a consumer create its own queue and bind it to the exchange with the bind key that is interested to receive the message from the exchange based on that key. + 3) it's notable that a queue can be bounded to multiple exchange at the same time + it allows to receive different messages based on each exchange routing key. -------- --------- | queue1 | <----- |consumer1| ------> routing_key1 <--------------------------- --------- @@ -245,7 +303,9 @@ pub struct ConsumeNotif{ // we'll create a channel then start consuming by bindi | | -------- ----------- producer2 ----------- | | queue2 | <----| consumer2 | ------> routing_key4 <------------------ ----------- - + ---------- | + | exchange2| -----bind(routing_key)----- + ---------- */ pub routing_key: String, // patterns for this queue to tell exchange what messages this queue is interested in pub tag: String, @@ -309,42 +369,29 @@ impl NotifBrokerActor{ match redis_pool.get().await{ Ok(mut redis_conn) => { - let mut dataString = serde_json::to_string(¬if_data).unwrap(); - let finalData = if encryptionConfig.is_some(){ - - let CryptoConfig{ secret, passphrase, unique_redis_id } = encryptionConfig.unwrap(); - let mut secure_cell_config = &mut wallexerr::misc::SecureCellConfig{ - secret_key: hex::encode(secret), - passphrase: hex::encode(passphrase), - data: vec![], - }; - - // after calling encrypt method dataString has changed and contains the hex encrypted data - dataString.encrypt(secure_cell_config); - - // make sure that we have redis unique id and encrypted data in secure cell - // then cache the condif on redis with expirable key - if - !unique_redis_id.is_empty() && - !secure_cell_config.secret_key.is_empty() && - !secure_cell_config.passphrase.is_empty(){ + let channel = channel.to_string(); + tokio::spawn(async move{ + let mut dataString = serde_json::to_string(¬if_data).unwrap(); + let finalData = if encryptionConfig.is_some(){ - log::info!("[*] caching secure cell config on redis"); - - // cache the secure cell config on redis for 5 mins - // this is faster than storing it on disk or file - let str_secure_cell = serde_json::to_string_pretty(&secure_cell_config).unwrap(); - let redis_key = format!("Redis_notif_encryption_config_for_{}", unique_redis_id); - let _: () = redis_conn.set_ex(redis_key, str_secure_cell, 300).await.unwrap(); - } - - dataString - - } else{ - dataString - }; - - let _: () = redis_conn.publish(channel, finalData).await.unwrap(); + let CryptoConfig{ secret, passphrase } = encryptionConfig.unwrap(); + let mut secure_cell_config = &mut wallexerr::misc::SecureCellConfig{ + secret_key: hex::encode(secret), + passphrase: hex::encode(passphrase), + data: vec![], + }; + + // after calling encrypt method dataString has changed and contains the hex encrypted data + dataString.encrypt(secure_cell_config); + + dataString + + } else{ + dataString + }; + + let _: () = redis_conn.publish(channel.clone(), finalData).await.unwrap(); + }); }, Err(e) => { @@ -386,31 +433,16 @@ impl NotifBrokerActor{ // do passphrase and secret key validation logic before // consuming messages let mut secure_cell_config = if let Some(mut config) = decryption_config.clone(){ - - // get the secure cell config from redis cache - let redis_key = format!("Redis_notif_encryption_config_for_{}", config.unique_redis_id); - let is_key_there: bool = redis_conn.exists(&redis_key).await.unwrap(); - - let secure_cell_config = if is_key_there{ - let get_secure_cell: String = redis_conn.get(redis_key).await.unwrap(); - serde_json::from_str::(&get_secure_cell).unwrap() - } else{ - SecureCellConfig::default() - }; - + config.secret = hex::encode(config.secret); config.passphrase = hex::encode(config.passphrase); - // make sure that both passphrase and secret key are the same - // inside the stored secure cell config on redis - if config.passphrase != secure_cell_config.passphrase || - config.secret != secure_cell_config.secret_key{ - log::error!("[!] wrong passphrase or secret key"); - return; // terminate the caller and cancel consuming, api must gets called again - } - // return the loaded instance from redis - secure_cell_config + SecureCellConfig{ + secret_key: config.secret, + passphrase: config.passphrase, + data: vec![], + } } else{ SecureCellConfig::default() @@ -517,7 +549,7 @@ impl NotifBrokerActor{ *constants::CODEC_ERROR_CODE, // error code error_content_, // error content ErrorKind::Codec(crate::error::CodecError::Serde(e)), // error kind - "NotifBrokerActor.decode_serde_redis", // method + "NotifBrokerActor.consumeFromRedis.decode_serde_redis", // method Some(&zerlog_producer_actor) ).await; return; // terminate the caller @@ -653,7 +685,11 @@ impl NotifBrokerActor{ /* ******************************************************************************** */ /* ***************************** PUBLISHING TO KAFKA ****************************** */ /* ******************************************************************************** */ - pub async fn publishToKafka(&self, channel: &str, notif_data: NotifData, encryptionConfig: Option){ + pub async fn publishToKafka(&self, + topic: &str, notif_data: NotifData, + encryptionConfig: Option, + partitions: u64, brokers: &str, + kafkaHeaders: Vec){ let storage = self.app_storage.clone(); let rmq_pool = storage.as_ref().unwrap().get_lapin_pool().await.unwrap(); @@ -664,14 +700,179 @@ impl NotifBrokerActor{ // later we can receive the notif in ws server setup and send it to the owner let notif_data_sender = self.notif_broker_sender.clone(); - // ... + // since the redis is important, so we can't move forward without it + match redis_pool.get().await{ + Ok(mut redis_conn) => { + + // we can't move a pointer (&str in this case) which has shorter liftime + // into tokio spawn scope we should either convert it into String or + // use 'static lifetime to live longer than the spawn scope. + let topic = topic.to_string(); + let brokers = brokers.to_string(); + let kafkaHeaders = kafkaHeaders.clone(); + + tokio::spawn(async move{ + + // this will be used as a unique key for the partition to send messages to + // it means that send all messages related to this owner to a partition + // with the ownerId as the key. partitioning per owner it's like creating + // a queue for each user consumer to receive messages from RMQ + let partitionKey = notif_data.clone().receiver_info; + + let mut dataString = serde_json::to_string(¬if_data).unwrap(); + let finalData = if encryptionConfig.is_some(){ + + let CryptoConfig{ secret, passphrase } = encryptionConfig.clone().unwrap(); + let mut secure_cell_config = &mut wallexerr::misc::SecureCellConfig{ + secret_key: hex::encode(secret), + passphrase: hex::encode(passphrase), + data: vec![], + }; + + // after calling encrypt method dataString has changed and contains the hex encrypted data + dataString.encrypt(secure_cell_config); + + dataString + + } else{ + dataString + }; + + // creating the producer to produce the message asyncly + let createProducer: Result = ClientConfig::new() + .set("bootstrap.servers", brokers) + .set("message.timeout.ms", "5000") + .set("queue.buffering.max.ms", "0") + .create(); + + match createProducer{ + Ok(producer) => { + + /* -ˋˏ✄┈┈┈┈ + >_ produce notif_data in a ligth thread in the background asyncly + tokio runtime schedule each async task of producing in a light + thread in the background ensures truely async execution without + blocking happens. + we're not awaiting on the spawn task, this let the task which + contains async move{} block gets executed in the background thread + the async move{} block however might contains some await operations + but the whole task will be executed asyncly in a none blocking manner + in the background light thread. + don't await on a future object let that to get executed in the + background inside a light thread like tokio::spawn(async move{}); + if you need data inside the spawn scope use channels or you can + await on it which it tells runtime suspend the function execution and + schedule to notify the caller once the result is ready but don't block + the thread let the thread continue executing other tasks in itself. + */ + tokio::spawn(async move{ + + // sending the message to the topic queue, topic in here is like the exchange + // in rmq, it collects all the messages in itself. + let deliveryStatus = producer.send( + FutureRecord::to(&topic) + .payload(&finalData) + .key(&partitionKey) + .timestamp(chrono::Local::now().timestamp()) + .headers( + { + let mut headers = OwnedHeaders::new(); + for header in kafkaHeaders.iter(){ + headers.clone().insert(Header{ + key: &header.key, + value: Some(&header.val) + }); + } + headers + } + ), + std::time::Duration::from_secs(0) // no timeout for the queue + ).await; + + match deliveryStatus{ + /* + messages go to a topic then topic divided into partitions (has been specified) + so it means messages will be sent to partitions based on specified rules like + number of partitions and partition key, hence if a message is deliverred successfully + kafka returns partition number that the message was sent to and the offset of + the message which is the id of the message shows the message was stored. + */ + Ok((partition, offset)) => { + /* + partition: The partition number the message was delivered to. + offset : The offset within that partition where the message was stored. + */ + log::info!("Message delivered to partition: {}", partition); + log::info!("Message offset: {}", offset); + }, + Err((e, om)) => { + + // load the Message trait enables us to call payload() and key() methods + // on the owned message instance. + use rdkafka::Message; + // om is the message that is failed to deliver + log::error!("Kafa couldn't send the message:"); + log::error!("Original message payload: {:?}", om.payload()); + log::error!("Original message key: {:?}", om.key()); + + use crate::error::{ErrorKind, HoopoeErrorResponse}; + let error_content = &e.to_string(); + let error_content_ = error_content.as_bytes().to_vec(); + let mut error_instance = HoopoeErrorResponse::new( + *constants::STORAGE_IO_ERROR_CODE, // error code + error_content_, // error content + ErrorKind::Storage(crate::error::StorageError::Kafka(e)), // error kind + "NotifBrokerActor.publishToKafka.deliveryStatus", // method + Some(&zerlog_producer_actor) + ).await; + } + } + + }); + + }, + Err(e) => { + use crate::error::{ErrorKind, HoopoeErrorResponse}; + let error_content = &e.to_string(); + let error_content_ = error_content.as_bytes().to_vec(); + let mut error_instance = HoopoeErrorResponse::new( + *constants::STORAGE_IO_ERROR_CODE, // error code + error_content_, // error content + ErrorKind::Storage(crate::error::StorageError::Kafka(e)), // error kind + "NotifBrokerActor.publishToKafka.createProducer", // method + Some(&zerlog_producer_actor) + ).await; + } + } + + }); + + }, + Err(e) => { + use crate::error::{ErrorKind, HoopoeErrorResponse}; + let error_content = &e.to_string(); + let error_content_ = error_content.as_bytes().to_vec(); + let mut error_instance = HoopoeErrorResponse::new( + *constants::STORAGE_IO_ERROR_CODE, // error code + error_content_, // error content + ErrorKind::Storage(crate::error::StorageError::RedisPool(e)), // error kind + "NotifBrokerActor.publishToKafka.redis_pool", // method + Some(&zerlog_producer_actor) + ).await; + } + } } /* ******************************************************************************** */ /* ***************************** CONSUMING FROM KAFKA ***************************** */ /* ******************************************************************************** */ - pub async fn consumeFromKafka(&self, channel: &str, consumerId: &str, decryptionConfig: Option){ + // this message handler creates a new consumer everytime for consuming messages + // from the kafka topics, useful for grouping consumerws + pub async fn consumeFromKafka(&self, + topics: &[String], consumerGroupId: &str, + decryptionConfig: Option, + redis_cache_exp: u64, brokers: &str){ let storage = self.app_storage.clone(); let rmq_pool = storage.as_ref().unwrap().get_lapin_pool().await.unwrap(); @@ -682,7 +883,310 @@ impl NotifBrokerActor{ // later we can receive the notif in ws server setup and send it to the owner let notif_data_sender = self.notif_broker_sender.clone(); - // ... + // since the redis is important, so we can't move forward without it + match redis_pool.get().await{ + Ok(mut redis_conn) => { + + let mut secure_cell_config = if let Some(mut config) = decryptionConfig.clone(){ + + config.secret = hex::encode(config.secret); + config.passphrase = hex::encode(config.passphrase); + + // return the loaded instance from redis + SecureCellConfig{ + secret_key: config.secret, + passphrase: config.passphrase, + data: vec![], + } + + } else{ + SecureCellConfig::default() + }; + + let cloned_notif_data_sender_channel = notif_data_sender.clone(); + + // creating the consumer to consume messages from an specific topic asyncly + let createConsumer: Result = ClientConfig::new() + .set("bootstrap.servers", brokers) + .set("enable.partition.eof", "false") + .set("session.timeout.ms", "6000") + .set("enable.auto.commit", "true") + .set("group.id", format!("NotifBrokerActor-{}", consumerGroupId)) // consumers with this ID will be grouped together + .create(); + + match createConsumer{ + Ok(consumer) => { + + let topics = { + topics + .into_iter() + .map(|t| t.as_str()) + .collect::>() + }; + consumer.subscribe(&topics).unwrap(); + + /* -ˋˏ✄┈┈┈┈ + >_ consume notif_data in a ligth thread in the background asyncly + tokio runtime schedule each async task of producing in a light + thread in the background ensures truely async execution without + blocking happens. + we're not awaiting on the spawn task, this let the task which + contains async move{} block gets executed in the background thread + the async move{} block however might contains some await operations + but the whole task will be executed asyncly in a none blocking manner + in the background light thread. + don't await on a future object let that to get executed in the + background inside a light thread like tokio::spawn(async move{}); + if you need data inside the spawn scope use channels or you can + await on it which it tells runtime suspend the function execution and + schedule to notify the caller once the result is ready but don't block + the thread let the thread continue executing other tasks in itself. + */ + tokio::spawn(async move{ + + // streaming over the consumer to receive messages from the topics + while let Ok(message) = consumer.recv().await{ + + // it uses std::str::from_utf8() to convert utf8 bytes into string + let mut consumedBuffer = message.payload().unwrap(); + let hexed_data = std::str::from_utf8(consumedBuffer).unwrap(); + let mut payload = hexed_data.to_string(); + + log::info!("Received Message from Kafka Broker"); + log::info!("key: '{:?}', payload: '{}', topic: {}, partition: {}, offset: {}, timestamp: {:?}", + message.key(), payload, message.topic(), message.partition(), message.offset(), message.timestamp()); + + if let Some(headers) = message.headers(){ + for header in headers.iter(){ + log::info!("Message Header {:#?}: {:?}", header.key, header.value); + } + } + + consumer.commit_message(&message, CommitMode::Async).unwrap(); + + // ===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>> + // ===>>>===>>>===>>>===>>>===>>> data decryption logic ===>>>===>>>===>>>===>>>===>>> + // ===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>> + // if we have a config means the data has been encrypted + let finalData = if + !secure_cell_config.clone().secret_key.is_empty() && + !secure_cell_config.clone().passphrase.is_empty(){ + + // after calling decrypt method has changed and now contains raw string + // payload must be mutable since the method mutate the content after decrypting + payload.decrypt(&mut secure_cell_config); + + // payload is now raw string which can be decoded into the NotifData structure + payload + + } else{ + // no decryption config is needed, just return the raw data + // there would be no isse with decoding this into NotifData + log::error!("secure_cell_config is empty, data is not encrypted"); + payload + }; + // ===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>> + // ===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>> + // ===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>> + + // either decrypted or the raw data as string + log::info!("[*] received data: {}", finalData); + + // decoding the string data into the NotifData structure (convention) + let get_notif_event = serde_json::from_str::(&finalData); + match get_notif_event{ + Ok(notif_event) => { + + log::info!("[*] deserialized data: {:?}", notif_event); + + // ================================================================= + /* -------------------------- send to mpsc channel for ws streaming + // ================================================================= + this is the most important part in here, we're slightly sending the data + to downside of a jobq mpsc channel, the receiver eventloop however will + receive data in websocket handler which enables us to send realtime data + received from RMQ to the browser through websocket server: RMQ over websocket + once we receive the data from the mpsc channel in websocket handler we'll + send it to the browser through websocket channel. + */ + if let Err(e) = cloned_notif_data_sender_channel.send(finalData).await{ + log::error!("can't send notif data to websocket channel due to: {}", e.to_string()); + } + + // ============================================================================= + // ------------- if the cache on redis flag was activated we then store on redis + // ============================================================================= + if redis_cache_exp != 0{ + match redis_pool.get().await{ + Ok(mut redis_conn) => { + + // key: String::from(notif_receiver.id) | value: Vec + let redis_notif_key = format!("notif_owner:{}", ¬if_event.receiver_info); + + // -ˋˏ✄┈┈┈┈ extend notifs + let get_events: RedisResult = redis_conn.get(&redis_notif_key).await; + let events = match get_events{ + Ok(events_string) => { + let get_messages = serde_json::from_str::>(&events_string); + match get_messages{ + Ok(mut messages) => { + messages.push(notif_event.clone()); + messages + }, + Err(e) => { + use crate::error::{ErrorKind, HoopoeErrorResponse}; + let error_content = &e.to_string(); + let error_content_ = error_content.as_bytes().to_vec(); + let mut error_instance = HoopoeErrorResponse::new( + *constants::CODEC_ERROR_CODE, // error code + error_content_, // error content + ErrorKind::Codec(crate::error::CodecError::Serde(e)), // error kind + "NotifBrokerActor.consumeFromKafka.decode_serde_redis", // method + Some(&zerlog_producer_actor) + ).await; + return; // terminate the caller + } + } + + }, + Err(e) => { + // we can't get the key means this is the first time we're creating the key + // or the key is expired already, we'll create a new key either way and put + // the init message in there. + let init_message = vec![ + notif_event.clone() + ]; + + init_message + + } + }; + + // -ˋˏ✄┈┈┈┈ caching the notif event in redis with expirable key + // chaching in redis is an async task which will be executed + // in the background with an expirable key + tokio::spawn(async move{ + let events_string = serde_json::to_string(&events).unwrap(); + let is_key_there: bool = redis_conn.exists(&redis_notif_key.clone()).await.unwrap(); + if is_key_there{ // update only the value + let _: () = redis_conn.set(&redis_notif_key.clone(), &events_string).await.unwrap(); + } else{ // initializing a new expirable key containing the new notif data + /* + make sure you won't get the following error: + called `Result::unwrap()` on an `Err` value: MISCONF: Redis is configured to + save RDB snapshots, but it's currently unable to persist to disk. Commands that + may modify the data set are disabled, because this instance is configured to + report errors during writes if RDB snapshotting fails (stop-writes-on-bgsave-error option). + Please check the Redis logs for details about the RDB error. + SOLUTION: restart redis :) + */ + let _: () = redis_conn.set_ex(&redis_notif_key.clone(), &events_string, redis_cache_exp).await.unwrap(); + } + }); + + }, + Err(e) => { + use crate::error::{ErrorKind, HoopoeErrorResponse}; + let error_content = &e.to_string(); + let error_content_ = error_content.as_bytes().to_vec(); + let mut error_instance = HoopoeErrorResponse::new( + *constants::STORAGE_IO_ERROR_CODE, // error code + error_content_, // error content + ErrorKind::Storage(crate::error::StorageError::RedisPool(e)), // error kind + "NotifBrokerActor.consumeFromKafka.redis_pool", // method + Some(&zerlog_producer_actor) + ).await; + return; // terminate the caller + } + } + } + + // ============================================================================= + // ------------- store in database + // ============================================================================= + // -ˋˏ✄┈┈┈┈ store notif in db by sending message to the notif mutator actor worker + // sending StoreNotifEvent message to the notif event mutator actor + // spawning the async task of storing data in db in the background + // worker of lightweight thread of execution using tokio threadpool + tokio::spawn( + { + let cloned_message = notif_event.clone(); + let cloned_mutator_actor = notif_mutator_actor.clone(); + let zerlog_producer_actor = zerlog_producer_actor.clone(); + async move{ + match cloned_mutator_actor + .send(StoreNotifEvent{ + message: cloned_message, + local_spawn: true + }) + .await + { + Ok(_) => { () }, + Err(e) => { + let source = &e.to_string(); // we know every goddamn type implements Error trait, we've used it here which allows use to call the source method on the object + let err_instance = crate::error::HoopoeErrorResponse::new( + *MAILBOX_CHANNEL_ERROR_CODE, // error hex (u16) code + source.as_bytes().to_vec(), // text of error source in form of utf8 bytes + crate::error::ErrorKind::Actor(crate::error::ActixMailBoxError::Mailbox(e)), // the actual source of the error caused at runtime + &String::from("NotifBrokerActor.consumeFromKafka.notif_mutator_actor.send"), // current method name + Some(&zerlog_producer_actor) + ).await; + return; + } + } + } + } + ); + + }, + Err(e) => { + log::error!("[!] can't deserialized into NotifData struct"); + use crate::error::{ErrorKind, HoopoeErrorResponse}; + let error_content = &e.to_string(); + let error_content_ = error_content.as_bytes().to_vec(); + let mut error_instance = HoopoeErrorResponse::new( + *constants::CODEC_ERROR_CODE, // error code + error_content_, // error content + ErrorKind::Codec(crate::error::CodecError::Serde(e)), // error kind + "NotifBrokerActor.consumeFromKafka.decode_serde", // method + Some(&zerlog_producer_actor) + ).await; + return; // terminate the caller + } + } + } + + }); + + }, + Err(e) => { + use crate::error::{ErrorKind, HoopoeErrorResponse}; + let error_content = &e.to_string(); + let error_content_ = error_content.as_bytes().to_vec(); + let mut error_instance = HoopoeErrorResponse::new( + *constants::STORAGE_IO_ERROR_CODE, // error code + error_content_, // error content + ErrorKind::Storage(crate::error::StorageError::Kafka(e)), // error kind + "NotifBrokerActor.consumeFromKafka.createConsumer", // method + Some(&zerlog_producer_actor) + ).await; + } + } + + }, + Err(e) => { + use crate::error::{ErrorKind, HoopoeErrorResponse}; + let error_content = &e.to_string(); + let error_content_ = error_content.as_bytes().to_vec(); + let mut error_instance = HoopoeErrorResponse::new( + *constants::STORAGE_IO_ERROR_CODE, // error code + error_content_, // error content + ErrorKind::Storage(crate::error::StorageError::RedisPool(e)), // error kind + "NotifBrokerActor.consumeFromKafka.redis_pool", // method + Some(&zerlog_producer_actor) + ).await; + } + } } @@ -791,31 +1295,16 @@ impl NotifBrokerActor{ let mut secure_cell_config = if let Some(mut config) = decryption_config.clone(){ match redis_pool.get().await{ Ok(mut redis_conn) => { - - // get the secure cell config from redis cache - let redis_key = format!("Rmq_notif_encryption_config_for_{}", config.unique_redis_id); - let is_key_there: bool = redis_conn.exists(&redis_key).await.unwrap(); - - let secure_cell_config = if is_key_there{ - let get_secure_cell: String = redis_conn.get(redis_key).await.unwrap(); - serde_json::from_str::(&get_secure_cell).unwrap() - } else{ - SecureCellConfig::default() - }; config.secret = hex::encode(config.secret); config.passphrase = hex::encode(config.passphrase); - // make sure that both passphrase and secret key are the same - // inside the stored secure cell config on redis - if config.passphrase != secure_cell_config.passphrase || - config.secret != secure_cell_config.secret_key{ - log::error!("[!] wrong passphrase or secret key"); - return; // terminate the caller and cancel consuming, api must gets called again - } - // return the loaded instance from redis - secure_cell_config + SecureCellConfig{ + secret_key: config.secret, + passphrase: config.passphrase, + data: vec![], + } }, Err(e) => { @@ -1189,7 +1678,7 @@ impl NotifBrokerActor{ /* ********************************************************************* */ /* ***************************** PRODUCING ***************************** */ /* ********************************************************************* */ - pub async fn publishToRmq(&self, data: &str, exchange: &str, routing_key: &str, exchange_type: &str, unique_redis_id: &str, secure_cell_config: &mut SecureCellConfig){ + pub async fn publishToRmq(&self, data: &str, exchange: &str, routing_key: &str, exchange_type: &str, secure_cell_config: &mut SecureCellConfig){ let zerlog_producer_actor = self.clone().zerlog_producer_actor; let this = self.clone(); @@ -1203,43 +1692,12 @@ impl NotifBrokerActor{ let exchange_type = exchange_type.to_string(); let data = data.to_string(); let cloned_secure_cell_config = secure_cell_config.clone(); - let unique_redis_id = unique_redis_id.to_string(); tokio::spawn(async move{ let storage = this.clone().app_storage.clone(); let rmq_pool = storage.as_ref().unwrap().get_lapin_pool().await.unwrap(); let redis_pool = storage.as_ref().unwrap().get_redis_pool().await.unwrap(); - - - // make sure that we have redis unique id and encrypted data in secure cell - // then cache the condif on redis with expirable key - if !unique_redis_id.is_empty() && !cloned_secure_cell_config.data.is_empty(){ - match redis_pool.get().await{ - Ok(mut redis_conn) => { - - log::info!("[*] caching secure cell config on redis"); - - // cache the secure cell config on redis for 5 mins - // this is faster than storing it on disk or file - let str_secure_cell = serde_json::to_string_pretty(&cloned_secure_cell_config).unwrap(); - let redis_key = format!("Rmq_notif_encryption_config_for_{}", unique_redis_id); - let _: () = redis_conn.set_ex(redis_key, str_secure_cell, 300).await.unwrap(); - }, - Err(e) => { - use crate::error::{ErrorKind, HoopoeErrorResponse}; - let error_content = &e.to_string(); - let error_content_ = error_content.as_bytes().to_vec(); - let mut error_instance = HoopoeErrorResponse::new( - *constants::STORAGE_IO_ERROR_CODE, // error code - error_content_, // error content - ErrorKind::Storage(crate::error::StorageError::RedisPool(e)), // error kind - "NotifBrokerActor.produce.redis_pool", // method - Some(&zerlog_producer_actor) - ).await; - } - }; - } // trying to ge a connection from the pool match rmq_pool.get().await{ @@ -1399,13 +1857,13 @@ impl NotifBrokerActor{ /* ********************************************************************************* */ /* ***************************** PRODUCE NOTIF HANDLER ***************************** */ /* ********************************************************************************* */ -impl ActixMessageHandler for NotifBrokerActor{ +impl ActixMessageHandler for NotifBrokerActor{ type Result = (); - fn handle(&mut self, msg: ProduceNotif, ctx: &mut Self::Context) -> Self::Result { + fn handle(&mut self, msg: PublishNotifToRmq, ctx: &mut Self::Context) -> Self::Result {; // unpacking the notif data - let ProduceNotif { + let PublishNotifToRmq { exchange_name, exchange_type, routing_key, @@ -1462,7 +1920,7 @@ impl ActixMessageHandler for NotifBrokerActor{ *CRYPTER_THEMIS_ERROR_CODE, // error hex (u16) code source.as_bytes().to_vec(), // text of error source in form of utf8 bytes crate::error::ErrorKind::Crypter(crate::error::CrypterError::Themis(e)), // the actual source of the error caused at runtime - &String::from("NotifBrokerActor.ActixMessageHandler.Wallet::secure_cell_encrypt"), // current method name + &String::from("NotifBrokerActor.ActixMessageHandler.Wallet::secure_cell_encrypt"), // current method name Some(&zerlog_producer_actor) ).await; }); @@ -1471,7 +1929,7 @@ impl ActixMessageHandler for NotifBrokerActor{ } }; - (str_data, secure_cell_config.to_owned(), config.unique_redis_id) + (str_data, secure_cell_config.to_owned()) } else{ // we can't return a reference to a SecureCellConfig in here cause @@ -1479,7 +1937,7 @@ impl ActixMessageHandler for NotifBrokerActor{ // this scope gets executed and dropped the instance hence any pointer // of it won't be valid. so don't return a mutable pointer to SecureCellConfig // instance in here. - (serde_json::to_string_pretty(¬if_data).unwrap(), SecureCellConfig::default(), String::from("")) + (serde_json::to_string_pretty(¬if_data).unwrap(), SecureCellConfig::default()) }; // ===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>> // ===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>> @@ -1490,7 +1948,6 @@ impl ActixMessageHandler for NotifBrokerActor{ let ( stringified_data, mut secure_cell_config, - unique_redis_id ) = stringified_config_data; // spawn the future in the background into the given actor context thread @@ -1498,13 +1955,13 @@ impl ActixMessageHandler for NotifBrokerActor{ // every actor has its own thread of execution. if local_spawn{ async move{ - this.publishToRmq(&stringified_data, &exchange_name, &routing_key, &exchange_type, &unique_redis_id, &mut secure_cell_config).await; + this.publishToRmq(&stringified_data, &exchange_name, &routing_key, &exchange_type,&mut secure_cell_config).await; } .into_actor(self) // convert the future into an actor future of type NotifBrokerActor .spawn(ctx); // spawn the future object into this actor context thread } else{ // spawn the future in the background into the tokio lightweight thread tokio::spawn(async move{ - this.publishToRmq(&stringified_data, &exchange_name, &routing_key, &exchange_type, &unique_redis_id, &mut secure_cell_config).await; + this.publishToRmq(&stringified_data, &exchange_name, &routing_key, &exchange_type,&mut secure_cell_config).await; }); } @@ -1517,13 +1974,13 @@ impl ActixMessageHandler for NotifBrokerActor{ /* ********************************************************************************* */ /* ***************************** CONSUME NOTIF HANDLER ***************************** */ /* ********************************************************************************* */ -impl ActixMessageHandler for NotifBrokerActor{ +impl ActixMessageHandler for NotifBrokerActor{ type Result = (); - fn handle(&mut self, msg: ConsumeNotif, ctx: &mut Self::Context) -> Self::Result { + fn handle(&mut self, msg: ConsumeNotifFromRmq, ctx: &mut Self::Context) -> Self::Result { // unpacking the consume data - let ConsumeNotif { + let ConsumeNotifFromRmq { queue, tag, exchange_name, @@ -1672,7 +2129,17 @@ impl ActixMessageHandler for NotifBrokerActor{ fn handle(&mut self, msg: PublishNotifToKafka, ctx: &mut Self::Context) -> Self::Result { - let PublishNotifToKafka{ topic, local_spawn, notif_data, encryptionConfig } = msg.clone(); + let PublishNotifToKafka{ + topic, + local_spawn, + brokers, + notif_data, + encryptionConfig, + partitions, + headers, + } = msg.clone(); + + log::info!("PublishNotifToKafka : {:#?}", msg.clone()); let this = self.clone(); let task = async move{ @@ -1682,7 +2149,7 @@ impl ActixMessageHandler for NotifBrokerActor{ // the caller otherwise suspend the this.publishToRedis() function // until the task is ready to be polled, meanwhile it executes other // tasks (won't block the thread) - this.publishToKafka(&topic, notif_data, encryptionConfig).await; + this.publishToKafka(&topic, notif_data, encryptionConfig, partitions, &brokers, headers).await; }; @@ -1713,7 +2180,7 @@ impl ActixMessageHandler for NotifBrokerActor{ fn handle(&mut self, msg: ConsumeNotifFromKafka, ctx: &mut Self::Context) -> Self::Result { - let ConsumeNotifFromKafka{ topic, consumerId, decryptionConfig } = msg.clone(); + let ConsumeNotifFromKafka{ topics, brokers, consumerGroupId, decryptionConfig, redis_cache_exp } = msg.clone(); let this = self.clone(); let task = async move{ @@ -1723,7 +2190,7 @@ impl ActixMessageHandler for NotifBrokerActor{ // the caller otherwise suspend the this.publishToRedis() function // until the task is ready to be polled, meanwhile it executes other // tasks (won't block the thread) - this.consumeFromKafka(&topic, &consumerId, decryptionConfig).await; + this.consumeFromKafka(&topics, &consumerGroupId, decryptionConfig, redis_cache_exp, &brokers).await; }; diff --git a/src/workers/scheduler/mod.rs b/src/workers/scheduler/mod.rs index 6003f01..15fd00a 100644 --- a/src/workers/scheduler/mod.rs +++ b/src/workers/scheduler/mod.rs @@ -35,17 +35,18 @@ pub async fn runInterval1(task: std::sync::Arc R + Send + S }); } -pub async fn runInterval(method: M, int: u64) +// task is a closure that returns a future object +pub async fn runInterval(task: M, period: u64) where M: Fn() -> R + Send + Sync + 'static, R: std::future::Future + Send + Sync + 'static, { tokio::spawn(async move{ - let mut int = tokio::time::interval(tokio::time::Duration::from_secs(int)); + let mut int = tokio::time::interval(tokio::time::Duration::from_secs(period)); int.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); loop{ int.tick().await; - method().await; + task().await; } }); }