Skip to content

Commit

Permalink
start to factor out store::spawn
Browse files Browse the repository at this point in the history
  • Loading branch information
cablehead committed Aug 30, 2024
1 parent 39cfc90 commit 44df54c
Showing 1 changed file with 83 additions and 72 deletions.
155 changes: 83 additions & 72 deletions src/store.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::ops::Bound;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;

use scru128::Scru128Id;
Expand Down Expand Up @@ -106,7 +107,7 @@ impl Store {
.open_partition("kv", PartitionCreateOptions::default())
.unwrap();

let (tx, mut rx) = tokio::sync::mpsc::channel::<Command>(32);
let (tx, rx) = tokio::sync::mpsc::channel::<Command>(32);

let store = Store {
path,
Expand All @@ -116,77 +117,10 @@ impl Store {
commands_tx: tx,
};

{
let store = store.clone();
std::thread::spawn(move || {
let mut subscribers: Vec<tokio::sync::mpsc::Sender<Frame>> = Vec::new();
'outer: while let Some(command) = rx.blocking_recv() {
match command {
Command::Read(tx, options) => {
if !options.tail {
let range = match &options.last_id {
Some(last_id) => (
Bound::Excluded(last_id.to_bytes()),
Bound::<[u8; 16]>::Unbounded,
),
None => (Bound::Unbounded, Bound::Unbounded),
};
for record in store.partition.range(range) {
let record = record.unwrap();

let frame: Frame = match serde_json::from_slice(&record.1) {
Ok(frame) => frame,
Err(e) => {
let key = std::str::from_utf8(&record.0).unwrap();
let value = std::str::from_utf8(&record.1).unwrap();
panic!(
"Failed to deserialize frame: {} {} {}",
e, key, value
);
}
};

if tx.blocking_send(frame).is_err() {
continue 'outer;
}
}
}

match options.follow {
FollowOption::On | FollowOption::WithHeartbeat(_) => {
if !options.tail {
let frame = Frame {
id: scru128::new(),
topic: "xs.threshold".into(),
hash: None,
meta: None,
};
if tx.blocking_send(frame).is_err() {
continue 'outer;
}
}
subscribers.push(tx);
}
FollowOption::Off => {
// Do nothing
}
}
}
Command::Append(frame) => {
// subscribers.retain(|tx| tx.blocking_send(frame.clone()).is_ok());
subscribers.retain(|tx| {
if tx.blocking_send(frame.clone()).is_ok() {
true
} else {
tracing::error!("Subscriber not retained");
false
}
});
}
}
}
});
}
let store_clone = store.clone();
std::thread::spawn(move || {
handle_commands(store_clone, rx);
});

store
}
Expand Down Expand Up @@ -266,6 +200,83 @@ impl Store {
}
}

fn handle_commands(store: Store, mut rx: tokio::sync::mpsc::Receiver<Command>) {
let mut subscribers: Vec<tokio::sync::mpsc::Sender<Frame>> = Vec::new();
while let Some(command) = rx.blocking_recv() {
match command {
Command::Read(tx, options) => {
handle_read_command(&store, &tx, &options, &mut subscribers);
}
Command::Append(frame) => {
handle_append_command(&mut subscribers, frame);
}
}
}
}

fn handle_read_command(
store: &Store,
tx: &tokio::sync::mpsc::Sender<Frame>,
options: &ReadOptions,
subscribers: &mut Vec<tokio::sync::mpsc::Sender<Frame>>,
) {
if !options.tail {
let range = get_range(options.last_id.as_ref());
for record in store.partition.range(range) {
let frame = deserialize_frame(record.unwrap());
if tx.blocking_send(frame).is_err() {
return;
}
}
}

match options.follow {
FollowOption::On | FollowOption::WithHeartbeat(_) => {
if !options.tail {
send_threshold_frame(tx);
}
subscribers.push(tx.clone());
}
FollowOption::Off => {}
}
}

fn get_range(last_id: Option<&Scru128Id>) -> (Bound<Vec<u8>>, Bound<Vec<u8>>) {
match last_id {
Some(last_id) => (Bound::Excluded(last_id.as_bytes().to_vec()), Bound::Unbounded),
None => (Bound::Unbounded, Bound::Unbounded),
}
}

fn deserialize_frame(record: (Arc<[u8]>, Arc<[u8]>)) -> Frame {
serde_json::from_slice(&record.1).unwrap_or_else(|e| {
let key = std::str::from_utf8(&record.0).unwrap();
let value = std::str::from_utf8(&record.1).unwrap();
panic!("Failed to deserialize frame: {} {} {}", e, key, value)
})
}

fn send_threshold_frame(tx: &tokio::sync::mpsc::Sender<Frame>) {
let frame = Frame {
id: scru128::new(),
topic: "xs.threshold".into(),
hash: None,
meta: None,
};
let _ = tx.blocking_send(frame);
}

fn handle_append_command(subscribers: &mut Vec<tokio::sync::mpsc::Sender<Frame>>, frame: Frame) {
subscribers.retain(|tx| {
if tx.blocking_send(frame.clone()).is_ok() {
true
} else {
tracing::error!("Subscriber not retained");
false
}
});
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down

0 comments on commit 44df54c

Please sign in to comment.