diff --git a/Cargo.lock b/Cargo.lock index dab7167e7..f2c8094ed 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1993,6 +1993,7 @@ dependencies = [ "thiserror", "tokio", "tokio-cron-scheduler", + "tokio-stream", "toml", "toml_edit 0.22.16", "uuid", @@ -3767,9 +3768,9 @@ dependencies = [ [[package]] name = "tokio-stream" -version = "0.1.15" +version = "0.1.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "267ac89e0bec6e691e5813911606935d77c476ff49024f98abcea3e7b15e37af" +checksum = "4f4e6ce100d0eb49a2734f8c0812bcd324cf357d21810932c5df6b96ef2b86f1" dependencies = [ "futures-core", "pin-project-lite", diff --git a/apps/framework-cli/Cargo.toml b/apps/framework-cli/Cargo.toml index 42ccd3b3a..b25f9142f 100644 --- a/apps/framework-cli/Cargo.toml +++ b/apps/framework-cli/Cargo.toml @@ -77,6 +77,7 @@ sha2 = "0.10.8" hex = "0.4.2" constant_time_eq = "0.3.0" tokio-cron-scheduler = "0.11.0" +tokio-stream = "0.1.16" indexmap = "2.5.0" redis = { version = "0.24.0", features = ["tokio-comp", "aio", "tokio-native-tls-comp"] } jsonwebtoken = "9.3.0" diff --git a/apps/framework-cli/src/cli/routines.rs b/apps/framework-cli/src/cli/routines.rs index 70f3edc79..f50d4b8d4 100644 --- a/apps/framework-cli/src/cli/routines.rs +++ b/apps/framework-cli/src/cli/routines.rs @@ -79,13 +79,14 @@ //! - Organize routines better in the file hiearchy //! +use crate::infrastructure::redis::redis_client::RedisClient; use std::collections::{HashMap, HashSet}; use std::ops::DerefMut; use std::path::PathBuf; use std::sync::Arc; use log::{debug, error, info}; -use tokio::sync::RwLock; // or use the appropriate error type +use tokio::sync::{Mutex, RwLock}; use tokio::time::{interval, Duration}; use crate::cli::watcher::{ @@ -112,7 +113,6 @@ use crate::infrastructure::processes::cron_registry::CronRegistry; use crate::infrastructure::processes::functions_registry::FunctionProcessRegistry; use crate::infrastructure::processes::kafka_clickhouse_sync::SyncingProcessesRegistry; use crate::infrastructure::processes::process_registry::ProcessRegistries; -use crate::infrastructure::redis::redis_client::RedisClient; use crate::infrastructure::stream::redpanda::fetch_topics; use crate::project::Project; @@ -268,20 +268,86 @@ impl RoutineController { } } -async fn setup_redis_client(project: Arc) -> anyhow::Result { - let mut redis_client = RedisClient::new(project.name(), project.redis_config.clone()).await?; +async fn setup_redis_client(project: Arc) -> anyhow::Result>> { + let redis_client = RedisClient::new(project.name(), project.redis_config.clone()).await?; + let redis_client = Arc::new(Mutex::new(redis_client)); + + let (service_name, instance_id) = { + let client = redis_client.lock().await; + ( + client.get_service_name().to_string(), + client.get_instance_id().to_string(), + ) + }; + + show_message!( + MessageType::Info, + Message { + action: "Node Id:".to_string(), + details: format!("{}::{}", service_name, instance_id), + } + ); // Register the leadership lock - redis_client.register_lock("leadership", 10).await?; + redis_client + .lock() + .await + .register_lock("leadership", 10) + .await?; // Start the leadership lock management task start_leadership_lock_task(redis_client.clone(), project.clone()); - redis_client.start_periodic_tasks(); + let redis_client_clone = redis_client.clone(); + let callback = Arc::new(move |message: String| { + let redis_client = redis_client_clone.clone(); + tokio::spawn(async move { + if let Err(e) = process_pubsub_message(message, redis_client).await { + error!("Error processing pubsub message: {}", e); + } + }); + }); + + redis_client + .lock() + .await + .register_message_handler(callback) + .await; + redis_client.lock().await.start_periodic_tasks(); + Ok(redis_client) } -fn start_leadership_lock_task(redis_client: RedisClient, project: Arc) { +async fn process_pubsub_message( + message: String, + redis_client: Arc>, +) -> anyhow::Result<()> { + let has_lock = { + let client = redis_client.lock().await; + client.has_lock("leadership").await? + }; + + if has_lock { + if message.contains("") { + info!(" This instance is the leader so ignoring the Migration start message: {}", message); + } else if message.contains("") { + info!(" This instance is the leader so ignoring the Migration end message received: {}", message); + } else { + info!( + " This instance is the leader and received pubsub message: {}", + message + ); + } + } else { + info!( + " This instance is not the leader and received pubsub message: {}", + message + ); + } + Ok(()) +} + +fn start_leadership_lock_task(redis_client: Arc>, project: Arc) { tokio::spawn(async move { let mut interval = interval(Duration::from_secs(5)); // Adjust the interval as needed loop { @@ -294,27 +360,35 @@ fn start_leadership_lock_task(redis_client: RedisClient, project: Arc) } async fn manage_leadership_lock( - redis_client: &RedisClient, + redis_client: &Arc>, project: &Arc, ) -> Result<(), anyhow::Error> { - match redis_client.has_lock("leadership").await? { - true => { - // We have the lock, renew it - redis_client.renew_lock("leadership").await?; - } - false => { - // We don't have the lock, try to acquire it - if redis_client.attempt_lock("leadership").await? { - info!("Obtained leadership lock, performing leadership tasks"); - let project_clone = project.clone(); - tokio::spawn(async move { - if let Err(e) = leadership_tasks(project_clone).await { - error!("Error executing leadership tasks: {}", e); - } - }); - } else { - debug!("Failed to obtain leadership lock"); - } + let has_lock = { + let client = redis_client.lock().await; + client.has_lock("leadership").await? + }; + + if has_lock { + // We have the lock, renew it + let client = redis_client.lock().await; + client.renew_lock("leadership").await?; + } else { + // We don't have the lock, try to acquire it + let acquired_lock = { + let client = redis_client.lock().await; + client.attempt_lock("leadership").await? + }; + if acquired_lock { + let mut client = redis_client.lock().await; + client.broadcast_message("").await?; + + info!("Obtained leadership lock, performing leadership tasks"); + let project_clone = project.clone(); + tokio::spawn(async move { + if let Err(e) = leadership_tasks(project_clone).await { + error!("Error executing leadership tasks: {}", e); + } + }); } } Ok(()) @@ -341,7 +415,7 @@ pub async fn start_development_mode( } ); - let mut redis_client = setup_redis_client(project.clone()).await?; + let redis_client = setup_redis_client(project.clone()).await?; let server_config = project.http_server_config.clone(); let web_server = Webserver::new( @@ -530,7 +604,10 @@ pub async fn start_development_mode( .await; }; - let _ = redis_client.stop_periodic_tasks(); + { + let mut redis_client = redis_client.lock().await; + let _ = redis_client.stop_periodic_tasks(); + } Ok(()) } @@ -553,8 +630,7 @@ pub async fn start_production_mode( panic!("Crashing for testing purposes"); } - let mut redis_client = setup_redis_client(project.clone()).await?; - info!("Redis client initialized"); + let redis_client = setup_redis_client(project.clone()).await?; let server_config = project.http_server_config.clone(); info!("Server config: {:?}", server_config); let web_server = Webserver::new( @@ -686,7 +762,10 @@ pub async fn start_production_mode( .await; } - let _ = redis_client.stop_periodic_tasks(); + { + let mut redis_client = redis_client.lock().await; + let _ = redis_client.stop_periodic_tasks(); + } Ok(()) } diff --git a/apps/framework-cli/src/framework/core/execute.rs b/apps/framework-cli/src/framework/core/execute.rs index 39d1a3bc1..a0a494de4 100644 --- a/apps/framework-cli/src/framework/core/execute.rs +++ b/apps/framework-cli/src/framework/core/execute.rs @@ -2,6 +2,7 @@ use crate::infrastructure::redis::redis_client::RedisClient; use clickhouse_rs::ClientHandle; use std::sync::Arc; use tokio::sync::mpsc::Sender; +use tokio::sync::Mutex; use super::{infrastructure_map::ApiChange, plan::InfraPlan}; use crate::infrastructure::migration; @@ -50,7 +51,7 @@ pub async fn execute_initial_infra_change( api_changes_channel: Sender, metrics: Arc, clickhouse_client: &mut ClientHandle, - redis_client: &RedisClient, + redis_client: &Arc>, ) -> Result<(SyncingProcessesRegistry, ProcessRegistries), ExecutionError> { // This probably can be parallelized through Tokio Spawn olap::execute_changes(project, &plan.changes.olap_changes).await?; @@ -81,6 +82,8 @@ pub async fn execute_initial_infra_change( // Check if this process instance has the "leadership" lock if redis_client + .lock() + .await .has_lock("leadership") .await .map_err(ExecutionError::LeadershipCheckFailed)? diff --git a/apps/framework-cli/src/infrastructure/redis/redis_client.rs b/apps/framework-cli/src/infrastructure/redis/redis_client.rs index 4c9fe6d66..970f29ddb 100644 --- a/apps/framework-cli/src/infrastructure/redis/redis_client.rs +++ b/apps/framework-cli/src/infrastructure/redis/redis_client.rs @@ -3,52 +3,12 @@ //! This module provides a Redis client implementation with support for leader election, //! presence updates, and message passing (Sending) and message queuing. //! -//! # Example Usage -//! -//! ```rust -//! use your_crate_name::infrastructure::redis::RedisClient; -//! -//! #[tokio::main] -//! async fn main() -> Result<(), Box> { -//! // Initialize the Redis client -//! let mut client = RedisClient::new("my_service").await?; -//! -//! // Start periodic tasks (presence updates and lock renewal) -//! client.start_periodic_tasks().await; -//! -//! // Attempt to gain leadership -//! let is_leader = client.attempt_leadership().await?; -//! println!("Is leader: {}", is_leader); -//! -//! // Send a message to another instance -//! client.send_message_to_instance("Hello", "target_instance_id").await?; -//! -//! // Broadcast a message to all instances -//! client.broadcast_message("Broadcast message").await?; -//! -//! // Post a message to the queue -//! client.post_queue_message("New task").await?; -//! -//! // Get a message from the queue -//! if let Some(message) = client.get_queue_message().await? { -//! println!("Received message: {}", message); -//! // Process the message... -//! client.mark_queue_message(&message, true).await?; -//! } -//! -//! // The client will automatically stop periodic tasks and release the lock (if leader) -//! // when it goes out of scope due to the Drop implementation -//! -//! Ok(()) -//! } -//! ``` -//! -//! Note: Make sure to set the MOOSE_REDIS_URL environment variable or the client will default to "redis://127.0.0.1:6379". +//! Note: Make sure to set the MOOSE_REDIS_URL environment variable or the client will +//! default to "redis://127.0.0.1:6379". use anyhow::{Context, Result}; -use log::{error, info}; -use redis::aio::Connection as AsyncConnection; -use redis::AsyncCommands; -use redis::{Client, Script}; +use log::{error, info, warn}; +use redis::aio::Connection as RedisConnection; +use redis::{AsyncCommands, Client, RedisError, Script}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::sync::Arc; @@ -56,12 +16,20 @@ use std::time::{SystemTime, UNIX_EPOCH}; use tokio::sync::Mutex; use tokio::task::JoinHandle; use tokio::time::{interval, Duration}; +use tokio_stream::StreamExt; use uuid::Uuid; // Internal constants that we don't expose to the user const KEY_EXPIRATION_TTL: u64 = 3; // 3 seconds const PRESENCE_UPDATE_INTERVAL: u64 = 1; // 1 second +// type alias for the callback function +use std::future::Future; +use std::pin::Pin; + +type MessageCallback = + Arc Pin + Send>> + Send + Sync>; + #[derive(Debug, Serialize, Deserialize, Clone)] pub struct RedisConfig { #[serde(default = "RedisConfig::default_url")] @@ -96,13 +64,15 @@ pub struct RedisLock { } pub struct RedisClient { - connection: Arc>, - pub_sub: Arc>, + connection: Arc>, + pub_sub: Arc>, redis_config: RedisConfig, service_name: String, instance_id: String, locks: HashMap, presence_task: Option>, + message_callbacks: Arc>>, + listener_task: Mutex>>, } impl RedisClient { @@ -136,7 +106,7 @@ impl RedisClient { Err(e) => error!("Redis connection failed: {}", e), } - Ok(Self { + let client = Self { connection: Arc::new(Mutex::new(connection)), pub_sub: Arc::new(Mutex::new(pub_sub)), redis_config, @@ -144,7 +114,23 @@ impl RedisClient { service_name, locks: HashMap::new(), presence_task: None, - }) + message_callbacks: Arc::new(Mutex::new(Vec::new())), + listener_task: Mutex::new(None), + }; + + // Start the message listener as part of initialization + match client.start_message_listener().await { + Ok(_) => info!("Successfully started message listener"), + Err(e) => error!("Failed to start message listener: {}", e), + } + + info!( + " Started {}::{}", + client.get_service_name(), + client.get_instance_id() + ); + + Ok(client) } pub async fn presence_update(&mut self) -> Result<()> { @@ -399,6 +385,90 @@ impl RedisClient { .context("Failed to ping Redis server")?; Ok(()) } + + pub async fn register_message_handler(&self, callback: Arc) { + self.message_callbacks + .lock() + .await + .push(Arc::new(move |message: String| { + let callback = Arc::clone(&callback); + Box::pin(async move { + (callback)(message); + }) as Pin + Send>> + })); + } + + async fn start_message_listener(&self) -> Result<(), RedisError> { + let instance_channel = format!( + "{}::{}::{}::msgchannel", + self.redis_config.key_prefix, self.service_name, self.instance_id + ); + let broadcast_channel = format!( + "{}::{}::msgchannel", + self.redis_config.key_prefix, self.service_name + ); + + info!( + " Listening for messages on channels: {} and {}", + instance_channel, broadcast_channel + ); + + // Create a separate PubSub connection for listening to messages + let client = match Client::open(self.redis_config.url.clone()) { + Ok(client) => client, + Err(e) => { + error!(" Failed to open Redis client: {}", e); + return Err(e); + } + }; + + let pubsub_conn = match client.get_async_connection().await { + Ok(conn) => conn, + Err(e) => { + error!( + " Failed to get async connection for PubSub: {}", + e + ); + return Err(e); + } + }; + + let mut pubsub = pubsub_conn.into_pubsub(); + + pubsub + .subscribe(&[&instance_channel, &broadcast_channel]) + .await + .map_err(|e| { + error!(" Failed to subscribe to channels: {}", e); + e + })?; + + let callback = self.message_callbacks.clone(); + + let listener_task = tokio::spawn(async move { + loop { + let mut pubsub_stream = pubsub.on_message(); + + while let Some(msg) = pubsub_stream.next().await { + if let Ok(payload) = msg.get_payload::() { + info!(" Received pubsub message: {}", payload); + let callbacks = callback.lock().await.clone(); + for cb in callbacks { + cb(payload.clone()).await; + } + } else { + warn!(" Failed to get payload from message"); + } + } + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + } + }); + + // Store the listener task + *self.listener_task.lock().await = Some(listener_task); + + Ok(()) + } } impl Clone for RedisClient { @@ -411,6 +481,8 @@ impl Clone for RedisClient { service_name: self.service_name.clone(), locks: self.locks.clone(), presence_task: None, + message_callbacks: Arc::clone(&self.message_callbacks), + listener_task: Mutex::new(None), } } } @@ -434,6 +506,11 @@ impl Drop for RedisClient { } else { error!("Failed to get current runtime handle in RedisClient::drop"); } + if let Ok(mut guard) = self.listener_task.try_lock() { + if let Some(task) = guard.take() { + task.abort(); + } + } } } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index fe30832a8..0201d64c0 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -897,6 +897,7 @@ packages: cpu: [arm64] os: [darwin] + '@514labs/moose-cli-darwin-arm64@0.3.672': resolution: {integrity: sha512-ArL0kpQcxDs+I9D47nG+0i3jaxZg196P5eYv1Api0V79z58bMXlPKylQYbOwFXrB59aJHBNZWlZA+dhvP2wdDw==} cpu: [arm64]