Skip to content

Commit

Permalink
impedance-match push parser with pull processor
Browse files Browse the repository at this point in the history
Use a channel to send incoming events from push-style rapidjson handler to our pull-style schema calculator.
  • Loading branch information
djellemah committed May 2, 2024
1 parent 0a6ff78 commit 5b5b779
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 24 deletions.
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,3 @@ cxx = "1.0"

[build-dependencies]
cxx-build = "1.0"

2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ fn main() {
}
[ "-m", "-c", dir, rst @ ..] => shredder::channel_shred(&std::path::PathBuf::from(dir), rst),
[ "-m", dir, rst @ ..] => shredder::shred(&std::path::PathBuf::from(dir), rst),
[ "-r", "-f", filename, _rst @ ..] => jch::rapid::parse_file(filename),
[ "-r", "-f", filename, _rst @ ..] => jch::rapid::schema_from_file(filename),
[ "-r", rst @ ..] => {
let istream = jch::make_readable(rst);
jch::rapid::parse(istream)
Expand Down
127 changes: 105 additions & 22 deletions src/rapid.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// Because the rapidjson handler needs PascalCase method names.
#![allow(non_snake_case)]

use std::ffi::c_char;
Expand Down Expand Up @@ -72,36 +73,69 @@ impl RustStream {
unsafe fn PutEnd(self : &mut RustStream, stuff : *mut c_char) -> usize { unimplemented!("PutEnd not necessary for read-only stream") }
}

pub struct RustHandler;
// convert rapidjson values to JsonEvents and send to a channel.
pub struct RustHandler {
tx : crossbeam::channel::Sender<JsonEvent<String>>
}

use crate::parser::JsonEvent;

impl RustHandler {
pub fn new(tx : crossbeam::channel::Sender<JsonEvent<String>>) -> Self {
Self{tx}
}

pub fn close(self) {
drop(self.tx)
}

// shim to ease the forwarding
#[inline]
fn send(&self, jev : JsonEvent<String>) -> bool {
match self.tx.send(jev) {
Ok(()) => true,
Err(_) => false,
}
}

// return value for all of these is true -> continue parsing; false -> halt parsing
fn Null(self : &RustHandler) -> bool { println!("null"); true }
fn Bool(self : &RustHandler, val : bool) -> bool { println!("bool {val}"); true }
fn Int(self : &RustHandler, val : i32) -> bool { println!("int {val}"); true }
fn Uint(self : &RustHandler, val : u64) -> bool { println!("uint {val}"); true }
fn Int64(self : &RustHandler, val : i64) -> bool { println!("int64 {val}"); true }
fn Uint64(self : &RustHandler, val : i64) -> bool { println!("uint64 {val}"); true }
fn Double(self : &RustHandler, val : f64) -> bool { println!("double {val}"); true }
fn RawNumber(self : &RustHandler, val : *const c_char, length : usize, copy : bool) -> bool { println!("number {length}:{copy}:{val:?}"); true }
fn String(self : &RustHandler, val : *const c_char, length : usize, copy : bool) -> bool {
//
// the _copy parameter appears to be always 'true', which I take to mean "it's not safe to keep a reference to this parameter"
fn Null(self : &RustHandler) -> bool { self.send(JsonEvent::Null) }
fn Bool(self : &RustHandler, val : bool) -> bool { self.send(JsonEvent::Boolean(val)) }

// All the number types.
//
// TODO rapidjson has already parsed these strings into numbers, so it's
// wasteful to convert them back to strings.
fn Int(self : &RustHandler, val : i32) -> bool { self.send(JsonEvent::Number(format!("{val}"))) }
fn Uint(self : &RustHandler, val : u64) -> bool { self.send(JsonEvent::Number(format!("{val}"))) }
fn Int64(self : &RustHandler, val : i64) -> bool { self.send(JsonEvent::Number(format!("{val}"))) }
fn Uint64(self : &RustHandler, val : i64) -> bool { self.send(JsonEvent::Number(format!("{val}"))) }
fn Double(self : &RustHandler, val : f64) -> bool { self.send(JsonEvent::Number(format!("{val}"))) }
fn RawNumber(self : &RustHandler, val : *const c_char, length : usize, _copy : bool) -> bool {
let val = unsafe { std::slice::from_raw_parts(val as *const u8, length) };
let val = unsafe { std::str::from_utf8_unchecked(val) };
self.send(JsonEvent::Number(val.into()))
}

fn String(self : &RustHandler, val : *const c_char, length : usize, _copy : bool) -> bool {
// TODO there must be a cxx.rss builtin for this
let val = unsafe { std::slice::from_raw_parts(val as *const u8, length) };
let val = unsafe { std::str::from_utf8_unchecked(val) };
println!("string {length}:{copy}:{val}", );
true
self.send(JsonEvent::String(val.into()))
}
fn StartObject(self : &RustHandler) -> bool { println!("start obj"); true }
fn Key(self : &RustHandler, val : *const c_char, length : usize, copy : bool) -> bool {

fn StartObject(self : &RustHandler) -> bool { self.send(JsonEvent::StartObject) }
fn Key(self : &RustHandler, val : *const c_char, length : usize, _copy : bool) -> bool {
// TODO there must be a cxx.rss builtin for this
let val = unsafe { std::slice::from_raw_parts(val as *const u8, length) };
let val = unsafe { std::str::from_utf8_unchecked(val) };
println!("key {length}:{copy}:{val}", );
true
self.send(JsonEvent::ObjectKey(val.into()))
}
fn EndObject(self : &RustHandler, member_count : usize) -> bool { println!("end obj {member_count}"); true }
fn StartArray(self : &RustHandler) -> bool { println!("start ary"); true }
fn EndArray(self : &RustHandler, element_count : usize) -> bool { println!("end ary {element_count}"); true }
fn EndObject(self : &RustHandler, _member_count : usize) -> bool { self.send(JsonEvent::EndObject) }
fn StartArray(self : &RustHandler) -> bool { self.send(JsonEvent::StartArray) }
fn EndArray(self : &RustHandler, _element_count : usize) -> bool { self.send(JsonEvent::EndArray) }
}

// can also have
Expand Down Expand Up @@ -158,15 +192,64 @@ pub mod ffi {
}
}

// This seems to be around optimal
const CROSSBEAM_CHANNEL_BOUND : usize = (2usize).pow(12);

/// parse events via our implementation of a rapidjson::Stream.
/// It's quite slow compared to letting rapidjson handle the file reading.
pub fn parse( istream : Box<dyn std::io::BufRead> ) {
// let src : &[u8] = r#"{"one": "uno", "two": 2, "tre": false}"#.as_bytes();
// let istream = Box::new(src);
let (tx,rx) = crossbeam::channel::bounded(CROSSBEAM_CHANNEL_BOUND);

let cons_thr = std::thread::spawn(move || {
while let Ok(event) = rx.recv() {
println!("{event:?}");
}
});

let mut reader = RustStream::new(istream);
let mut handler = RustHandler;
let mut handler = RustHandler::new(tx);
ffi::parse(&mut handler, &mut reader);

cons_thr.join().unwrap()
}

pub fn parse_file( filename : &str ) {
let mut handler = RustHandler;
/// Shim to present a channel as a JsonEvents pull source.
struct ChannelStreamer(crossbeam::channel::Receiver<JsonEvent<String>>);

use crate::parser::JsonEvents;

impl<'l> JsonEvents<'_,String> for ChannelStreamer {
#[inline]
fn next_event<'a>(&'a mut self) -> std::result::Result<JsonEvent<std::string::String>, Box<(dyn std::error::Error + 'static)>> {
use crossbeam::channel::RecvError;
match self.0.recv() {
Ok(jev) => Ok(jev),
Err(RecvError) => Ok(JsonEvent::Eof),
}
}
}

/// This constructs a rapidjson parser from the filename, thereby maximising read performance,
/// and then sends event from rapidjson's push parser to a channel, which feeds our pull-oriented schema calculator.
///
/// In fact using this setup, the receive/schema thread is slower than the send/parser thread
/// which only operates at about 65% capacity.
pub fn schema_from_file( filename : &str ) {
let (tx,rx) = crossbeam::channel::bounded(CROSSBEAM_CHANNEL_BOUND);

let cons_thr = std::thread::Builder::new().name("jch rapid recv".into()).spawn(move || {
let mut streamer = ChannelStreamer(rx);
crate::schema::schema(&mut std::io::stdout(), &mut streamer)
});

// it's no-go if the receive thread can't be created, so just die.
let cons_thr = cons_thr.expect("cannot create recv thread");

let mut handler = RustHandler::new(tx);
ffi::from_file(filename.to_string(), &mut handler );
// Shut down channel. Kak api because if you forget to call this, the thread just blocks.
handler.close();
cons_thr.join().unwrap()
}

0 comments on commit 5b5b779

Please sign in to comment.