Skip to content

Commit

Permalink
use one type for both handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
somtochiama committed Oct 28, 2024
1 parent b6dcd88 commit 893d344
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 82 deletions.
25 changes: 13 additions & 12 deletions crates/corro-agent/src/api/public/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use bytes::{BufMut, Bytes, BytesMut};
use compact_str::ToCompactString;
use corro_types::{
agent::Agent,
updates::{NotifyEvent, UpdateCreated, UpdateHandle, UpdatesManager},
updates::{Handle, NotifyEvent, UpdateCreated, UpdateHandle, UpdatesManager},
};
use futures::future::poll_fn;
use tokio::sync::{
Expand Down Expand Up @@ -58,10 +58,7 @@ pub async fn api_v1_updates(
};

tokio::spawn(forward_update_bytes_to_body_sender(
handle.id(),
sub_rx,
tx,
tripwire,
handle, sub_rx, tx, tripwire,
));

hyper::Response::builder()
Expand Down Expand Up @@ -218,7 +215,7 @@ fn make_query_event_bytes(
}

async fn forward_update_bytes_to_body_sender(
update_id: Uuid,
update: UpdateHandle,
mut rx: broadcast::Receiver<Bytes>,
mut tx: hyper::body::Sender,
mut tripwire: Tripwire,
Expand All @@ -237,36 +234,40 @@ async fn forward_update_bytes_to_body_sender(
buf.extend_from_slice(&event_buf);
if buf.len() >= 64 * 1024 {
if let Err(e) = tx.send_data(buf.split().freeze()).await {
warn!(%update_id, "could not forward subscription query event to receiver: {e}");
warn!(update_id = %update.id(), "could not forward subscription query event to receiver: {e}");
return;
}
};
},
Err(RecvError::Lagged(skipped)) => {
warn!(%update_id, "update skipped {} events, aborting", skipped);
warn!(update_id = %update.id(), "update skipped {} events, aborting", skipped);
return;
},
Err(RecvError::Closed) => {
info!(%update_id, "events subcription ran out");
info!(update_id = %update.id(), "events subcription ran out");
return;
},
}
},
_ = &mut send_deadline => {
if !buf.is_empty() {
if let Err(e) = tx.send_data(buf.split().freeze()).await {
warn!(%update_id, "could not forward subscription query event to receiver: {e}");
warn!(update_id = %update.id(), "could not forward subscription query event to receiver: {e}");
return;
}
} else {
if let Err(e) = poll_fn(|cx| tx.poll_ready(cx)).await {
warn!(%update_id, error = %e, "body sender was closed or errored, stopping event broadcast sends");
warn!(update_id = %update.id(), error = %e, "body sender was closed or errored, stopping event broadcast sends");
return;
}
send_deadline.as_mut().reset(tokio::time::Instant::now() + Duration::from_millis(10));
continue;
}
},
_ = update.cancelled() => {
// info!(update_id = %update.id(), "update cancelled, aborting forwarding bytes to subscriber");
// return;
},
_ = &mut tripwire => {
break;
}
Expand All @@ -276,7 +277,7 @@ async fn forward_update_bytes_to_body_sender(
while let Ok(event_buf) = rx.try_recv() {
buf.extend_from_slice(&event_buf);
if let Err(e) = tx.send_data(buf.split().freeze()).await {
warn!(%update_id, "could not forward subscription query event to receiver: {e}");
warn!(update_id = %update.id(), "could not forward subscription query event to receiver: {e}");
return;
}
}
Expand Down
26 changes: 11 additions & 15 deletions crates/corro-types/src/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,13 +267,10 @@ struct InnerMatcherHandle {
cached_statements: HashMap<String, MatcherStmt>,
}

type MatchCandidates = IndexMap<TableName, IndexSet<Vec<u8>>>;
pub type MatchCandidates = IndexMap<TableName, IndexMap<Vec<u8>, i64>>;

#[async_trait]
impl Handle for MatcherHandle {
type Candidate = IndexSet<Vec<u8>>;
type CandidateMatcher = MatchCandidates;

fn id(&self) -> Uuid {
self.inner.id
}
Expand All @@ -290,10 +287,6 @@ impl Handle for MatcherHandle {
self.inner.changes_tx.clone()
}

fn get_candidates(&self) -> MatchCandidates {
MatchCandidates::new()
}

async fn cleanup(&self) {
self.inner.cancel.cancel();
info!(sub_id = %self.inner.id, "Canceled subscription");
Expand All @@ -308,7 +301,7 @@ impl Handle for MatcherHandle {
// don't double process the same pk
if candidates
.get(change.table)
.map(|pks| pks.contains(change.pk))
.map(|pks| pks.contains_key(change.pk))
.unwrap_or_default()
{
trace!("already contained key");
Expand All @@ -329,9 +322,12 @@ impl Handle for MatcherHandle {
}

if let Some(v) = candidates.get_mut(change.table) {
v.insert(change.pk.to_vec())
v.insert(change.pk.to_vec(), change.cl).is_none()
} else {
candidates.insert(change.table.clone(), [change.pk.to_vec()].into());
candidates.insert(
change.table.clone(),
[(change.pk.to_vec(), change.cl)].into(),
);
true
}
}
Expand Down Expand Up @@ -1090,8 +1086,8 @@ impl Matcher {
Some((candidates, db_version)) = self.changes_rx.recv() => {
for (table, pks) in candidates {
let buffed = buf.entry(table).or_default();
for pk in pks {
if buffed.insert(pk) {
for (pk, cl) in pks {
if buffed.insert(pk, cl).is_none() {
buf_count += 1;
}
}
Expand Down Expand Up @@ -1434,7 +1430,7 @@ impl Matcher {
for (table, pks) in candidates {
let pks = pks
.iter()
.map(|pk| unpack_columns(pk))
.map(|(pk, _)| unpack_columns(pk))
.collect::<Result<Vec<Vec<SqliteValueRef>>, _>>()?;

let tmp_table_name = format!("temp_{table}");
Expand Down Expand Up @@ -1715,7 +1711,7 @@ impl Matcher {
candidates
.entry(row.get(0)?)
.or_default()
.insert(row.get(1)?);
.insert(row.get(1)?, row.get(2)?);
}
}

Expand Down
71 changes: 16 additions & 55 deletions crates/corro-types/src/updates.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
use crate::agent::SplitPool;
use crate::pubsub::{unpack_columns, MatchableChange, MatcherError};
use crate::pubsub::{unpack_columns, MatchCandidates, MatchableChange, MatcherError};
use crate::schema::Schema;
use async_trait::async_trait;
use compact_str::CompactString;
use corro_api_types::sqlite::ChangeType;
use corro_api_types::{Change, ColumnName, SqliteValue, SqliteValueRef, TableName};
use corro_base_types::CrsqlDbVersion;
use indexmap::{IndexMap, IndexSet};
use metrics::{counter, histogram};
use parking_lot::RwLock;
use rusqlite::Connection;
Expand All @@ -26,51 +25,24 @@ use uuid::Uuid;
#[derive(Debug, Default, Clone)]
pub struct UpdatesManager(Arc<RwLock<InnerUpdatesManager>>);

#[allow(clippy::len_without_is_empty)]
pub trait HasLen {
fn len(&self) -> usize;
}

impl<K, V> HasLen for IndexMap<K, V> {
fn len(&self) -> usize {
self.len()
}
}

impl<T> HasLen for IndexSet<T> {
fn len(&self) -> usize {
self.len()
}
}

pub trait Manager<H> {
fn trait_type(&self) -> String;
fn get(&self, id: &Uuid) -> Option<H>;
fn remove(&self, id: &Uuid) -> Option<H>;
fn get_handles(&self) -> BTreeMap<Uuid, H>;
}

impl<T> HasLen for Vec<T> {
fn len(&self) -> usize {
self.len()
}
}

#[async_trait]
pub trait Handle {
type Candidate: HasLen;
type CandidateMatcher: IntoIterator<Item = (TableName, Self::Candidate)>;

fn id(&self) -> Uuid;
fn hash(&self) -> &str;
fn cancelled(&self) -> WaitForCancellationFuture;
fn filter_matchable_change(
&self,
candidates: &mut Self::CandidateMatcher,
candidates: &mut MatchCandidates,
change: MatchableChange,
) -> bool;
fn changes_tx(&self) -> mpsc::Sender<(Self::CandidateMatcher, CrsqlDbVersion)>;
fn get_candidates(&self) -> Self::CandidateMatcher;
fn changes_tx(&self) -> mpsc::Sender<(MatchCandidates, CrsqlDbVersion)>;
async fn cleanup(&self);
}

Expand All @@ -93,8 +65,6 @@ impl Manager<UpdateHandle> for UpdatesManager {
}
}

type MatchClCandidates = IndexMap<TableName, IndexMap<Vec<u8>, i64>>;

pub type NotifyEvent = TypedNotifyEvent<Vec<SqliteValue>>;

pub enum NotifyType {
Expand All @@ -111,9 +81,6 @@ pub enum TypedNotifyEvent<T> {

#[async_trait]
impl Handle for UpdateHandle {
type Candidate = IndexMap<Vec<u8>, i64>;
type CandidateMatcher = MatchClCandidates;

fn id(&self) -> Uuid {
self.inner.id
}
Expand All @@ -128,7 +95,7 @@ impl Handle for UpdateHandle {

fn filter_matchable_change(
&self,
candidates: &mut MatchClCandidates,
candidates: &mut MatchCandidates,
change: MatchableChange,
) -> bool {
if change.table.to_string() != self.inner.name {
Expand Down Expand Up @@ -157,14 +124,10 @@ impl Handle for UpdateHandle {
}
}

fn changes_tx(&self) -> mpsc::Sender<(MatchClCandidates, CrsqlDbVersion)> {
fn changes_tx(&self) -> mpsc::Sender<(MatchCandidates, CrsqlDbVersion)> {
self.inner.changes_tx.clone()
}

fn get_candidates(&self) -> MatchClCandidates {
MatchClCandidates::new()
}

async fn cleanup(&self) {
self.inner.cancel.cancel();
info!(sub_id = %self.inner.id, "Canceled subscription");
Expand Down Expand Up @@ -251,7 +214,7 @@ pub struct InnerUpdateHandle {
id: Uuid,
name: String,
cancel: CancellationToken,
changes_tx: mpsc::Sender<(MatchClCandidates, CrsqlDbVersion)>,
changes_tx: mpsc::Sender<(MatchCandidates, CrsqlDbVersion)>,
}

impl UpdateHandle {
Expand Down Expand Up @@ -286,7 +249,7 @@ impl UpdateHandle {
changes_tx,
}),
};
spawn_counted(cmd_loop(id, cancel, evt_tx, changes_rx, tripwire));
spawn_counted(batch_candidates(id, cancel, evt_tx, changes_rx, tripwire));
Ok(handle)
}

Expand All @@ -298,7 +261,7 @@ impl UpdateHandle {

fn handle_candidates(
evt_tx: mpsc::Sender<NotifyEvent>,
candidates: MatchClCandidates,
candidates: MatchCandidates,
) -> Result<(), MatcherError> {
if candidates.is_empty() {
return Ok(());
Expand Down Expand Up @@ -333,19 +296,19 @@ fn handle_candidates(
Ok(())
}

async fn cmd_loop(
async fn batch_candidates(
id: Uuid,
cancel: CancellationToken,
evt_tx: mpsc::Sender<NotifyEvent>,
mut changes_rx: mpsc::Receiver<(MatchClCandidates, CrsqlDbVersion)>,
mut changes_rx: mpsc::Receiver<(MatchCandidates, CrsqlDbVersion)>,
mut tripwire: Tripwire,
) {
const PROCESS_CHANGES_THRESHOLD: usize = 1000;
const PROCESS_BUFFER_DEADLINE: Duration = Duration::from_millis(600);

info!(sub_id = %id, "Starting loop to run the subscription");

let mut buf = MatchClCandidates::new();
let mut buf = MatchCandidates::new();
let mut buf_count = 0;

// max duration of aggregating candidates
Expand Down Expand Up @@ -377,13 +340,13 @@ async fn cmd_loop(
_ = process_changes_deadline.as_mut() => {
process_changes_deadline
.as_mut()
.reset((Instant::now() + PROCESS_BUFFER_DEADLINE).into());
.reset(Instant::now() + PROCESS_BUFFER_DEADLINE);
if buf_count != 0 {
process = true
}
},
_ = &mut tripwire => {
trace!(sub_id = %id, "tripped cmd_loop, returning");
trace!(sub_id = %id, "tripped batch_candidates, returning");
return;
}
else => {
Expand All @@ -410,7 +373,7 @@ async fn cmd_loop(
// reset the deadline
process_changes_deadline
.as_mut()
.reset((Instant::now() + PROCESS_BUFFER_DEADLINE).into());
.reset(Instant::now() + PROCESS_BUFFER_DEADLINE);
}
}

Expand All @@ -420,7 +383,6 @@ async fn cmd_loop(
pub fn match_changes<H>(manager: &impl Manager<H>, changes: &[Change], db_version: CrsqlDbVersion)
where
H: Handle + Send + 'static,
H::CandidateMatcher: Clone + Send,
{
let trait_type = manager.trait_type();
trace!(
Expand All @@ -439,7 +401,7 @@ where

for (id, handle) in handles.iter() {
trace!(sub_id = %id, %db_version, "attempting to match changes to a subscription");
let mut candidates = handle.get_candidates();
let mut candidates = MatchCandidates::new();
let mut match_count = 0;
for change in changes.iter().map(MatchableChange::from) {
if handle.filter_matchable_change(&mut candidates, change) {
Expand Down Expand Up @@ -488,7 +450,6 @@ pub fn match_changes_from_db_version<H>(
) -> rusqlite::Result<()>
where
H: Handle + Send + 'static,
H::CandidateMatcher: Clone + Send,
{
let handles = manager.get_handles();
if handles.is_empty() {
Expand All @@ -498,7 +459,7 @@ where
let trait_type = manager.trait_type();
let mut candidates = handles
.iter()
.map(|(id, handle)| (id, (handle.get_candidates(), handle)))
.map(|(id, handle)| (id, (MatchCandidates::new(), handle)))
.collect::<BTreeMap<_, _>>();

{
Expand Down

0 comments on commit 893d344

Please sign in to comment.