diff --git a/crates/core/src/tracing.rs b/crates/core/src/tracing.rs index 662ecf590..0196489d6 100644 --- a/crates/core/src/tracing.rs +++ b/crates/core/src/tracing.rs @@ -1,23 +1,11 @@ -use std::{ - io, - path::{Path, PathBuf}, - sync::Arc, - time::SystemTime, -}; +use std::{path::PathBuf, sync::Arc, time::SystemTime}; use chrono::{DateTime, Utc}; use either::Either; use freenet_stdlib::prelude::*; use futures::{future::BoxFuture, FutureExt}; use serde::{Deserialize, Serialize}; -use tokio::{ - fs::OpenOptions, - net::TcpStream, - sync::{ - mpsc::{self}, - Mutex, - }, -}; +use tokio::{net::TcpStream, sync::mpsc}; use tokio_tungstenite::{MaybeTlsStream, WebSocketStream}; use crate::{ @@ -38,6 +26,9 @@ pub(crate) use test::TestEventListener; use crate::node::OpManager; +/// An append-only log for network events. +mod aof; + #[derive(Debug, Clone, Copy)] #[allow(dead_code)] struct ListenerLogId(usize); @@ -375,20 +366,10 @@ pub(crate) struct EventRegister { /// Records from a new session must have higher than this ts. static NEW_RECORDS_TS: std::sync::OnceLock = std::sync::OnceLock::new(); -static FILE_LOCK: Mutex<()> = Mutex::const_new(()); - -const EVENT_REGISTER_BATCH_SIZE: usize = 100; const DEFAULT_METRICS_SERVER_PORT: u16 = 55010; impl EventRegister { - #[cfg(not(test))] - const MAX_LOG_RECORDS: usize = 100_000; - #[cfg(test)] - const MAX_LOG_RECORDS: usize = 10_000; - - const BATCH_SIZE: usize = EVENT_REGISTER_BATCH_SIZE; - pub fn new(event_log_path: PathBuf) -> Self { let (log_sender, log_recv) = mpsc::channel(1000); NEW_RECORDS_TS.get_or_init(SystemTime::now); @@ -407,24 +388,13 @@ impl EventRegister { use futures::StreamExt; tokio::time::sleep(std::time::Duration::from_millis(200)).await; // wait for the node to start - let mut event_log = match OpenOptions::new() - .write(true) - .read(true) - .open(&*event_log_path) - .await - { + let mut event_log = match aof::LogFile::open(event_log_path.as_path()).await { Ok(file) => file, Err(err) => { tracing::error!("Failed openning log file {:?} with: {err}", event_log_path); panic!("Failed openning log file"); // fixme: propagate this to the main event loop } }; - let mut num_written = 0; - let mut log_batch = Vec::with_capacity(Self::BATCH_SIZE); - - let mut num_recs = Self::num_lines(event_log_path.as_path()) - .await - .expect("non IO error"); let mut ws = connect_to_metrics_server().await; @@ -440,7 +410,7 @@ impl EventRegister { if let Some(ws) = ws.as_mut() { send_to_metrics_server(ws, &log).await; } - Self::persist_log(&mut log_batch, &mut num_written, &mut num_recs, &mut event_log, log).await; + event_log.persist_log(log).await; } ws_msg = ws_recv => { if let Some((ws, ws_msg)) = ws.as_mut().zip(ws_msg) { @@ -451,248 +421,21 @@ impl EventRegister { } // store remaining logs - let mut batch_serialized_data = Vec::with_capacity(log_batch.len() * 1024); - for log_item in log_batch { - let mut serialized = match bincode::serialize(&log_item) { - Err(err) => { - tracing::error!("Failed serializing log: {err}"); - break; - } - Ok(serialized) => serialized, - }; - { - use byteorder::{BigEndian, WriteBytesExt}; - batch_serialized_data - .write_u32::(serialized.len() as u32) - .expect("enough memory"); - } - batch_serialized_data.append(&mut serialized); - } - if !batch_serialized_data.is_empty() { - use tokio::io::AsyncWriteExt; - let _guard = FILE_LOCK.lock().await; - if let Err(err) = event_log.write_all(&batch_serialized_data).await { - tracing::error!("Failed writting to event log: {err}"); - panic!("Failed writting event log"); - } - } - } - - async fn persist_log( - log_batch: &mut Vec, - num_written: &mut usize, - num_recs: &mut usize, - event_log: &mut tokio::fs::File, - log: NetLogMessage, - ) { - log_batch.push(log); - let mut batch_buf = vec![]; - - if log_batch.len() >= Self::BATCH_SIZE { - let num_logs: usize = log_batch.len(); - let moved_batch = std::mem::replace(log_batch, Vec::with_capacity(Self::BATCH_SIZE)); - let serialization_task = tokio::task::spawn_blocking(move || { - let mut batch_serialized_data = Vec::with_capacity(Self::BATCH_SIZE * 1024); - for log_item in &moved_batch { - let mut serialized = match bincode::serialize(log_item) { - Err(err) => { - tracing::error!("Failed serializing log: {err}"); - return Err(err); - } - Ok(serialized) => serialized, - }; - { - use byteorder::{BigEndian, WriteBytesExt}; - batch_serialized_data - .write_u32::(serialized.len() as u32) - .expect("enough memory"); - } - batch_serialized_data.append(&mut serialized); - } - Ok(batch_serialized_data) - }); - - match serialization_task.await { - Ok(Ok(serialized_data)) => { - // tracing::debug!(bytes = %serialized_data.len(), %num_logs, "serialized logs"); - batch_buf = serialized_data; - *num_written += num_logs; - log_batch.clear(); // Clear the batch for new data - } - _ => { - panic!("Failed serializing log"); - } - } - } - - if *num_written >= Self::BATCH_SIZE { - { - use tokio::io::AsyncWriteExt; - let _guard = FILE_LOCK.lock().await; - if let Err(err) = event_log.write_all(&batch_buf).await { - tracing::error!("Failed writting to event log: {err}"); + let moved_batch = std::mem::replace(&mut event_log.batch, aof::Batch::new(aof::BATCH_SIZE)); + let batch_writes = moved_batch.num_writes; + match aof::LogFile::encode_batch(&moved_batch) { + Ok(batch_serialized_data) => { + if !batch_serialized_data.is_empty() + && event_log.write_all(&batch_serialized_data).await.is_err() + { panic!("Failed writting event log"); } + event_log.update_recs(batch_writes); } - *num_recs += *num_written; - *num_written = 0; - } - - // Check the number of lines and truncate if needed - if *num_recs > Self::MAX_LOG_RECORDS { - const REMOVE_RECS: usize = 1000 + EVENT_REGISTER_BATCH_SIZE; // making space for 1000 new records - if let Err(err) = Self::truncate_records(event_log, REMOVE_RECS).await { - tracing::error!("Failed truncating log file: {:?}", err); - panic!("Failed truncating log file"); - } - *num_recs -= REMOVE_RECS; - } - } - - async fn num_lines(path: &Path) -> io::Result { - use tokio::fs::File; - use tokio::io::{AsyncReadExt, AsyncSeekExt}; - - let mut file = tokio::io::BufReader::new(File::open(path).await?); - let mut num_records = 0; - let mut buf = [0; 4]; // Read the u32 length prefix - - loop { - let bytes_read = file.read_exact(&mut buf).await; - if bytes_read.is_err() { - break; - } - num_records += 1; - - // Seek to the next record without reading its contents - let length = u32::from_le_bytes(buf) as u64; - if (file.seek(io::SeekFrom::Current(length as i64)).await).is_err() { - break; - } - } - - Ok(num_records) - } - - async fn truncate_records( - file: &mut tokio::fs::File, - remove_records: usize, - ) -> Result<(), Box> { - use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; - - let _guard = FILE_LOCK.lock().await; - file.rewind().await?; - // tracing::debug!(position = file.stream_position().await.unwrap()); - let mut records_count = 0; - while records_count < remove_records { - let mut length_bytes = [0u8; 4]; - if let Err(error) = file.read_exact(&mut length_bytes).await { - if matches!(error.kind(), io::ErrorKind::UnexpectedEof) { - break; - } - let pos = file.stream_position().await; - tracing::error!(%error, ?pos, "error while trying to read file"); - return Err(error.into()); - } - let length = u32::from_be_bytes(length_bytes); - if let Err(error) = file.seek(io::SeekFrom::Current(length as i64)).await { - if matches!(error.kind(), io::ErrorKind::UnexpectedEof) { - break; - } - let pos = file.stream_position().await; - tracing::error!(%error, ?pos, "error while trying to read file"); - return Err(error.into()); - } - records_count += 1; - } - - // Copy the rest of the file to the buffer - let mut buffer = Vec::new(); - if let Err(error) = file.read_to_end(&mut buffer).await { - if !matches!(error.kind(), io::ErrorKind::UnexpectedEof) { - let pos = file.stream_position().await; - tracing::error!(%error, ?pos, "error while trying to read file"); - return Err(error.into()); + Err(err) => { + tracing::error!("Failed encode batch: {err}"); } } - - // Seek back to the beginning and write the remaining content - file.rewind().await?; - file.write_all(&buffer).await?; - - // Truncate the file to the new size - file.set_len(buffer.len() as u64).await?; - file.seek(io::SeekFrom::End(0)).await?; - Ok(()) - } - - pub async fn get_router_events( - max_event_number: usize, - event_log_path: &Path, - ) -> Result, DynError> { - use tokio::io::{AsyncReadExt, AsyncSeekExt}; - const MAX_EVENT_HISTORY: usize = 10_000; - let event_num = max_event_number.min(MAX_EVENT_HISTORY); - - // tracing::info!(?event_log_path); - let _guard: tokio::sync::MutexGuard<'_, ()> = FILE_LOCK.lock().await; - let mut file = - tokio::io::BufReader::new(OpenOptions::new().read(true).open(event_log_path).await?); - - let new_records_ts = NEW_RECORDS_TS - .get() - .expect("set on initialization") - .duration_since(std::time::UNIX_EPOCH) - .expect("should be older than unix epoch") - .as_secs() as i64; - - let mut records = Vec::with_capacity(event_num); - while records.len() < event_num { - // Read the length prefix - let length = match file.read_u32().await { - Ok(l) => l, - Err(error) => { - if !matches!(error.kind(), io::ErrorKind::UnexpectedEof) { - let pos = file.stream_position().await; - tracing::error!(%error, ?pos, "error while trying to read file"); - return Err(error.into()); - } else { - break; - } - } - }; - let mut buf = vec![0; length as usize]; - file.read_exact(&mut buf).await?; - records.push(buf); - if records.len() == event_num { - break; - } - } - - if records.is_empty() { - return Ok(vec![]); - } - - let deserialized_records = tokio::task::spawn_blocking(move || { - let mut filtered = vec![]; - for buf in records { - let record: NetLogMessage = bincode::deserialize(&buf).map_err(|e| { - tracing::error!(?buf, "deserialization error"); - e - })?; - // tracing::info!(?record); - if let EventKind::Route(outcome) = record.kind { - let record_ts = record.datetime.timestamp(); - if record_ts >= new_records_ts { - filtered.push(outcome); - } - } - } - Ok::<_, DynError>(filtered) - }) - .await??; - - Ok(deserialized_records) } } @@ -718,7 +461,7 @@ impl NetEventRegister for EventRegister { } fn get_router_events(&self, number: usize) -> BoxFuture, DynError>> { - async move { EventRegister::get_router_events(number, &self.log_file).await }.boxed() + async move { aof::LogFile::get_router_events(number, &self.log_file).await }.boxed() } } @@ -1094,6 +837,7 @@ mod opentelemetry_tracer { #[derive(Serialize, Deserialize, Debug, Clone)] #[cfg_attr(test, derive(arbitrary::Arbitrary))] +#[non_exhaustive] // todo: make this take by ref instead, probably will need an owned version enum EventKind { Connect(ConnectEvent), @@ -1114,6 +858,28 @@ enum EventKind { }, } +impl EventKind { + const CONNECT: u8 = 0; + const PUT: u8 = 1; + const GET: u8 = 2; + const ROUTE: u8 = 3; + const SUBSCRIBED: u8 = 4; + const IGNORED: u8 = 5; + const DISCONNECTED: u8 = 6; + + const fn varint_id(&self) -> u8 { + match self { + EventKind::Connect(_) => Self::CONNECT, + EventKind::Put(_) => Self::PUT, + EventKind::Get { .. } => Self::GET, + EventKind::Route(_) => Self::ROUTE, + EventKind::Subscribed { .. } => Self::SUBSCRIBED, + EventKind::Ignored => Self::IGNORED, + EventKind::Disconnected { .. } => Self::DISCONNECTED, + } + } +} + #[derive(Serialize, Deserialize, Debug, Clone)] #[cfg_attr(test, derive(arbitrary::Arbitrary))] enum ConnectEvent { @@ -1259,52 +1025,6 @@ pub(super) mod test { static LOG_ID: AtomicUsize = AtomicUsize::new(0); - #[tokio::test(flavor = "multi_thread", worker_threads = 4)] - async fn event_register_read_write() -> Result<(), DynError> { - // use tracing::level_filters::LevelFilter; - // crate::config::set_logger(Some(LevelFilter::TRACE)); - use std::time::Duration; - let temp_dir = tempfile::tempdir()?; - let log_path = temp_dir.path().join("event_log"); - std::fs::File::create(&log_path)?; - - // force a truncation - const TEST_LOGS: usize = EventRegister::MAX_LOG_RECORDS + 100; - let register = EventRegister::new(log_path.clone()); - let bytes = crate::util::test::random_bytes_2mb(); - let mut gen = arbitrary::Unstructured::new(&bytes); - let mut transactions = vec![]; - let mut peers = vec![]; - let mut events = vec![]; - for _ in 0..TEST_LOGS { - let tx: Transaction = gen.arbitrary()?; - transactions.push(tx); - let peer: PeerId = PeerId::random(); - peers.push(peer); - } - let mut total_route_events: usize = 0; - for i in 0..TEST_LOGS { - let kind: EventKind = gen.arbitrary()?; - if matches!(kind, EventKind::Route(_)) { - total_route_events += 1; - } - events.push(NetEventLog { - tx: &transactions[i], - peer_id: peers[i].clone(), - kind, - }); - } - register.register_events(Either::Right(events)).await; - while register.log_sender.capacity() != 1000 { - tokio::time::sleep(Duration::from_millis(500)).await; - } - tokio::time::sleep(Duration::from_millis(1_000)).await; - let ev = - EventRegister::get_router_events(EventRegister::MAX_LOG_RECORDS, &log_path).await?; - assert_eq!(ev.len(), total_route_events); - Ok(()) - } - #[derive(Clone)] pub(crate) struct TestEventListener { node_labels: Arc>, diff --git a/crates/core/src/tracing/aof.rs b/crates/core/src/tracing/aof.rs new file mode 100644 index 000000000..a7e73867e --- /dev/null +++ b/crates/core/src/tracing/aof.rs @@ -0,0 +1,548 @@ +use byteorder::ByteOrder; +use tokio::{ + fs::{File, OpenOptions}, + io::{self, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWriteExt, BufReader, Error}, +}; + +use std::path::{Path, PathBuf}; + +use tokio::sync::Mutex; + +use super::{DynError, EventKind, NetLogMessage, RouteEvent, NEW_RECORDS_TS}; + +static FILE_LOCK: Mutex<()> = Mutex::const_new(()); + +const RECORD_LENGTH: usize = core::mem::size_of::(); +const EVENT_KIND_LENGTH: usize = 1; +const EVENT_LOG_HEADER_SIZE: usize = RECORD_LENGTH + EVENT_KIND_LENGTH; // len + varint id +#[cfg(not(test))] +pub(super) const MAX_LOG_RECORDS: usize = 100_000; +#[cfg(test)] +pub(super) const MAX_LOG_RECORDS: usize = 10_000; +pub(super) const REMOVE_RECS: usize = 1000 + EVENT_REGISTER_BATCH_SIZE; // making space for 1000 new records +const EVENT_REGISTER_BATCH_SIZE: usize = 100; +pub(super) const BATCH_SIZE: usize = EVENT_REGISTER_BATCH_SIZE; + +type DefaultEndian = byteorder::BigEndian; + +pub(super) struct Batch { + pub batch: Vec, + pub num_writes: usize, +} + +impl Batch { + #[inline] + pub fn new(cap: usize) -> Self { + Self { + batch: Vec::with_capacity(cap), + num_writes: 0, + } + } + + #[inline] + fn push(&mut self, log: NetLogMessage) { + self.num_writes += 1; + self.batch.push(log); + } + + #[inline] + pub fn len(&self) -> usize { + self.batch.len() + } + + #[allow(dead_code)] + #[inline] + fn is_empty(&self) -> bool { + self.batch.is_empty() + } + + #[inline] + fn clear(&mut self) { + self.batch.clear(); + self.num_writes = 0; + } +} + +pub(super) struct LogFile { + file: Option>, + path: PathBuf, + rewrite_path: PathBuf, + // make this configurable? + max_log_records: usize, + pub(super) batch: Batch, + num_writes: usize, + num_recs: usize, +} + +impl LogFile { + pub async fn open>(path: P) -> Result { + let path = path.as_ref(); + let file = OpenOptions::new() + .create(true) + .read(true) + .append(true) + .open(path) + .await?; + let mut file = BufReader::new(file); + let num_recs = Self::num_lines(&mut file).await.expect("non IO error"); + Ok(Self { + file: Some(file), + path: path.to_path_buf(), + rewrite_path: path.with_extension("rewrite"), + max_log_records: MAX_LOG_RECORDS, + batch: Batch { + batch: Vec::with_capacity(BATCH_SIZE), + num_writes: 0, + }, + num_writes: 0, + num_recs, + }) + } + + pub(super) fn update_recs(&mut self, recs: usize) { + self.num_recs += recs; + } + + pub fn encode_log( + log: &NetLogMessage, + ) -> bincode::Result<([u8; EVENT_LOG_HEADER_SIZE], Vec)> { + let serialized = bincode::serialize(&log)?; + let mut header = [0; EVENT_LOG_HEADER_SIZE]; + DefaultEndian::write_u32(&mut header, serialized.len() as u32); + header[4] = log.kind.varint_id(); // event kind + Ok((header, serialized)) + } + + async fn num_lines(file: &mut (impl AsyncRead + AsyncSeek + Unpin)) -> io::Result { + let mut num_records = 0; + + let mut buf = [0; EVENT_LOG_HEADER_SIZE]; // Read the u32 length prefix + u8 event kind + + loop { + let bytes_read = file.read_exact(&mut buf).await; + if bytes_read.is_err() { + break; + } + num_records += 1; + + // Seek to the next record without reading its contents + let length = DefaultEndian::read_u32(&buf[..4]) as u64; + + match buf[4] { + 0..=6 => {} + _ => { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "Unknown event kind", + )) + } + } + + if (file.seek(io::SeekFrom::Current(length as i64)).await).is_err() { + break; + } + } + + Ok(num_records) + } + + pub async fn persist_log(&mut self, log: NetLogMessage) { + self.batch.push(log); + let mut batch_buf = vec![]; + + if self.batch.len() >= BATCH_SIZE { + let moved_batch = std::mem::replace(&mut self.batch, Batch::new(BATCH_SIZE)); + let batch_writes = moved_batch.num_writes; + let serialization_task = + tokio::task::spawn_blocking(move || Self::encode_batch(&moved_batch)); + + match serialization_task.await { + Ok(Ok(serialized_data)) => { + batch_buf = serialized_data; + self.num_writes += batch_writes; + self.batch.clear(); // Clear the batch for new data + } + _ => { + panic!("Failed serializing log"); + } + } + } + + if self.num_writes >= BATCH_SIZE { + { + let res = self.write_all(&batch_buf).await; + if res.is_err() { + panic!("Failed writing to log file"); + } + } + self.num_recs += self.num_writes; + self.num_writes = 0; + } + + // Check the number of lines and truncate if needed + if self.num_recs > self.max_log_records { + if let Err(err) = self.truncate_records(REMOVE_RECS).await { + tracing::error!("Failed truncating log file: {:?}", err); + panic!("Failed truncating log file"); + } + } + } + + pub async fn truncate_records( + &mut self, + remove_records: usize, + ) -> Result<(), Box> { + let _guard = FILE_LOCK.lock().await; + let mut file = self.file.take().unwrap(); + file.rewind().await?; + file.get_mut().rewind().await?; + + let mut records_count = 0; + while records_count < remove_records { + let mut header = [0u8; EVENT_LOG_HEADER_SIZE]; + if let Err(error) = file.read_exact(&mut header).await { + if matches!(error.kind(), io::ErrorKind::UnexpectedEof) { + break; + } + let pos = file.stream_position().await; + tracing::error!(%error, ?pos, "error while trying to read file"); + return Err(error.into()); + } + let length = DefaultEndian::read_u32(&header[..4]); + if let Err(error) = file.seek(io::SeekFrom::Current(length as i64)).await { + if matches!(error.kind(), io::ErrorKind::UnexpectedEof) { + break; + } + let pos = file.stream_position().await; + tracing::error!(%error, ?pos, "error while trying to read file"); + return Err(error.into()); + } + records_count += 1; + } + + let mut bk = OpenOptions::new() + .create(true) + .truncate(true) + .write(true) + .read(true) + .open(&self.rewrite_path) + .await?; + + let mut num_recs = 0; + loop { + let mut header = [0u8; EVENT_LOG_HEADER_SIZE]; + if let Err(error) = file.read_exact(&mut header).await { + if matches!(error.kind(), io::ErrorKind::UnexpectedEof) { + break; + } + let pos = file.stream_position().await; + tracing::error!(%error, ?pos, "error while trying to read file"); + return Err(error.into()); + } + + let length = DefaultEndian::read_u32(&header[..4]); + let mut buf = vec![0u8; EVENT_LOG_HEADER_SIZE + length as usize]; + buf[..EVENT_LOG_HEADER_SIZE].copy_from_slice(&header); + if let Err(error) = file.read_exact(&mut buf[EVENT_LOG_HEADER_SIZE..]).await { + if matches!(error.kind(), io::ErrorKind::UnexpectedEof) { + break; + } + let pos = file.stream_position().await; + tracing::error!(%error, ?pos, "error while trying to read file"); + return Err(error.into()); + } + + num_recs += 1; + + if let Err(error) = bk.write_all(&buf).await { + let pos = bk.stream_position().await; + tracing::error!(%error, ?pos, "error while trying to write file"); + return Err(error.into()); + } + } + + self.num_recs = num_recs; + + drop(bk); + drop(file); + std::fs::remove_file(&self.path)?; + std::fs::rename(&self.rewrite_path, &self.path)?; + + self.file = Some(BufReader::new( + OpenOptions::new() + .read(true) + .append(true) + .write(true) + .open(&self.path) + .await?, + )); + + Ok(()) + } + + pub async fn get_router_events( + max_event_number: usize, + event_log_path: &Path, + ) -> Result, DynError> { + const MAX_EVENT_HISTORY: usize = 10_000; + let event_num = max_event_number.min(MAX_EVENT_HISTORY); + + let _guard: tokio::sync::MutexGuard<'_, ()> = FILE_LOCK.lock().await; + let mut file = BufReader::new(OpenOptions::new().read(true).open(event_log_path).await?); + + Self::get_router_events_in(event_num, &mut file).await + } + + async fn get_router_events_in( + event_num: usize, + file: &mut (impl AsyncRead + AsyncSeek + Unpin), + ) -> Result, DynError> { + let new_records_ts = NEW_RECORDS_TS + .get() + .expect("set on initialization") + .duration_since(std::time::UNIX_EPOCH) + .expect("should be older than unix epoch") + .as_secs() as i64; + + let mut records = Vec::with_capacity(event_num); + let mut num_records = 0; + while num_records < event_num { + let mut header = [0; EVENT_LOG_HEADER_SIZE]; + + // Read the length prefix + if let Err(error) = file.read_exact(&mut header).await { + if !matches!(error.kind(), io::ErrorKind::UnexpectedEof) { + let pos = file.stream_position().await; + tracing::error!(%error, ?pos, "error while trying to read file"); + return Err(error.into()); + } else { + break; + } + } + + let length = DefaultEndian::read_u32(&header[..4]); + if header[4] == EventKind::ROUTE { + let mut buf = vec![0; length as usize]; + file.read_exact(&mut buf).await?; + records.push(buf); + } else { + file.seek(io::SeekFrom::Current(length as i64)).await?; + } + + num_records += 1; + } + + if records.is_empty() { + return Ok(vec![]); + } + + let deserialized_records = tokio::task::spawn_blocking(move || { + let mut filtered = vec![]; + for buf in records { + let record: NetLogMessage = bincode::deserialize(&buf).map_err(|e| { + tracing::error!(?buf, "deserialization error"); + e + })?; + // tracing::info!(?record); + if let EventKind::Route(outcome) = record.kind { + let record_ts = record.datetime.timestamp(); + if record_ts >= new_records_ts { + filtered.push(outcome); + } + } + } + Ok::<_, DynError>(filtered) + }) + .await??; + + Ok(deserialized_records) + } + + pub async fn write_all(&mut self, data: &[u8]) -> io::Result<()> { + let _guard = FILE_LOCK.lock().await; + let file = self.file.as_mut().unwrap(); + if let Err(err) = file.get_mut().write_all(data).await { + tracing::error!("Failed writting to event log: {err}"); + return Err(err); + } + + if let Err(err) = file.get_mut().sync_all().await { + tracing::error!("Failed syncing event log: {err}"); + return Err(err); + } + Ok(()) + } + + pub fn encode_batch(batch: &Batch) -> bincode::Result> { + let mut batch_serialized_data = Vec::with_capacity(BATCH_SIZE * 1024); + for log_item in &batch.batch { + let (header, mut serialized) = match Self::encode_log(log_item) { + Err(err) => { + tracing::error!("Failed serializing log: {err}"); + return Err(err); + } + Ok(serialized) => serialized, + }; + + batch_serialized_data.extend_from_slice(&header); + batch_serialized_data.append(&mut serialized); + } + + Ok(batch_serialized_data) + } +} + +#[cfg(test)] +mod tests { + use std::time::SystemTime; + + use tracing::level_filters::LevelFilter; + + use crate::{ + dev_tool::{PeerId, Transaction}, + tracing::NetEventLog, + }; + + use super::*; + + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + async fn read_write() -> Result<(), DynError> { + NEW_RECORDS_TS.get_or_init(SystemTime::now); + crate::config::set_logger(Some(LevelFilter::TRACE)); + let temp_dir = tempfile::tempdir()?; + let log_path = temp_dir.path().join("event_log"); + + // force a truncation + const TEST_LOGS: usize = MAX_LOG_RECORDS; + + let mut log = LogFile::open(&log_path).await?; + let bytes = crate::util::test::random_bytes_2mb(); + let mut gen = arbitrary::Unstructured::new(&bytes); + let mut transactions = vec![]; + let mut peers = vec![]; + let mut events = vec![]; + + for _ in 0..TEST_LOGS { + let tx: Transaction = gen.arbitrary()?; + transactions.push(tx); + let peer: PeerId = PeerId::random(); + peers.push(peer); + } + let mut total_route_events: usize = 0; + + for i in 0..TEST_LOGS { + let kind: EventKind = gen.arbitrary()?; + // The route events in first REMOVE_RECS will be dropped + if matches!(kind, EventKind::Route(_)) { + total_route_events += 1; + } + events.push(NetEventLog { + tx: &transactions[i], + peer_id: peers[i].clone(), + kind, + }); + } + + for msg in NetLogMessage::to_log_message(either::Either::Right(events)) { + log.persist_log(msg).await; + } + + let ev = LogFile::get_router_events(TEST_LOGS, &log_path).await?; + assert_eq!(ev.len(), total_route_events); + Ok(()) + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + async fn read_write_small() -> Result<(), DynError> { + NEW_RECORDS_TS.get_or_init(SystemTime::now); + crate::config::set_logger(Some(LevelFilter::TRACE)); + let temp_dir = tempfile::tempdir()?; + let log_path = temp_dir.path().join("event_log"); + + // force a truncation + const TEST_LOGS: usize = 100; + + let mut log = LogFile::open(&log_path).await?; + let bytes = crate::util::test::random_bytes_2mb(); + let mut gen = arbitrary::Unstructured::new(&bytes); + let mut transactions = vec![]; + let mut peers = vec![]; + let mut events = vec![]; + + for _ in 0..TEST_LOGS { + let tx: Transaction = gen.arbitrary()?; + transactions.push(tx); + let peer: PeerId = PeerId::random(); + peers.push(peer); + } + let mut total_route_events: usize = 0; + + for i in 0..TEST_LOGS { + let kind: EventKind = gen.arbitrary()?; + // The route events in first REMOVE_RECS will be dropped + if matches!(kind, EventKind::Route(_)) { + total_route_events += 1; + } + events.push(NetEventLog { + tx: &transactions[i], + peer_id: peers[i].clone(), + kind, + }); + } + + for msg in NetLogMessage::to_log_message(either::Either::Right(events)) { + log.persist_log(msg).await; + } + + let ev = LogFile::get_router_events(TEST_LOGS, &log_path).await?; + assert_eq!(ev.len(), total_route_events); + Ok(()) + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + async fn read_write_truncate() -> Result<(), DynError> { + NEW_RECORDS_TS.get_or_init(SystemTime::now); + crate::config::set_logger(Some(LevelFilter::TRACE)); + let temp_dir = tempfile::tempdir()?; + let log_path = temp_dir.path().join("event_log"); + + // force a truncation + const TEST_LOGS: usize = MAX_LOG_RECORDS + 100; + + let mut log = LogFile::open(&log_path).await?; + let bytes = crate::util::test::random_bytes_2mb(); + let mut gen = arbitrary::Unstructured::new(&bytes); + let mut transactions = vec![]; + let mut peers = vec![]; + let mut events = vec![]; + + for _ in 0..TEST_LOGS { + let tx: Transaction = gen.arbitrary()?; + transactions.push(tx); + let peer: PeerId = PeerId::random(); + peers.push(peer); + } + let mut total_route_events: usize = 0; + + for i in 0..TEST_LOGS { + let kind: EventKind = gen.arbitrary()?; + // The route events in first REMOVE_RECS will be dropped + if matches!(kind, EventKind::Route(_)) && i >= REMOVE_RECS { + total_route_events += 1; + } + events.push(NetEventLog { + tx: &transactions[i], + peer_id: peers[i].clone(), + kind, + }); + } + + for msg in NetLogMessage::to_log_message(either::Either::Right(events)) { + log.persist_log(msg).await; + } + + let ev = LogFile::get_router_events(TEST_LOGS, &log_path).await?; + assert_eq!(ev.len(), total_route_events); + Ok(()) + } +}