Skip to content

Commit

Permalink
feat(streaming): finish kafka streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
wildonion committed Sep 19, 2024
1 parent 08742ac commit db73cdd
Show file tree
Hide file tree
Showing 32 changed files with 2,034 additions and 370 deletions.
5 changes: 5 additions & 0 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ AMQP_PORT=5672
AMQP_USERNAME=hoopoe
AMQP_PASSWORD=geDteDd0Ltg2135FJYQ6rjNYHYkGQa70

KAFKA_HOST=localhost # kafka
KAFKA_PORT=9092
KAFKA_USERNAME=hoopoe
KAFKA_PASSWORD=geDteDd0Ltg2135FJYQ6rjNYHYkGQa70

IO_BUFFER_SIZE=1204
FILE_SIZE=209715200
ENVIRONMENT=prod
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/cicd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ jobs:
username: ${{ secrets.SERVER_USER }}
password: ${{ secrets.SERVER_PASSWORD }}
port: 22
script: | # 1 - pull services from the custom registry (each image in dockercompose file prefixed with registry name) | 2 - up all the services
cd /root/hoopoe
script: | # cd hoopoe directly cause we're already in root
cd hoopoe
sudo docker compose -f "docker-compose.yml" pull
sudo docker compose -f "docker-compose.yml" up -d
27 changes: 27 additions & 0 deletions .github/workflows/ui.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
name: CI/CD Pipeline

on:
push:
branches: [main]
pull_request:
branches: [main]

jobs:
build:
runs-on: ubuntu-latest
steps:
- name: Deploy & Build
uses: appleboy/ssh-action@master
with:
host: ${{ secrets.SERVER_HOST }}
username: ${{ secrets.SERVER_USER }}
password: ${{ secrets.SERVER_PASSWORD }}
port: 22
script: | # cd SomeUiRepo directly cause we're already in root
if [ ! -d "/root/ui" ]; then
git clone https://wildonion:${{ secrets.ACCESS_TOKEN }}@github.com/wildonion/SomeUiRepo.git
fi
cd SomeUiRepo
git pull origin main
npm install
npm run build
95 changes: 90 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ tracing-subscriber = "0.3"
serde = {version = "1", features = ["derive"] }
serde_json = { version = "1", features = ["preserve_order"] }
redis-async = "0.17"
rdkafka = "0.36.2"
sha2 = "0.10"
is_type = "0.2.1"
rayon = "1.10.0"
Expand Down
31 changes: 31 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,37 @@ services:
timeout: 10s
retries: 5
start_period: 10s
##### ===================================================================================================
kafka:
image: apache/kafka:latest
container_name: kafka
restart: unless-stopped
ports:
- "9092:9092"
networks:
- hoopoe
- hoopoe_is_there1
- hoopoe_is_there2
hostname: kafka
healthcheck:
test: rabbitmq-diagnostics -q ping
interval: 30s
timeout: 10s
retries: 5
start_period: 10s
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: PLAINTEXT://localhost:9092,CONTROLLER://localhost:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_NUM_PARTITIONS: 3
##### ===================================================================================================
adminer:
container_name: adminer
Expand Down
File renamed without changes.
File renamed without changes.
101 changes: 99 additions & 2 deletions src/workers/actorWorker.md → docs/actorWorker.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@

Actors are threads with isolated and no shared state with outside world, they have a single thread of execution which enables them to execute jobs received by their mailbox jobq channel one at a time, each job however can be scheduled to be executed periodically or at an specific time.
Tasks can be represented as light threads, more of essence ligth threads can be more smarter objects like with isolated and no shared state mechanism with outside world cause they handle one message at a time and ensures internal state remains consistent while processing the message, they have a single thread of execution which enables them to execute jobs received by their jobq mailbox channel one at a time, each job however can be scheduled to be executed periodically or at an specific time. Putting all together creating a package that can execute tasks in ligh thread of executions or light threadpool without sacrificing resources, internal state sharing and syncing with mutex brings us something called Actor which can execute tasks one at a time in the actor runtime light threadpool by sending message to it.

### **1. Workers and Threads**:
A **worker** typically refers to an execution unit (or process) that handles jobs or tasks. Workers can be implemented using **threads** or **processes** depending on the underlying concurrency model.
Expand Down Expand Up @@ -261,4 +261,101 @@ Actix is widely used in building **high-performance, low-latency systems** such
5. **Concurrency**: Non-blocking, asynchronous execution via Rust's async runtime.
6. **Lightweight Tasks**: Efficient use of system resources, enabling the creation of many actors with minimal overhead.
7. **Supervision**: Fault tolerance via supervisors that monitor and restart actors.
8. **Performance**: Actix is designed for high throughput and low latency, making it ideal for real-time and concurrent systems.
8. **Performance**: Actix is designed for high throughput and low latency, making it ideal for real-time and concurrent systems.

In the context of **actor-based systems** like **Actix** (or other actor frameworks), **actors** are lightweight entities that handle tasks by receiving and processing messages asynchronously. Here's a clearer explanation, tying together the concept of **lightweight threads** (or tasks) and how the **actor runtime** manages their scheduling:

### **Actors as Lightweight Tasks (or Threads)**
- **Actors** can be thought of as **lightweight tasks** or **green threads** that are scheduled by the **actor runtime**. They are **not full-fledged OS threads** (which can be more resource-heavy and involve expensive context switches), but instead are smaller, more efficient units of execution.

- These **lightweight tasks** (sometimes called **fibers** or **coroutines**) are more resource-efficient than OS threads because they run within a single or few OS threads, multiplexing many actors over a small number of system threads. This allows a large number of actors to run concurrently.

- **Actix** uses **async/await** in Rust, allowing it to leverage Rust's async runtime (e.g., **Tokio** or **async-std**) for scheduling these lightweight tasks. This means that many actors can be created and run concurrently, making it possible to efficiently scale the system while keeping the resource footprint low.

### **Actor Runtime and Task Scheduling**
- The **actor runtime** (like the Actix system) is responsible for **scheduling and managing the execution of actors**. The actor runtime handles several important tasks:
1. **Message Delivery**: Actors communicate by sending messages to each other. The actor runtime ensures that messages are delivered to the correct actor.
2. **Task Scheduling**: Each actor processes one message at a time in its own **single-threaded context**. The runtime schedules when an actor should wake up and process the next message in its queue. This is done without blocking other actors, thanks to the non-blocking and asynchronous nature of the actor model.
3. **Multiplexing Actors**: The runtime efficiently multiplexes many actors over a few OS threads. This way, a single physical thread can handle the work of hundreds or even thousands of actors, switching between actors when they are ready to process the next message.
4. **Concurrency Without Shared State**: Since actors don’t share memory and each actor processes messages one at a time, there’s no need for locks or synchronization mechanisms (like mutexes). The runtime manages message queues for each actor, ensuring messages are processed in the correct order.

### **Message Passing and Asynchronous Execution**
- In actor systems, the **messages** that actors send to each other are how work is distributed. Instead of directly calling functions or manipulating shared state, actors interact by **passing messages**. The runtime ensures that each actor processes its incoming messages **asynchronously**.

- **Non-blocking execution**: When an actor receives a message, it can process it in a non-blocking way (using `async/await` in Rust). This means the actor doesn’t have to wait for long-running tasks to complete (such as I/O operations), allowing the runtime to switch to another actor and keep the system responsive.

### **Actors as Single-Threaded Execution Units**
- **Single-threaded execution**: Each actor processes **only one message at a time**. This may seem like a limitation, but it is a deliberate design choice that simplifies concurrency. Since the actor can only process one message at a time, there’s no risk of race conditions within the actor itself. It ensures that all internal state remains consistent while processing a message.

- The runtime efficiently switches between actors when they have messages to process, ensuring high concurrency and throughput.

---

### **Putting It All Together**
In **Actix** (and actor systems in general):
- **Actors** are lightweight tasks that are not full OS threads, but instead **lightweight units of execution** that run within the context of a few OS threads.
- The **actor runtime** schedules these tasks (actors) by:
1. Handling **message delivery**.
2. Scheduling the actors for execution **asynchronously** when they receive messages.
3. Ensuring that actors process **one message at a time** (single-threaded), which avoids the need for locks or synchronization.
4. **Multiplexing** a large number of actors over a small number of OS threads, making the system **scalable and efficient**.

This design ensures that the system can handle **high concurrency and low-latency** workloads while keeping resource consumption low and simplifying concurrency management.

---

### **Example of Actix Actor Lifecycle and Message Handling**

```rust
use actix::prelude::*;

// Define an actor
struct MyActor;

impl Actor for MyActor {
type Context = Context<Self>;
}

// Define a message
#[derive(Message)]
#[rtype(result = "usize")]
struct MyMessage(String);

// Implement message handler
impl Handler<MyMessage> for MyActor {
type Result = usize;

fn handle(&mut self, msg: MyMessage, _: &mut Self::Context) -> Self::Result {
println!("Received message: {}", msg.0);
msg.0.len() // Return the length of the message string
}
}

#[actix_rt::main]
async fn main() {
// Start an instance of the actor
let actor = MyActor.start();

// Send a message asynchronously
let result = actor.send(MyMessage("Hello, Actix!".to_string())).await;

// Output the result (length of the message string)
match result {
Ok(res) => println!("Message length: {}", res),
Err(err) => println!("Error handling message: {:?}", err),
}
}
```

### **Key Concepts in the Example**:
- The `MyActor` actor processes **one message at a time** (each message of type `MyMessage`).
- The actor’s runtime manages the **scheduling of tasks** and the **delivery of messages**.
- The message is processed **asynchronously**, without blocking, allowing other actors to work concurrently.

---

### **Summary**
- **Actors** in Actix are **lightweight tasks** that are scheduled by the **actor runtime** to process messages.
- The actor runtime **schedules these tasks** efficiently over a pool of OS threads, ensuring high concurrency with minimal resource usage.
- Since actors are **single-threaded**, they process **one message at a time**, which ensures consistency and simplifies concurrency management.
- This model allows Actix to handle **large-scale concurrent systems** efficiently, using Rust's powerful async capabilities for non-blocking execution.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Loading

0 comments on commit db73cdd

Please sign in to comment.