Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat(watcher): add event retry queue #218

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ path = "src/service.rs"
name = "watcher"
path = "src/watcher.rs"

[[bin]]
name = "retry"
path = "src/retry.rs"

[[bench]]
name = "user"
harness = false
Expand Down
40 changes: 21 additions & 19 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ Nexus is composed of several core components:

- **service.rs**: The REST API server for handling client requests, querying databases, and returning responses to the Pubky-App frontend.
- **watcher.rs**: The event aggregator that listens to homeserver events, translating them into social graph updates within the Nexus databases.
- **lib.rs**: A library crate containing common functionalities shared by `service` and `watcher`, including database connectors, models, and queries.
- **retry.rs**: binary that retries events marked as failed by watcher.
- **lib.rs**: library crate with all of the common functionalities (connector, models, queries) needed for `watcher`, `service` and `retry`

### Data Flow

Expand All @@ -54,27 +55,28 @@ To get started with Nexus, first set up the required databases: Neo4j and Redis.
1. Clone the repository and navigate to the project directory.
2. Copy the environment template and set up the Docker environment:

```bash
cd docker
cp .env-sample .env
docker-compose up -d
```
```bash
cd docker
cp .env-sample .env
docker-compose up -d
```

3. Populate the Neo4j database with initial data:

```bash
docker exec neo4j bash /db-graph/run-queries.sh
```
```bash
docker exec neo4j bash /db-graph/run-queries.sh
```

Once the `Neo4j` graph database is seeded with data, the next step is to populate the `Redis` database by running the _nexus-service_

> If the Redis cache is empty, the nexus-service will handle it automatically. If not follow the steps of warning section

4. Run the Nexus service:

```bash
cargo run
```
```bash
cargo run
```

5. **Access Redis and Neo4j UIs**:
- Redis UI: [http://localhost:8001/redis-stack/browser](http://localhost:8001/redis-stack/browser)
- Neo4J UI: [http://localhost:7474/browser/](http://localhost:7474/browser/)
Expand Down Expand Up @@ -114,16 +116,16 @@ If tests or the development environment seem out of sync, follow these steps to

1. **Reset Neo4j**:

```bash
docker exec neo4j bash -c "cypher-shell -u neo4j -p 12345678 'MATCH (n) DETACH DELETE n;'"
docker exec neo4j bash /db-graph/run-queries.sh
```
```bash
docker exec neo4j bash -c "cypher-shell -u neo4j -p 12345678 'MATCH (n) DETACH DELETE n;'"
docker exec neo4j bash /db-graph/run-queries.sh
```

2. **Re-index Redis Cache**:

```bash
REINDEX=true cargo run
```
```bash
REINDEX=true cargo run
```

## 🌐 Useful Links

Expand Down
2 changes: 1 addition & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub struct Config {
pub homeserver_url: String,
pub events_limit: u32,
pub watcher_sleep: u64,
pub max_retries: u64,
pub max_retries: i32,
}

impl Config {
Expand Down
159 changes: 144 additions & 15 deletions src/events/mod.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
use crate::types::PubkyId;
use crate::{db::kv::index::sorted_sets::SortOrder, types::PubkyId, RedisOps};
use chrono::Utc;
use log::{debug, error};
use pubky::PubkyClient;
use serde::{Deserialize, Serialize};
use std::error::Error;
use uri::ParsedUri;

pub mod handlers;
pub mod processor;
pub mod uri;

#[derive(Debug, Clone)]
enum ResourceType {
pub const EVENT_ERROR_PREFIX: &str = "error";
pub const EVENT_RECOVERED_PREFIX: &str = "recovered";

#[derive(Debug, Clone, Deserialize, Serialize)]
pub enum ResourceType {
User {
user_id: PubkyId,
},
Expand Down Expand Up @@ -39,24 +45,126 @@ enum ResourceType {
}

// Look for the end pattern after the start index, or use the end of the string if not found
#[derive(Debug, Clone)]
enum EventType {
#[derive(Debug, Clone, Deserialize, Serialize)]
pub enum EventType {
Put,
Del,
}

#[derive(Debug, Clone)]
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct Event {
uri: String,
event_type: EventType,
resource_type: ResourceType,
pubky_client: PubkyClient,
}

#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct EventInfo {
event: Event,
created_at: i64,
attempts: i32,
last_attempt_at: Option<i64>,
}

impl RedisOps for EventInfo {}

impl EventInfo {
pub fn new(event: Event, created_at: i64, attempts: i32, last_attempt_at: Option<i64>) -> Self {
EventInfo {
event,
attempts,
created_at,
last_attempt_at,
}
}

pub async fn retry(
mut self,
pubky_client: &PubkyClient,
max_retries: i32,
) -> Result<(), Box<dyn std::error::Error + Sync + Send>> {
let event_uri = self.event.uri.as_str();
if let Err(e) = self.clone().event.handle(pubky_client).await {
self.attempts += 1;
self.last_attempt_at = Some(Utc::now().timestamp_millis());
error!(
"Error while handling retry of event {} with attempt {}: {}",
event_uri, self.attempts, e
);

if self.attempts > max_retries {
self.put_index_json(&[EVENT_ERROR_PREFIX, event_uri])
.await?;
EventInfo::remove_from_index_multiple_json(&[&[event_uri]]).await?;
EventFailed::delete(&self).await?;
} else {
EventFailed::log(&self).await?;
}
} else {
self.put_index_json(&[EVENT_RECOVERED_PREFIX, event_uri])
.await?;
EventInfo::remove_from_index_multiple_json(&[&[event_uri]]).await?;
EventFailed::delete(&self).await?;
}
Ok(())
}

pub fn get_attempts(self) -> i32 {
self.attempts
}
}

#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct EventFailed {}
impl RedisOps for EventFailed {}

impl EventFailed {
pub async fn log(info: &EventInfo) -> Result<(), Box<dyn Error + Send + Sync>> {
let score = if info.attempts == 0 {
info.created_at
} else {
info.created_at + ((2 ^ info.attempts) * 1000) as i64
};
EventFailed::put_index_sorted_set(
&[EventFailed::prefix().await.as_str()],
&[(score as f64, info.event.uri.as_str())],
)
.await?;
Ok(())
}

pub async fn delete(info: &EventInfo) -> Result<(), Box<dyn Error + Send + Sync>> {
EventFailed::remove_from_index_sorted_set(
&[EventFailed::prefix().await.as_str()],
&[info.event.uri.as_str()],
)
.await?;
Ok(())
}

pub async fn list(
start: Option<f64>,
end: Option<f64>,
skip: Option<usize>,
limit: Option<usize>,
sorting: SortOrder,
) -> Result<Option<Vec<(String, f64)>>, Box<dyn Error + Send + Sync>> {
let result = EventFailed::try_from_index_sorted_set(
&[EventFailed::prefix().await.as_str()],
start,
end,
skip,
limit,
sorting,
)
.await?;
Ok(result)
}
}

impl Event {
fn from_str(
pub fn from_event_str(
line: &str,
pubky_client: PubkyClient,
) -> Result<Option<Self>, Box<dyn std::error::Error + Sync + Send>> {
debug!("New event: {}", line);
let parts: Vec<&str> = line.split(' ').collect();
Expand Down Expand Up @@ -116,24 +224,37 @@ impl Event {
uri,
event_type,
resource_type,
pubky_client,
}))
}

async fn handle(self) -> Result<(), Box<dyn std::error::Error + Sync + Send>> {
pub fn new(uri: String, event_type: EventType, resource_type: ResourceType) -> Self {
Event {
uri,
event_type,
resource_type,
}
}

async fn handle(
self,
pubky_client: &PubkyClient,
) -> Result<(), Box<dyn std::error::Error + Sync + Send>> {
match self.event_type {
EventType::Put => self.handle_put_event().await,
EventType::Put => self.handle_put_event(pubky_client).await,
EventType::Del => self.handle_del_event().await,
}
}

async fn handle_put_event(self) -> Result<(), Box<dyn std::error::Error + Sync + Send>> {
async fn handle_put_event(
self,
pubky_client: &PubkyClient,
) -> Result<(), Box<dyn std::error::Error + Sync + Send>> {
debug!("Handling PUT event for {:?}", self.resource_type);

// User PUT event's into the homeserver write new data. We fetch the data
// for every Resource Type
let url = reqwest::Url::parse(&self.uri)?;
let blob = match self.pubky_client.get(url).await {
let blob = match pubky_client.get(url).await {
Ok(Some(blob)) => blob,
Ok(None) => {
error!("No content found at {}", self.uri);
Expand Down Expand Up @@ -165,13 +286,21 @@ impl Event {
handlers::tag::put(user_id, tag_id, blob).await?
}
ResourceType::File { user_id, file_id } => {
handlers::file::put(self.uri, user_id, file_id, blob, &self.pubky_client).await?
handlers::file::put(self.uri, user_id, file_id, blob, pubky_client).await?
}
}

Ok(())
}

async fn log_failure(self) -> Result<(), Box<dyn std::error::Error + Sync + Send>> {
let now = Utc::now().timestamp_millis();
let info = EventInfo::new(self.clone(), now, 0, None);
info.put_index_json(&[self.uri.as_str()]).await?;
EventFailed::log(&info).await?;
Ok(())
}

async fn handle_del_event(self) -> Result<(), Box<dyn std::error::Error + Sync + Send>> {
debug!("Handling DEL event for {:?}", self.resource_type);

Expand Down
Loading
Loading