diff --git a/Cargo.toml b/Cargo.toml index e07c83b..e85be44 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,4 +29,3 @@ cxx = "1.0" [build-dependencies] cxx-build = "1.0" - diff --git a/src/main.rs b/src/main.rs index e399ff0..b6d2b94 100644 --- a/src/main.rs +++ b/src/main.rs @@ -64,7 +64,7 @@ fn main() { } [ "-m", "-c", dir, rst @ ..] => shredder::channel_shred(&std::path::PathBuf::from(dir), rst), [ "-m", dir, rst @ ..] => shredder::shred(&std::path::PathBuf::from(dir), rst), - [ "-r", "-f", filename, _rst @ ..] => jch::rapid::parse_file(filename), + [ "-r", "-f", filename, _rst @ ..] => jch::rapid::schema_from_file(filename), [ "-r", rst @ ..] => { let istream = jch::make_readable(rst); jch::rapid::parse(istream) diff --git a/src/rapid.rs b/src/rapid.rs index 409f207..413069a 100644 --- a/src/rapid.rs +++ b/src/rapid.rs @@ -1,3 +1,4 @@ +// Because the rapidjson handler needs PascalCase method names. #![allow(non_snake_case)] use std::ffi::c_char; @@ -72,36 +73,69 @@ impl RustStream { unsafe fn PutEnd(self : &mut RustStream, stuff : *mut c_char) -> usize { unimplemented!("PutEnd not necessary for read-only stream") } } -pub struct RustHandler; +// convert rapidjson values to JsonEvents and send to a channel. +pub struct RustHandler { + tx : crossbeam::channel::Sender> +} + +use crate::parser::JsonEvent; impl RustHandler { + pub fn new(tx : crossbeam::channel::Sender>) -> Self { + Self{tx} + } + + pub fn close(self) { + drop(self.tx) + } + + // shim to ease the forwarding + #[inline] + fn send(&self, jev : JsonEvent) -> bool { + match self.tx.send(jev) { + Ok(()) => true, + Err(_) => false, + } + } + // return value for all of these is true -> continue parsing; false -> halt parsing - fn Null(self : &RustHandler) -> bool { println!("null"); true } - fn Bool(self : &RustHandler, val : bool) -> bool { println!("bool {val}"); true } - fn Int(self : &RustHandler, val : i32) -> bool { println!("int {val}"); true } - fn Uint(self : &RustHandler, val : u64) -> bool { println!("uint {val}"); true } - fn Int64(self : &RustHandler, val : i64) -> bool { println!("int64 {val}"); true } - fn Uint64(self : &RustHandler, val : i64) -> bool { println!("uint64 {val}"); true } - fn Double(self : &RustHandler, val : f64) -> bool { println!("double {val}"); true } - fn RawNumber(self : &RustHandler, val : *const c_char, length : usize, copy : bool) -> bool { println!("number {length}:{copy}:{val:?}"); true } - fn String(self : &RustHandler, val : *const c_char, length : usize, copy : bool) -> bool { + // + // the _copy parameter appears to be always 'true', which I take to mean "it's not safe to keep a reference to this parameter" + fn Null(self : &RustHandler) -> bool { self.send(JsonEvent::Null) } + fn Bool(self : &RustHandler, val : bool) -> bool { self.send(JsonEvent::Boolean(val)) } + + // All the number types. + // + // TODO rapidjson has already parsed these strings into numbers, so it's + // wasteful to convert them back to strings. + fn Int(self : &RustHandler, val : i32) -> bool { self.send(JsonEvent::Number(format!("{val}"))) } + fn Uint(self : &RustHandler, val : u64) -> bool { self.send(JsonEvent::Number(format!("{val}"))) } + fn Int64(self : &RustHandler, val : i64) -> bool { self.send(JsonEvent::Number(format!("{val}"))) } + fn Uint64(self : &RustHandler, val : i64) -> bool { self.send(JsonEvent::Number(format!("{val}"))) } + fn Double(self : &RustHandler, val : f64) -> bool { self.send(JsonEvent::Number(format!("{val}"))) } + fn RawNumber(self : &RustHandler, val : *const c_char, length : usize, _copy : bool) -> bool { + let val = unsafe { std::slice::from_raw_parts(val as *const u8, length) }; + let val = unsafe { std::str::from_utf8_unchecked(val) }; + self.send(JsonEvent::Number(val.into())) + } + + fn String(self : &RustHandler, val : *const c_char, length : usize, _copy : bool) -> bool { // TODO there must be a cxx.rss builtin for this let val = unsafe { std::slice::from_raw_parts(val as *const u8, length) }; let val = unsafe { std::str::from_utf8_unchecked(val) }; - println!("string {length}:{copy}:{val}", ); - true + self.send(JsonEvent::String(val.into())) } - fn StartObject(self : &RustHandler) -> bool { println!("start obj"); true } - fn Key(self : &RustHandler, val : *const c_char, length : usize, copy : bool) -> bool { + + fn StartObject(self : &RustHandler) -> bool { self.send(JsonEvent::StartObject) } + fn Key(self : &RustHandler, val : *const c_char, length : usize, _copy : bool) -> bool { // TODO there must be a cxx.rss builtin for this let val = unsafe { std::slice::from_raw_parts(val as *const u8, length) }; let val = unsafe { std::str::from_utf8_unchecked(val) }; - println!("key {length}:{copy}:{val}", ); - true + self.send(JsonEvent::ObjectKey(val.into())) } - fn EndObject(self : &RustHandler, member_count : usize) -> bool { println!("end obj {member_count}"); true } - fn StartArray(self : &RustHandler) -> bool { println!("start ary"); true } - fn EndArray(self : &RustHandler, element_count : usize) -> bool { println!("end ary {element_count}"); true } + fn EndObject(self : &RustHandler, _member_count : usize) -> bool { self.send(JsonEvent::EndObject) } + fn StartArray(self : &RustHandler) -> bool { self.send(JsonEvent::StartArray) } + fn EndArray(self : &RustHandler, _element_count : usize) -> bool { self.send(JsonEvent::EndArray) } } // can also have @@ -158,15 +192,64 @@ pub mod ffi { } } +// This seems to be around optimal +const CROSSBEAM_CHANNEL_BOUND : usize = (2usize).pow(12); + +/// parse events via our implementation of a rapidjson::Stream. +/// It's quite slow compared to letting rapidjson handle the file reading. pub fn parse( istream : Box ) { // let src : &[u8] = r#"{"one": "uno", "two": 2, "tre": false}"#.as_bytes(); // let istream = Box::new(src); + let (tx,rx) = crossbeam::channel::bounded(CROSSBEAM_CHANNEL_BOUND); + + let cons_thr = std::thread::spawn(move || { + while let Ok(event) = rx.recv() { + println!("{event:?}"); + } + }); + let mut reader = RustStream::new(istream); - let mut handler = RustHandler; + let mut handler = RustHandler::new(tx); ffi::parse(&mut handler, &mut reader); + + cons_thr.join().unwrap() } -pub fn parse_file( filename : &str ) { - let mut handler = RustHandler; +/// Shim to present a channel as a JsonEvents pull source. +struct ChannelStreamer(crossbeam::channel::Receiver>); + +use crate::parser::JsonEvents; + +impl<'l> JsonEvents<'_,String> for ChannelStreamer { + #[inline] + fn next_event<'a>(&'a mut self) -> std::result::Result, Box<(dyn std::error::Error + 'static)>> { + use crossbeam::channel::RecvError; + match self.0.recv() { + Ok(jev) => Ok(jev), + Err(RecvError) => Ok(JsonEvent::Eof), + } + } +} + +/// This constructs a rapidjson parser from the filename, thereby maximising read performance, +/// and then sends event from rapidjson's push parser to a channel, which feeds our pull-oriented schema calculator. +/// +/// In fact using this setup, the receive/schema thread is slower than the send/parser thread +/// which only operates at about 65% capacity. +pub fn schema_from_file( filename : &str ) { + let (tx,rx) = crossbeam::channel::bounded(CROSSBEAM_CHANNEL_BOUND); + + let cons_thr = std::thread::Builder::new().name("jch rapid recv".into()).spawn(move || { + let mut streamer = ChannelStreamer(rx); + crate::schema::schema(&mut std::io::stdout(), &mut streamer) + }); + + // it's no-go if the receive thread can't be created, so just die. + let cons_thr = cons_thr.expect("cannot create recv thread"); + + let mut handler = RustHandler::new(tx); ffi::from_file(filename.to_string(), &mut handler ); + // Shut down channel. Kak api because if you forget to call this, the thread just blocks. + handler.close(); + cons_thr.join().unwrap() }