From 3860365c02cc99f7ed63c130500765b2bd114773 Mon Sep 17 00:00:00 2001 From: Ignacio Duart Date: Thu, 30 May 2024 14:27:14 +0200 Subject: [PATCH 01/14] Don't generate random peer_ids --- crates/core/src/tracing.rs | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/crates/core/src/tracing.rs b/crates/core/src/tracing.rs index 6790133c3..408492e38 100644 --- a/crates/core/src/tracing.rs +++ b/crates/core/src/tracing.rs @@ -1249,16 +1249,19 @@ pub(super) mod test { use dashmap::DashMap; use std::{ collections::HashMap, + net::{Ipv4Addr, SocketAddr}, sync::atomic::{AtomicUsize, Ordering::SeqCst}, }; + use tracing::level_filters::LevelFilter; use super::*; - use crate::{node::testing_impl::NodeLabel, ring::Distance}; + use crate::{dev_tool::TransportKeypair, node::testing_impl::NodeLabel, ring::Distance}; static LOG_ID: AtomicUsize = AtomicUsize::new(0); #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn event_register_read_write() -> Result<(), DynError> { + crate::config::set_logger(Some(LevelFilter::TRACE)); use std::time::Duration; let temp_dir = tempfile::tempdir()?; let log_path = temp_dir.path().join("event_log"); @@ -1272,13 +1275,17 @@ pub(super) mod test { let mut transactions = vec![]; let mut peers = vec![]; let mut events = vec![]; + let key = TransportKeypair::new(); + let pub_key = key.public(); + let socket: SocketAddr = (Ipv4Addr::LOCALHOST, 8080).into(); for _ in 0..TEST_LOGS { let tx: Transaction = gen.arbitrary()?; transactions.push(tx); - let peer: PeerId = gen.arbitrary()?; + let peer: PeerId = PeerId::new(socket, pub_key.clone()); peers.push(peer); } - let mut total_route_events = 0; + let mut total_route_events: usize = 0; + tracing::info!("generating logs"); for i in 0..TEST_LOGS { let kind: EventKind = gen.arbitrary()?; if matches!(kind, EventKind::Route(_)) { @@ -1290,7 +1297,9 @@ pub(super) mod test { kind, }); } + tracing::info!(?total_route_events); register.register_events(Either::Right(events)).await; + tracing::info!("waiting for logs to be written"); while register.log_sender.capacity() != 1000 { tokio::time::sleep(Duration::from_millis(500)).await; } From 9ff613705305b1caa54757e86a52a6b103c3f8fe Mon Sep 17 00:00:00 2001 From: Ignacio Duart Date: Thu, 30 May 2024 16:03:54 +0200 Subject: [PATCH 02/14] Random gen of peer id is more efficient now --- crates/core/src/node.rs | 32 ++++++++++++++++++++++++++++++-- crates/core/src/tracing.rs | 11 ++--------- 2 files changed, 32 insertions(+), 11 deletions(-) diff --git a/crates/core/src/node.rs b/crates/core/src/node.rs index d547a690f..0a07f869f 100644 --- a/crates/core/src/node.rs +++ b/crates/core/src/node.rs @@ -860,11 +860,27 @@ impl PeerId { } } +thread_local! { + static PEER_ID: std::cell::RefCell> = std::cell::RefCell::new(None); +} + #[cfg(test)] impl<'a> arbitrary::Arbitrary<'a> for PeerId { fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result { let addr: ([u8; 4], u16) = u.arbitrary()?; - let pub_key = TransportKeypair::new().public().clone(); // TODO: impl arbitrary for TransportPublicKey + + let pub_key = PEER_ID.with(|peer_id| { + let mut peer_id = peer_id.borrow_mut(); + match &*peer_id { + Some(k) => k.clone(), + None => { + let key = TransportKeypair::new().public().clone(); + peer_id.replace(key.clone()); + key + } + } + }); + Ok(Self { addr: addr.into(), pub_key, @@ -878,7 +894,19 @@ impl PeerId { let mut addr = [0; 4]; rand::thread_rng().fill(&mut addr[..]); let port = crate::util::get_free_port().unwrap(); - let pub_key = TransportKeypair::new().public().clone(); + + let pub_key = PEER_ID.with(|peer_id| { + let mut peer_id = peer_id.borrow_mut(); + match &*peer_id { + Some(k) => k.clone(), + None => { + let key = TransportKeypair::new().public().clone(); + peer_id.replace(key.clone()); + key + } + } + }); + Self { addr: (addr, port).into(), pub_key, diff --git a/crates/core/src/tracing.rs b/crates/core/src/tracing.rs index 408492e38..5fac9a65e 100644 --- a/crates/core/src/tracing.rs +++ b/crates/core/src/tracing.rs @@ -1249,13 +1249,12 @@ pub(super) mod test { use dashmap::DashMap; use std::{ collections::HashMap, - net::{Ipv4Addr, SocketAddr}, sync::atomic::{AtomicUsize, Ordering::SeqCst}, }; use tracing::level_filters::LevelFilter; use super::*; - use crate::{dev_tool::TransportKeypair, node::testing_impl::NodeLabel, ring::Distance}; + use crate::{node::testing_impl::NodeLabel, ring::Distance}; static LOG_ID: AtomicUsize = AtomicUsize::new(0); @@ -1275,17 +1274,13 @@ pub(super) mod test { let mut transactions = vec![]; let mut peers = vec![]; let mut events = vec![]; - let key = TransportKeypair::new(); - let pub_key = key.public(); - let socket: SocketAddr = (Ipv4Addr::LOCALHOST, 8080).into(); for _ in 0..TEST_LOGS { let tx: Transaction = gen.arbitrary()?; transactions.push(tx); - let peer: PeerId = PeerId::new(socket, pub_key.clone()); + let peer: PeerId = PeerId::random(); peers.push(peer); } let mut total_route_events: usize = 0; - tracing::info!("generating logs"); for i in 0..TEST_LOGS { let kind: EventKind = gen.arbitrary()?; if matches!(kind, EventKind::Route(_)) { @@ -1297,9 +1292,7 @@ pub(super) mod test { kind, }); } - tracing::info!(?total_route_events); register.register_events(Either::Right(events)).await; - tracing::info!("waiting for logs to be written"); while register.log_sender.capacity() != 1000 { tokio::time::sleep(Duration::from_millis(500)).await; } From d081844a03c95ec2bfdab6e7d047502f2270fa11 Mon Sep 17 00:00:00 2001 From: Al Liu Date: Sun, 2 Jun 2024 12:42:57 +0800 Subject: [PATCH 03/14] WIP --- crates/core/src/tracing/aof.rs | 35 ++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) create mode 100644 crates/core/src/tracing/aof.rs diff --git a/crates/core/src/tracing/aof.rs b/crates/core/src/tracing/aof.rs new file mode 100644 index 000000000..a917e62d2 --- /dev/null +++ b/crates/core/src/tracing/aof.rs @@ -0,0 +1,35 @@ +// use std::{ +// fs::{File, OpenOptions}, +// io::{Error, Write}, +// path::Path, +// }; + +// use super::NetLogMessage; + +// struct SerializedNetLogMessage { +// data: Vec, +// } + +// pub(super) struct LogFile { +// file: File, +// } + +// impl LogFile { +// pub fn open>(path: P) -> Result { +// let mut file = OpenOptions::new() +// .create(true) +// .read(true) +// .append(true) +// .open(path)?; +// todo!() +// } + +// pub fn append(&mut self, data: &[u8]) -> Result<(), Error> { +// self.file.write_all(data) +// } + +// /// Returns number of records +// pub fn replay(&mut self) -> Result { +// let mut len_buf = [0u8; 4]; +// } +// } From 2dceddfcbe81bf184a737deaed92cc000cacc8d9 Mon Sep 17 00:00:00 2001 From: Al Liu Date: Sun, 2 Jun 2024 12:43:05 +0800 Subject: [PATCH 04/14] WIP --- crates/core/src/tracing.rs | 44 +++++++++++++++++++++++++------------- 1 file changed, 29 insertions(+), 15 deletions(-) diff --git a/crates/core/src/tracing.rs b/crates/core/src/tracing.rs index 5fac9a65e..f5e0240bf 100644 --- a/crates/core/src/tracing.rs +++ b/crates/core/src/tracing.rs @@ -38,6 +38,9 @@ pub(crate) use test::TestEventListener; use crate::node::OpManager; +/// An append-only log for network events. +mod aof; + #[derive(Debug, Clone, Copy)] #[allow(dead_code)] struct ListenerLogId(usize); @@ -408,6 +411,7 @@ impl EventRegister { let mut event_log = match OpenOptions::new() .write(true) .read(true) + .append(true) .open(&*event_log_path) .await { @@ -523,6 +527,11 @@ impl EventRegister { } } + tracing::error!( + "num_written: {num_written}, num_recs: {num_recs}", + num_written = num_written, + num_recs = num_recs + ); if *num_written >= Self::BATCH_SIZE { { use tokio::io::AsyncWriteExt; @@ -531,20 +540,25 @@ impl EventRegister { tracing::error!("Failed writting to event log: {err}"); panic!("Failed writting event log"); } + + if let Err(err) = event_log.sync_all().await { + tracing::error!("Failed syncing event log: {err}"); + panic!("Failed syncing event log"); + } } *num_recs += *num_written; *num_written = 0; } // Check the number of lines and truncate if needed - if *num_recs > Self::MAX_LOG_RECORDS { - const REMOVE_RECS: usize = 1000 + EVENT_REGISTER_BATCH_SIZE; // making space for 1000 new records - if let Err(err) = Self::truncate_records(event_log, REMOVE_RECS).await { - tracing::error!("Failed truncating log file: {:?}", err); - panic!("Failed truncating log file"); - } - *num_recs -= REMOVE_RECS; - } + // if *num_recs > Self::MAX_LOG_RECORDS { + // const REMOVE_RECS: usize = 1000 + EVENT_REGISTER_BATCH_SIZE; // making space for 1000 new records + // if let Err(err) = Self::truncate_records(event_log, REMOVE_RECS).await { + // tracing::error!("Failed truncating log file: {:?}", err); + // panic!("Failed truncating log file"); + // } + // *num_recs -= REMOVE_RECS; + // } } async fn num_lines(path: &Path) -> io::Result { @@ -620,6 +634,7 @@ impl EventRegister { // Truncate the file to the new size file.set_len(buffer.len() as u64).await?; + file.sync_all().await?; file.seek(io::SeekFrom::End(0)).await?; Ok(()) } @@ -1267,7 +1282,7 @@ pub(super) mod test { std::fs::File::create(&log_path)?; // force a truncation - const TEST_LOGS: usize = EventRegister::MAX_LOG_RECORDS + 100; + const TEST_LOGS: usize = 100; let register = EventRegister::new(log_path.clone()); let bytes = crate::util::test::random_bytes_2mb(); let mut gen = arbitrary::Unstructured::new(&bytes); @@ -1293,12 +1308,11 @@ pub(super) mod test { }); } register.register_events(Either::Right(events)).await; - while register.log_sender.capacity() != 1000 { - tokio::time::sleep(Duration::from_millis(500)).await; - } - tokio::time::sleep(Duration::from_millis(1_000)).await; - let ev = - EventRegister::get_router_events(EventRegister::MAX_LOG_RECORDS, &log_path).await?; + // while register.log_sender.capacity() != 1000 { + // tokio::time::sleep(Duration::from_millis(500)).await; + // } + tokio::time::sleep(Duration::from_millis(10_000)).await; + let ev = EventRegister::get_router_events(100, &log_path).await?; assert_eq!(ev.len(), total_route_events); Ok(()) } From 793941b05751bbb27d8b2a7d7edd639faac656b5 Mon Sep 17 00:00:00 2001 From: Al Liu Date: Mon, 3 Jun 2024 14:10:01 +0800 Subject: [PATCH 05/14] append only file --- crates/core/src/tracing.rs | 298 +++--------------------------- crates/core/src/tracing/aof.rs | 325 +++++++++++++++++++++++++++++---- 2 files changed, 318 insertions(+), 305 deletions(-) diff --git a/crates/core/src/tracing.rs b/crates/core/src/tracing.rs index f5e0240bf..b78428edf 100644 --- a/crates/core/src/tracing.rs +++ b/crates/core/src/tracing.rs @@ -1,9 +1,4 @@ -use std::{ - io, - path::{Path, PathBuf}, - sync::Arc, - time::SystemTime, -}; +use std::{path::PathBuf, sync::Arc, time::SystemTime}; use chrono::{DateTime, Utc}; use either::Either; @@ -11,12 +6,8 @@ use freenet_stdlib::prelude::*; use futures::{future::BoxFuture, FutureExt}; use serde::{Deserialize, Serialize}; use tokio::{ - fs::OpenOptions, net::TcpStream, - sync::{ - mpsc::{self}, - Mutex, - }, + sync::{mpsc, Mutex}, }; use tokio_tungstenite::{MaybeTlsStream, WebSocketStream}; @@ -378,18 +369,9 @@ pub(crate) struct EventRegister { static NEW_RECORDS_TS: std::sync::OnceLock = std::sync::OnceLock::new(); static FILE_LOCK: Mutex<()> = Mutex::const_new(()); -const EVENT_REGISTER_BATCH_SIZE: usize = 100; - const DEFAULT_METRICS_SERVER_PORT: u16 = 55010; impl EventRegister { - #[cfg(not(test))] - const MAX_LOG_RECORDS: usize = 100_000; - #[cfg(test)] - const MAX_LOG_RECORDS: usize = 10_000; - - const BATCH_SIZE: usize = EVENT_REGISTER_BATCH_SIZE; - pub fn new(event_log_path: PathBuf) -> Self { let (log_sender, log_recv) = mpsc::channel(1000); NEW_RECORDS_TS.get_or_init(SystemTime::now); @@ -408,13 +390,7 @@ impl EventRegister { use futures::StreamExt; tokio::time::sleep(std::time::Duration::from_millis(200)).await; // wait for the node to start - let mut event_log = match OpenOptions::new() - .write(true) - .read(true) - .append(true) - .open(&*event_log_path) - .await - { + let mut event_log = match aof::LogFile::open(event_log_path.as_path()).await { Ok(file) => file, Err(err) => { tracing::error!("Failed openning log file {:?} with: {err}", event_log_path); @@ -422,9 +398,8 @@ impl EventRegister { } }; let mut num_written = 0; - let mut log_batch = Vec::with_capacity(Self::BATCH_SIZE); - let mut num_recs = Self::num_lines(event_log_path.as_path()) + let mut num_recs = aof::LogFile::num_lines(event_log_path.as_path()) .await .expect("non IO error"); @@ -442,7 +417,7 @@ impl EventRegister { if let Some(ws) = ws.as_mut() { send_to_metrics_server(ws, &log).await; } - Self::persist_log(&mut log_batch, &mut num_written, &mut num_recs, &mut event_log, log).await; + event_log.persist_log(&mut num_written, &mut num_recs, log).await; } ws_msg = ws_recv => { if let Some((ws, ws_msg)) = ws.as_mut().zip(ws_msg) { @@ -453,25 +428,19 @@ impl EventRegister { } // store remaining logs - let mut batch_serialized_data = Vec::with_capacity(log_batch.len() * 1024); - for log_item in log_batch { - let mut serialized = match bincode::serialize(&log_item) { + let mut batch_serialized_data = Vec::with_capacity(event_log.batch.len() * 1024); + for log_item in event_log.batch.drain(..) { + let (mut header, mut serialized) = match aof::LogFile::encode_log(&log_item) { Err(err) => { tracing::error!("Failed serializing log: {err}"); break; } Ok(serialized) => serialized, }; - { - use byteorder::{BigEndian, WriteBytesExt}; - batch_serialized_data - .write_u32::(serialized.len() as u32) - .expect("enough memory"); - } + batch_serialized_data.extend_from_slice(&mut header); batch_serialized_data.append(&mut serialized); } if !batch_serialized_data.is_empty() { - use tokio::io::AsyncWriteExt; let _guard = FILE_LOCK.lock().await; if let Err(err) = event_log.write_all(&batch_serialized_data).await { tracing::error!("Failed writting to event log: {err}"); @@ -479,234 +448,6 @@ impl EventRegister { } } } - - async fn persist_log( - log_batch: &mut Vec, - num_written: &mut usize, - num_recs: &mut usize, - event_log: &mut tokio::fs::File, - log: NetLogMessage, - ) { - log_batch.push(log); - let mut batch_buf = vec![]; - - if log_batch.len() >= Self::BATCH_SIZE { - let num_logs: usize = log_batch.len(); - let moved_batch = std::mem::replace(log_batch, Vec::with_capacity(Self::BATCH_SIZE)); - let serialization_task = tokio::task::spawn_blocking(move || { - let mut batch_serialized_data = Vec::with_capacity(Self::BATCH_SIZE * 1024); - for log_item in &moved_batch { - let mut serialized = match bincode::serialize(log_item) { - Err(err) => { - tracing::error!("Failed serializing log: {err}"); - return Err(err); - } - Ok(serialized) => serialized, - }; - { - use byteorder::{BigEndian, WriteBytesExt}; - batch_serialized_data - .write_u32::(serialized.len() as u32) - .expect("enough memory"); - } - batch_serialized_data.append(&mut serialized); - } - Ok(batch_serialized_data) - }); - - match serialization_task.await { - Ok(Ok(serialized_data)) => { - // tracing::debug!(bytes = %serialized_data.len(), %num_logs, "serialized logs"); - batch_buf = serialized_data; - *num_written += num_logs; - log_batch.clear(); // Clear the batch for new data - } - _ => { - panic!("Failed serializing log"); - } - } - } - - tracing::error!( - "num_written: {num_written}, num_recs: {num_recs}", - num_written = num_written, - num_recs = num_recs - ); - if *num_written >= Self::BATCH_SIZE { - { - use tokio::io::AsyncWriteExt; - let _guard = FILE_LOCK.lock().await; - if let Err(err) = event_log.write_all(&batch_buf).await { - tracing::error!("Failed writting to event log: {err}"); - panic!("Failed writting event log"); - } - - if let Err(err) = event_log.sync_all().await { - tracing::error!("Failed syncing event log: {err}"); - panic!("Failed syncing event log"); - } - } - *num_recs += *num_written; - *num_written = 0; - } - - // Check the number of lines and truncate if needed - // if *num_recs > Self::MAX_LOG_RECORDS { - // const REMOVE_RECS: usize = 1000 + EVENT_REGISTER_BATCH_SIZE; // making space for 1000 new records - // if let Err(err) = Self::truncate_records(event_log, REMOVE_RECS).await { - // tracing::error!("Failed truncating log file: {:?}", err); - // panic!("Failed truncating log file"); - // } - // *num_recs -= REMOVE_RECS; - // } - } - - async fn num_lines(path: &Path) -> io::Result { - use tokio::fs::File; - use tokio::io::{AsyncReadExt, AsyncSeekExt}; - - let mut file = tokio::io::BufReader::new(File::open(path).await?); - let mut num_records = 0; - let mut buf = [0; 4]; // Read the u32 length prefix - - loop { - let bytes_read = file.read_exact(&mut buf).await; - if bytes_read.is_err() { - break; - } - num_records += 1; - - // Seek to the next record without reading its contents - let length = u32::from_le_bytes(buf) as u64; - if (file.seek(io::SeekFrom::Current(length as i64)).await).is_err() { - break; - } - } - - Ok(num_records) - } - - async fn truncate_records( - file: &mut tokio::fs::File, - remove_records: usize, - ) -> Result<(), Box> { - use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; - - let _guard = FILE_LOCK.lock().await; - file.rewind().await?; - // tracing::debug!(position = file.stream_position().await.unwrap()); - let mut records_count = 0; - while records_count < remove_records { - let mut length_bytes = [0u8; 4]; - if let Err(error) = file.read_exact(&mut length_bytes).await { - if matches!(error.kind(), io::ErrorKind::UnexpectedEof) { - break; - } - let pos = file.stream_position().await; - tracing::error!(%error, ?pos, "error while trying to read file"); - return Err(error.into()); - } - let length = u32::from_be_bytes(length_bytes); - if let Err(error) = file.seek(io::SeekFrom::Current(length as i64)).await { - if matches!(error.kind(), io::ErrorKind::UnexpectedEof) { - break; - } - let pos = file.stream_position().await; - tracing::error!(%error, ?pos, "error while trying to read file"); - return Err(error.into()); - } - records_count += 1; - } - - // Copy the rest of the file to the buffer - let mut buffer = Vec::new(); - if let Err(error) = file.read_to_end(&mut buffer).await { - if !matches!(error.kind(), io::ErrorKind::UnexpectedEof) { - let pos = file.stream_position().await; - tracing::error!(%error, ?pos, "error while trying to read file"); - return Err(error.into()); - } - } - - // Seek back to the beginning and write the remaining content - file.rewind().await?; - file.write_all(&buffer).await?; - - // Truncate the file to the new size - file.set_len(buffer.len() as u64).await?; - file.sync_all().await?; - file.seek(io::SeekFrom::End(0)).await?; - Ok(()) - } - - pub async fn get_router_events( - max_event_number: usize, - event_log_path: &Path, - ) -> Result, DynError> { - use tokio::io::{AsyncReadExt, AsyncSeekExt}; - const MAX_EVENT_HISTORY: usize = 10_000; - let event_num = max_event_number.min(MAX_EVENT_HISTORY); - - // tracing::info!(?event_log_path); - let _guard: tokio::sync::MutexGuard<'_, ()> = FILE_LOCK.lock().await; - let mut file = - tokio::io::BufReader::new(OpenOptions::new().read(true).open(event_log_path).await?); - - let new_records_ts = NEW_RECORDS_TS - .get() - .expect("set on initialization") - .duration_since(std::time::UNIX_EPOCH) - .expect("should be older than unix epoch") - .as_secs() as i64; - - let mut records = Vec::with_capacity(event_num); - while records.len() < event_num { - // Read the length prefix - let length = match file.read_u32().await { - Ok(l) => l, - Err(error) => { - if !matches!(error.kind(), io::ErrorKind::UnexpectedEof) { - let pos = file.stream_position().await; - tracing::error!(%error, ?pos, "error while trying to read file"); - return Err(error.into()); - } else { - break; - } - } - }; - let mut buf = vec![0; length as usize]; - file.read_exact(&mut buf).await?; - records.push(buf); - if records.len() == event_num { - break; - } - } - - if records.is_empty() { - return Ok(vec![]); - } - - let deserialized_records = tokio::task::spawn_blocking(move || { - let mut filtered = vec![]; - for buf in records { - let record: NetLogMessage = bincode::deserialize(&buf).map_err(|e| { - tracing::error!(?buf, "deserialization error"); - e - })?; - // tracing::info!(?record); - if let EventKind::Route(outcome) = record.kind { - let record_ts = record.datetime.timestamp(); - if record_ts >= new_records_ts { - filtered.push(outcome); - } - } - } - Ok::<_, DynError>(filtered) - }) - .await??; - - Ok(deserialized_records) - } } impl NetEventRegister for EventRegister { @@ -731,7 +472,7 @@ impl NetEventRegister for EventRegister { } fn get_router_events(&self, number: usize) -> BoxFuture, DynError>> { - async move { EventRegister::get_router_events(number, &self.log_file).await }.boxed() + async move { aof::LogFile::get_router_events(number, &self.log_file).await }.boxed() } } @@ -1107,6 +848,7 @@ mod opentelemetry_tracer { #[derive(Serialize, Deserialize, Debug, Clone)] #[cfg_attr(test, derive(arbitrary::Arbitrary))] +#[non_exhaustive] // todo: make this take by ref instead, probably will need an owned version enum EventKind { Connect(ConnectEvent), @@ -1127,6 +869,22 @@ enum EventKind { }, } +impl EventKind { + const ROUTE: u8 = 3; + + const fn varint_id(&self) -> u8 { + match self { + EventKind::Connect(_) => 0, + EventKind::Put(_) => 1, + EventKind::Get { .. } => 2, + EventKind::Route(_) => Self::ROUTE, + EventKind::Subscribed { .. } => 4, + EventKind::Ignored => 5, + EventKind::Disconnected { .. } => 6, + } + } +} + #[derive(Serialize, Deserialize, Debug, Clone)] #[cfg_attr(test, derive(arbitrary::Arbitrary))] enum ConnectEvent { @@ -1312,7 +1070,7 @@ pub(super) mod test { // tokio::time::sleep(Duration::from_millis(500)).await; // } tokio::time::sleep(Duration::from_millis(10_000)).await; - let ev = EventRegister::get_router_events(100, &log_path).await?; + let ev = aof::LogFile::get_router_events(100, &log_path).await?; assert_eq!(ev.len(), total_route_events); Ok(()) } diff --git a/crates/core/src/tracing/aof.rs b/crates/core/src/tracing/aof.rs index a917e62d2..e2eac1902 100644 --- a/crates/core/src/tracing/aof.rs +++ b/crates/core/src/tracing/aof.rs @@ -1,35 +1,290 @@ -// use std::{ -// fs::{File, OpenOptions}, -// io::{Error, Write}, -// path::Path, -// }; - -// use super::NetLogMessage; - -// struct SerializedNetLogMessage { -// data: Vec, -// } - -// pub(super) struct LogFile { -// file: File, -// } - -// impl LogFile { -// pub fn open>(path: P) -> Result { -// let mut file = OpenOptions::new() -// .create(true) -// .read(true) -// .append(true) -// .open(path)?; -// todo!() -// } - -// pub fn append(&mut self, data: &[u8]) -> Result<(), Error> { -// self.file.write_all(data) -// } - -// /// Returns number of records -// pub fn replay(&mut self) -> Result { -// let mut len_buf = [0u8; 4]; -// } -// } +use byteorder::ByteOrder; +use tokio::{ + fs::{File, OpenOptions}, + io::{self, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufReader, Error}, +}; + +use std::path::Path; + +use tokio::sync::Mutex; + +use super::{DynError, EventKind, NetLogMessage, RouteEvent, NEW_RECORDS_TS}; + +static FILE_LOCK: Mutex<()> = Mutex::const_new(()); + +const RECORD_LENGTH: usize = core::mem::size_of::(); +const EVENT_KIND_LENGTH: usize = 1; +const EVENT_LOG_HEADER_SIZE: usize = RECORD_LENGTH + EVENT_KIND_LENGTH; // len + varint id +#[cfg(not(test))] +const MAX_LOG_RECORDS: usize = 100_000; +#[cfg(test)] +const MAX_LOG_RECORDS: usize = 10_000; +const EVENT_REGISTER_BATCH_SIZE: usize = 100; +const BATCH_SIZE: usize = EVENT_REGISTER_BATCH_SIZE; + +type DefaultEndian = byteorder::BigEndian; + +pub(super) struct LogFile { + reader: BufReader, + writer: File, + // make this configurable? + max_log_records: usize, + pub(super) batch: Vec, +} + +impl LogFile { + pub async fn open>(path: P) -> Result { + let file = OpenOptions::new() + .create(true) + .read(true) + .append(true) + .open(path) + .await?; + + let writer = file.try_clone().await?; + Ok(Self { + reader: BufReader::new(file), + writer, + max_log_records: MAX_LOG_RECORDS, + batch: Vec::with_capacity(BATCH_SIZE), + }) + } + + pub fn encode_log( + log: &NetLogMessage, + ) -> bincode::Result<([u8; EVENT_LOG_HEADER_SIZE], Vec)> { + let serialized = bincode::serialize(&log)?; + let mut header = [0; EVENT_LOG_HEADER_SIZE]; + DefaultEndian::write_u32(&mut header, serialized.len() as u32); + header[4] = log.kind.varint_id(); // varint id + Ok((header, serialized)) + } + + pub async fn num_lines(path: &Path) -> io::Result { + let mut file = BufReader::new(File::open(path).await?); + let mut num_records = 0; + let mut buf = [0; EVENT_LOG_HEADER_SIZE]; // Read the u32 length prefix + u8 event kind + + loop { + let bytes_read = file.read_exact(&mut buf).await; + if bytes_read.is_err() { + break; + } + num_records += 1; + + // Seek to the next record without reading its contents + let length = DefaultEndian::read_u32(&buf[..4]) as u64; + if (file.seek(io::SeekFrom::Current(length as i64)).await).is_err() { + break; + } + } + + Ok(num_records) + } + + pub async fn persist_log( + &mut self, + num_written: &mut usize, + num_recs: &mut usize, + log: NetLogMessage, + ) { + self.batch.push(log); + let mut batch_buf = vec![]; + + if self.batch.len() >= BATCH_SIZE { + let num_logs: usize = self.batch.len(); + let moved_batch = std::mem::replace(&mut self.batch, Vec::with_capacity(BATCH_SIZE)); + let serialization_task = tokio::task::spawn_blocking(move || { + let mut batch_serialized_data = Vec::with_capacity(BATCH_SIZE * 1024); + for log_item in &moved_batch { + let (header, mut serialized) = match Self::encode_log(log_item) { + Err(err) => { + tracing::error!("Failed serializing log: {err}"); + return Err(err); + } + Ok(serialized) => serialized, + }; + + batch_serialized_data.extend_from_slice(&header); + batch_serialized_data.append(&mut serialized); + } + Ok(batch_serialized_data) + }); + + match serialization_task.await { + Ok(Ok(serialized_data)) => { + // tracing::debug!(bytes = %serialized_data.len(), %num_logs, "serialized logs"); + batch_buf = serialized_data; + *num_written += num_logs; + self.batch.clear(); // Clear the batch for new data + } + _ => { + panic!("Failed serializing log"); + } + } + } + + tracing::error!( + "num_written: {num_written}, num_recs: {num_recs}", + num_written = num_written, + num_recs = num_recs + ); + if *num_written >= BATCH_SIZE { + { + let _guard = FILE_LOCK.lock().await; + if let Err(err) = self.writer.write_all(&batch_buf).await { + tracing::error!("Failed writting to event log: {err}"); + panic!("Failed writting event log"); + } + + if let Err(err) = self.writer.sync_all().await { + tracing::error!("Failed syncing event log: {err}"); + panic!("Failed syncing event log"); + } + } + *num_recs += *num_written; + *num_written = 0; + } + + // Check the number of lines and truncate if needed + if *num_recs > self.max_log_records { + const REMOVE_RECS: usize = 1000 + EVENT_REGISTER_BATCH_SIZE; // making space for 1000 new records + if let Err(err) = self.truncate_records(REMOVE_RECS).await { + tracing::error!("Failed truncating log file: {:?}", err); + panic!("Failed truncating log file"); + } + *num_recs -= REMOVE_RECS; + } + } + + pub async fn truncate_records( + &mut self, + remove_records: usize, + ) -> Result<(), Box> { + let _guard = FILE_LOCK.lock().await; + self.writer.rewind().await?; + // tracing::debug!(position = file.stream_position().await.unwrap()); + let mut records_count = 0; + while records_count < remove_records { + let mut length_bytes = [0u8; EVENT_LOG_HEADER_SIZE]; + if let Err(error) = self.reader.read_exact(&mut length_bytes).await { + if matches!(error.kind(), io::ErrorKind::UnexpectedEof) { + break; + } + let pos = self.reader.stream_position().await; + tracing::error!(%error, ?pos, "error while trying to read file"); + return Err(error.into()); + } + let length = DefaultEndian::read_u32(&length_bytes[..4]); + if let Err(error) = self.reader.seek(io::SeekFrom::Current(length as i64)).await { + if matches!(error.kind(), io::ErrorKind::UnexpectedEof) { + break; + } + let pos = self.reader.stream_position().await; + tracing::error!(%error, ?pos, "error while trying to read file"); + return Err(error.into()); + } + records_count += 1; + } + + // Copy the rest of the file to the buffer + let mut buffer = Vec::new(); + if let Err(error) = self.reader.read_to_end(&mut buffer).await { + if !matches!(error.kind(), io::ErrorKind::UnexpectedEof) { + let pos = self.reader.stream_position().await; + tracing::error!(%error, ?pos, "error while trying to read file"); + return Err(error.into()); + } + } + + // Seek back to the beginning and write the remaining content + self.reader.rewind().await?; + self.writer.write_all(&buffer).await?; + + // Truncate the file to the new size + self.writer.set_len(buffer.len() as u64).await?; + self.writer.sync_all().await?; + self.writer.seek(io::SeekFrom::End(0)).await?; + Ok(()) + } + + pub async fn get_router_events( + max_event_number: usize, + event_log_path: &Path, + ) -> Result, DynError> { + const MAX_EVENT_HISTORY: usize = 10_000; + let event_num = max_event_number.min(MAX_EVENT_HISTORY); + + // tracing::info!(?event_log_path); + let _guard: tokio::sync::MutexGuard<'_, ()> = FILE_LOCK.lock().await; + let mut file = + tokio::io::BufReader::new(OpenOptions::new().read(true).open(event_log_path).await?); + + let new_records_ts = NEW_RECORDS_TS + .get() + .expect("set on initialization") + .duration_since(std::time::UNIX_EPOCH) + .expect("should be older than unix epoch") + .as_secs() as i64; + + let mut records = Vec::with_capacity(event_num); + let mut num_records = 0; + while num_records < event_num { + let mut header = [0; EVENT_LOG_HEADER_SIZE]; + + // Read the length prefix + if let Err(error) = file.read_exact(&mut header).await { + if !matches!(error.kind(), io::ErrorKind::UnexpectedEof) { + let pos = file.stream_position().await; + tracing::error!(%error, ?pos, "error while trying to read file"); + return Err(error.into()); + } else { + break; + } + } + + if header[4] == EventKind::ROUTE { + let length = DefaultEndian::read_u32(&header[..4]); + let mut buf = vec![0; length as usize]; + file.read_exact(&mut buf).await?; + records.push(buf); + } + + num_records += 1; + if num_records == event_num { + break; + } + } + + if records.is_empty() { + return Ok(vec![]); + } + + let deserialized_records = tokio::task::spawn_blocking(move || { + let mut filtered = vec![]; + for buf in records { + let record: NetLogMessage = bincode::deserialize(&buf).map_err(|e| { + tracing::error!(?buf, "deserialization error"); + e + })?; + // tracing::info!(?record); + if let EventKind::Route(outcome) = record.kind { + let record_ts = record.datetime.timestamp(); + if record_ts >= new_records_ts { + filtered.push(outcome); + } + } + } + Ok::<_, DynError>(filtered) + }) + .await??; + + Ok(deserialized_records) + } + + pub async fn write_all(&mut self, data: &[u8]) -> io::Result<()> { + self.writer.write_all(data).await?; + self.writer.sync_all().await?; + Ok(()) + } +} From 5181feb8feaf685719ce4a164b5618b35abe0010 Mon Sep 17 00:00:00 2001 From: Al Liu Date: Mon, 3 Jun 2024 14:17:56 +0800 Subject: [PATCH 06/14] append only file --- crates/core/src/tracing.rs | 11 +++------- crates/core/src/tracing/aof.rs | 40 +++++++++++++++------------------- 2 files changed, 20 insertions(+), 31 deletions(-) diff --git a/crates/core/src/tracing.rs b/crates/core/src/tracing.rs index b78428edf..fe45185dd 100644 --- a/crates/core/src/tracing.rs +++ b/crates/core/src/tracing.rs @@ -397,11 +397,6 @@ impl EventRegister { panic!("Failed openning log file"); // fixme: propagate this to the main event loop } }; - let mut num_written = 0; - - let mut num_recs = aof::LogFile::num_lines(event_log_path.as_path()) - .await - .expect("non IO error"); let mut ws = connect_to_metrics_server().await; @@ -417,7 +412,7 @@ impl EventRegister { if let Some(ws) = ws.as_mut() { send_to_metrics_server(ws, &log).await; } - event_log.persist_log(&mut num_written, &mut num_recs, log).await; + event_log.persist_log(log).await; } ws_msg = ws_recv => { if let Some((ws, ws_msg)) = ws.as_mut().zip(ws_msg) { @@ -430,14 +425,14 @@ impl EventRegister { // store remaining logs let mut batch_serialized_data = Vec::with_capacity(event_log.batch.len() * 1024); for log_item in event_log.batch.drain(..) { - let (mut header, mut serialized) = match aof::LogFile::encode_log(&log_item) { + let (header, mut serialized) = match aof::LogFile::encode_log(&log_item) { Err(err) => { tracing::error!("Failed serializing log: {err}"); break; } Ok(serialized) => serialized, }; - batch_serialized_data.extend_from_slice(&mut header); + batch_serialized_data.extend_from_slice(&header); batch_serialized_data.append(&mut serialized); } if !batch_serialized_data.is_empty() { diff --git a/crates/core/src/tracing/aof.rs b/crates/core/src/tracing/aof.rs index e2eac1902..bc91df65e 100644 --- a/crates/core/src/tracing/aof.rs +++ b/crates/core/src/tracing/aof.rs @@ -1,7 +1,7 @@ use byteorder::ByteOrder; use tokio::{ fs::{File, OpenOptions}, - io::{self, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufReader, Error}, + io::{self, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWriteExt, BufReader, Error}, }; use std::path::Path; @@ -30,6 +30,8 @@ pub(super) struct LogFile { // make this configurable? max_log_records: usize, pub(super) batch: Vec, + num_written: usize, + num_recs: usize, } impl LogFile { @@ -40,13 +42,16 @@ impl LogFile { .append(true) .open(path) .await?; - - let writer = file.try_clone().await?; + let mut reader = BufReader::new(file); + let num_recs = Self::num_lines(&mut reader).await.expect("non IO error"); + let writer = reader.get_ref().try_clone().await?; Ok(Self { - reader: BufReader::new(file), + reader, writer, max_log_records: MAX_LOG_RECORDS, batch: Vec::with_capacity(BATCH_SIZE), + num_written: 0, + num_recs, }) } @@ -60,8 +65,7 @@ impl LogFile { Ok((header, serialized)) } - pub async fn num_lines(path: &Path) -> io::Result { - let mut file = BufReader::new(File::open(path).await?); + pub async fn num_lines(file: &mut (impl AsyncRead + AsyncSeek + Unpin)) -> io::Result { let mut num_records = 0; let mut buf = [0; EVENT_LOG_HEADER_SIZE]; // Read the u32 length prefix + u8 event kind @@ -82,12 +86,7 @@ impl LogFile { Ok(num_records) } - pub async fn persist_log( - &mut self, - num_written: &mut usize, - num_recs: &mut usize, - log: NetLogMessage, - ) { + pub async fn persist_log(&mut self, log: NetLogMessage) { self.batch.push(log); let mut batch_buf = vec![]; @@ -115,7 +114,7 @@ impl LogFile { Ok(Ok(serialized_data)) => { // tracing::debug!(bytes = %serialized_data.len(), %num_logs, "serialized logs"); batch_buf = serialized_data; - *num_written += num_logs; + self.num_written += num_logs; self.batch.clear(); // Clear the batch for new data } _ => { @@ -124,12 +123,7 @@ impl LogFile { } } - tracing::error!( - "num_written: {num_written}, num_recs: {num_recs}", - num_written = num_written, - num_recs = num_recs - ); - if *num_written >= BATCH_SIZE { + if self.num_written >= BATCH_SIZE { { let _guard = FILE_LOCK.lock().await; if let Err(err) = self.writer.write_all(&batch_buf).await { @@ -142,18 +136,18 @@ impl LogFile { panic!("Failed syncing event log"); } } - *num_recs += *num_written; - *num_written = 0; + self.num_recs += self.num_written; + self.num_written = 0; } // Check the number of lines and truncate if needed - if *num_recs > self.max_log_records { + if self.num_recs > self.max_log_records { const REMOVE_RECS: usize = 1000 + EVENT_REGISTER_BATCH_SIZE; // making space for 1000 new records if let Err(err) = self.truncate_records(REMOVE_RECS).await { tracing::error!("Failed truncating log file: {:?}", err); panic!("Failed truncating log file"); } - *num_recs -= REMOVE_RECS; + self.num_recs -= REMOVE_RECS; } } From 8380d74e1fd07b7f462411b01e87c38c3dd96a50 Mon Sep 17 00:00:00 2001 From: Al Liu Date: Mon, 3 Jun 2024 14:41:08 +0800 Subject: [PATCH 07/14] append only file --- crates/core/src/tracing.rs | 16 +++++----------- crates/core/src/tracing/aof.rs | 32 +++++++++++++++++++------------- 2 files changed, 24 insertions(+), 24 deletions(-) diff --git a/crates/core/src/tracing.rs b/crates/core/src/tracing.rs index fe45185dd..859353cfe 100644 --- a/crates/core/src/tracing.rs +++ b/crates/core/src/tracing.rs @@ -5,10 +5,7 @@ use either::Either; use freenet_stdlib::prelude::*; use futures::{future::BoxFuture, FutureExt}; use serde::{Deserialize, Serialize}; -use tokio::{ - net::TcpStream, - sync::{mpsc, Mutex}, -}; +use tokio::{net::TcpStream, sync::mpsc}; use tokio_tungstenite::{MaybeTlsStream, WebSocketStream}; use crate::{ @@ -367,7 +364,6 @@ pub(crate) struct EventRegister { /// Records from a new session must have higher than this ts. static NEW_RECORDS_TS: std::sync::OnceLock = std::sync::OnceLock::new(); -static FILE_LOCK: Mutex<()> = Mutex::const_new(()); const DEFAULT_METRICS_SERVER_PORT: u16 = 55010; @@ -435,12 +431,10 @@ impl EventRegister { batch_serialized_data.extend_from_slice(&header); batch_serialized_data.append(&mut serialized); } - if !batch_serialized_data.is_empty() { - let _guard = FILE_LOCK.lock().await; - if let Err(err) = event_log.write_all(&batch_serialized_data).await { - tracing::error!("Failed writting to event log: {err}"); - panic!("Failed writting event log"); - } + if !batch_serialized_data.is_empty() + && event_log.write_all(&batch_serialized_data).await.is_err() + { + panic!("Failed writting event log"); } } } diff --git a/crates/core/src/tracing/aof.rs b/crates/core/src/tracing/aof.rs index bc91df65e..0da22636e 100644 --- a/crates/core/src/tracing/aof.rs +++ b/crates/core/src/tracing/aof.rs @@ -61,7 +61,7 @@ impl LogFile { let serialized = bincode::serialize(&log)?; let mut header = [0; EVENT_LOG_HEADER_SIZE]; DefaultEndian::write_u32(&mut header, serialized.len() as u32); - header[4] = log.kind.varint_id(); // varint id + header[4] = log.kind.varint_id(); // event kind Ok((header, serialized)) } @@ -125,15 +125,9 @@ impl LogFile { if self.num_written >= BATCH_SIZE { { - let _guard = FILE_LOCK.lock().await; - if let Err(err) = self.writer.write_all(&batch_buf).await { - tracing::error!("Failed writting to event log: {err}"); - panic!("Failed writting event log"); - } - - if let Err(err) = self.writer.sync_all().await { - tracing::error!("Failed syncing event log: {err}"); - panic!("Failed syncing event log"); + let res = self.write_all(&batch_buf).await; + if res.is_err() { + panic!("Failed writing to log file"); } } self.num_recs += self.num_written; @@ -237,11 +231,13 @@ impl LogFile { } } + let length = DefaultEndian::read_u32(&header[..4]); if header[4] == EventKind::ROUTE { - let length = DefaultEndian::read_u32(&header[..4]); let mut buf = vec![0; length as usize]; file.read_exact(&mut buf).await?; records.push(buf); + } else { + file.seek(io::SeekFrom::Current(length as i64)).await?; } num_records += 1; @@ -250,6 +246,8 @@ impl LogFile { } } + tracing::info!(len = records.len(), "records read"); + if records.is_empty() { return Ok(vec![]); } @@ -277,8 +275,16 @@ impl LogFile { } pub async fn write_all(&mut self, data: &[u8]) -> io::Result<()> { - self.writer.write_all(data).await?; - self.writer.sync_all().await?; + let _guard = FILE_LOCK.lock().await; + if let Err(err) = self.writer.write_all(&data).await { + tracing::error!("Failed writting to event log: {err}"); + return Err(err); + } + + if let Err(err) = self.writer.sync_all().await { + tracing::error!("Failed syncing event log: {err}"); + return Err(err); + } Ok(()) } } From 556b83450d8e1627968c0daf3ba05a2f6ceac2ba Mon Sep 17 00:00:00 2001 From: Al Liu Date: Mon, 3 Jun 2024 14:43:40 +0800 Subject: [PATCH 08/14] append only file --- crates/core/src/tracing.rs | 4 ++-- crates/core/src/tracing/aof.rs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/core/src/tracing.rs b/crates/core/src/tracing.rs index 859353cfe..e67c25f79 100644 --- a/crates/core/src/tracing.rs +++ b/crates/core/src/tracing.rs @@ -1029,7 +1029,7 @@ pub(super) mod test { std::fs::File::create(&log_path)?; // force a truncation - const TEST_LOGS: usize = 100; + const TEST_LOGS: usize = aof::MAX_LOG_RECORDS; let register = EventRegister::new(log_path.clone()); let bytes = crate::util::test::random_bytes_2mb(); let mut gen = arbitrary::Unstructured::new(&bytes); @@ -1059,7 +1059,7 @@ pub(super) mod test { // tokio::time::sleep(Duration::from_millis(500)).await; // } tokio::time::sleep(Duration::from_millis(10_000)).await; - let ev = aof::LogFile::get_router_events(100, &log_path).await?; + let ev = aof::LogFile::get_router_events(TEST_LOGS, &log_path).await?; assert_eq!(ev.len(), total_route_events); Ok(()) } diff --git a/crates/core/src/tracing/aof.rs b/crates/core/src/tracing/aof.rs index 0da22636e..3526452fc 100644 --- a/crates/core/src/tracing/aof.rs +++ b/crates/core/src/tracing/aof.rs @@ -16,9 +16,9 @@ const RECORD_LENGTH: usize = core::mem::size_of::(); const EVENT_KIND_LENGTH: usize = 1; const EVENT_LOG_HEADER_SIZE: usize = RECORD_LENGTH + EVENT_KIND_LENGTH; // len + varint id #[cfg(not(test))] -const MAX_LOG_RECORDS: usize = 100_000; +pub(super) const MAX_LOG_RECORDS: usize = 100_000; #[cfg(test)] -const MAX_LOG_RECORDS: usize = 10_000; +pub(super) const MAX_LOG_RECORDS: usize = 10_000; const EVENT_REGISTER_BATCH_SIZE: usize = 100; const BATCH_SIZE: usize = EVENT_REGISTER_BATCH_SIZE; From c00d774c8689f70d3762b4e603f7fbfa6a34c8ba Mon Sep 17 00:00:00 2001 From: Al Liu Date: Mon, 3 Jun 2024 21:49:24 +0800 Subject: [PATCH 09/14] states --- crates/core/src/tracing.rs | 90 ++----- crates/core/src/tracing/aof.rs | 440 ++++++++++++++++++++++++++++----- 2 files changed, 400 insertions(+), 130 deletions(-) diff --git a/crates/core/src/tracing.rs b/crates/core/src/tracing.rs index e67c25f79..4640b1b47 100644 --- a/crates/core/src/tracing.rs +++ b/crates/core/src/tracing.rs @@ -419,22 +419,20 @@ impl EventRegister { } // store remaining logs - let mut batch_serialized_data = Vec::with_capacity(event_log.batch.len() * 1024); - for log_item in event_log.batch.drain(..) { - let (header, mut serialized) = match aof::LogFile::encode_log(&log_item) { - Err(err) => { - tracing::error!("Failed serializing log: {err}"); - break; + let moved_batch = std::mem::replace(&mut event_log.batch, aof::Batch::new(aof::BATCH_SIZE)); + let batch_states = moved_batch.states; + match aof::LogFile::encode_batch(&moved_batch) { + Ok(batch_serialized_data) => { + if !batch_serialized_data.is_empty() + && event_log.write_all(&batch_serialized_data).await.is_err() + { + panic!("Failed writting event log"); } - Ok(serialized) => serialized, - }; - batch_serialized_data.extend_from_slice(&header); - batch_serialized_data.append(&mut serialized); - } - if !batch_serialized_data.is_empty() - && event_log.write_all(&batch_serialized_data).await.is_err() - { - panic!("Failed writting event log"); + event_log.update_states(batch_states); + } + Err(err) => { + tracing::error!("Failed encode batch: {err}"); + } } } } @@ -859,17 +857,23 @@ enum EventKind { } impl EventKind { + const CONNECT: u8 = 0; + const PUT: u8 = 1; + const GET: u8 = 2; const ROUTE: u8 = 3; + const SUBSCRIBED: u8 = 4; + const IGNORED: u8 = 5; + const DISCONNECTED: u8 = 6; const fn varint_id(&self) -> u8 { match self { - EventKind::Connect(_) => 0, - EventKind::Put(_) => 1, - EventKind::Get { .. } => 2, + EventKind::Connect(_) => Self::CONNECT, + EventKind::Put(_) => Self::PUT, + EventKind::Get { .. } => Self::GET, EventKind::Route(_) => Self::ROUTE, - EventKind::Subscribed { .. } => 4, - EventKind::Ignored => 5, - EventKind::Disconnected { .. } => 6, + EventKind::Subscribed { .. } => Self::SUBSCRIBED, + EventKind::Ignored => Self::IGNORED, + EventKind::Disconnected { .. } => Self::DISCONNECTED, } } } @@ -1020,50 +1024,6 @@ pub(super) mod test { static LOG_ID: AtomicUsize = AtomicUsize::new(0); - #[tokio::test(flavor = "multi_thread", worker_threads = 4)] - async fn event_register_read_write() -> Result<(), DynError> { - crate::config::set_logger(Some(LevelFilter::TRACE)); - use std::time::Duration; - let temp_dir = tempfile::tempdir()?; - let log_path = temp_dir.path().join("event_log"); - std::fs::File::create(&log_path)?; - - // force a truncation - const TEST_LOGS: usize = aof::MAX_LOG_RECORDS; - let register = EventRegister::new(log_path.clone()); - let bytes = crate::util::test::random_bytes_2mb(); - let mut gen = arbitrary::Unstructured::new(&bytes); - let mut transactions = vec![]; - let mut peers = vec![]; - let mut events = vec![]; - for _ in 0..TEST_LOGS { - let tx: Transaction = gen.arbitrary()?; - transactions.push(tx); - let peer: PeerId = PeerId::random(); - peers.push(peer); - } - let mut total_route_events: usize = 0; - for i in 0..TEST_LOGS { - let kind: EventKind = gen.arbitrary()?; - if matches!(kind, EventKind::Route(_)) { - total_route_events += 1; - } - events.push(NetEventLog { - tx: &transactions[i], - peer_id: peers[i].clone(), - kind, - }); - } - register.register_events(Either::Right(events)).await; - // while register.log_sender.capacity() != 1000 { - // tokio::time::sleep(Duration::from_millis(500)).await; - // } - tokio::time::sleep(Duration::from_millis(10_000)).await; - let ev = aof::LogFile::get_router_events(TEST_LOGS, &log_path).await?; - assert_eq!(ev.len(), total_route_events); - Ok(()) - } - #[derive(Clone)] pub(crate) struct TestEventListener { node_labels: Arc>, diff --git a/crates/core/src/tracing/aof.rs b/crates/core/src/tracing/aof.rs index 3526452fc..d967499a5 100644 --- a/crates/core/src/tracing/aof.rs +++ b/crates/core/src/tracing/aof.rs @@ -4,7 +4,7 @@ use tokio::{ io::{self, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWriteExt, BufReader, Error}, }; -use std::path::Path; +use std::{path::Path, sync::atomic::AtomicUsize}; use tokio::sync::Mutex; @@ -19,42 +19,155 @@ const EVENT_LOG_HEADER_SIZE: usize = RECORD_LENGTH + EVENT_KIND_LENGTH; // len + pub(super) const MAX_LOG_RECORDS: usize = 100_000; #[cfg(test)] pub(super) const MAX_LOG_RECORDS: usize = 10_000; +pub(super) const REMOVE_RECS: usize = 1000 + EVENT_REGISTER_BATCH_SIZE; // making space for 1000 new records const EVENT_REGISTER_BATCH_SIZE: usize = 100; -const BATCH_SIZE: usize = EVENT_REGISTER_BATCH_SIZE; +pub(super) const BATCH_SIZE: usize = EVENT_REGISTER_BATCH_SIZE; type DefaultEndian = byteorder::BigEndian; +#[derive(Debug, Default, Copy, Clone)] +pub(super) struct States { + total: usize, + connect_events: usize, + put_events: usize, + get_events: usize, + route_events: usize, + subscribed_events: usize, + ignored_events: usize, + disconnected_events: usize, +} + +impl core::ops::AddAssign for States { + fn add_assign(&mut self, rhs: Self) { + self.total += rhs.total; + self.connect_events += rhs.connect_events; + self.put_events += rhs.put_events; + self.get_events += rhs.get_events; + self.route_events += rhs.route_events; + self.subscribed_events += rhs.subscribed_events; + self.ignored_events += rhs.ignored_events; + self.disconnected_events += rhs.disconnected_events; + } +} + +impl core::ops::SubAssign for States { + fn sub_assign(&mut self, rhs: u8) { + self.total = self.total.saturating_sub(1); + match rhs { + EventKind::CONNECT => self.connect_events = self.connect_events.saturating_sub(1), + EventKind::PUT => self.put_events = self.put_events.saturating_sub(1), + EventKind::GET => self.get_events = self.get_events.saturating_sub(1), + EventKind::ROUTE => self.route_events = self.route_events.saturating_sub(1), + EventKind::SUBSCRIBED => { + self.subscribed_events = self.subscribed_events.saturating_sub(1) + } + EventKind::IGNORED => self.ignored_events = self.ignored_events.saturating_sub(1), + EventKind::DISCONNECTED => { + self.disconnected_events = self.disconnected_events.saturating_sub(1) + } + _ => unreachable!(), + } + } +} + +impl core::ops::SubAssign for States { + fn sub_assign(&mut self, rhs: Self) { + self.total -= rhs.total; + self.connect_events -= rhs.connect_events; + self.put_events -= rhs.put_events; + self.get_events -= rhs.get_events; + self.route_events -= rhs.route_events; + self.subscribed_events -= rhs.subscribed_events; + self.ignored_events -= rhs.ignored_events; + self.disconnected_events -= rhs.disconnected_events; + } +} + +pub(super) struct Batch { + pub batch: Vec, + pub states: States, +} + +impl Batch { + #[inline] + pub fn new(cap: usize) -> Self { + Self { + batch: Vec::with_capacity(cap), + states: Default::default(), + } + } + + #[inline] + fn push(&mut self, log: NetLogMessage) { + match log.kind.varint_id() { + EventKind::CONNECT => self.states.connect_events += 1, + EventKind::PUT => self.states.put_events += 1, + EventKind::GET => self.states.get_events += 1, + EventKind::ROUTE => self.states.route_events += 1, + EventKind::SUBSCRIBED => self.states.subscribed_events += 1, + EventKind::IGNORED => self.states.ignored_events += 1, + EventKind::DISCONNECTED => self.states.disconnected_events += 1, + _ => unreachable!(), + } + self.states.total += 1; + self.batch.push(log); + } + + #[inline] + pub fn len(&self) -> usize { + self.batch.len() + } + + #[inline] + fn is_empty(&self) -> bool { + self.batch.is_empty() + } + + #[inline] + fn clear(&mut self) { + self.batch.clear(); + self.states = Default::default(); + } +} + pub(super) struct LogFile { - reader: BufReader, - writer: File, + file: BufReader, // make this configurable? max_log_records: usize, - pub(super) batch: Vec, - num_written: usize, - num_recs: usize, + pub(super) batch: Batch, + current_states: States, + states: States, } impl LogFile { pub async fn open>(path: P) -> Result { + let path = path.as_ref(); let file = OpenOptions::new() .create(true) .read(true) .append(true) .open(path) .await?; - let mut reader = BufReader::new(file); - let num_recs = Self::num_lines(&mut reader).await.expect("non IO error"); - let writer = reader.get_ref().try_clone().await?; + let mut file = BufReader::new(file); + tracing::error!("open {}", path.display()); + let states = Self::num_lines(&mut file).await.expect("non IO error"); + tracing::error!("states {:?}", states); Ok(Self { - reader, - writer, + file, max_log_records: MAX_LOG_RECORDS, - batch: Vec::with_capacity(BATCH_SIZE), - num_written: 0, - num_recs, + batch: Batch { + batch: Vec::with_capacity(BATCH_SIZE), + states: Default::default(), + }, + current_states: Default::default(), + states, }) } + pub(super) fn update_states(&mut self, states: States) { + self.states += states; + } + pub fn encode_log( log: &NetLogMessage, ) -> bincode::Result<([u8; EVENT_LOG_HEADER_SIZE], Vec)> { @@ -65,8 +178,16 @@ impl LogFile { Ok((header, serialized)) } - pub async fn num_lines(file: &mut (impl AsyncRead + AsyncSeek + Unpin)) -> io::Result { + async fn num_lines(file: &mut (impl AsyncRead + AsyncSeek + Unpin)) -> io::Result { let mut num_records = 0; + let mut connect_events: usize = 0; + let mut put_events: usize = 0; + let mut get_events: usize = 0; + let mut route_events: usize = 0; + let mut subscribed_events: usize = 0; + let mut ignored_events: usize = 0; + let mut disconnected_events: usize = 0; + let mut buf = [0; EVENT_LOG_HEADER_SIZE]; // Read the u32 length prefix + u8 event kind loop { @@ -78,43 +199,57 @@ impl LogFile { // Seek to the next record without reading its contents let length = DefaultEndian::read_u32(&buf[..4]) as u64; + + match buf[4] { + EventKind::CONNECT => connect_events += 1, + EventKind::PUT => put_events += 1, + EventKind::GET => get_events += 1, + EventKind::ROUTE => route_events += 1, + EventKind::SUBSCRIBED => subscribed_events += 1, + EventKind::IGNORED => ignored_events += 1, + EventKind::DISCONNECTED => disconnected_events += 1, + _ => { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "Unknown event kind", + )) + } + } + if (file.seek(io::SeekFrom::Current(length as i64)).await).is_err() { break; } } - Ok(num_records) + Ok(States { + total: num_records, + connect_events, + put_events, + get_events, + route_events, + subscribed_events, + ignored_events, + disconnected_events, + }) } pub async fn persist_log(&mut self, log: NetLogMessage) { + let _guard = FILE_LOCK.lock().await; + self.batch.push(log); let mut batch_buf = vec![]; if self.batch.len() >= BATCH_SIZE { - let num_logs: usize = self.batch.len(); - let moved_batch = std::mem::replace(&mut self.batch, Vec::with_capacity(BATCH_SIZE)); - let serialization_task = tokio::task::spawn_blocking(move || { - let mut batch_serialized_data = Vec::with_capacity(BATCH_SIZE * 1024); - for log_item in &moved_batch { - let (header, mut serialized) = match Self::encode_log(log_item) { - Err(err) => { - tracing::error!("Failed serializing log: {err}"); - return Err(err); - } - Ok(serialized) => serialized, - }; - - batch_serialized_data.extend_from_slice(&header); - batch_serialized_data.append(&mut serialized); - } - Ok(batch_serialized_data) - }); + let moved_batch = std::mem::replace(&mut self.batch, Batch::new(BATCH_SIZE)); + let batch_states = moved_batch.states; + let serialization_task = + tokio::task::spawn_blocking(move || Self::encode_batch(&moved_batch)); match serialization_task.await { Ok(Ok(serialized_data)) => { // tracing::debug!(bytes = %serialized_data.len(), %num_logs, "serialized logs"); batch_buf = serialized_data; - self.num_written += num_logs; + self.current_states += batch_states; self.batch.clear(); // Clear the batch for new data } _ => { @@ -123,25 +258,25 @@ impl LogFile { } } - if self.num_written >= BATCH_SIZE { + if self.current_states.total >= BATCH_SIZE { { let res = self.write_all(&batch_buf).await; if res.is_err() { panic!("Failed writing to log file"); } } - self.num_recs += self.num_written; - self.num_written = 0; + self.states += self.current_states; + self.current_states = Default::default(); } // Check the number of lines and truncate if needed - if self.num_recs > self.max_log_records { - const REMOVE_RECS: usize = 1000 + EVENT_REGISTER_BATCH_SIZE; // making space for 1000 new records + if self.states.total > self.max_log_records { + tracing::info!("before truncating {:?}", self.states); + if let Err(err) = self.truncate_records(REMOVE_RECS).await { tracing::error!("Failed truncating log file: {:?}", err); panic!("Failed truncating log file"); } - self.num_recs -= REMOVE_RECS; } } @@ -149,50 +284,55 @@ impl LogFile { &mut self, remove_records: usize, ) -> Result<(), Box> { - let _guard = FILE_LOCK.lock().await; - self.writer.rewind().await?; + self.file.rewind().await?; // tracing::debug!(position = file.stream_position().await.unwrap()); let mut records_count = 0; + let mut removed_states = States::default(); while records_count < remove_records { - let mut length_bytes = [0u8; EVENT_LOG_HEADER_SIZE]; - if let Err(error) = self.reader.read_exact(&mut length_bytes).await { + let mut header = [0u8; EVENT_LOG_HEADER_SIZE]; + if let Err(error) = self.file.read_exact(&mut header).await { if matches!(error.kind(), io::ErrorKind::UnexpectedEof) { break; } - let pos = self.reader.stream_position().await; + let pos = self.file.stream_position().await; tracing::error!(%error, ?pos, "error while trying to read file"); return Err(error.into()); } - let length = DefaultEndian::read_u32(&length_bytes[..4]); - if let Err(error) = self.reader.seek(io::SeekFrom::Current(length as i64)).await { + let length = DefaultEndian::read_u32(&header[..4]); + if let Err(error) = self.file.seek(io::SeekFrom::Current(length as i64)).await { if matches!(error.kind(), io::ErrorKind::UnexpectedEof) { break; } - let pos = self.reader.stream_position().await; + let pos = self.file.stream_position().await; tracing::error!(%error, ?pos, "error while trying to read file"); return Err(error.into()); } + removed_states -= header[4]; records_count += 1; } // Copy the rest of the file to the buffer let mut buffer = Vec::new(); - if let Err(error) = self.reader.read_to_end(&mut buffer).await { + if let Err(error) = self.file.read_to_end(&mut buffer).await { if !matches!(error.kind(), io::ErrorKind::UnexpectedEof) { - let pos = self.reader.stream_position().await; + let pos = self.file.stream_position().await; tracing::error!(%error, ?pos, "error while trying to read file"); return Err(error.into()); } } + self.states -= removed_states; + + tracing::error!("removed {removed_states:?} remaining {:?}", self.states); + // Seek back to the beginning and write the remaining content - self.reader.rewind().await?; - self.writer.write_all(&buffer).await?; + self.file.rewind().await?; + self.file.write_all(&buffer).await?; // Truncate the file to the new size - self.writer.set_len(buffer.len() as u64).await?; - self.writer.sync_all().await?; - self.writer.seek(io::SeekFrom::End(0)).await?; + self.file.get_ref().set_len(buffer.len() as u64).await?; + self.file.get_ref().sync_all().await?; + self.file.seek(io::SeekFrom::End(0)).await?; Ok(()) } @@ -204,7 +344,7 @@ impl LogFile { let event_num = max_event_number.min(MAX_EVENT_HISTORY); // tracing::info!(?event_log_path); - let _guard: tokio::sync::MutexGuard<'_, ()> = FILE_LOCK.lock().await; + // let _guard: tokio::sync::MutexGuard<'_, ()> = FILE_LOCK.lock().await; let mut file = tokio::io::BufReader::new(OpenOptions::new().read(true).open(event_log_path).await?); @@ -241,12 +381,9 @@ impl LogFile { } num_records += 1; - if num_records == event_num { - break; - } } - tracing::info!(len = records.len(), "records read"); + tracing::info!(len = records.len(), total = num_records, "records read"); if records.is_empty() { return Ok(vec![]); @@ -275,16 +412,189 @@ impl LogFile { } pub async fn write_all(&mut self, data: &[u8]) -> io::Result<()> { - let _guard = FILE_LOCK.lock().await; - if let Err(err) = self.writer.write_all(&data).await { + // let _guard = FILE_LOCK.lock().await; + if let Err(err) = self.file.get_mut().write_all(data).await { tracing::error!("Failed writting to event log: {err}"); return Err(err); } - if let Err(err) = self.writer.sync_all().await { + if let Err(err) = self.file.get_mut().sync_all().await { tracing::error!("Failed syncing event log: {err}"); return Err(err); } Ok(()) } + + pub fn encode_batch(batch: &Batch) -> bincode::Result> { + let mut batch_serialized_data = Vec::with_capacity(BATCH_SIZE * 1024); + for log_item in &batch.batch { + let (header, mut serialized) = match Self::encode_log(log_item) { + Err(err) => { + tracing::error!("Failed serializing log: {err}"); + return Err(err); + } + Ok(serialized) => serialized, + }; + + batch_serialized_data.extend_from_slice(&header); + batch_serialized_data.append(&mut serialized); + } + + Ok(batch_serialized_data) + } +} + +#[cfg(test)] +mod tests { + use std::time::SystemTime; + + use tracing::level_filters::LevelFilter; + + use crate::{ + dev_tool::{PeerId, Transaction}, + tracing::NetEventLog, + }; + + use super::*; + + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + async fn read_write() -> Result<(), DynError> { + NEW_RECORDS_TS.get_or_init(SystemTime::now); + crate::config::set_logger(Some(LevelFilter::TRACE)); + let temp_dir = tempfile::tempdir()?; + let log_path = temp_dir.path().join("event_log"); + + // force a truncation + const TEST_LOGS: usize = MAX_LOG_RECORDS; + + let mut log = LogFile::open(&log_path).await?; + let bytes = crate::util::test::random_bytes_2mb(); + let mut gen = arbitrary::Unstructured::new(&bytes); + let mut transactions = vec![]; + let mut peers = vec![]; + let mut events = vec![]; + + for _ in 0..TEST_LOGS { + let tx: Transaction = gen.arbitrary()?; + transactions.push(tx); + let peer: PeerId = PeerId::random(); + peers.push(peer); + } + let mut total_route_events: usize = 0; + + for i in 0..TEST_LOGS { + let kind: EventKind = gen.arbitrary()?; + // The route events in first REMOVE_RECS will be dropped + if matches!(kind, EventKind::Route(_)) { + total_route_events += 1; + } + events.push(NetEventLog { + tx: &transactions[i], + peer_id: peers[i].clone(), + kind, + }); + } + + for msg in NetLogMessage::to_log_message(either::Either::Right(events)) { + log.persist_log(msg).await; + } + + let ev = LogFile::get_router_events(TEST_LOGS, &log_path).await?; + assert_eq!(ev.len(), total_route_events); + Ok(()) + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + async fn read_write_small() -> Result<(), DynError> { + NEW_RECORDS_TS.get_or_init(SystemTime::now); + crate::config::set_logger(Some(LevelFilter::TRACE)); + let temp_dir = tempfile::tempdir()?; + let log_path = temp_dir.path().join("event_log"); + + // force a truncation + const TEST_LOGS: usize = 100; + + let mut log = LogFile::open(&log_path).await?; + let bytes = crate::util::test::random_bytes_2mb(); + let mut gen = arbitrary::Unstructured::new(&bytes); + let mut transactions = vec![]; + let mut peers = vec![]; + let mut events = vec![]; + + for _ in 0..TEST_LOGS { + let tx: Transaction = gen.arbitrary()?; + transactions.push(tx); + let peer: PeerId = PeerId::random(); + peers.push(peer); + } + let mut total_route_events: usize = 0; + + for i in 0..TEST_LOGS { + let kind: EventKind = gen.arbitrary()?; + // The route events in first REMOVE_RECS will be dropped + if matches!(kind, EventKind::Route(_)) { + total_route_events += 1; + } + events.push(NetEventLog { + tx: &transactions[i], + peer_id: peers[i].clone(), + kind, + }); + } + + for msg in NetLogMessage::to_log_message(either::Either::Right(events)) { + log.persist_log(msg).await; + } + + let ev = LogFile::get_router_events(TEST_LOGS, &log_path).await?; + assert_eq!(ev.len(), total_route_events); + Ok(()) + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + async fn read_write_truncate() -> Result<(), DynError> { + NEW_RECORDS_TS.get_or_init(SystemTime::now); + crate::config::set_logger(Some(LevelFilter::TRACE)); + let temp_dir = tempfile::tempdir()?; + let log_path = temp_dir.path().join("event_log"); + + // force a truncation + const TEST_LOGS: usize = MAX_LOG_RECORDS + 100; + + let mut log = LogFile::open(&log_path).await?; + let bytes = crate::util::test::random_bytes_2mb(); + let mut gen = arbitrary::Unstructured::new(&bytes); + let mut transactions = vec![]; + let mut peers = vec![]; + let mut events = vec![]; + + for _ in 0..TEST_LOGS { + let tx: Transaction = gen.arbitrary()?; + transactions.push(tx); + let peer: PeerId = PeerId::random(); + peers.push(peer); + } + let mut total_route_events: usize = 0; + + for i in 0..TEST_LOGS { + let kind: EventKind = gen.arbitrary()?; + // The route events in first REMOVE_RECS will be dropped + if matches!(kind, EventKind::Route(_)) && i >= REMOVE_RECS { + total_route_events += 1; + } + events.push(NetEventLog { + tx: &transactions[i], + peer_id: peers[i].clone(), + kind, + }); + } + + for msg in NetLogMessage::to_log_message(either::Either::Right(events)) { + log.persist_log(msg).await; + } + + let ev = LogFile::get_router_events(TEST_LOGS, &log_path).await?; + assert_eq!(ev.len(), total_route_events); + Ok(()) + } } From 5ae64a3fde62070bb7a18e64f6af3250476cf963 Mon Sep 17 00:00:00 2001 From: Al Liu Date: Mon, 3 Jun 2024 21:53:19 +0800 Subject: [PATCH 10/14] Update aof.rs --- crates/core/src/tracing/aof.rs | 27 ++++++++++++++++++++------- 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/crates/core/src/tracing/aof.rs b/crates/core/src/tracing/aof.rs index d967499a5..92072aa28 100644 --- a/crates/core/src/tracing/aof.rs +++ b/crates/core/src/tracing/aof.rs @@ -70,6 +70,23 @@ impl core::ops::SubAssign for States { } } +impl core::ops::AddAssign for States { + fn add_assign(&mut self, rhs: u8) { + self.total += 1; + + match rhs { + EventKind::CONNECT => self.connect_events += 1, + EventKind::PUT => self.put_events += 1, + EventKind::GET => self.get_events += 1, + EventKind::ROUTE => self.route_events += 1, + EventKind::SUBSCRIBED => self.subscribed_events += 1, + EventKind::IGNORED => self.ignored_events += 1, + EventKind::DISCONNECTED => self.disconnected_events += 1, + _ => unreachable!(), + } + } +} + impl core::ops::SubAssign for States { fn sub_assign(&mut self, rhs: Self) { self.total -= rhs.total; @@ -234,8 +251,6 @@ impl LogFile { } pub async fn persist_log(&mut self, log: NetLogMessage) { - let _guard = FILE_LOCK.lock().await; - self.batch.push(log); let mut batch_buf = vec![]; @@ -247,7 +262,6 @@ impl LogFile { match serialization_task.await { Ok(Ok(serialized_data)) => { - // tracing::debug!(bytes = %serialized_data.len(), %num_logs, "serialized logs"); batch_buf = serialized_data; self.current_states += batch_states; self.batch.clear(); // Clear the batch for new data @@ -307,7 +321,7 @@ impl LogFile { tracing::error!(%error, ?pos, "error while trying to read file"); return Err(error.into()); } - removed_states -= header[4]; + removed_states += header[4]; records_count += 1; } @@ -343,8 +357,7 @@ impl LogFile { const MAX_EVENT_HISTORY: usize = 10_000; let event_num = max_event_number.min(MAX_EVENT_HISTORY); - // tracing::info!(?event_log_path); - // let _guard: tokio::sync::MutexGuard<'_, ()> = FILE_LOCK.lock().await; + let _guard: tokio::sync::MutexGuard<'_, ()> = FILE_LOCK.lock().await; let mut file = tokio::io::BufReader::new(OpenOptions::new().read(true).open(event_log_path).await?); @@ -412,7 +425,7 @@ impl LogFile { } pub async fn write_all(&mut self, data: &[u8]) -> io::Result<()> { - // let _guard = FILE_LOCK.lock().await; + let _guard = FILE_LOCK.lock().await; if let Err(err) = self.file.get_mut().write_all(data).await { tracing::error!("Failed writting to event log: {err}"); return Err(err); From 709ae0a33b860f3d89e1d81673bc457c5ab86495 Mon Sep 17 00:00:00 2001 From: Al Liu Date: Mon, 3 Jun 2024 22:48:44 +0800 Subject: [PATCH 11/14] Update aof.rs --- crates/core/src/tracing/aof.rs | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/crates/core/src/tracing/aof.rs b/crates/core/src/tracing/aof.rs index 92072aa28..933940a45 100644 --- a/crates/core/src/tracing/aof.rs +++ b/crates/core/src/tracing/aof.rs @@ -298,6 +298,7 @@ impl LogFile { &mut self, remove_records: usize, ) -> Result<(), Box> { + let _guard = FILE_LOCK.lock().await; self.file.rewind().await?; // tracing::debug!(position = file.stream_position().await.unwrap()); let mut records_count = 0; @@ -341,11 +342,20 @@ impl LogFile { // Seek back to the beginning and write the remaining content self.file.rewind().await?; + self.file.get_mut().rewind().await?; self.file.write_all(&buffer).await?; - // Truncate the file to the new size self.file.get_ref().set_len(buffer.len() as u64).await?; self.file.get_ref().sync_all().await?; + + { + self.file.rewind().await?; + let records = Self::get_router_events_in(MAX_LOG_RECORDS, &mut self.file) + .await + .unwrap(); + tracing::error!("records {:?}", records.len()); + } + self.file.seek(io::SeekFrom::End(0)).await?; Ok(()) } @@ -358,9 +368,15 @@ impl LogFile { let event_num = max_event_number.min(MAX_EVENT_HISTORY); let _guard: tokio::sync::MutexGuard<'_, ()> = FILE_LOCK.lock().await; - let mut file = - tokio::io::BufReader::new(OpenOptions::new().read(true).open(event_log_path).await?); + let mut file = BufReader::new(OpenOptions::new().read(true).open(event_log_path).await?); + + Self::get_router_events_in(event_num, &mut file).await + } + async fn get_router_events_in( + event_num: usize, + file: &mut (impl AsyncRead + AsyncSeek + Unpin), + ) -> Result, DynError> { let new_records_ts = NEW_RECORDS_TS .get() .expect("set on initialization") From 15ecb7988b4dc0a259eeac66d0ecc4d59aabd921 Mon Sep 17 00:00:00 2001 From: Al Liu Date: Wed, 5 Jun 2024 12:26:28 +0800 Subject: [PATCH 12/14] Fix truncate --- crates/core/src/tracing/aof.rs | 105 +++++++++++++++++++++------------ 1 file changed, 67 insertions(+), 38 deletions(-) diff --git a/crates/core/src/tracing/aof.rs b/crates/core/src/tracing/aof.rs index 933940a45..a74541ad6 100644 --- a/crates/core/src/tracing/aof.rs +++ b/crates/core/src/tracing/aof.rs @@ -4,7 +4,7 @@ use tokio::{ io::{self, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWriteExt, BufReader, Error}, }; -use std::{path::Path, sync::atomic::AtomicUsize}; +use std::path::{Path, PathBuf}; use tokio::sync::Mutex; @@ -135,6 +135,7 @@ impl Batch { self.batch.len() } + #[allow(dead_code)] #[inline] fn is_empty(&self) -> bool { self.batch.is_empty() @@ -148,7 +149,9 @@ impl Batch { } pub(super) struct LogFile { - file: BufReader, + file: Option>, + path: PathBuf, + rewrite_path: PathBuf, // make this configurable? max_log_records: usize, pub(super) batch: Batch, @@ -166,11 +169,11 @@ impl LogFile { .open(path) .await?; let mut file = BufReader::new(file); - tracing::error!("open {}", path.display()); let states = Self::num_lines(&mut file).await.expect("non IO error"); - tracing::error!("states {:?}", states); Ok(Self { - file, + file: Some(file), + path: path.to_path_buf(), + rewrite_path: path.with_extension("rewrite"), max_log_records: MAX_LOG_RECORDS, batch: Batch { batch: Vec::with_capacity(BATCH_SIZE), @@ -299,26 +302,28 @@ impl LogFile { remove_records: usize, ) -> Result<(), Box> { let _guard = FILE_LOCK.lock().await; - self.file.rewind().await?; - // tracing::debug!(position = file.stream_position().await.unwrap()); + let mut file = self.file.take().unwrap(); + file.rewind().await?; + file.get_mut().rewind().await?; + let mut records_count = 0; let mut removed_states = States::default(); while records_count < remove_records { let mut header = [0u8; EVENT_LOG_HEADER_SIZE]; - if let Err(error) = self.file.read_exact(&mut header).await { + if let Err(error) = file.read_exact(&mut header).await { if matches!(error.kind(), io::ErrorKind::UnexpectedEof) { break; } - let pos = self.file.stream_position().await; + let pos = file.stream_position().await; tracing::error!(%error, ?pos, "error while trying to read file"); return Err(error.into()); } let length = DefaultEndian::read_u32(&header[..4]); - if let Err(error) = self.file.seek(io::SeekFrom::Current(length as i64)).await { + if let Err(error) = file.seek(io::SeekFrom::Current(length as i64)).await { if matches!(error.kind(), io::ErrorKind::UnexpectedEof) { break; } - let pos = self.file.stream_position().await; + let pos = file.stream_position().await; tracing::error!(%error, ?pos, "error while trying to read file"); return Err(error.into()); } @@ -326,37 +331,62 @@ impl LogFile { records_count += 1; } - // Copy the rest of the file to the buffer - let mut buffer = Vec::new(); - if let Err(error) = self.file.read_to_end(&mut buffer).await { - if !matches!(error.kind(), io::ErrorKind::UnexpectedEof) { - let pos = self.file.stream_position().await; + let mut bk = OpenOptions::new() + .create(true) + .truncate(true) + .write(true) + .read(true) + .open(&self.rewrite_path) + .await?; + + let mut new_states = States::default(); + loop { + let mut header = [0u8; EVENT_LOG_HEADER_SIZE]; + if let Err(error) = file.read_exact(&mut header).await { + if matches!(error.kind(), io::ErrorKind::UnexpectedEof) { + break; + } + let pos = file.stream_position().await; tracing::error!(%error, ?pos, "error while trying to read file"); return Err(error.into()); } - } - self.states -= removed_states; + let length = DefaultEndian::read_u32(&header[..4]); + let mut buf = vec![0u8; EVENT_LOG_HEADER_SIZE + length as usize]; + buf[..EVENT_LOG_HEADER_SIZE].copy_from_slice(&header); + if let Err(error) = file.read_exact(&mut buf[EVENT_LOG_HEADER_SIZE..]).await { + if matches!(error.kind(), io::ErrorKind::UnexpectedEof) { + break; + } + let pos = file.stream_position().await; + tracing::error!(%error, ?pos, "error while trying to read file"); + return Err(error.into()); + } + new_states += header[4]; - tracing::error!("removed {removed_states:?} remaining {:?}", self.states); + if let Err(error) = bk.write_all(&buf).await { + let pos = bk.stream_position().await; + tracing::error!(%error, ?pos, "error while trying to write file"); + return Err(error.into()); + } + } - // Seek back to the beginning and write the remaining content - self.file.rewind().await?; - self.file.get_mut().rewind().await?; - self.file.write_all(&buffer).await?; - // Truncate the file to the new size - self.file.get_ref().set_len(buffer.len() as u64).await?; - self.file.get_ref().sync_all().await?; + self.states = new_states; - { - self.file.rewind().await?; - let records = Self::get_router_events_in(MAX_LOG_RECORDS, &mut self.file) - .await - .unwrap(); - tracing::error!("records {:?}", records.len()); - } + drop(bk); + drop(file); + std::fs::remove_file(&self.path)?; + std::fs::rename(&self.rewrite_path, &self.path)?; + + self.file = Some(BufReader::new( + OpenOptions::new() + .read(true) + .append(true) + .write(true) + .open(&self.path) + .await?, + )); - self.file.seek(io::SeekFrom::End(0)).await?; Ok(()) } @@ -412,8 +442,6 @@ impl LogFile { num_records += 1; } - tracing::info!(len = records.len(), total = num_records, "records read"); - if records.is_empty() { return Ok(vec![]); } @@ -442,12 +470,13 @@ impl LogFile { pub async fn write_all(&mut self, data: &[u8]) -> io::Result<()> { let _guard = FILE_LOCK.lock().await; - if let Err(err) = self.file.get_mut().write_all(data).await { + let file = self.file.as_mut().unwrap(); + if let Err(err) = file.get_mut().write_all(data).await { tracing::error!("Failed writting to event log: {err}"); return Err(err); } - if let Err(err) = self.file.get_mut().sync_all().await { + if let Err(err) = file.get_mut().sync_all().await { tracing::error!("Failed syncing event log: {err}"); return Err(err); } From 14c5786fcfefbec11491ea68e84cf488c912665c Mon Sep 17 00:00:00 2001 From: Al Liu Date: Wed, 5 Jun 2024 12:35:25 +0800 Subject: [PATCH 13/14] Update aof.rs --- crates/core/src/tracing/aof.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/crates/core/src/tracing/aof.rs b/crates/core/src/tracing/aof.rs index a74541ad6..925b3ca0d 100644 --- a/crates/core/src/tracing/aof.rs +++ b/crates/core/src/tracing/aof.rs @@ -288,8 +288,6 @@ impl LogFile { // Check the number of lines and truncate if needed if self.states.total > self.max_log_records { - tracing::info!("before truncating {:?}", self.states); - if let Err(err) = self.truncate_records(REMOVE_RECS).await { tracing::error!("Failed truncating log file: {:?}", err); panic!("Failed truncating log file"); From 0febabe08b3c30480267be43a265ded086d8ef6b Mon Sep 17 00:00:00 2001 From: Al Liu Date: Thu, 6 Jun 2024 13:59:24 +0800 Subject: [PATCH 14/14] Remove states --- crates/core/src/tracing.rs | 5 +- crates/core/src/tracing/aof.rs | 158 ++++++--------------------------- 2 files changed, 27 insertions(+), 136 deletions(-) diff --git a/crates/core/src/tracing.rs b/crates/core/src/tracing.rs index 4640b1b47..f8596e7ec 100644 --- a/crates/core/src/tracing.rs +++ b/crates/core/src/tracing.rs @@ -420,7 +420,7 @@ impl EventRegister { // store remaining logs let moved_batch = std::mem::replace(&mut event_log.batch, aof::Batch::new(aof::BATCH_SIZE)); - let batch_states = moved_batch.states; + let batch_writes = moved_batch.num_writes; match aof::LogFile::encode_batch(&moved_batch) { Ok(batch_serialized_data) => { if !batch_serialized_data.is_empty() @@ -428,7 +428,7 @@ impl EventRegister { { panic!("Failed writting event log"); } - event_log.update_states(batch_states); + event_log.update_recs(batch_writes); } Err(err) => { tracing::error!("Failed encode batch: {err}"); @@ -1017,7 +1017,6 @@ pub(super) mod test { collections::HashMap, sync::atomic::{AtomicUsize, Ordering::SeqCst}, }; - use tracing::level_filters::LevelFilter; use super::*; use crate::{node::testing_impl::NodeLabel, ring::Distance}; diff --git a/crates/core/src/tracing/aof.rs b/crates/core/src/tracing/aof.rs index 925b3ca0d..a7e73867e 100644 --- a/crates/core/src/tracing/aof.rs +++ b/crates/core/src/tracing/aof.rs @@ -25,84 +25,9 @@ pub(super) const BATCH_SIZE: usize = EVENT_REGISTER_BATCH_SIZE; type DefaultEndian = byteorder::BigEndian; -#[derive(Debug, Default, Copy, Clone)] -pub(super) struct States { - total: usize, - connect_events: usize, - put_events: usize, - get_events: usize, - route_events: usize, - subscribed_events: usize, - ignored_events: usize, - disconnected_events: usize, -} - -impl core::ops::AddAssign for States { - fn add_assign(&mut self, rhs: Self) { - self.total += rhs.total; - self.connect_events += rhs.connect_events; - self.put_events += rhs.put_events; - self.get_events += rhs.get_events; - self.route_events += rhs.route_events; - self.subscribed_events += rhs.subscribed_events; - self.ignored_events += rhs.ignored_events; - self.disconnected_events += rhs.disconnected_events; - } -} - -impl core::ops::SubAssign for States { - fn sub_assign(&mut self, rhs: u8) { - self.total = self.total.saturating_sub(1); - match rhs { - EventKind::CONNECT => self.connect_events = self.connect_events.saturating_sub(1), - EventKind::PUT => self.put_events = self.put_events.saturating_sub(1), - EventKind::GET => self.get_events = self.get_events.saturating_sub(1), - EventKind::ROUTE => self.route_events = self.route_events.saturating_sub(1), - EventKind::SUBSCRIBED => { - self.subscribed_events = self.subscribed_events.saturating_sub(1) - } - EventKind::IGNORED => self.ignored_events = self.ignored_events.saturating_sub(1), - EventKind::DISCONNECTED => { - self.disconnected_events = self.disconnected_events.saturating_sub(1) - } - _ => unreachable!(), - } - } -} - -impl core::ops::AddAssign for States { - fn add_assign(&mut self, rhs: u8) { - self.total += 1; - - match rhs { - EventKind::CONNECT => self.connect_events += 1, - EventKind::PUT => self.put_events += 1, - EventKind::GET => self.get_events += 1, - EventKind::ROUTE => self.route_events += 1, - EventKind::SUBSCRIBED => self.subscribed_events += 1, - EventKind::IGNORED => self.ignored_events += 1, - EventKind::DISCONNECTED => self.disconnected_events += 1, - _ => unreachable!(), - } - } -} - -impl core::ops::SubAssign for States { - fn sub_assign(&mut self, rhs: Self) { - self.total -= rhs.total; - self.connect_events -= rhs.connect_events; - self.put_events -= rhs.put_events; - self.get_events -= rhs.get_events; - self.route_events -= rhs.route_events; - self.subscribed_events -= rhs.subscribed_events; - self.ignored_events -= rhs.ignored_events; - self.disconnected_events -= rhs.disconnected_events; - } -} - pub(super) struct Batch { pub batch: Vec, - pub states: States, + pub num_writes: usize, } impl Batch { @@ -110,23 +35,13 @@ impl Batch { pub fn new(cap: usize) -> Self { Self { batch: Vec::with_capacity(cap), - states: Default::default(), + num_writes: 0, } } #[inline] fn push(&mut self, log: NetLogMessage) { - match log.kind.varint_id() { - EventKind::CONNECT => self.states.connect_events += 1, - EventKind::PUT => self.states.put_events += 1, - EventKind::GET => self.states.get_events += 1, - EventKind::ROUTE => self.states.route_events += 1, - EventKind::SUBSCRIBED => self.states.subscribed_events += 1, - EventKind::IGNORED => self.states.ignored_events += 1, - EventKind::DISCONNECTED => self.states.disconnected_events += 1, - _ => unreachable!(), - } - self.states.total += 1; + self.num_writes += 1; self.batch.push(log); } @@ -144,7 +59,7 @@ impl Batch { #[inline] fn clear(&mut self) { self.batch.clear(); - self.states = Default::default(); + self.num_writes = 0; } } @@ -155,8 +70,8 @@ pub(super) struct LogFile { // make this configurable? max_log_records: usize, pub(super) batch: Batch, - current_states: States, - states: States, + num_writes: usize, + num_recs: usize, } impl LogFile { @@ -169,7 +84,7 @@ impl LogFile { .open(path) .await?; let mut file = BufReader::new(file); - let states = Self::num_lines(&mut file).await.expect("non IO error"); + let num_recs = Self::num_lines(&mut file).await.expect("non IO error"); Ok(Self { file: Some(file), path: path.to_path_buf(), @@ -177,15 +92,15 @@ impl LogFile { max_log_records: MAX_LOG_RECORDS, batch: Batch { batch: Vec::with_capacity(BATCH_SIZE), - states: Default::default(), + num_writes: 0, }, - current_states: Default::default(), - states, + num_writes: 0, + num_recs, }) } - pub(super) fn update_states(&mut self, states: States) { - self.states += states; + pub(super) fn update_recs(&mut self, recs: usize) { + self.num_recs += recs; } pub fn encode_log( @@ -198,15 +113,8 @@ impl LogFile { Ok((header, serialized)) } - async fn num_lines(file: &mut (impl AsyncRead + AsyncSeek + Unpin)) -> io::Result { + async fn num_lines(file: &mut (impl AsyncRead + AsyncSeek + Unpin)) -> io::Result { let mut num_records = 0; - let mut connect_events: usize = 0; - let mut put_events: usize = 0; - let mut get_events: usize = 0; - let mut route_events: usize = 0; - let mut subscribed_events: usize = 0; - let mut ignored_events: usize = 0; - let mut disconnected_events: usize = 0; let mut buf = [0; EVENT_LOG_HEADER_SIZE]; // Read the u32 length prefix + u8 event kind @@ -221,13 +129,7 @@ impl LogFile { let length = DefaultEndian::read_u32(&buf[..4]) as u64; match buf[4] { - EventKind::CONNECT => connect_events += 1, - EventKind::PUT => put_events += 1, - EventKind::GET => get_events += 1, - EventKind::ROUTE => route_events += 1, - EventKind::SUBSCRIBED => subscribed_events += 1, - EventKind::IGNORED => ignored_events += 1, - EventKind::DISCONNECTED => disconnected_events += 1, + 0..=6 => {} _ => { return Err(io::Error::new( io::ErrorKind::InvalidData, @@ -241,16 +143,7 @@ impl LogFile { } } - Ok(States { - total: num_records, - connect_events, - put_events, - get_events, - route_events, - subscribed_events, - ignored_events, - disconnected_events, - }) + Ok(num_records) } pub async fn persist_log(&mut self, log: NetLogMessage) { @@ -259,14 +152,14 @@ impl LogFile { if self.batch.len() >= BATCH_SIZE { let moved_batch = std::mem::replace(&mut self.batch, Batch::new(BATCH_SIZE)); - let batch_states = moved_batch.states; + let batch_writes = moved_batch.num_writes; let serialization_task = tokio::task::spawn_blocking(move || Self::encode_batch(&moved_batch)); match serialization_task.await { Ok(Ok(serialized_data)) => { batch_buf = serialized_data; - self.current_states += batch_states; + self.num_writes += batch_writes; self.batch.clear(); // Clear the batch for new data } _ => { @@ -275,19 +168,19 @@ impl LogFile { } } - if self.current_states.total >= BATCH_SIZE { + if self.num_writes >= BATCH_SIZE { { let res = self.write_all(&batch_buf).await; if res.is_err() { panic!("Failed writing to log file"); } } - self.states += self.current_states; - self.current_states = Default::default(); + self.num_recs += self.num_writes; + self.num_writes = 0; } // Check the number of lines and truncate if needed - if self.states.total > self.max_log_records { + if self.num_recs > self.max_log_records { if let Err(err) = self.truncate_records(REMOVE_RECS).await { tracing::error!("Failed truncating log file: {:?}", err); panic!("Failed truncating log file"); @@ -305,7 +198,6 @@ impl LogFile { file.get_mut().rewind().await?; let mut records_count = 0; - let mut removed_states = States::default(); while records_count < remove_records { let mut header = [0u8; EVENT_LOG_HEADER_SIZE]; if let Err(error) = file.read_exact(&mut header).await { @@ -325,7 +217,6 @@ impl LogFile { tracing::error!(%error, ?pos, "error while trying to read file"); return Err(error.into()); } - removed_states += header[4]; records_count += 1; } @@ -337,7 +228,7 @@ impl LogFile { .open(&self.rewrite_path) .await?; - let mut new_states = States::default(); + let mut num_recs = 0; loop { let mut header = [0u8; EVENT_LOG_HEADER_SIZE]; if let Err(error) = file.read_exact(&mut header).await { @@ -360,7 +251,8 @@ impl LogFile { tracing::error!(%error, ?pos, "error while trying to read file"); return Err(error.into()); } - new_states += header[4]; + + num_recs += 1; if let Err(error) = bk.write_all(&buf).await { let pos = bk.stream_position().await; @@ -369,7 +261,7 @@ impl LogFile { } } - self.states = new_states; + self.num_recs = num_recs; drop(bk); drop(file);