diff --git a/Cargo.toml b/Cargo.toml index 4443823a..2224d99a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -56,6 +56,10 @@ path = "src/service.rs" name = "watcher" path = "src/watcher.rs" +[[bin]] +name = "retry" +path = "src/retry.rs" + [[bench]] name = "user" harness = false diff --git a/README.md b/README.md index 4af59dd7..a4ba92e7 100644 --- a/README.md +++ b/README.md @@ -31,7 +31,8 @@ Nexus is composed of several core components: - **service.rs**: The REST API server for handling client requests, querying databases, and returning responses to the Pubky-App frontend. - **watcher.rs**: The event aggregator that listens to homeserver events, translating them into social graph updates within the Nexus databases. -- **lib.rs**: A library crate containing common functionalities shared by `service` and `watcher`, including database connectors, models, and queries. +- **retry.rs**: binary that retries events marked as failed by watcher. +- **lib.rs**: library crate with all of the common functionalities (connector, models, queries) needed for `watcher`, `service` and `retry` ### Data Flow @@ -54,17 +55,17 @@ To get started with Nexus, first set up the required databases: Neo4j and Redis. 1. Clone the repository and navigate to the project directory. 2. Copy the environment template and set up the Docker environment: - ```bash - cd docker - cp .env-sample .env - docker-compose up -d - ``` + ```bash + cd docker + cp .env-sample .env + docker-compose up -d + ``` 3. Populate the Neo4j database with initial data: - ```bash - docker exec neo4j bash /db-graph/run-queries.sh - ``` + ```bash + docker exec neo4j bash /db-graph/run-queries.sh + ``` Once the `Neo4j` graph database is seeded with data, the next step is to populate the `Redis` database by running the _nexus-service_ @@ -72,9 +73,10 @@ Once the `Neo4j` graph database is seeded with data, the next step is to populat 4. Run the Nexus service: - ```bash - cargo run - ``` + ```bash + cargo run + ``` + 5. **Access Redis and Neo4j UIs**: - Redis UI: [http://localhost:8001/redis-stack/browser](http://localhost:8001/redis-stack/browser) - Neo4J UI: [http://localhost:7474/browser/](http://localhost:7474/browser/) @@ -114,16 +116,16 @@ If tests or the development environment seem out of sync, follow these steps to 1. **Reset Neo4j**: - ```bash - docker exec neo4j bash -c "cypher-shell -u neo4j -p 12345678 'MATCH (n) DETACH DELETE n;'" - docker exec neo4j bash /db-graph/run-queries.sh - ``` + ```bash + docker exec neo4j bash -c "cypher-shell -u neo4j -p 12345678 'MATCH (n) DETACH DELETE n;'" + docker exec neo4j bash /db-graph/run-queries.sh + ``` 2. **Re-index Redis Cache**: - ```bash - REINDEX=true cargo run - ``` + ```bash + REINDEX=true cargo run + ``` ## 🌐 Useful Links diff --git a/src/config.rs b/src/config.rs index e0f1ce29..334e72b6 100644 --- a/src/config.rs +++ b/src/config.rs @@ -21,7 +21,7 @@ pub struct Config { pub homeserver_url: String, pub events_limit: u32, pub watcher_sleep: u64, - pub max_retries: u64, + pub max_retries: i32, } impl Config { diff --git a/src/events/mod.rs b/src/events/mod.rs index f4ca92cd..e103ad3a 100644 --- a/src/events/mod.rs +++ b/src/events/mod.rs @@ -1,14 +1,20 @@ -use crate::types::PubkyId; +use crate::{db::kv::index::sorted_sets::SortOrder, types::PubkyId, RedisOps}; +use chrono::Utc; use log::{debug, error}; use pubky::PubkyClient; +use serde::{Deserialize, Serialize}; +use std::error::Error; use uri::ParsedUri; pub mod handlers; pub mod processor; pub mod uri; -#[derive(Debug, Clone)] -enum ResourceType { +pub const EVENT_ERROR_PREFIX: &str = "error"; +pub const EVENT_RECOVERED_PREFIX: &str = "recovered"; + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub enum ResourceType { User { user_id: PubkyId, }, @@ -39,24 +45,126 @@ enum ResourceType { } // Look for the end pattern after the start index, or use the end of the string if not found -#[derive(Debug, Clone)] -enum EventType { +#[derive(Debug, Clone, Deserialize, Serialize)] +pub enum EventType { Put, Del, } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Deserialize, Serialize)] pub struct Event { uri: String, event_type: EventType, resource_type: ResourceType, - pubky_client: PubkyClient, +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct EventInfo { + event: Event, + created_at: i64, + attempts: i32, + last_attempt_at: Option, +} + +impl RedisOps for EventInfo {} + +impl EventInfo { + pub fn new(event: Event, created_at: i64, attempts: i32, last_attempt_at: Option) -> Self { + EventInfo { + event, + attempts, + created_at, + last_attempt_at, + } + } + + pub async fn retry( + mut self, + pubky_client: &PubkyClient, + max_retries: i32, + ) -> Result<(), Box> { + let event_uri = self.event.uri.as_str(); + if let Err(e) = self.clone().event.handle(pubky_client).await { + self.attempts += 1; + self.last_attempt_at = Some(Utc::now().timestamp_millis()); + error!( + "Error while handling retry of event {} with attempt {}: {}", + event_uri, self.attempts, e + ); + + if self.attempts > max_retries { + self.put_index_json(&[EVENT_ERROR_PREFIX, event_uri]) + .await?; + EventInfo::remove_from_index_multiple_json(&[&[event_uri]]).await?; + EventFailed::delete(&self).await?; + } else { + EventFailed::log(&self).await?; + } + } else { + self.put_index_json(&[EVENT_RECOVERED_PREFIX, event_uri]) + .await?; + EventInfo::remove_from_index_multiple_json(&[&[event_uri]]).await?; + EventFailed::delete(&self).await?; + } + Ok(()) + } + + pub fn get_attempts(self) -> i32 { + self.attempts + } +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct EventFailed {} +impl RedisOps for EventFailed {} + +impl EventFailed { + pub async fn log(info: &EventInfo) -> Result<(), Box> { + let score = if info.attempts == 0 { + info.created_at + } else { + info.created_at + ((2 ^ info.attempts) * 1000) as i64 + }; + EventFailed::put_index_sorted_set( + &[EventFailed::prefix().await.as_str()], + &[(score as f64, info.event.uri.as_str())], + ) + .await?; + Ok(()) + } + + pub async fn delete(info: &EventInfo) -> Result<(), Box> { + EventFailed::remove_from_index_sorted_set( + &[EventFailed::prefix().await.as_str()], + &[info.event.uri.as_str()], + ) + .await?; + Ok(()) + } + + pub async fn list( + start: Option, + end: Option, + skip: Option, + limit: Option, + sorting: SortOrder, + ) -> Result>, Box> { + let result = EventFailed::try_from_index_sorted_set( + &[EventFailed::prefix().await.as_str()], + start, + end, + skip, + limit, + sorting, + ) + .await?; + Ok(result) + } } impl Event { - fn from_str( + pub fn from_event_str( line: &str, - pubky_client: PubkyClient, ) -> Result, Box> { debug!("New event: {}", line); let parts: Vec<&str> = line.split(' ').collect(); @@ -116,24 +224,37 @@ impl Event { uri, event_type, resource_type, - pubky_client, })) } - async fn handle(self) -> Result<(), Box> { + pub fn new(uri: String, event_type: EventType, resource_type: ResourceType) -> Self { + Event { + uri, + event_type, + resource_type, + } + } + + async fn handle( + self, + pubky_client: &PubkyClient, + ) -> Result<(), Box> { match self.event_type { - EventType::Put => self.handle_put_event().await, + EventType::Put => self.handle_put_event(pubky_client).await, EventType::Del => self.handle_del_event().await, } } - async fn handle_put_event(self) -> Result<(), Box> { + async fn handle_put_event( + self, + pubky_client: &PubkyClient, + ) -> Result<(), Box> { debug!("Handling PUT event for {:?}", self.resource_type); // User PUT event's into the homeserver write new data. We fetch the data // for every Resource Type let url = reqwest::Url::parse(&self.uri)?; - let blob = match self.pubky_client.get(url).await { + let blob = match pubky_client.get(url).await { Ok(Some(blob)) => blob, Ok(None) => { error!("No content found at {}", self.uri); @@ -165,13 +286,21 @@ impl Event { handlers::tag::put(user_id, tag_id, blob).await? } ResourceType::File { user_id, file_id } => { - handlers::file::put(self.uri, user_id, file_id, blob, &self.pubky_client).await? + handlers::file::put(self.uri, user_id, file_id, blob, pubky_client).await? } } Ok(()) } + async fn log_failure(self) -> Result<(), Box> { + let now = Utc::now().timestamp_millis(); + let info = EventInfo::new(self.clone(), now, 0, None); + info.put_index_json(&[self.uri.as_str()]).await?; + EventFailed::log(&info).await?; + Ok(()) + } + async fn handle_del_event(self) -> Result<(), Box> { debug!("Handling DEL event for {:?}", self.resource_type); diff --git a/src/events/processor.rs b/src/events/processor.rs index f47786df..22a44da4 100644 --- a/src/events/processor.rs +++ b/src/events/processor.rs @@ -1,5 +1,3 @@ -use std::time::Duration; - use super::Event; use crate::types::DynError; use crate::types::PubkyId; @@ -14,7 +12,6 @@ pub struct EventProcessor { http_client: Client, homeserver: Homeserver, limit: u32, - max_retries: u64, } impl EventProcessor { @@ -22,7 +19,6 @@ impl EventProcessor { let pubky_client = Self::init_pubky_client(config); let homeserver = Homeserver::from_config(config).await?; let limit = config.events_limit; - let max_retries = config.max_retries; info!( "Initialized Event Processor for homeserver: {:?}", @@ -34,7 +30,6 @@ impl EventProcessor { http_client: Client::new(), homeserver, limit, - max_retries, }) } @@ -58,7 +53,6 @@ impl EventProcessor { http_client: Client::new(), homeserver, limit: 1000, - max_retries: 3, } } @@ -103,7 +97,7 @@ impl EventProcessor { info!("Cursor for the next request: {}", cursor); } } else { - let event = match Event::from_str(line, self.pubky_client.clone()) { + let event = match Event::from_event_str(line) { Ok(event) => event, Err(e) => { error!("Error while creating event line from line: {}", e); @@ -112,7 +106,8 @@ impl EventProcessor { }; if let Some(event) = event { debug!("Processing event: {:?}", event); - self.handle_event_with_retry(event).await?; + self.handle_event_with_retry(event, &self.pubky_client) + .await?; } } } @@ -121,28 +116,17 @@ impl EventProcessor { } // Generic retry on event handler - async fn handle_event_with_retry(&self, event: Event) -> Result<(), DynError> { - let mut attempts = 0; - loop { - match event.clone().handle().await { - Ok(_) => break Ok(()), - Err(e) => { - attempts += 1; - if attempts >= self.max_retries { - error!( - "Error while handling event after {} attempts: {}", - attempts, e - ); - break Ok(()); - } else { - error!( - "Error while handling event: {}. Retrying attempt {}/{}", - e, attempts, self.max_retries - ); - // Optionally, add a delay between retries - tokio::time::sleep(Duration::from_millis(100)).await; - } - } + async fn handle_event_with_retry( + &self, + event: Event, + pubky_client: &PubkyClient, + ) -> Result<(), DynError> { + match event.clone().handle(pubky_client).await { + Ok(_) => Ok(()), + Err(e) => { + error!("Error while handling event {}", e); + event.log_failure().await?; + Ok(()) } } } diff --git a/src/lib.rs b/src/lib.rs index 5aefe23a..4620ef11 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,5 @@ mod config; -mod db; +pub mod db; mod error; pub mod events; pub mod models; diff --git a/src/retry.rs b/src/retry.rs new file mode 100644 index 00000000..133ec66a --- /dev/null +++ b/src/retry.rs @@ -0,0 +1,80 @@ +use chrono::Utc; +use log::{error, info}; +use pkarr::mainline::Testnet; +use pubky::PubkyClient; +use pubky_nexus::events::{EventFailed, EventInfo}; +use pubky_nexus::RedisOps; +use pubky_nexus::{db::kv::index::sorted_sets::SortOrder, setup, Config}; +use std::sync::Arc; +use tokio::sync::Semaphore; +use tokio::time::{interval, Duration}; + +fn init_pubky_client(config: &Config) -> PubkyClient { + if config.testnet { + let testnet = Testnet { + bootstrap: vec![config.bootstrap.clone()], + nodes: vec![], + }; + PubkyClient::test(&testnet) + } else { + PubkyClient::default() + } +} + +const MAX_CONCURRENT_RETRIES: usize = 10; +const RETRY_INTERVAL: u64 = 5; + +#[tokio::main] +async fn main() { + let config = Config::from_env(); + setup(&config).await; + let pubky_client = init_pubky_client(&config); + + info!("Starting retry cron job."); + + let retry_interval = Duration::from_secs(RETRY_INTERVAL); + let semaphore = Arc::new(Semaphore::new(MAX_CONCURRENT_RETRIES)); + let mut interval = interval(retry_interval); + + loop { + interval.tick().await; + let now = Utc::now().timestamp_millis(); + + match EventFailed::list( + Some((now as f64) - ((RETRY_INTERVAL as f64) * 1000.0)), + Some(now as f64), + None, + None, + SortOrder::Ascending, + ) + .await + { + Ok(Some(failed_events)) => { + for (event_uri, _) in failed_events { + let permit = semaphore.clone().acquire_owned().await.unwrap(); + let event_uri = event_uri.clone(); + let client = pubky_client.clone(); + + tokio::spawn(async move { + let _permit = permit; // Keep the permit until the task is done + match EventInfo::try_from_index_json(&[event_uri.as_str()]).await { + Ok(Some(event_info)) => { + event_info.retry(&client, config.max_retries).await + } + Ok(None) => { + error!("Failed event's info not found: {}", event_uri); + Ok(()) + } + Err(e) => { + error!("Error getting event info for: {}: {}", event_uri, e); + Ok(()) + } + } + }); + } + } + Ok(None) => info!("No failed events found."), + Err(e) => error!("Error fetching failed events: {}", e), + } + } +} diff --git a/tests/mod.rs b/tests/mod.rs index 82609503..079bd81e 100644 --- a/tests/mod.rs +++ b/tests/mod.rs @@ -1,2 +1,3 @@ +pub mod retry; pub mod service; pub mod watcher; diff --git a/tests/retry.rs b/tests/retry.rs new file mode 100644 index 00000000..52352188 --- /dev/null +++ b/tests/retry.rs @@ -0,0 +1,148 @@ +use anyhow::Result; +use chrono::Utc; +use pkarr::mainline::Testnet; +use pubky::PubkyClient; +use pubky_nexus::{ + db::kv::index::sorted_sets::SortOrder, + events::{ + Event, EventFailed, EventInfo, EventType, ResourceType, EVENT_ERROR_PREFIX, + EVENT_RECOVERED_PREFIX, + }, + setup, + types::PubkyId, + Config, RedisOps, +}; + +struct RetryTest { + client: PubkyClient, + config: Config, +} + +impl RetryTest { + async fn setup() -> Result { + let config = Config::from_env(); + setup(&config).await; + let testnet = Testnet::new(10); + let client = PubkyClient::test(&testnet); + Ok(Self { client, config }) + } + + async fn create_failed_event( + &self, + uri: &str, + pubky: &str, + event_id: &str, + attempts: i32, + ) -> Result { + let event = Event::new( + String::from(uri), + EventType::Put, + ResourceType::Post { + author_id: PubkyId(String::from(pubky)), + post_id: String::from(event_id), + }, + ); + let event_info = EventInfo::new(event, Utc::now().timestamp_millis(), attempts, None); + event_info.put_index_json(&[uri]).await.unwrap(); + EventFailed::log(&event_info).await.unwrap(); + Ok(event_info) + } +} + +#[tokio::test] +async fn test_basic_retry() -> Result<()> { + let test = RetryTest::setup().await?; + let uri = "pubky://test/pub/app/post/post_id_1"; + + // Create a failed event + let event_info = test.create_failed_event(uri, "test", "post_id", 0).await?; + + // Attempt retry + event_info + .retry(&test.client, 3) + .await + .expect("Testing Retry failed"); + + // Verify event state + let event = EventInfo::try_from_index_json(&[uri]).await; + assert!(event.unwrap().is_none(), "Event should no longer exist"); + + let recovered_event = EventInfo::try_from_index_json(&[EVENT_RECOVERED_PREFIX, uri]).await; + assert!( + recovered_event.unwrap().is_some(), + "Event should exist in the recovered state" + ); + + let failed_events = EventFailed::try_from_index_sorted_set( + &[EventFailed::prefix().await.as_str()], + None, + None, + None, + None, + SortOrder::Ascending, + ) + .await; + assert!( + !failed_events + .unwrap() + .unwrap() + .iter() + .any(|(item, _)| item == uri), + "Event should not exist in the failed state" + ); + + Ok(()) +} + +#[tokio::test] +async fn test_max_retry_attempts() -> Result<()> { + let test = RetryTest::setup().await?; + let uri = "pubky://test/pub/app/post/post_id_2"; + + // Create failed event + let event_info = test + .create_failed_event(uri, "test", "post_id_2", test.config.max_retries - 1) + .await?; + + // Retry until max attempts + + event_info + .retry(&test.client, test.config.max_retries) + .await + .expect("Testing retry failed"); + + // Verify event state + let event = EventInfo::try_from_index_json(&[uri]).await; + assert!(event.unwrap().is_none(), "Event should no longer exist"); + + let recovered_event = EventInfo::try_from_index_json(&[EVENT_RECOVERED_PREFIX, uri]).await; + assert!( + recovered_event.unwrap().is_none(), + "Event should not exist in the recovered state" + ); + + let error_event = EventInfo::try_from_index_json(&[EVENT_ERROR_PREFIX, uri]).await; + assert!( + error_event.unwrap().is_some(), + "Event should exist in the error state" + ); + + let failed_events = EventFailed::try_from_index_sorted_set( + &[EventFailed::prefix().await.as_str()], + None, + None, + None, + None, + SortOrder::Ascending, + ) + .await; + assert!( + !failed_events + .unwrap() + .unwrap() + .iter() + .any(|(item, _)| item == uri), + "Event should not exist in the failed state" + ); + Ok(()) +}