Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Redis message listener #1826

Merged
merged 21 commits into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions apps/framework-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ sha2 = "0.10.8"
hex = "0.4.2"
constant_time_eq = "0.3.0"
tokio-cron-scheduler = "0.11.0"
tokio-stream = "0.1.16"
indexmap = "2.5.0"
redis = { version = "0.24.0", features = ["tokio-comp", "aio", "tokio-native-tls-comp"] }
jsonwebtoken = "9.3.0"
Expand Down
141 changes: 110 additions & 31 deletions apps/framework-cli/src/cli/routines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,14 @@
//! - Organize routines better in the file hiearchy
//!

use crate::infrastructure::redis::redis_client::RedisClient;
use std::collections::{HashMap, HashSet};
use std::ops::DerefMut;
use std::path::PathBuf;
use std::sync::Arc;

use log::{debug, error, info};
use tokio::sync::RwLock; // or use the appropriate error type
use tokio::sync::{Mutex, RwLock};
use tokio::time::{interval, Duration};

use crate::cli::watcher::{
Expand All @@ -112,7 +113,6 @@ use crate::infrastructure::processes::cron_registry::CronRegistry;
use crate::infrastructure::processes::functions_registry::FunctionProcessRegistry;
use crate::infrastructure::processes::kafka_clickhouse_sync::SyncingProcessesRegistry;
use crate::infrastructure::processes::process_registry::ProcessRegistries;
use crate::infrastructure::redis::redis_client::RedisClient;
use crate::infrastructure::stream::redpanda::fetch_topics;
use crate::project::Project;

Expand Down Expand Up @@ -268,20 +268,86 @@ impl RoutineController {
}
}

async fn setup_redis_client(project: Arc<Project>) -> anyhow::Result<RedisClient> {
let mut redis_client = RedisClient::new(project.name(), project.redis_config.clone()).await?;
async fn setup_redis_client(project: Arc<Project>) -> anyhow::Result<Arc<Mutex<RedisClient>>> {
let redis_client = RedisClient::new(project.name(), project.redis_config.clone()).await?;
let redis_client = Arc::new(Mutex::new(redis_client));

let (service_name, instance_id) = {
let client = redis_client.lock().await;
(
client.get_service_name().to_string(),
client.get_instance_id().to_string(),
)
};

show_message!(
callicles marked this conversation as resolved.
Show resolved Hide resolved
MessageType::Info,
Message {
action: "Node Id:".to_string(),
details: format!("{}::{}", service_name, instance_id),
}
);

// Register the leadership lock
redis_client.register_lock("leadership", 10).await?;
redis_client
.lock()
.await
.register_lock("leadership", 10)
.await?;

// Start the leadership lock management task
start_leadership_lock_task(redis_client.clone(), project.clone());

redis_client.start_periodic_tasks();
let redis_client_clone = redis_client.clone();
let callback = Arc::new(move |message: String| {
let redis_client = redis_client_clone.clone();
tokio::spawn(async move {
if let Err(e) = process_pubsub_message(message, redis_client).await {
error!("Error processing pubsub message: {}", e);
}
});
});

redis_client
.lock()
.await
.register_message_handler(callback)
.await;
redis_client.lock().await.start_periodic_tasks();

Ok(redis_client)
}

fn start_leadership_lock_task(redis_client: RedisClient, project: Arc<Project>) {
async fn process_pubsub_message(
message: String,
redis_client: Arc<Mutex<RedisClient>>,
) -> anyhow::Result<()> {
let has_lock = {
let client = redis_client.lock().await;
client.has_lock("leadership").await?
};

if has_lock {
if message.contains("<migration_start>") {
info!("<Routines> This instance is the leader so ignoring the Migration start message: {}", message);
} else if message.contains("<migration_end>") {
info!("<Routines> This instance is the leader so ignoring the Migration end message received: {}", message);
} else {
info!(
"<Routines> This instance is the leader and received pubsub message: {}",
message
);
}
} else {
info!(
"<Routines> This instance is not the leader and received pubsub message: {}",
message
);
}
Ok(())
}

fn start_leadership_lock_task(redis_client: Arc<Mutex<RedisClient>>, project: Arc<Project>) {
tokio::spawn(async move {
let mut interval = interval(Duration::from_secs(5)); // Adjust the interval as needed
loop {
Expand All @@ -294,27 +360,35 @@ fn start_leadership_lock_task(redis_client: RedisClient, project: Arc<Project>)
}

async fn manage_leadership_lock(
redis_client: &RedisClient,
redis_client: &Arc<Mutex<RedisClient>>,
project: &Arc<Project>,
) -> Result<(), anyhow::Error> {
match redis_client.has_lock("leadership").await? {
true => {
// We have the lock, renew it
redis_client.renew_lock("leadership").await?;
}
false => {
// We don't have the lock, try to acquire it
if redis_client.attempt_lock("leadership").await? {
info!("Obtained leadership lock, performing leadership tasks");
let project_clone = project.clone();
tokio::spawn(async move {
if let Err(e) = leadership_tasks(project_clone).await {
error!("Error executing leadership tasks: {}", e);
}
});
} else {
debug!("Failed to obtain leadership lock");
}
let has_lock = {
let client = redis_client.lock().await;
client.has_lock("leadership").await?
};

if has_lock {
// We have the lock, renew it
let client = redis_client.lock().await;
client.renew_lock("leadership").await?;
} else {
// We don't have the lock, try to acquire it
let acquired_lock = {
let client = redis_client.lock().await;
client.attempt_lock("leadership").await?
};
if acquired_lock {
let mut client = redis_client.lock().await;
client.broadcast_message("<new_leader>").await?;

info!("Obtained leadership lock, performing leadership tasks");
let project_clone = project.clone();
tokio::spawn(async move {
if let Err(e) = leadership_tasks(project_clone).await {
error!("Error executing leadership tasks: {}", e);
}
});
}
}
Ok(())
Expand All @@ -341,7 +415,7 @@ pub async fn start_development_mode(
}
);

let mut redis_client = setup_redis_client(project.clone()).await?;
let redis_client = setup_redis_client(project.clone()).await?;

let server_config = project.http_server_config.clone();
let web_server = Webserver::new(
Expand Down Expand Up @@ -530,7 +604,10 @@ pub async fn start_development_mode(
.await;
};

let _ = redis_client.stop_periodic_tasks();
{
let mut redis_client = redis_client.lock().await;
let _ = redis_client.stop_periodic_tasks();
}

Ok(())
}
Expand All @@ -553,8 +630,7 @@ pub async fn start_production_mode(
panic!("Crashing for testing purposes");
}

let mut redis_client = setup_redis_client(project.clone()).await?;
info!("Redis client initialized");
let redis_client = setup_redis_client(project.clone()).await?;
let server_config = project.http_server_config.clone();
info!("Server config: {:?}", server_config);
let web_server = Webserver::new(
Expand Down Expand Up @@ -686,7 +762,10 @@ pub async fn start_production_mode(
.await;
}

let _ = redis_client.stop_periodic_tasks();
{
let mut redis_client = redis_client.lock().await;
let _ = redis_client.stop_periodic_tasks();
}

Ok(())
}
Expand Down
5 changes: 4 additions & 1 deletion apps/framework-cli/src/framework/core/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::infrastructure::redis::redis_client::RedisClient;
use clickhouse_rs::ClientHandle;
use std::sync::Arc;
use tokio::sync::mpsc::Sender;
use tokio::sync::Mutex;

use super::{infrastructure_map::ApiChange, plan::InfraPlan};
use crate::infrastructure::migration;
Expand Down Expand Up @@ -50,7 +51,7 @@ pub async fn execute_initial_infra_change(
api_changes_channel: Sender<ApiChange>,
metrics: Arc<Metrics>,
clickhouse_client: &mut ClientHandle,
redis_client: &RedisClient,
redis_client: &Arc<Mutex<RedisClient>>,
) -> Result<(SyncingProcessesRegistry, ProcessRegistries), ExecutionError> {
// This probably can be parallelized through Tokio Spawn
olap::execute_changes(project, &plan.changes.olap_changes).await?;
Expand Down Expand Up @@ -81,6 +82,8 @@ pub async fn execute_initial_infra_change(

// Check if this process instance has the "leadership" lock
if redis_client
.lock()
.await
.has_lock("leadership")
.await
.map_err(ExecutionError::LeadershipCheckFailed)?
Expand Down
Loading
Loading