From 3d9230e4d7d6e6f9cc5b85b3d0f135c04a234760 Mon Sep 17 00:00:00 2001 From: Carlos Justiniano Date: Tue, 24 Sep 2024 18:06:46 -0600 Subject: [PATCH] Revert "Adding Redis support (wip) (#1755)" (#1774) This reverts commit d76cbb46522e93c51638892b827910ebe5f85141. --- Cargo.lock | 48 +- apps/framework-cli/Cargo.toml | 1 - apps/framework-cli/src/cli.rs | 15 +- apps/framework-cli/src/cli/routines.rs | 13 +- apps/framework-cli/src/infrastructure.rs | 1 - .../framework-cli/src/infrastructure/redis.rs | 1 - .../src/infrastructure/redis/redis_client.rs | 541 ------------------ .../src/utilities/docker-compose.yml | 13 +- 8 files changed, 17 insertions(+), 616 deletions(-) delete mode 100644 apps/framework-cli/src/infrastructure/redis.rs delete mode 100644 apps/framework-cli/src/infrastructure/redis/redis_client.rs diff --git a/Cargo.lock b/Cargo.lock index 0f3331a48..4b5cda14c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -489,11 +489,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba5a308b75df32fe02788e748662718f03fde005016435c444eea572398219fd" dependencies = [ "bytes", - "futures-core", "memchr", - "pin-project-lite", - "tokio", - "tokio-util", ] [[package]] @@ -1380,7 +1376,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2 0.5.7", + "socket2", "tokio", "tower-service", "tracing", @@ -1467,7 +1463,7 @@ dependencies = [ "http-body 1.0.1", "hyper 1.4.1", "pin-project-lite", - "socket2 0.5.7", + "socket2", "tokio", "tower", "tower-service", @@ -1967,7 +1963,6 @@ dependencies = [ "prometheus-parse", "ratatui", "rdkafka", - "redis", "regex", "reqwest 0.12.5", "rustpython-parser", @@ -2710,27 +2705,6 @@ dependencies = [ "pkg-config", ] -[[package]] -name = "redis" -version = "0.23.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4f49cdc0bb3f412bf8e7d1bd90fe1d9eb10bc5c399ba90973c14662a27b3f8ba" -dependencies = [ - "async-trait", - "bytes", - "combine", - "futures-util", - "itoa", - "percent-encoding", - "pin-project-lite", - "ryu", - "sha1_smol", - "socket2 0.4.10", - "tokio", - "tokio-util", - "url", -] - [[package]] name = "redox_syscall" version = "0.4.1" @@ -3332,12 +3306,6 @@ dependencies = [ "syn 2.0.72", ] -[[package]] -name = "sha1_smol" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbfa15b3dddfee50a0fff136974b3e1bde555604ba463834a7eb7deb6417705d" - [[package]] name = "sha2" version = "0.10.8" @@ -3400,16 +3368,6 @@ version = "1.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" -[[package]] -name = "socket2" -version = "0.4.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f7916fc008ca5542385b89a3d3ce689953c143e9304a9bf8beec1de48994c0d" -dependencies = [ - "libc", - "winapi", -] - [[package]] name = "socket2" version = "0.5.7" @@ -3681,7 +3639,7 @@ dependencies = [ "parking_lot", "pin-project-lite", "signal-hook-registry", - "socket2 0.5.7", + "socket2", "tokio-macros", "windows-sys 0.52.0", ] diff --git a/apps/framework-cli/Cargo.toml b/apps/framework-cli/Cargo.toml index f95f70d09..1bb2ff90b 100644 --- a/apps/framework-cli/Cargo.toml +++ b/apps/framework-cli/Cargo.toml @@ -78,7 +78,6 @@ sha2 = "0.10.8" hex = "0.4.2" constant_time_eq = "0.3.0" tokio-cron-scheduler = "0.11.0" -redis = { version = "0.23.3", features = ["aio", "tokio-comp"] } indexmap = "2.5.0" [dev-dependencies] diff --git a/apps/framework-cli/src/cli.rs b/apps/framework-cli/src/cli.rs index 780b1000e..4c8331cd4 100644 --- a/apps/framework-cli/src/cli.rs +++ b/apps/framework-cli/src/cli.rs @@ -334,10 +334,9 @@ async fn top_command_handler( arc_metrics.start_listening_to_metrics(rx).await; check_project_name(&project_arc.name())?; - let project_arc_clone = Arc::clone(&project_arc); - run_local_infrastructure(&project_arc_clone)?.show(); + run_local_infrastructure(&project_arc)?.show(); - routines::start_development_mode(project_arc_clone, &settings.features, arc_metrics) + routines::start_development_mode(project_arc, &settings.features, arc_metrics) .await .map_err(|e| { RoutineFailure::error(Message { @@ -514,13 +513,9 @@ async fn top_command_handler( check_project_name(&project_arc.name())?; - routines::start_production_mode( - Arc::clone(&project_arc), - settings.features, - arc_metrics, - ) - .await - .unwrap(); + routines::start_production_mode(project_arc, settings.features, arc_metrics) + .await + .unwrap(); wait_for_usage_capture(capture_handle).await; diff --git a/apps/framework-cli/src/cli/routines.rs b/apps/framework-cli/src/cli/routines.rs index dcb6d695e..85d3a725c 100644 --- a/apps/framework-cli/src/cli/routines.rs +++ b/apps/framework-cli/src/cli/routines.rs @@ -110,7 +110,6 @@ use crate::infrastructure::processes::consumption_registry::ConsumptionProcessRe use crate::infrastructure::processes::functions_registry::FunctionProcessRegistry; use crate::infrastructure::processes::kafka_clickhouse_sync::SyncingProcessesRegistry; use crate::infrastructure::processes::process_registry::ProcessRegistries; -use crate::infrastructure::redis::redis_client::RedisClient; use crate::infrastructure::stream::redpanda::fetch_topics; use crate::project::Project; @@ -172,7 +171,7 @@ impl RoutineSuccess { } pub fn show(&self) { - show_message!(self.message_type, self.message.clone()); + show_message!(self.message_type, self.message); } } @@ -280,9 +279,6 @@ pub async fn start_development_mode( } ); - let mut redis_client = RedisClient::new(&project.name()).await?; - redis_client.start_periodic_tasks(); - let server_config = project.http_server_config.clone(); let web_server = Webserver::new( server_config.host.clone(), @@ -468,8 +464,6 @@ pub async fn start_development_mode( .await; }; - let _ = redis_client.stop_periodic_tasks(); - Ok(()) } @@ -487,9 +481,6 @@ pub async fn start_production_mode( } ); - let mut redis_client = RedisClient::new(&project.name()).await?; - redis_client.start_periodic_tasks(); - let server_config = project.http_server_config.clone(); let web_server = Webserver::new( server_config.host.clone(), @@ -616,8 +607,6 @@ pub async fn start_production_mode( .await; } - let _ = redis_client.stop_periodic_tasks(); - Ok(()) } diff --git a/apps/framework-cli/src/infrastructure.rs b/apps/framework-cli/src/infrastructure.rs index 27fc08cf1..50eeb2f5e 100644 --- a/apps/framework-cli/src/infrastructure.rs +++ b/apps/framework-cli/src/infrastructure.rs @@ -19,5 +19,4 @@ pub mod ingest; pub mod migration; pub mod olap; pub mod processes; -pub mod redis; pub mod stream; diff --git a/apps/framework-cli/src/infrastructure/redis.rs b/apps/framework-cli/src/infrastructure/redis.rs deleted file mode 100644 index 5c6d1e87d..000000000 --- a/apps/framework-cli/src/infrastructure/redis.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod redis_client; diff --git a/apps/framework-cli/src/infrastructure/redis/redis_client.rs b/apps/framework-cli/src/infrastructure/redis/redis_client.rs deleted file mode 100644 index 0080fa274..000000000 --- a/apps/framework-cli/src/infrastructure/redis/redis_client.rs +++ /dev/null @@ -1,541 +0,0 @@ -//! Redis Client Module -//! -//! This module provides a Redis client implementation with support for leader election, -//! presence updates, and message passing (Sending) and message queuing. -//! -//! # Example Usage -//! -//! ```rust -//! use your_crate_name::infrastructure::redis::RedisClient; -//! -//! #[tokio::main] -//! async fn main() -> Result<(), Box> { -//! // Initialize the Redis client -//! let mut client = RedisClient::new("my_service").await?; -//! -//! // Start periodic tasks (presence updates and lock renewal) -//! client.start_periodic_tasks().await; -//! -//! // Attempt to gain leadership -//! let is_leader = client.attempt_leadership().await?; -//! println!("Is leader: {}", is_leader); -//! -//! // Send a message to another instance -//! client.send_message_to_instance("Hello", "target_instance_id").await?; -//! -//! // Broadcast a message to all instances -//! client.broadcast_message("Broadcast message").await?; -//! -//! // Post a message to the queue -//! client.post_queue_message("New task").await?; -//! -//! // Get a message from the queue -//! if let Some(message) = client.get_queue_message().await? { -//! println!("Received message: {}", message); -//! // Process the message... -//! client.mark_queue_message(&message, true).await?; -//! } -//! -//! // The client will automatically stop periodic tasks and release the lock (if leader) -//! // when it goes out of scope due to the Drop implementation -//! -//! Ok(()) -//! } -//! ``` -//! -//! Note: Make sure to set the REDIS_URL environment variable or the client will default to "redis://127.0.0.1:6379". -use anyhow::{Context, Result}; -use log::{error, info}; -use redis::aio::Connection as AsyncConnection; -use redis::AsyncCommands; -use redis::{Client, Script}; -use std::env; -use std::sync::Arc; -use std::time::{SystemTime, UNIX_EPOCH}; -use tokio::sync::Mutex; -use tokio::task::JoinHandle; -use tokio::time::{interval, Duration}; -use uuid::Uuid; - -const REDIS_KEY_PREFIX: &str = "MS"; // MooSe -const KEY_EXPIRATION_TTL: usize = 3; // 3 seconds -const LOCK_TTL: usize = 10; // 10 seconds -const PRESENCE_UPDATE_INTERVAL: u64 = 1; // 1 second -const LOCK_RENEWAL_INTERVAL: u64 = 3; // 3 seconds - -pub struct RedisClient { - connection: Arc>, - pub_sub: Arc>, - service_name: String, - instance_id: String, - lock_key: String, - is_leader: bool, - presence_task: Option>, - lock_renewal_task: Option>, -} - -impl RedisClient { - pub async fn new(service_name: &str) -> Result { - let redis_url = - env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string()); - let client = Client::open(redis_url.clone()).context("Failed to create Redis client")?; - let mut connection = client - .get_async_connection() - .await - .context("Failed to establish Redis connection")?; - let pub_sub = client - .get_async_connection() - .await - .context("Failed to establish Redis pub/sub connection")?; - let instance_id = Uuid::new_v4().to_string(); - let lock_key = format!("{}::{}::leader-lock", REDIS_KEY_PREFIX, service_name); - - info!( - "Initializing Redis client for {} with instance ID: {}", - service_name, instance_id - ); - - // Test Redis connection - match redis::cmd("PING") - .query_async::<_, String>(&mut connection) - .await - { - Ok(response) => info!("Redis connection successful: {}", response), - Err(e) => error!("Redis connection failed: {}", e), - } - - Ok(Self { - connection: Arc::new(Mutex::new(connection)), - pub_sub: Arc::new(Mutex::new(pub_sub)), - service_name: service_name.to_string(), - instance_id, - lock_key, - is_leader: false, - presence_task: None, - lock_renewal_task: None, - }) - } - - pub async fn presence_update(&mut self) -> Result<()> { - let key = format!( - "{}::{}::{}::presence", - REDIS_KEY_PREFIX, self.service_name, self.instance_id - ); - let now = SystemTime::now() - .duration_since(UNIX_EPOCH) - .context("Failed to get current time")? - .as_secs() - .to_string(); - self.connection - .lock() - .await - .set_ex(&key, &now, KEY_EXPIRATION_TTL) - .await - .context("Failed to update presence") - } - - pub async fn attempt_leadership(&mut self) -> Result { - let lock_key = self.lock_key.clone(); - let instance_id = self.instance_id.clone(); - - info!("Attempting leadership for {}", self.instance_id); - info!("First getting redis connection lock"); - - let result: bool = self - .connection - .lock() - .await - .set_nx(&lock_key, &instance_id) - .await - .context("Failed to attempt leadership")?; - - if result { - let _: () = self - .connection - .lock() - .await - .expire(&lock_key, LOCK_TTL) - .await - .context("Failed to set expiration on leadership lock")?; - } - - self.is_leader = result; - - if self.is_leader { - info!("Instance {} became leader", self.instance_id); - self.renew_lock().await?; - } - - Ok(self.is_leader) - } - - pub async fn renew_lock(&mut self) -> Result { - let extended: bool = self - .connection - .lock() - .await - .expire(&self.lock_key, LOCK_TTL) - .await - .context("Failed to renew leadership lock")?; - - if !extended { - info!("Failed to extend leader lock, lost leadership"); - self.is_leader = false; - } - - Ok(extended) - } - - pub async fn release_lock(&mut self) -> Result<()> { - let script = Script::new( - r" - if redis.call('get', KEYS[1]) == ARGV[1] then - return redis.call('del', KEYS[1]) - else - return 0 - end - ", - ); - - let _: () = script - .key(&self.lock_key) - .arg(&self.instance_id) - .invoke_async(&mut *self.connection.lock().await) - .await - .context("Failed to release leadership lock")?; - - info!("Instance {} released leadership", self.instance_id); - self.is_leader = false; - - Ok(()) - } - - pub fn is_current_leader(&self) -> bool { - self.is_leader - } - - pub fn get_instance_id(&self) -> &str { - &self.instance_id - } - - pub fn get_service_name(&self) -> &str { - &self.service_name - } - - pub async fn send_message_to_instance( - &self, - message: &str, - target_instance_id: &str, - ) -> Result<()> { - let channel = format!( - "{}::{}::{}::msgchannel", - REDIS_KEY_PREFIX, self.service_name, target_instance_id - ); - let _: () = self - .pub_sub - .lock() - .await - .publish(&channel, message) - .await - .context("Failed to send message to instance")?; - Ok(()) - } - - pub async fn broadcast_message(&mut self, message: &str) -> Result<()> { - let channel = format!("{}::{}::msgchannel", REDIS_KEY_PREFIX, self.service_name); - let _: () = self - .pub_sub - .lock() - .await - .publish(&channel, message) - .await - .context("Failed to broadcast message")?; - Ok(()) - } - - pub async fn get_queue_message(&self) -> Result> { - let source_queue = format!("{}::{}::mqrecieved", REDIS_KEY_PREFIX, self.service_name); - let destination_queue = format!("{}::{}::mqprocess", REDIS_KEY_PREFIX, self.service_name); - self.connection - .lock() - .await - .rpoplpush(&source_queue, &destination_queue) - .await - .context("Failed to get queue message") - } - - pub async fn post_queue_message(&self, message: &str) -> Result<()> { - let queue = format!("{}::{}::mqrecieved", REDIS_KEY_PREFIX, self.service_name); - let _: () = self - .connection - .lock() - .await - .rpush(&queue, message) - .await - .context("Failed to post queue message")?; - Ok(()) - } - - pub async fn mark_queue_message(&mut self, message: &str, success: bool) -> Result<()> { - let in_progress_queue = - format!("{}::{}::mqinprogress", REDIS_KEY_PREFIX, self.service_name); - let incomplete_queue = format!("{}::{}::mqincomplete", REDIS_KEY_PREFIX, self.service_name); - - if success { - let _: () = self - .connection - .lock() - .await - .lrem(&in_progress_queue, 0, message) - .await - .context("Failed to mark queue message as successful")?; - } else { - let mut pipeline = redis::pipe(); - pipeline - .lrem(&in_progress_queue, 0, message) - .rpush(&incomplete_queue, message); - - let _: () = pipeline - .query_async(&mut *self.connection.lock().await) - .await - .context("Failed to mark queue message as incomplete")?; - } - Ok(()) - } - - pub fn start_periodic_tasks(&mut self) { - info!("Starting periodic tasks"); - - self.presence_task = Some(tokio::spawn({ - let mut presence_client = self.clone(); - async move { - let mut interval = interval(Duration::from_secs(PRESENCE_UPDATE_INTERVAL)); - loop { - interval.tick().await; - if let Err(e) = presence_client.presence_update().await { - error!("Error updating presence: {}", e); - } - } - } - })); - - self.lock_renewal_task = Some(tokio::spawn({ - let mut lock_renewal_client = self.clone(); - async move { - let mut interval = interval(Duration::from_secs(LOCK_RENEWAL_INTERVAL)); - - // First attempt to gain leadership - match lock_renewal_client.attempt_leadership().await { - Ok(is_leader) => { - if is_leader { - info!("Successfully gained leadership"); - } else { - info!("Failed to gain leadership initially"); - } - } - Err(e) => error!("Error attempting leadership: {}", e), - } - - loop { - interval.tick().await; - if lock_renewal_client.is_current_leader() { - if let Err(e) = lock_renewal_client.renew_lock().await { - error!("Error renewing lock: {}", e); - } - } else { - // Attempt to gain leadership if not currently the leader - if let Err(e) = lock_renewal_client.attempt_leadership().await { - error!("Error attempting leadership: {}", e); - } - } - } - } - })); - - info!("Periodic tasks started"); - } - - pub fn stop_periodic_tasks(&mut self) -> Result<()> { - if let Some(task) = self.presence_task.take() { - task.abort(); - } - if let Some(task) = self.lock_renewal_task.take() { - task.abort(); - } - Ok(()) - } - - pub async fn check_connection(&mut self) -> Result<()> { - redis::cmd("PING") - .query_async::<_, String>(&mut *self.connection.lock().await) - .await - .context("Failed to ping Redis server")?; - Ok(()) - } -} - -impl Clone for RedisClient { - fn clone(&self) -> Self { - Self { - connection: Arc::clone(&self.connection), - pub_sub: Arc::clone(&self.pub_sub), - service_name: self.service_name.clone(), - instance_id: self.instance_id.clone(), - lock_key: self.lock_key.clone(), - is_leader: self.is_leader, - presence_task: None, - lock_renewal_task: None, - } - } -} - -impl Drop for RedisClient { - fn drop(&mut self) { - info!("RedisClient is being dropped"); - if let Ok(rt) = tokio::runtime::Handle::try_current() { - let mut self_clone = self.clone(); - rt.spawn(async move { - if let Err(e) = self_clone.stop_periodic_tasks() { - error!("Error stopping periodic tasks: {}", e); - } - if self_clone.is_current_leader() { - if let Err(e) = self_clone.release_lock().await { - error!("Error releasing lock: {}", e); - } - } - }); - } else { - error!("Failed to get current runtime handle in RedisClient::drop"); - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - use std::time::Duration; - use tokio; - #[tokio::test] - async fn test_redis_operations() { - let client = RedisClient::new("test_service") - .await - .expect("Failed to create Redis client"); - - // Test set and get - let _: () = client - .connection - .lock() - .await - .set("test_key", "test_value") - .await - .expect("Failed to set value"); - let result: Option = client - .connection - .lock() - .await - .get("test_key") - .await - .expect("Failed to get value"); - assert_eq!(result, Some("test_value".to_string())); - - // Test delete - let _: () = client - .connection - .lock() - .await - .del("test_key") - .await - .expect("Failed to delete key"); - let result: Option = client - .connection - .lock() - .await - .get("test_key") - .await - .expect("Failed to get value after deletion"); - assert_eq!(result, None); - - // Test set_ex - let _: () = client - .connection - .lock() - .await - .set_ex("test_ex_key", "test_ex_value", 1) - .await - .expect("Failed to set value with expiry"); - let result: Option = client - .connection - .lock() - .await - .get("test_ex_key") - .await - .expect("Failed to get value with expiry"); - assert_eq!(result, Some("test_ex_value".to_string())); - tokio::time::sleep(Duration::from_secs(2)).await; - let result: Option = client - .connection - .lock() - .await - .get("test_ex_key") - .await - .expect("Failed to get expired value"); - assert_eq!(result, None); - - // Test rpush and rpoplpush - let _: () = client - .connection - .lock() - .await - .rpush("source_list", "item1") - .await - .expect("Failed to push to list"); - let _: () = client - .connection - .lock() - .await - .rpush("source_list", "item2") - .await - .expect("Failed to push to list"); - let result: Option = client - .connection - .lock() - .await - .rpoplpush("source_list", "dest_list") - .await - .expect("Failed to rpoplpush"); - assert_eq!(result, Some("item2".to_string())); - - // Test lrem - let _: () = client - .connection - .lock() - .await - .lrem("dest_list", 0, "item2") - .await - .expect("Failed to remove from list"); - let result: Option = client - .connection - .lock() - .await - .rpoplpush("dest_list", "source_list") - .await - .expect("Failed to rpoplpush after lrem"); - assert_eq!(result, None); - - // Cleanup - let _: () = client - .connection - .lock() - .await - .del("dest_list") - .await - .expect("Failed to delete dest_list"); - let _: () = client - .connection - .lock() - .await - .del("source_list") - .await - .expect("Failed to delete source_list"); - } -} diff --git a/apps/framework-cli/src/utilities/docker-compose.yml b/apps/framework-cli/src/utilities/docker-compose.yml index c9ea5fbbe..608c7282f 100644 --- a/apps/framework-cli/src/utilities/docker-compose.yml +++ b/apps/framework-cli/src/utilities/docker-compose.yml @@ -3,23 +3,26 @@ volumes: clickhouse-0-data: null clickhouse-0-logs: null clickhouse-0-users: null - redis-data: null - redisinsight-data: null + redis-data: + redisinsight-data: services: redis: - image: redis:latest + image: redis:7-alpine ports: - "6379:6379" command: ["redis-server", "--appendonly", "yes"] volumes: - redis-data:/data redisinsight: - image: redislabs/redisinsight:latest + image: redislabs/redisinsight:1.14.0 ports: - - "5540:5540" + - "8001:8001" volumes: - redisinsight-data:/data + environment: + - REDISINSIGHT_HOST=0.0.0.0 + - REDISINSIGHT_PORT=8001 depends_on: - redis redpanda: