diff --git a/src/channel.rs b/src/channel.rs index 4f13747..e0c5de8 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -65,7 +65,7 @@ pub mod rb { } // implementation of Producer and Consumer for crossbeam::channel -mod ch { +pub mod ch { use super::Event; pub struct ChSender(pub crossbeam::channel::Sender>); @@ -77,8 +77,7 @@ mod ch { impl crate::channel::Sender> for ChSender { type SendError=crossbeam::channel::SendError>; - // Here's where we actually do something with the json event - // That is, decouple the handling of the parse events, from the actual parsing stream. + // Convert a Json fn send<'a>(&mut self, ev: Box>) -> Result<(), Self::SendError> { self.0.send(*ev) } diff --git a/src/shredder.rs b/src/shredder.rs index a8b1943..182f513 100644 --- a/src/shredder.rs +++ b/src/shredder.rs @@ -225,22 +225,27 @@ where S : AsRef + std::convert::AsRef + std::fmt::Debug // Create ShredWriter first, because it might want to stop things. let mut writer : ShredWriter> = ShredWriter::new(dir, "mpk"); - use crate::plain::Plain; - // The event that will be sent across the channel - // type ChEvent<'a> = sender::Event<::V<'a>>; - - // this seems to be about optimal wrt performance - const CHANNEL_SIZE : usize = 8192; - // let (tx, rx) = crossbeam::channel::bounded(CHANNEL_SIZE); - let (tx, rx) = rtrb::RingBuffer::new(CHANNEL_SIZE); - - use crate::channel::Consumer; - let (mut tx, mut rx) = (crate::channel::rb::RbProducer(tx), crate::channel::rb::RbConsumer(rx,std::thread::current())); + // Crossbeam Channel + let (tx, rx) = { + // this seems to be about optimal wrt performance + const CHANNEL_SIZE : usize = 8192; + let (tx, rx) = crossbeam::channel::bounded::>>(CHANNEL_SIZE); + (tx, rx) + }; + // Ring Buffer +/* let (mut tx, mut rx) = { + // this seems to be about optimal wrt performance + const CHANNEL_SIZE : usize = 8192; + // let (tx, rx) = crossbeam::channel::bounded(CHANNEL_SIZE); + let (tx, rx) = rtrb::RingBuffer::new(CHANNEL_SIZE); + (crate::channel::rb::RbProducer(tx), crate::channel::rb::RbConsumer(rx,std::thread::current())) + }; +*/ // consumer thread let cons_thr = { - // use crate::shredder::ShredWriter; std::thread::Builder::new().name("jch recv".into()).spawn(move || { + // use crate::channel::Consumer; while let Ok(ref event) = rx.recv() { use sender::Event; let msgpacked_event = match event { @@ -261,13 +266,15 @@ where S : AsRef + std::convert::AsRef + std::fmt::Debug let mut jevstream = parser::JsonEventParser::new(istream); // This will send `sender::Event` over the channel + use crate::plain::Plain; let visitor = Plain(|_| true); + use crate::channel::ch::ChSender; + let mut tx : ChSender<::V<'_>> = ChSender(tx); visitor.value(&mut jevstream, JsonPath::new(), 0, &mut tx).unwrap_or_else(|_| println!("uhoh")); - // inner tx dropped automatically here + // tx dropped automatically here, so channel gets closed } // done with the weird hoops - drop(tx); cons_thr.join().unwrap(); }