Skip to content

Commit

Permalink
Dav Push: Support for calendar collections
Browse files Browse the repository at this point in the history
  • Loading branch information
lennart-k committed Jan 12, 2025
1 parent 974acdf commit 347061f
Show file tree
Hide file tree
Showing 10 changed files with 564 additions and 22 deletions.
364 changes: 364 additions & 0 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,7 @@ rpassword.workspace = true
argon2.workspace = true
pbkdf2.workspace = true
password-hash.workspace = true
reqwest = "0.12.12"
rustical_xml.workspace = true
rustical_dav.workspace = true
quick-xml.workspace = true
10 changes: 10 additions & 0 deletions crates/caldav/src/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@ use actix_web::{
web::{self, Data, Path},
HttpResponse,
};
use rustical_dav::xml::multistatus::PropstatElement;
use rustical_store::SubscriptionStore;
use rustical_xml::{XmlRootTag, XmlSerialize};

use crate::calendar::resource::CalendarProp;

async fn handle_delete<S: SubscriptionStore + ?Sized>(
store: Data<S>,
Expand All @@ -18,3 +22,9 @@ pub fn subscription_resource<S: SubscriptionStore + ?Sized>() -> actix_web::Reso
.name("subscription")
.delete(handle_delete::<S>)
}

#[derive(XmlSerialize, XmlRootTag)]
#[xml(root = b"push-message", ns = "rustical_dav::namespace::NS_DAVPUSH")]
pub struct PushMessage {
propstat: PropstatElement<CalendarProp>,
}
4 changes: 3 additions & 1 deletion crates/dav/src/xml/multistatus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ pub struct PropTagWrapper<T: XmlSerialize>(#[xml(flatten, ty = "untagged")] pub

// RFC 2518
// <!ELEMENT propstat (prop, status, responsedescription?) >
#[derive(XmlSerialize)]
#[derive(XmlSerialize, Debug)]
pub struct PropstatElement<PropType: XmlSerialize> {
#[xml(ns = "crate::namespace::NS_DAV")]
pub prop: PropType,
#[xml(serialize_with = "xml_serialize_status")]
#[xml(ns = "crate::namespace::NS_DAV")]
pub status: StatusCode,
}

Expand Down
21 changes: 21 additions & 0 deletions crates/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,24 @@ pub use subscription_store::*;

pub use addressbook::{AddressObject, Addressbook};
pub use calendar::{Calendar, CalendarObject};

#[derive(Debug, Clone)]
pub enum CollectionOperationType {
// Sync-Token increased
Object,
Delete,
}

#[derive(Debug, Clone)]
pub enum CollectionOperationDomain {
Calendar,
Addressbook,
}

#[derive(Debug, Clone)]
pub struct CollectionOperation {
pub r#type: CollectionOperationType,
pub domain: CollectionOperationDomain,
pub topic: String,
pub sync_token: Option<String>,
}
2 changes: 1 addition & 1 deletion crates/store/src/subscription_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub struct Subscription {
pub push_resource: String,
}

#[async_trait(?Send)]
#[async_trait]
pub trait SubscriptionStore: Send + Sync + 'static {
async fn get_subscriptions(&self, topic: &str) -> Result<Vec<Subscription>, Error>;
async fn get_subscription(&self, id: &str) -> Result<Subscription, Error>;
Expand Down
1 change: 1 addition & 0 deletions crates/store_sqlite/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ repository.workspace = true
publish = false

[dependencies]
tokio.workspace = true
rustical_store = { workspace = true }
async-trait = { workspace = true }
serde = { workspace = true }
Expand Down
82 changes: 72 additions & 10 deletions crates/store_sqlite/src/calendar_store.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
use super::{ChangeOperation, SqliteStore};
use super::ChangeOperation;
use async_trait::async_trait;
use rustical_store::synctoken::format_synctoken;
use rustical_store::{Calendar, CalendarObject, CalendarStore, Error};
use rustical_store::{CollectionOperation, CollectionOperationType};
use sqlx::Sqlite;
use sqlx::SqlitePool;
use sqlx::Transaction;
use tokio::sync::mpsc::Sender;
use tracing::instrument;

#[derive(Debug, Clone)]
Expand All @@ -26,16 +30,21 @@ async fn log_object_operation(
cal_id: &str,
object_id: &str,
operation: ChangeOperation,
) -> Result<(), Error> {
sqlx::query!(
) -> Result<String, Error> {
struct Synctoken {
synctoken: i64,
}
let Synctoken { synctoken } = sqlx::query_as!(
Synctoken,
r#"
UPDATE calendars
SET synctoken = synctoken + 1
WHERE (principal, id) = (?1, ?2)"#,
WHERE (principal, id) = (?1, ?2)
RETURNING synctoken"#,
principal,
cal_id
)
.execute(&mut **tx)
.fetch_one(&mut **tx)
.await
.map_err(crate::Error::from)?;

Expand All @@ -53,11 +62,23 @@ async fn log_object_operation(
.execute(&mut **tx)
.await
.map_err(crate::Error::from)?;
Ok(())
Ok(format_synctoken(synctoken))
}

#[derive(Debug)]
pub struct SqliteCalendarStore {
db: SqlitePool,
sender: Sender<CollectionOperation>,
}

impl SqliteCalendarStore {
pub fn new(db: SqlitePool, sender: Sender<CollectionOperation>) -> Self {
Self { db, sender }
}
}

#[async_trait]
impl CalendarStore for SqliteStore {
impl CalendarStore for SqliteCalendarStore {
#[instrument]
async fn get_calendar(&self, principal: &str, id: &str) -> Result<Calendar, Error> {
let cal = sqlx::query_as!(
Expand Down Expand Up @@ -157,6 +178,12 @@ impl CalendarStore for SqliteStore {
id: &str,
use_trashbin: bool,
) -> Result<(), Error> {
let cal = match self.get_calendar(principal, id).await {
Ok(cal) => Some(cal),
Err(Error::NotFound) => None,
Err(err) => return Err(err),
};

match use_trashbin {
true => {
sqlx::query!(
Expand All @@ -177,6 +204,16 @@ impl CalendarStore for SqliteStore {
.map_err(crate::Error::from)?;
}
};

if let Some(cal) = cal {
// TODO: Watch for errors here?
let _ = self.sender.try_send(CollectionOperation {
r#type: CollectionOperationType::Delete,
domain: rustical_store::CollectionOperationDomain::Calendar,
topic: cal.push_topic,
sync_token: None,
});
}
Ok(())
}

Expand Down Expand Up @@ -267,7 +304,7 @@ impl CalendarStore for SqliteStore {
.await
.map_err(crate::Error::from)?;

log_object_operation(
let synctoken = log_object_operation(
&mut tx,
&principal,
&cal_id,
Expand All @@ -276,6 +313,14 @@ impl CalendarStore for SqliteStore {
)
.await?;

// TODO: Watch for errors here?
let _ = self.sender.try_send(CollectionOperation {
r#type: CollectionOperationType::Object,
domain: rustical_store::CollectionOperationDomain::Calendar,
topic: self.get_calendar(&principal, &cal_id).await?.push_topic,
sync_token: Some(synctoken),
});

tx.commit().await.map_err(crate::Error::from)?;
Ok(())
}
Expand Down Expand Up @@ -312,8 +357,16 @@ impl CalendarStore for SqliteStore {
.map_err(crate::Error::from)?;
}
};
log_object_operation(&mut tx, principal, cal_id, id, ChangeOperation::Delete).await?;
let synctoken =
log_object_operation(&mut tx, principal, cal_id, id, ChangeOperation::Delete).await?;
tx.commit().await.map_err(crate::Error::from)?;
// TODO: Watch for errors here?
let _ = self.sender.try_send(CollectionOperation {
r#type: CollectionOperationType::Object,
domain: rustical_store::CollectionOperationDomain::Calendar,
topic: self.get_calendar(principal, cal_id).await?.push_topic,
sync_token: Some(synctoken),
});
Ok(())
}

Expand All @@ -335,8 +388,17 @@ impl CalendarStore for SqliteStore {
.execute(&mut *tx)
.await.map_err(crate::Error::from)?;

log_object_operation(&mut tx, principal, cal_id, object_id, ChangeOperation::Add).await?;
let synctoken =
log_object_operation(&mut tx, principal, cal_id, object_id, ChangeOperation::Add)
.await?;
tx.commit().await.map_err(crate::Error::from)?;
// TODO: Watch for errors here?
let _ = self.sender.try_send(CollectionOperation {
r#type: CollectionOperationType::Object,
domain: rustical_store::CollectionOperationDomain::Calendar,
topic: self.get_calendar(principal, cal_id).await?.push_topic,
sync_token: Some(synctoken),
});
Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion crates/store_sqlite/src/subscription_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::SqliteStore;
use async_trait::async_trait;
use rustical_store::{Error, Subscription, SubscriptionStore};

#[async_trait(?Send)]
#[async_trait]
impl SubscriptionStore for SqliteStore {
async fn get_subscriptions(&self, topic: &str) -> Result<Vec<Subscription>, Error> {
Ok(sqlx::query_as!(
Expand Down
96 changes: 87 additions & 9 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
use crate::config::Config;
use actix_web::http::KeepAlive;
use actix_web::http::{KeepAlive, StatusCode};
use actix_web::HttpServer;
use anyhow::Result;
use app::make_app;
use clap::{Parser, Subcommand};
use commands::{cmd_gen_config, cmd_pwhash};
use config::{DataStoreConfig, SqliteDataStoreConfig};
use rustical_dav::xml::multistatus::PropstatElement;
use rustical_store::auth::StaticUserStore;
use rustical_store::{AddressbookStore, CalendarStore, SubscriptionStore};
use rustical_store::{AddressbookStore, CalendarStore, CollectionOperation, SubscriptionStore};
use rustical_store_sqlite::calendar_store::SqliteCalendarStore;
use rustical_store_sqlite::{create_db_pool, SqliteStore};
use rustical_xml::{XmlRootTag, XmlSerialize, XmlSerializeRoot};
use setup_tracing::setup_tracing;
use std::fs;
use std::sync::Arc;
use tokio::sync::mpsc::Receiver;
use tracing::{error, info};

mod app;
mod commands;
Expand Down Expand Up @@ -43,20 +48,43 @@ async fn get_data_stores(
Arc<dyn AddressbookStore>,
Arc<dyn CalendarStore>,
Arc<dyn SubscriptionStore>,
Receiver<CollectionOperation>,
)> {
Ok(match &config {
DataStoreConfig::Sqlite(SqliteDataStoreConfig { db_url }) => {
let db = create_db_pool(db_url, migrate).await?;
let sqlite_store = Arc::new(SqliteStore::new(db));
(
sqlite_store.clone(),
sqlite_store.clone(),
sqlite_store.clone(),
)
// Channel to watch for changes (for DAV Push)
let (send, recv) = tokio::sync::mpsc::channel(1000);

let addressbook_store = Arc::new(SqliteStore::new(db.clone()));
let cal_store = Arc::new(SqliteCalendarStore::new(db.clone(), send));
let subscription_store = Arc::new(SqliteStore::new(db.clone()));
(addressbook_store, cal_store, subscription_store, recv)
}
})
}

// TODO: Move this code somewhere else :)

#[derive(XmlSerialize, Debug)]
struct PushMessageProp {
#[xml(ns = "rustical_dav::namespace::NS_DAV")]
topic: String,
#[xml(ns = "rustical_dav::namespace::NS_DAV")]
sync_token: Option<String>,
}

#[derive(XmlSerialize, XmlRootTag, Debug)]
#[xml(root = b"push-message", ns = "rustical_dav::namespace::NS_DAVPUSH")]
#[xml(ns_prefix(
rustical_dav::namespace::NS_DAVPUSH = b"",
rustical_dav::namespace::NS_DAV = b"D",
))]
struct PushMessage {
#[xml(ns = "rustical_dav::namespace::NS_DAV")]
propstat: PropstatElement<PushMessageProp>,
}

#[tokio::main]
async fn main() -> Result<()> {
let args = Args::parse();
Expand All @@ -69,9 +97,59 @@ async fn main() -> Result<()> {

setup_tracing(&config.tracing);

let (addr_store, cal_store, subscription_store) =
let (addr_store, cal_store, subscription_store, mut update_recv) =
get_data_stores(!args.no_migrations, &config.data_store).await?;

let subscription_store_clone = subscription_store.clone();
tokio::spawn(async move {
let subscription_store = subscription_store_clone.clone();
while let Some(message) = update_recv.recv().await {
dbg!(&message);
if let Ok(subscribers) =
subscription_store.get_subscriptions(&message.topic).await
{
let status = match message.r#type {
rustical_store::CollectionOperationType::Object => StatusCode::OK,
rustical_store::CollectionOperationType::Delete => {
StatusCode::NOT_FOUND
}
};
let push_message = PushMessage {
propstat: PropstatElement {
prop: PushMessageProp {
topic: message.topic,
sync_token: message.sync_token,
},
status,
},
};
let mut output: Vec<_> =
b"<?xml version=\"1.0\" encoding=\"utf-8\"?>\n".into();
let mut writer = quick_xml::Writer::new_with_indent(&mut output, b' ', 4);
if let Err(err) = push_message.serialize_root(&mut writer) {
error!("Could not serialize push message: {}", err);
continue;
}
let payload = String::from_utf8(output).unwrap();
for subscriber in subscribers {
info!(
"Sending a push message to {}: {}",
subscriber.push_resource, payload
);
let client = reqwest::Client::new();
if let Err(err) = client
.post(subscriber.push_resource)
.body(payload.to_owned())
.send()
.await
{
error!("{err}");
}
}
}
}
});

let user_store = Arc::new(match config.auth {
config::AuthConfig::Static(config) => StaticUserStore::new(config),
});
Expand Down

0 comments on commit 347061f

Please sign in to comment.