Skip to content

Commit

Permalink
complete consuming RMQ data over WS streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
wildonion committed Jul 10, 2024
1 parent f31ef07 commit fdf7f98
Show file tree
Hide file tree
Showing 9 changed files with 273 additions and 192 deletions.
Binary file modified infra/.DS_Store
Binary file not shown.
61 changes: 51 additions & 10 deletions infra/api.http.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"info": {
"_postman_id": "537baa22-afc3-4f65-b231-50572dcd53eb",
"name": "hoopoe",
"name": "HOOPOE",
"schema": "https://schema.getpostman.com/json/collection/v2.1.0/collection.json",
"_exporter_id": "22927035",
"_collection_link": "https://dewoloper.postman.co/workspace/dewo~9f34982c-dde5-4f77-9d5d-46872ed07d4a/collection/22927035-537baa22-afc3-4f65-b231-50572dcd53eb?action=share&source=collection_link&creator=22927035"
Expand All @@ -18,18 +18,51 @@
"name": "Notif",
"item": [
{
"name": "Register Notif",
"name": "Register Notif (Produce)",
"request": {
"method": "POST",
"header": [
{
"key": "x-api-key",
"value": "124kUmD39YUwJRKxaQqTkNzcoVW4P1ZqZ3ahiJWVQqs89bope7GtqU881rWpMyVXskyn2PWsp"
"value": "124kUmD39YUwJRKxaQqTkJqRfrAZotpYnrWJc2x9fdibFWHPdRFcGCToDWAqpyM9hcD23JvTi"
}
],
"body": {
"mode": "raw",
"raw": "{\n \"producer_info\": {\n \"info\":{\n \"local_spawn\": true,\n \"notif_data\": { \n \"receiver_info\": \"1\",\n \"id\": \"unqie-id0\",\n \"action_data\": {\n \"pid\": 200.4\n }, \n \"actioner_info\": \"2\", \n \"action_type\": \"ProductPurchased\", \n \"fired_at\": 1714316645, \n \"is_seen\": false\n }, \n \"exchange_name\": \"SavageEx\",\n \"exchange_type\": \"fanout\", // amq.topic for pubsub\n \"routing_key\": \"\" // routing pattern or key - will be ignored if type is fanout\n }\n },\n // \"producer_info\": null,\n \"consumer_info\": null\n // \"consumer_info\": {\n // \"info\": {\n // \"queue\": \"TestOnion\",\n // \"exchange_name\": \"SavageEx\",\n // \"routing_key\": \"\",\n // \"tag\": \"cons_tag0\",\n // \"redis_cache_exp\": 300,\n // \"local_spawn\": true,\n // \"cache_on_redis\": true,\n // \"store_in_db\": true\n // }\n // }\n}",
"raw": "{\n \"producer_info\": {\n \"info\":{\n \"local_spawn\": true, // spawn the task into the actor thread itself\n \"notif_data\": { \n \"receiver_info\": \"1\",\n \"id\": \"unqie-id0\",\n \"action_data\": {\n \"pid\": 200.4\n }, \n \"actioner_info\": \"2\", \n \"action_type\": \"ProductPurchased\", \n \"fired_at\": 1714316645, \n \"is_seen\": false\n }, \n \"exchange_name\": \"SavageEx\",\n \"exchange_type\": \"fanout\", // amq.topic for pubsub\n \"routing_key\": \"\", // routing pattern or key - will be ignored if type is fanout\n \"encryption_config\": { // this can be null\n \"secret\": \"wildonion@1234\",\n \"passphrase\": \"wildonion@12345\",\n \"unique_redis_id\": \"notif_unique_redis_id\" // !!!!must be unique for every new notif!!!!\n }\n }\n },\n \"consumer_info\": null\n}",
"options": {
"raw": {
"language": "json"
}
}
},
"url": {
"raw": "{{events-v1}}/notif/",
"host": [
"{{events-v1}}"
],
"path": [
"notif",
""
]
},
"description": "if you want to produce notif into the RMQ channel put `consumer_info: null` and fill the `producer_info`, to consume notifs just put `producer_info: null` and fill the `consumer_info`"
},
"response": []
},
{
"name": "Register Notif (Consume)",
"request": {
"method": "POST",
"header": [
{
"key": "x-api-key",
"value": "124kUmD39YUwJRKxaQqTkJqRfrAZotpYnrWJc2x9fdibFWHPdRFcGCToDWAqpyM9hcD23JvTi"
}
],
"body": {
"mode": "raw",
"raw": "{\n \"producer_info\": null,\n \"consumer_info\": {\n \"info\": {\n \"queue\": \"TestOnion\",\n \"exchange_name\": \"SavageEx\",\n \"routing_key\": \"\",\n \"tag\": \"cons_tag0\",\n \"redis_cache_exp\": 300,\n \"local_spawn\": true, // spawn the task into the actor thread itself\n \"cache_on_redis\": true,\n \"store_in_db\": true,\n \"decryption_config\": { // this can be null\n \"secret\": \"wildonion@1234\",\n \"passphrase\": \"wildonion@12345\",\n \"unique_redis_id\": \"notif_unique_redis_id\" // !!!!must be unique for every new notif!!!!\n }\n }\n }\n}",
"options": {
"raw": {
"language": "json"
Expand Down Expand Up @@ -203,11 +236,12 @@
}
},
"url": {
"raw": "{{health-v1}}/?pid=1",
"raw": "{{health-v1}}/atomic-mint/?pid=1",
"host": [
"{{health-v1}}"
],
"path": [
"atomic-mint",
""
],
"query": [
Expand Down Expand Up @@ -238,9 +272,12 @@
}
},
"url": {
"raw": "{{health-v1}}",
"raw": "{{health-v1}}/check",
"host": [
"{{health-v1}}"
],
"path": [
"check"
]
}
},
Expand All @@ -262,10 +299,14 @@
}
],
"url": {
"raw": "{{auth-v1}}?exp=4000&scope=write",
"raw": "{{auth-v1}}/generate-access-token/?exp=4000&scope=write",
"host": [
"{{auth-v1}}"
],
"path": [
"generate-access-token",
""
],
"query": [
{
"key": "exp",
Expand Down Expand Up @@ -308,17 +349,17 @@
"variable": [
{
"key": "events-v1",
"value": "http://localhost:2344/v1/events/",
"value": "http://localhost:2344/v1/events",
"type": "string"
},
{
"key": "auth-v1",
"value": "http://localhost:2344/v1/auth/",
"value": "http://localhost:2344/v1/auth",
"type": "string"
},
{
"key": "health-v1",
"value": "http://localhost:2344/v1/health/",
"value": "http://localhost:2344/v1/health",
"type": "string"
},
{
Expand Down
Binary file modified infra/arch.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
1 change: 1 addition & 0 deletions scripts/setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ sudo apt install -y snapd && sudo snap install core; sudo snap refresh core
sudo snap install --classic certbot && sudo ln -s /snap/bin/certbot /usr/bin/certbot
cargo install sqlant && sudo apt install -y openjdk-11-jdk && sudo apt install -y graphviz

# use sqlant and plantuml to extract erd from any db
sqlant postgresql://postgres:$PASSWORD@localhost/hoopoe > $(pwd)/infra/hoopoe.uml
java -jar $(pwd)/infra/plantuml.jar $(pwd)/infra/hoopoe.uml

Expand Down
4 changes: 2 additions & 2 deletions src/apis/v1/ws/notif.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::*;
the client to fetch notifs for an owner in a short polling manner
this way is used to fetch all notifs for an owern in realtime as
they're receiving by the RMQ consumer.
addr: localhost:2344/v1/stream/notif/?owner=100&room=notif_room
addr: localhost:2344/v1/stream/notif/consume/?owner=100&room=notif_room
*/
#[handler]
async fn consume(
Expand Down Expand Up @@ -50,7 +50,7 @@ async fn consume(
.upgrade(req, res, |ws| async move{
// spawn the async task of handling websocket session inside a lightweight thread
tokio::spawn(async move{
HoopoeWsServer::session_handler(ws, notif_broker_receiver, notif_query, zerlog_producer_actor).await;
HoopoeWsServer::session_handler(ws, notif_query, notif_broker_receiver, zerlog_producer_actor).await;
});
})
.await
Expand Down
4 changes: 3 additions & 1 deletion src/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ pub struct AppContext{

impl AppContext{

// initializing everything once!
pub async fn init() -> Self{

let app_storage = Storage::new().await;
Expand All @@ -85,6 +86,7 @@ impl AppContext{
Channels{
notif_broker: NotifMpscChannel{
sender: notif_broker_tx.clone(),
// making the receiver safe to be shared multiple times and mutated
receiver: std::sync::Arc::new(
tokio::sync::Mutex::new(
notif_broker_rx
Expand Down Expand Up @@ -131,7 +133,7 @@ impl AppContext{
channels,
app_storage: app_storage.clone(),
actors: Some(actor_instances),
kvan: std::sync::Arc::new( // an in memory, mutable and safe map which can be shared across threads
kvan: std::sync::Arc::new( // an in memory, mutable and safe binary tree map which can be shared across threads
tokio::sync::Mutex::new(
BTreeMap::new()
)
Expand Down
Loading

0 comments on commit fdf7f98

Please sign in to comment.