Skip to content

Commit

Permalink
ringbuffer with thread::park works and is fast.
Browse files Browse the repository at this point in the history
  • Loading branch information
djellemah committed May 2, 2024
1 parent 5b5b779 commit 1fd27e9
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 23 deletions.
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ json-event-parser = {version = "0.2.0"}
crossbeam = "0.8"
rustc-hash = "1.1.0"
cxx = "1.0"
rtrb = "0.3"

[build-dependencies]
cxx-build = "1.0"
64 changes: 41 additions & 23 deletions src/rapid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,16 +73,20 @@ impl RustStream {
unsafe fn PutEnd(self : &mut RustStream, stuff : *mut c_char) -> usize { unimplemented!("PutEnd not necessary for read-only stream") }
}

use std::cell::RefCell;

// convert rapidjson values to JsonEvents and send to a channel.
pub struct RustHandler {
tx : crossbeam::channel::Sender<JsonEvent<String>>
pub struct RustHandler
{
tx : RefCell<rtrb::Producer<JsonEvent<String>>>,
}

use crate::parser::JsonEvent;

impl RustHandler {
pub fn new(tx : crossbeam::channel::Sender<JsonEvent<String>>) -> Self {
Self{tx}
pub fn new(tx : rtrb::Producer<JsonEvent<String>>) -> Self
{
Self{tx: RefCell::new(tx)}
}

pub fn close(self) {
Expand All @@ -92,10 +96,18 @@ impl RustHandler {
// shim to ease the forwarding
#[inline]
fn send(&self, jev : JsonEvent<String>) -> bool {
match self.tx.send(jev) {
Ok(()) => true,
Err(_) => false,
let mut tx = self.tx.borrow_mut();
while !tx.is_abandoned() {
let jev = jev.clone();
match tx.push(jev) {
Ok(()) => return true,
Err(rtrb::PushError::Full(_)) => {
// ringbuffer is full, so wait for signal from consumer
std::thread::park()
}
}
}
return false
}

// return value for all of these is true -> continue parsing; false -> halt parsing
Expand Down Expand Up @@ -193,17 +205,17 @@ pub mod ffi {
}

// This seems to be around optimal
const CROSSBEAM_CHANNEL_BOUND : usize = (2usize).pow(12);
const RING_BUFFER_BOUND : usize = (2usize).pow(12);

/// parse events via our implementation of a rapidjson::Stream.
/// It's quite slow compared to letting rapidjson handle the file reading.
pub fn parse( istream : Box<dyn std::io::BufRead> ) {
// let src : &[u8] = r#"{"one": "uno", "two": 2, "tre": false}"#.as_bytes();
// let istream = Box::new(src);
let (tx,rx) = crossbeam::channel::bounded(CROSSBEAM_CHANNEL_BOUND);
let (tx, mut rx) = rtrb::RingBuffer::new(RING_BUFFER_BOUND);

let cons_thr = std::thread::spawn(move || {
while let Ok(event) = rx.recv() {
while let Ok(event) = rx.pop() {
println!("{event:?}");
}
});
Expand All @@ -216,18 +228,24 @@ pub fn parse( istream : Box<dyn std::io::BufRead> ) {
}

/// Shim to present a channel as a JsonEvents pull source.
struct ChannelStreamer(crossbeam::channel::Receiver<JsonEvent<String>>);
struct ChannelStreamer(rtrb::Consumer<JsonEvent<String>>, std::thread::Thread);

use crate::parser::JsonEvents;

impl<'l> JsonEvents<'_,String> for ChannelStreamer {
#[inline]
fn next_event<'a>(&'a mut self) -> std::result::Result<JsonEvent<std::string::String>, Box<(dyn std::error::Error + 'static)>> {
use crossbeam::channel::RecvError;
match self.0.recv() {
Ok(jev) => Ok(jev),
Err(RecvError) => Ok(JsonEvent::Eof),
while !self.0.is_abandoned() {
match self.0.pop() {
Ok(jev) => return Ok(jev),
Err(rtrb::PopError::Empty) => {
// tell the producer to carry on
self.1.unpark();
continue
},
}
}
Ok(JsonEvent::Eof)
}
}

Expand All @@ -237,18 +255,18 @@ impl<'l> JsonEvents<'_,String> for ChannelStreamer {
/// In fact using this setup, the receive/schema thread is slower than the send/parser thread
/// which only operates at about 65% capacity.
pub fn schema_from_file( filename : &str ) {
let (tx,rx) = crossbeam::channel::bounded(CROSSBEAM_CHANNEL_BOUND);
let (tx, rx) = rtrb::RingBuffer::new(RING_BUFFER_BOUND);

let cons_thr = std::thread::Builder::new().name("jch rapid recv".into()).spawn(move || {
let mut streamer = ChannelStreamer(rx);
crate::schema::schema(&mut std::io::stdout(), &mut streamer)
});

// it's no-go if the receive thread can't be created, so just die.
let cons_thr = cons_thr.expect("cannot create recv thread");
let mut streamer = ChannelStreamer(rx, std::thread::current());
let cons_thr = std::thread::Builder::new()
.name("jch rapid recv".into())
.spawn( move || crate::schema::schema(&mut std::io::stdout(), &mut streamer) )
// it's no-go if the receive thread can't be created, so just die.
.expect("cannot create recv thread");

let mut handler = RustHandler::new(tx);
ffi::from_file(filename.to_string(), &mut handler );

// Shut down channel. Kak api because if you forget to call this, the thread just blocks.
handler.close();
cons_thr.join().unwrap()
Expand Down

0 comments on commit 1fd27e9

Please sign in to comment.