From fd1a77149c25e34513cc827fa1e412bbe2013963 Mon Sep 17 00:00:00 2001 From: numinex Date: Fri, 1 Nov 2024 12:23:15 +0100 Subject: [PATCH 01/11] Fix batch_accumulator bug when rolling to next segment --- bench/src/consumer.rs | 4 +++- server/src/channels/commands/maintain_messages.rs | 2 +- .../storage_conversion/persistency/partitions.rs | 1 + server/src/streaming/batching/batch_accumulator.rs | 3 ++- server/src/streaming/partitions/messages.rs | 14 +++++++++++++- server/src/streaming/partitions/partition.rs | 1 + server/src/streaming/partitions/persistence.rs | 2 +- server/src/streaming/partitions/segments.rs | 4 +++- server/src/streaming/partitions/storage.rs | 1 + server/src/streaming/segments/messages.rs | 12 +++++++++--- server/src/streaming/segments/segment.rs | 1 + 11 files changed, 36 insertions(+), 9 deletions(-) diff --git a/bench/src/consumer.rs b/bench/src/consumer.rs index 6e80dcbc5..3b5b09c80 100644 --- a/bench/src/consumer.rs +++ b/bench/src/consumer.rs @@ -131,6 +131,7 @@ impl Consumer { } current_iteration = 0; + let mut previous_start_offset = 0; let start_timestamp = Instant::now(); while received_messages < total_messages { let offset = current_iteration * self.messages_per_batch as u64; @@ -173,7 +174,7 @@ impl Consumer { ); continue; } - + assert!(previous_start_offset <= polled_messages.messages[0].offset); if polled_messages.messages.len() != self.messages_per_batch as usize { warn!( "Consumer #{} → expected {} messages, but got {} messages, retrying...", @@ -183,6 +184,7 @@ impl Consumer { ); continue; } + previous_start_offset = polled_messages.messages[0].offset; latencies.push(latency_end); received_messages += polled_messages.messages.len() as u64; diff --git a/server/src/channels/commands/maintain_messages.rs b/server/src/channels/commands/maintain_messages.rs index f68630747..7c343450a 100644 --- a/server/src/channels/commands/maintain_messages.rs +++ b/server/src/channels/commands/maintain_messages.rs @@ -498,7 +498,7 @@ async fn delete_segments( if partition.get_segments().is_empty() { let start_offset = last_end_offset + 1; - partition.add_persisted_segment(start_offset).await?; + partition.add_persisted_segment(None, start_offset).await?; } } Err(error) => { diff --git a/server/src/compat/storage_conversion/persistency/partitions.rs b/server/src/compat/storage_conversion/persistency/partitions.rs index 7a65c9221..ea52bce62 100644 --- a/server/src/compat/storage_conversion/persistency/partitions.rs +++ b/server/src/compat/storage_conversion/persistency/partitions.rs @@ -178,6 +178,7 @@ pub async fn load( partition.topic_id, partition.partition_id, start_offset, + None, partition.config.clone(), partition.storage.clone(), partition.message_expiry, diff --git a/server/src/streaming/batching/batch_accumulator.rs b/server/src/streaming/batching/batch_accumulator.rs index 86382927c..e800163d1 100644 --- a/server/src/streaming/batching/batch_accumulator.rs +++ b/server/src/streaming/batching/batch_accumulator.rs @@ -1,9 +1,10 @@ use super::message_batch::{RetainedMessageBatch, RETAINED_BATCH_OVERHEAD}; use crate::streaming::{models::messages::RetainedMessage, sizeable::Sizeable}; use bytes::BytesMut; +use tracing::warn; use std::sync::Arc; -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct BatchAccumulator { base_offset: u64, current_size: u64, diff --git a/server/src/streaming/partitions/messages.rs b/server/src/streaming/partitions/messages.rs index bb4ff6a01..bf090dd2f 100644 --- a/server/src/streaming/partitions/messages.rs +++ b/server/src/streaming/partitions/messages.rs @@ -132,10 +132,12 @@ impl Partition { self.partition_id ); if self.segments.is_empty() { + warn!("Empty messages segment is empty"); return Ok(EMPTY_MESSAGES.into_iter().map(Arc::new).collect()); } if start_offset > self.current_offset { + warn!("Empty messages offset is greater than current_offset"); return Ok(EMPTY_MESSAGES.into_iter().map(Arc::new).collect()); } @@ -143,10 +145,14 @@ impl Partition { let messages = self.try_get_messages_from_cache(start_offset, end_offset); if let Some(messages) = messages { + warn!("getting messages from cache"); return Ok(messages); } let segments = self.filter_segments_by_offsets(start_offset, end_offset); + if segments.len() == 0 { + warn!("Empty messages no segments"); + } match segments.len() { 0 => Ok(EMPTY_MESSAGES.into_iter().map(Arc::new).collect()), 1 => segments[0].get_messages(start_offset, count).await, @@ -264,6 +270,7 @@ impl Partition { ) -> Option>> { let cache = self.cache.as_ref()?; if cache.is_empty() || start_offset > end_offset || end_offset > self.current_offset { + warn!("Empty messages cache is empty"); return None; } @@ -277,6 +284,7 @@ impl Partition { if start_offset >= first_buffered_offset { return Some(self.load_messages_from_cache(start_offset, end_offset)); } + warn!("Empty messages cache offset is greater or equal to cache start_offset"); None } @@ -340,11 +348,13 @@ impl Partition { ); if self.cache.is_none() || start_offset > end_offset { + warn!("Empty messages cache is none or start_offset > end_offset"); return EMPTY_MESSAGES.into_iter().map(Arc::new).collect(); } let cache = self.cache.as_ref().unwrap(); if cache.is_empty() { + warn!("Empty messages cache is empty"); return EMPTY_MESSAGES.into_iter().map(Arc::new).collect(); } @@ -386,11 +396,13 @@ impl Partition { let last_segment = self.segments.last_mut().ok_or(IggyError::SegmentNotFound)?; if last_segment.is_closed { let start_offset = last_segment.end_offset + 1; + let unsaved_messages = last_segment.unsaved_messages.clone(); trace!( "Current segment is closed, creating new segment with start offset: {} for partition with ID: {}...", start_offset, self.partition_id ); - self.add_persisted_segment(start_offset).await?; + // Rolling over messages that couldn't be persisted during segment close. + self.add_persisted_segment(unsaved_messages, start_offset).await?; } } diff --git a/server/src/streaming/partitions/partition.rs b/server/src/streaming/partitions/partition.rs index 32626356c..7dfad8f1b 100644 --- a/server/src/streaming/partitions/partition.rs +++ b/server/src/streaming/partitions/partition.rs @@ -148,6 +148,7 @@ impl Partition { topic_id, partition_id, 0, + None, partition.config.clone(), partition.storage.clone(), partition.message_expiry, diff --git a/server/src/streaming/partitions/persistence.rs b/server/src/streaming/partitions/persistence.rs index f34009f74..f09c7c1eb 100644 --- a/server/src/streaming/partitions/persistence.rs +++ b/server/src/streaming/partitions/persistence.rs @@ -48,7 +48,7 @@ impl Partition { .partition .delete_consumer_offsets(&self.consumer_group_offsets_path) .await?; - self.add_persisted_segment(0).await?; + self.add_persisted_segment(None, 0).await?; if !Path::new(&self.consumer_offsets_path).exists() && create_dir(&self.consumer_offsets_path).await.is_err() diff --git a/server/src/streaming/partitions/segments.rs b/server/src/streaming/partitions/segments.rs index eccf2a805..41b8d31ee 100644 --- a/server/src/streaming/partitions/segments.rs +++ b/server/src/streaming/partitions/segments.rs @@ -1,5 +1,6 @@ use std::sync::atomic::Ordering; +use crate::streaming::batching::batch_accumulator::BatchAccumulator; use crate::streaming::partitions::partition::Partition; use crate::streaming::segments::segment::Segment; use iggy::error::IggyError; @@ -42,7 +43,7 @@ impl Partition { expired_segments } - pub async fn add_persisted_segment(&mut self, start_offset: u64) -> Result<(), IggyError> { + pub async fn add_persisted_segment(&mut self, unsaved_messages: Option, start_offset: u64) -> Result<(), IggyError> { info!( "Creating the new segment for partition with ID: {}, stream with ID: {}, topic with ID: {}...", self.partition_id, self.stream_id, self.topic_id @@ -52,6 +53,7 @@ impl Partition { self.topic_id, self.partition_id, start_offset, + unsaved_messages, self.config.clone(), self.storage.clone(), self.message_expiry, diff --git a/server/src/streaming/partitions/storage.rs b/server/src/streaming/partitions/storage.rs index 692a4f299..65eef04c7 100644 --- a/server/src/streaming/partitions/storage.rs +++ b/server/src/streaming/partitions/storage.rs @@ -76,6 +76,7 @@ impl PartitionStorage for FilePartitionStorage { partition.topic_id, partition.partition_id, start_offset, + None, partition.config.clone(), partition.storage.clone(), partition.message_expiry, diff --git a/server/src/streaming/segments/messages.rs b/server/src/streaming/segments/messages.rs index 186795430..d63546605 100644 --- a/server/src/streaming/segments/messages.rs +++ b/server/src/streaming/segments/messages.rs @@ -9,7 +9,7 @@ use crate::streaming::sizeable::Sizeable; use iggy::error::IggyError; use std::sync::atomic::Ordering; use std::sync::Arc; -use tracing::{info, trace, warn}; +use tracing::{error, info, trace, warn}; const EMPTY_MESSAGES: Vec = vec![]; @@ -28,6 +28,7 @@ impl Segment { count: u32, ) -> Result>, IggyError> { if count == 0 { + warn!("Empty messages, count = 0"); return Ok(EMPTY_MESSAGES.into_iter().map(Arc::new).collect()); } @@ -94,6 +95,7 @@ impl Segment { start_offset: u64, end_offset: u64, ) -> Vec> { + warn!("Loading messages from unsaved_buffer"); let batch_accumulator = self.unsaved_messages.as_ref().unwrap(); batch_accumulator.get_messages_by_offset(start_offset, end_offset) } @@ -128,11 +130,12 @@ impl Segment { ) { Ok(range) => range, Err(_) => { - trace!( + error!( "Cannot load messages from disk, index range not found: {} - {}.", start_offset, end_offset ); + warn!("Empty messages, error when getting index"); return Ok(EMPTY_MESSAGES.into_iter().map(Arc::new).collect()); } }; @@ -173,6 +176,9 @@ impl Segment { msg.offset >= start_offset && msg.offset <= end_offset }); + if messages.len() == 0 { + warn!("Loading messages with count 0"); + } trace!( "Loaded {} messages from disk, segment start offset: {}, end offset: {}.", messages.len(), @@ -281,6 +287,7 @@ impl Segment { self.unsaved_messages = Some(batch_accumulator); } let saved_bytes = storage.save_batches(self, batch).await?; + assert_eq!(saved_bytes, batch_size); storage.save_index(&self.index_path, index).await?; storage .save_time_index(&self.time_index_path, time_index) @@ -305,7 +312,6 @@ impl Segment { if self.is_full().await { self.end_offset = self.current_offset; self.is_closed = true; - self.unsaved_messages = None; info!( "Closed segment with start offset: {} for partition with ID: {}.", self.start_offset, self.partition_id diff --git a/server/src/streaming/segments/segment.rs b/server/src/streaming/segments/segment.rs index 600240634..b85d97541 100644 --- a/server/src/streaming/segments/segment.rs +++ b/server/src/streaming/segments/segment.rs @@ -65,6 +65,7 @@ impl Segment { topic_id: u32, partition_id: u32, start_offset: u64, + unsaved_messages: Option, config: Arc, storage: Arc, message_expiry: IggyExpiry, From 739826cd8760558723f64840173feb9bc8bd97d1 Mon Sep 17 00:00:00 2001 From: numinex Date: Fri, 1 Nov 2024 12:26:27 +0100 Subject: [PATCH 02/11] Remove warn prints --- server/src/streaming/partitions/messages.rs | 10 ---------- server/src/streaming/segments/messages.rs | 7 ------- 2 files changed, 17 deletions(-) diff --git a/server/src/streaming/partitions/messages.rs b/server/src/streaming/partitions/messages.rs index bf090dd2f..17e498add 100644 --- a/server/src/streaming/partitions/messages.rs +++ b/server/src/streaming/partitions/messages.rs @@ -132,12 +132,10 @@ impl Partition { self.partition_id ); if self.segments.is_empty() { - warn!("Empty messages segment is empty"); return Ok(EMPTY_MESSAGES.into_iter().map(Arc::new).collect()); } if start_offset > self.current_offset { - warn!("Empty messages offset is greater than current_offset"); return Ok(EMPTY_MESSAGES.into_iter().map(Arc::new).collect()); } @@ -145,14 +143,10 @@ impl Partition { let messages = self.try_get_messages_from_cache(start_offset, end_offset); if let Some(messages) = messages { - warn!("getting messages from cache"); return Ok(messages); } let segments = self.filter_segments_by_offsets(start_offset, end_offset); - if segments.len() == 0 { - warn!("Empty messages no segments"); - } match segments.len() { 0 => Ok(EMPTY_MESSAGES.into_iter().map(Arc::new).collect()), 1 => segments[0].get_messages(start_offset, count).await, @@ -270,7 +264,6 @@ impl Partition { ) -> Option>> { let cache = self.cache.as_ref()?; if cache.is_empty() || start_offset > end_offset || end_offset > self.current_offset { - warn!("Empty messages cache is empty"); return None; } @@ -284,7 +277,6 @@ impl Partition { if start_offset >= first_buffered_offset { return Some(self.load_messages_from_cache(start_offset, end_offset)); } - warn!("Empty messages cache offset is greater or equal to cache start_offset"); None } @@ -348,13 +340,11 @@ impl Partition { ); if self.cache.is_none() || start_offset > end_offset { - warn!("Empty messages cache is none or start_offset > end_offset"); return EMPTY_MESSAGES.into_iter().map(Arc::new).collect(); } let cache = self.cache.as_ref().unwrap(); if cache.is_empty() { - warn!("Empty messages cache is empty"); return EMPTY_MESSAGES.into_iter().map(Arc::new).collect(); } diff --git a/server/src/streaming/segments/messages.rs b/server/src/streaming/segments/messages.rs index d63546605..ce0296646 100644 --- a/server/src/streaming/segments/messages.rs +++ b/server/src/streaming/segments/messages.rs @@ -28,7 +28,6 @@ impl Segment { count: u32, ) -> Result>, IggyError> { if count == 0 { - warn!("Empty messages, count = 0"); return Ok(EMPTY_MESSAGES.into_iter().map(Arc::new).collect()); } @@ -95,7 +94,6 @@ impl Segment { start_offset: u64, end_offset: u64, ) -> Vec> { - warn!("Loading messages from unsaved_buffer"); let batch_accumulator = self.unsaved_messages.as_ref().unwrap(); batch_accumulator.get_messages_by_offset(start_offset, end_offset) } @@ -135,7 +133,6 @@ impl Segment { start_offset, end_offset ); - warn!("Empty messages, error when getting index"); return Ok(EMPTY_MESSAGES.into_iter().map(Arc::new).collect()); } }; @@ -175,10 +172,6 @@ impl Segment { .to_messages_with_filter(messages_count, &|msg| { msg.offset >= start_offset && msg.offset <= end_offset }); - - if messages.len() == 0 { - warn!("Loading messages with count 0"); - } trace!( "Loaded {} messages from disk, segment start offset: {}, end offset: {}.", messages.len(), From 673aebd2813522b4afc1aa0bbc1e40fc83c15e51 Mon Sep 17 00:00:00 2001 From: numinex Date: Fri, 1 Nov 2024 14:26:22 +0100 Subject: [PATCH 03/11] Fix tests --- integration/tests/streaming/segment.rs | 7 +++++++ server/src/streaming/batching/batch_accumulator.rs | 1 - server/src/streaming/segments/index.rs | 1 + server/src/streaming/segments/segment.rs | 5 ++++- 4 files changed, 12 insertions(+), 2 deletions(-) diff --git a/integration/tests/streaming/segment.rs b/integration/tests/streaming/segment.rs index 7673340bd..51df8adc3 100644 --- a/integration/tests/streaming/segment.rs +++ b/integration/tests/streaming/segment.rs @@ -25,6 +25,7 @@ async fn should_persist_segment() { topic_id, partition_id, start_offset, + None, setup.config.clone(), setup.storage.clone(), IggyExpiry::NeverExpire, @@ -63,6 +64,7 @@ async fn should_load_existing_segment_from_disk() { topic_id, partition_id, start_offset, + None, setup.config.clone(), setup.storage.clone(), IggyExpiry::NeverExpire, @@ -90,6 +92,7 @@ async fn should_load_existing_segment_from_disk() { topic_id, partition_id, start_offset, + None, setup.config.clone(), setup.storage.clone(), IggyExpiry::NeverExpire, @@ -128,6 +131,7 @@ async fn should_persist_and_load_segment_with_messages() { topic_id, partition_id, start_offset, + None, setup.config.clone(), setup.storage.clone(), IggyExpiry::NeverExpire, @@ -179,6 +183,7 @@ async fn should_persist_and_load_segment_with_messages() { topic_id, partition_id, start_offset, + None, setup.config.clone(), setup.storage.clone(), IggyExpiry::NeverExpire, @@ -211,6 +216,7 @@ async fn given_all_expired_messages_segment_should_be_expired() { topic_id, partition_id, start_offset, + None, setup.config.clone(), setup.storage.clone(), message_expiry, @@ -279,6 +285,7 @@ async fn given_at_least_one_not_expired_message_segment_should_not_be_expired() topic_id, partition_id, start_offset, + None, setup.config.clone(), setup.storage.clone(), message_expiry, diff --git a/server/src/streaming/batching/batch_accumulator.rs b/server/src/streaming/batching/batch_accumulator.rs index e800163d1..6cd4a66a4 100644 --- a/server/src/streaming/batching/batch_accumulator.rs +++ b/server/src/streaming/batching/batch_accumulator.rs @@ -1,7 +1,6 @@ use super::message_batch::{RetainedMessageBatch, RETAINED_BATCH_OVERHEAD}; use crate::streaming::{models::messages::RetainedMessage, sizeable::Sizeable}; use bytes::BytesMut; -use tracing::warn; use std::sync::Arc; #[derive(Debug, Clone)] diff --git a/server/src/streaming/segments/index.rs b/server/src/streaming/segments/index.rs index 2c8cd0e20..e34920514 100644 --- a/server/src/streaming/segments/index.rs +++ b/server/src/streaming/segments/index.rs @@ -97,6 +97,7 @@ mod tests { topic_id, partition_id, start_offset, + None, config, storage, IggyExpiry::NeverExpire, diff --git a/server/src/streaming/segments/segment.rs b/server/src/streaming/segments/segment.rs index b85d97541..df84fc431 100644 --- a/server/src/streaming/segments/segment.rs +++ b/server/src/streaming/segments/segment.rs @@ -103,7 +103,7 @@ impl Segment { true => Some(Vec::new()), false => None, }, - unsaved_messages: None, + unsaved_messages, is_closed: false, size_of_parent_stream, size_of_parent_partition, @@ -267,6 +267,7 @@ mod tests { topic_id, partition_id, start_offset, + None, config, storage, message_expiry, @@ -323,6 +324,7 @@ mod tests { topic_id, partition_id, start_offset, + None, config, storage, message_expiry, @@ -364,6 +366,7 @@ mod tests { topic_id, partition_id, start_offset, + None, config, storage, message_expiry, From fa9b17b86de8502d768fedc7333e7b5e1e7e4f3c Mon Sep 17 00:00:00 2001 From: spetz Date: Sat, 2 Nov 2024 21:28:11 +0100 Subject: [PATCH 04/11] Increate max payload and headers size, update config --- Cargo.lock | 4 ++-- configs/server.toml | 4 ++-- sdk/Cargo.toml | 2 +- sdk/src/messages/mod.rs | 4 ++-- server/Cargo.toml | 2 +- server/src/streaming/partitions/messages.rs | 3 ++- server/src/streaming/partitions/segments.rs | 6 +++++- server/src/streaming/segments/messages.rs | 3 +-- server/src/streaming/segments/segment.rs | 2 +- 9 files changed, 17 insertions(+), 13 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ea374f061..f6d5daab0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2022,7 +2022,7 @@ dependencies = [ [[package]] name = "iggy" -version = "0.6.31" +version = "0.6.32" dependencies = [ "aes-gcm", "ahash 0.8.11", @@ -3998,7 +3998,7 @@ dependencies = [ [[package]] name = "server" -version = "0.4.60" +version = "0.4.61" dependencies = [ "ahash 0.8.11", "anyhow", diff --git a/configs/server.toml b/configs/server.toml index d1f600042..4caa195cc 100644 --- a/configs/server.toml +++ b/configs/server.toml @@ -420,12 +420,12 @@ validate_checksum = false # The threshold of buffered messages before triggering a save to disk (integer). # Specifies how many messages accumulate before persisting to storage. # Adjusting this can balance between write performance and data durability. -messages_required_to_save = 5000 +messages_required_to_save = 1000 # Segment configuration [system.segment] # Defines the soft limit for the size of a storage segment. -# When a segment reaches this size, a new segment is created for subsequent data. +# When a segment reaches this size (maximum 4 GB), a new segment is created for subsequent data. # Example: if `size` is set "1GB", the actual segment size may be 1GB + the size of remaining messages in received batch. size = "1 GB" # Configures the message time-based expiry setting. diff --git a/sdk/Cargo.toml b/sdk/Cargo.toml index b4736714e..8cf7a4063 100644 --- a/sdk/Cargo.toml +++ b/sdk/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "iggy" -version = "0.6.31" +version = "0.6.32" description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second." edition = "2021" license = "MIT" diff --git a/sdk/src/messages/mod.rs b/sdk/src/messages/mod.rs index 8a588efca..752b772f9 100644 --- a/sdk/src/messages/mod.rs +++ b/sdk/src/messages/mod.rs @@ -2,5 +2,5 @@ pub mod flush_unsaved_buffer; pub mod poll_messages; pub mod send_messages; -const MAX_HEADERS_SIZE: u32 = 100 * 1000; -pub const MAX_PAYLOAD_SIZE: u32 = 10 * 1000 * 1000; +const MAX_HEADERS_SIZE: u32 = 1000 * 1000; +pub const MAX_PAYLOAD_SIZE: u32 = 1000 * 1000 * 1000; diff --git a/server/Cargo.toml b/server/Cargo.toml index b14db14c8..957196c72 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "server" -version = "0.4.60" +version = "0.4.61" edition = "2021" build = "src/build.rs" diff --git a/server/src/streaming/partitions/messages.rs b/server/src/streaming/partitions/messages.rs index 17e498add..75be2f3e7 100644 --- a/server/src/streaming/partitions/messages.rs +++ b/server/src/streaming/partitions/messages.rs @@ -392,7 +392,8 @@ impl Partition { start_offset, self.partition_id ); // Rolling over messages that couldn't be persisted during segment close. - self.add_persisted_segment(unsaved_messages, start_offset).await?; + self.add_persisted_segment(unsaved_messages, start_offset) + .await?; } } diff --git a/server/src/streaming/partitions/segments.rs b/server/src/streaming/partitions/segments.rs index 41b8d31ee..7857ed6f6 100644 --- a/server/src/streaming/partitions/segments.rs +++ b/server/src/streaming/partitions/segments.rs @@ -43,7 +43,11 @@ impl Partition { expired_segments } - pub async fn add_persisted_segment(&mut self, unsaved_messages: Option, start_offset: u64) -> Result<(), IggyError> { + pub async fn add_persisted_segment( + &mut self, + unsaved_messages: Option, + start_offset: u64, + ) -> Result<(), IggyError> { info!( "Creating the new segment for partition with ID: {}, stream with ID: {}, topic with ID: {}...", self.partition_id, self.stream_id, self.topic_id diff --git a/server/src/streaming/segments/messages.rs b/server/src/streaming/segments/messages.rs index ce0296646..270452674 100644 --- a/server/src/streaming/segments/messages.rs +++ b/server/src/streaming/segments/messages.rs @@ -130,8 +130,7 @@ impl Segment { Err(_) => { error!( "Cannot load messages from disk, index range not found: {} - {}.", - start_offset, - end_offset + start_offset, end_offset ); return Ok(EMPTY_MESSAGES.into_iter().map(Arc::new).collect()); } diff --git a/server/src/streaming/segments/segment.rs b/server/src/streaming/segments/segment.rs index df84fc431..64298c9b1 100644 --- a/server/src/streaming/segments/segment.rs +++ b/server/src/streaming/segments/segment.rs @@ -27,7 +27,7 @@ use tracing::{info, trace}; pub const LOG_EXTENSION: &str = "log"; pub const INDEX_EXTENSION: &str = "index"; pub const TIME_INDEX_EXTENSION: &str = "timeindex"; -pub const MAX_SIZE_BYTES: u32 = 1000 * 1000 * 1000; +pub const MAX_SIZE_BYTES: u32 = 4000 * 1000 * 1000; #[derive(Debug)] pub struct Segment { From 3229a19173bbe409da31a8640fa5a04f7c8f9b92 Mon Sep 17 00:00:00 2001 From: spetz Date: Sat, 2 Nov 2024 21:51:40 +0100 Subject: [PATCH 05/11] Fix fsync --- configs/server.toml | 2 +- .../tests/streaming/common/test_setup.rs | 11 ++++++++--- integration/tests/streaming/segment.rs | 6 +++--- server/src/channels/commands/save_messages.rs | 8 ++++++-- server/src/compat/storage_conversion/mod.rs | 1 + server/src/streaming/partitions/messages.rs | 4 ++-- server/src/streaming/segments/messages.rs | 4 ++-- server/src/streaming/segments/storage.rs | 18 ++++++++++++++---- server/src/streaming/storage.rs | 10 ++++++++-- server/src/streaming/streams/persistence.rs | 4 ++-- server/src/streaming/systems/system.rs | 9 +++++---- server/src/streaming/topics/persistence.rs | 4 ++-- 12 files changed, 54 insertions(+), 27 deletions(-) diff --git a/configs/server.toml b/configs/server.toml index 4caa195cc..25f6c2398 100644 --- a/configs/server.toml +++ b/configs/server.toml @@ -318,7 +318,7 @@ path = "compatibility" # Determines whether to enforce file synchronization on state updates (boolean). # `true` ensures immediate writing of data to disk for durability. # `false` allows the OS to manage write operations, which can improve performance. -enforce_fsync = false +enforce_fsync = true # Runtime configuration. [system.runtime] diff --git a/integration/tests/streaming/common/test_setup.rs b/integration/tests/streaming/common/test_setup.rs index 7585ee60c..dee433404 100644 --- a/integration/tests/streaming/common/test_setup.rs +++ b/integration/tests/streaming/common/test_setup.rs @@ -1,5 +1,5 @@ use server::configs::system::SystemConfig; -use server::streaming::persistence::persister::FilePersister; +use server::streaming::persistence::persister::{FilePersister, FileWithSyncPersister}; use server::streaming::storage::SystemStorage; use std::sync::Arc; use tokio::fs; @@ -20,8 +20,13 @@ impl TestSetup { let config = Arc::new(config); fs::create_dir(config.get_system_path()).await.unwrap(); - let persister = FilePersister {}; - let storage = Arc::new(SystemStorage::new(config.clone(), Arc::new(persister))); + let persister = FilePersister; + let fsync_persister = FileWithSyncPersister; + let storage = Arc::new(SystemStorage::new( + config.clone(), + Arc::new(persister), + Arc::new(fsync_persister), + )); TestSetup { config, storage } } diff --git a/integration/tests/streaming/segment.rs b/integration/tests/streaming/segment.rs index 51df8adc3..065663c33 100644 --- a/integration/tests/streaming/segment.rs +++ b/integration/tests/streaming/segment.rs @@ -177,7 +177,7 @@ async fn should_persist_and_load_segment_with_messages() { .append_batch(batch_size, messages_count as u32, &messages) .await .unwrap(); - segment.persist_messages().await.unwrap(); + segment.persist_messages(true).await.unwrap(); let mut loaded_segment = segment::Segment::create( stream_id, topic_id, @@ -264,7 +264,7 @@ async fn given_all_expired_messages_segment_should_be_expired() { .append_batch(batch_size, messages_count as u32, &messages) .await .unwrap(); - segment.persist_messages().await.unwrap(); + segment.persist_messages(true).await.unwrap(); segment.is_closed = true; let is_expired = segment.is_expired(now).await; @@ -350,7 +350,7 @@ async fn given_at_least_one_not_expired_message_segment_should_not_be_expired() .append_batch(not_expired_message_size, 1, ¬_expired_messages) .await .unwrap(); - segment.persist_messages().await.unwrap(); + segment.persist_messages(true).await.unwrap(); let is_expired = segment.is_expired(now).await; assert!(!is_expired); diff --git a/server/src/channels/commands/save_messages.rs b/server/src/channels/commands/save_messages.rs index 6c489a876..af30d31c1 100644 --- a/server/src/channels/commands/save_messages.rs +++ b/server/src/channels/commands/save_messages.rs @@ -59,8 +59,12 @@ impl MessagesSaver { #[async_trait] impl ServerCommand for SaveMessagesExecutor { #[instrument(skip_all)] - async fn execute(&mut self, system: &SharedSystem, _command: SaveMessagesCommand) { - let saved_messages_count = system.read().await.persist_messages().await; + async fn execute(&mut self, system: &SharedSystem, command: SaveMessagesCommand) { + let saved_messages_count = system + .read() + .await + .persist_messages(command.enforce_fsync) + .await; match saved_messages_count { Ok(n) => { if n > 0 { diff --git a/server/src/compat/storage_conversion/mod.rs b/server/src/compat/storage_conversion/mod.rs index 0663e7058..c0a35331b 100644 --- a/server/src/compat/storage_conversion/mod.rs +++ b/server/src/compat/storage_conversion/mod.rs @@ -235,6 +235,7 @@ impl SegmentStorage for NoopSegmentStorage { &self, _segment: &Segment, _batch: RetainedMessageBatch, + _fsync: bool, ) -> Result { Ok(0) } diff --git a/server/src/streaming/partitions/messages.rs b/server/src/streaming/partitions/messages.rs index 75be2f3e7..6b6a54d68 100644 --- a/server/src/streaming/partitions/messages.rs +++ b/server/src/streaming/partitions/messages.rs @@ -488,7 +488,7 @@ impl Partition { self.partition_id ); - last_segment.persist_messages().await.unwrap(); + last_segment.persist_messages(false).await?; self.unsaved_messages_count = 0; } } @@ -512,7 +512,7 @@ impl Partition { // Make sure all of the messages from the accumulator are persisted // no leftover from one round trip. while last_segment.unsaved_messages.is_some() { - last_segment.persist_messages().await.unwrap(); + last_segment.persist_messages(false).await?; } self.unsaved_messages_count = 0; Ok(()) diff --git a/server/src/streaming/segments/messages.rs b/server/src/streaming/segments/messages.rs index 270452674..cac925b1f 100644 --- a/server/src/streaming/segments/messages.rs +++ b/server/src/streaming/segments/messages.rs @@ -250,7 +250,7 @@ impl Segment { (index, time_index) } - pub async fn persist_messages(&mut self) -> Result { + pub async fn persist_messages(&mut self, fsync: bool) -> Result { let storage = self.storage.segment.clone(); if self.unsaved_messages.is_none() { return Ok(0); @@ -278,7 +278,7 @@ impl Segment { if has_remainder { self.unsaved_messages = Some(batch_accumulator); } - let saved_bytes = storage.save_batches(self, batch).await?; + let saved_bytes = storage.save_batches(self, batch, fsync).await?; assert_eq!(saved_bytes, batch_size); storage.save_index(&self.index_path, index).await?; storage diff --git a/server/src/streaming/segments/storage.rs b/server/src/streaming/segments/storage.rs index bb14f1bd5..82b6dc54b 100644 --- a/server/src/streaming/segments/storage.rs +++ b/server/src/streaming/segments/storage.rs @@ -31,11 +31,15 @@ const BUF_READER_CAPACITY_BYTES: usize = 512 * 1000; #[derive(Debug)] pub struct FileSegmentStorage { persister: Arc, + fsync_persister: Arc, } impl FileSegmentStorage { - pub fn new(persister: Arc) -> Self { - Self { persister } + pub fn new(persister: Arc, fsync_persister: Arc) -> Self { + Self { + persister, + fsync_persister, + } } } @@ -250,13 +254,19 @@ impl SegmentStorage for FileSegmentStorage { &self, segment: &Segment, batch: RetainedMessageBatch, + fsync: bool, ) -> Result { let batch_size = batch.get_size_bytes(); let mut bytes = BytesMut::with_capacity(batch_size as usize); batch.extend(&mut bytes); - if let Err(err) = self - .persister + let persister = if fsync { + &self.fsync_persister + } else { + &self.persister + }; + + if let Err(err) = persister .append(&segment.log_path, &bytes) .await .with_context(|| format!("Failed to save messages to segment: {}", segment.log_path)) diff --git a/server/src/streaming/storage.rs b/server/src/streaming/storage.rs index 1acb71c2f..43ab87cc0 100644 --- a/server/src/streaming/storage.rs +++ b/server/src/streaming/storage.rs @@ -75,6 +75,7 @@ pub trait SegmentStorage: Send + Sync { &self, segment: &Segment, batch: RetainedMessageBatch, + fsync: bool, ) -> Result; async fn load_message_ids(&self, segment: &Segment) -> Result, IggyError>; async fn load_checksums(&self, segment: &Segment) -> Result<(), IggyError>; @@ -108,7 +109,11 @@ pub struct SystemStorage { } impl SystemStorage { - pub fn new(config: Arc, persister: Arc) -> Self { + pub fn new( + config: Arc, + persister: Arc, + fsync_persister: Arc, + ) -> Self { Self { info: Arc::new(FileSystemInfoStorage::new( config.get_state_info_path(), @@ -117,7 +122,7 @@ impl SystemStorage { stream: Arc::new(FileStreamStorage), topic: Arc::new(FileTopicStorage), partition: Arc::new(FilePartitionStorage::new(persister.clone())), - segment: Arc::new(FileSegmentStorage::new(persister.clone())), + segment: Arc::new(FileSegmentStorage::new(persister.clone(), fsync_persister)), persister, } } @@ -301,6 +306,7 @@ pub(crate) mod tests { &self, _segment: &Segment, _batch: RetainedMessageBatch, + _fsync: bool, ) -> Result { Ok(0) } diff --git a/server/src/streaming/streams/persistence.rs b/server/src/streaming/streams/persistence.rs index ca0bbb2f7..2e2d3f00c 100644 --- a/server/src/streaming/streams/persistence.rs +++ b/server/src/streaming/streams/persistence.rs @@ -20,10 +20,10 @@ impl Stream { self.storage.stream.delete(self).await } - pub async fn persist_messages(&self) -> Result { + pub async fn persist_messages(&self, fsync: bool) -> Result { let mut saved_messages_number = 0; for topic in self.get_topics() { - saved_messages_number += topic.persist_messages().await?; + saved_messages_number += topic.persist_messages(fsync).await?; } Ok(saved_messages_number) diff --git a/server/src/streaming/systems/system.rs b/server/src/streaming/systems/system.rs index 72df3f4c9..eb55bd665 100644 --- a/server/src/streaming/systems/system.rs +++ b/server/src/streaming/systems/system.rs @@ -101,6 +101,7 @@ impl System { let state_persister = Self::resolve_persister(config.state.enforce_fsync); let partition_persister = Self::resolve_persister(config.partition.enforce_fsync); + let fsync_persister = Self::resolve_persister(true); let state = Arc::new(FileState::new( &config.get_state_log_path(), @@ -110,7 +111,7 @@ impl System { )); Self::create( config.clone(), - SystemStorage::new(config, partition_persister), + SystemStorage::new(config, partition_persister, fsync_persister), state, encryptor, data_maintenance_config, @@ -234,16 +235,16 @@ impl System { #[instrument(skip_all)] pub async fn shutdown(&mut self) -> Result<(), IggyError> { - self.persist_messages().await?; + self.persist_messages(true).await?; Ok(()) } #[instrument(skip_all)] - pub async fn persist_messages(&self) -> Result { + pub async fn persist_messages(&self, fsync: bool) -> Result { trace!("Saving buffered messages on disk..."); let mut saved_messages_number = 0; for stream in self.streams.values() { - saved_messages_number += stream.persist_messages().await?; + saved_messages_number += stream.persist_messages(fsync).await?; } Ok(saved_messages_number) diff --git a/server/src/streaming/topics/persistence.rs b/server/src/streaming/topics/persistence.rs index ecb63d7fa..25e3f54c3 100644 --- a/server/src/streaming/topics/persistence.rs +++ b/server/src/streaming/topics/persistence.rs @@ -23,12 +23,12 @@ impl Topic { self.storage.topic.delete(self).await } - pub async fn persist_messages(&self) -> Result { + pub async fn persist_messages(&self, fsync: bool) -> Result { let mut saved_messages_number = 0; for partition in self.get_partitions() { let mut partition = partition.write().await; for segment in partition.get_segments_mut() { - saved_messages_number += segment.persist_messages().await?; + saved_messages_number += segment.persist_messages(fsync).await?; } } From fb653fd17bc64296fb841948433984a93a9f9863 Mon Sep 17 00:00:00 2001 From: spetz Date: Sat, 2 Nov 2024 22:24:21 +0100 Subject: [PATCH 06/11] Fix tests --- configs/server.toml | 2 +- .../tests/server/scenarios/message_size_scenario.rs | 12 +++++++++--- .../tests/server/scenarios/system_scenario.rs | 2 +- sdk/src/messages/mod.rs | 2 +- server/src/streaming/segments/messages.rs | 1 - 5 files changed, 12 insertions(+), 7 deletions(-) diff --git a/configs/server.toml b/configs/server.toml index 25f6c2398..ec29ff6a3 100644 --- a/configs/server.toml +++ b/configs/server.toml @@ -248,7 +248,7 @@ enabled = true enforce_fsync = true # Interval for running the message saver. -interval = "30 s" +interval = "10 s" # Personal access token configuration. [personal_access_token] diff --git a/integration/tests/server/scenarios/message_size_scenario.rs b/integration/tests/server/scenarios/message_size_scenario.rs index 0b885ad4e..1ee6f5399 100644 --- a/integration/tests/server/scenarios/message_size_scenario.rs +++ b/integration/tests/server/scenarios/message_size_scenario.rs @@ -64,7 +64,13 @@ pub async fn run(client_factory: &dyn ClientFactory) { .await; send_message_and_check_result( &client, - MessageToSend::OfSizeWithHeaders(100_001, 10_000_000), + MessageToSend::OfSizeWithHeaders(1_000_000, 10_000_000), + Ok(()), + ) + .await; + send_message_and_check_result( + &client, + MessageToSend::OfSizeWithHeaders(1_000_001, 10_000_000), Err(InvalidResponse( 4017, 23, @@ -74,7 +80,7 @@ pub async fn run(client_factory: &dyn ClientFactory) { .await; send_message_and_check_result( &client, - MessageToSend::OfSizeWithHeaders(100_000, 10_000_001), + MessageToSend::OfSizeWithHeaders(100_000, 100_000_001), Err(InvalidResponse( 4022, 23, @@ -83,7 +89,7 @@ pub async fn run(client_factory: &dyn ClientFactory) { ) .await; - assert_message_count(&client, 6).await; + assert_message_count(&client, 7).await; cleanup_system(&client).await; assert_clean_system(&client).await; } diff --git a/integration/tests/server/scenarios/system_scenario.rs b/integration/tests/server/scenarios/system_scenario.rs index ad64b0c60..a4c528111 100644 --- a/integration/tests/server/scenarios/system_scenario.rs +++ b/integration/tests/server/scenarios/system_scenario.rs @@ -292,7 +292,7 @@ pub async fn run(client_factory: &dyn ClientFactory) { assert_eq!(topic.name, TOPIC_NAME); assert_eq!(topic.partitions_count, PARTITIONS_COUNT); assert_eq!(topic.partitions.len(), PARTITIONS_COUNT as usize); - assert_eq!(topic.size, 55890); + assert_eq!(topic.size, 55914); assert_eq!(topic.messages_count, MESSAGES_COUNT as u64); let topic_partition = topic.partitions.get((PARTITION_ID - 1) as usize).unwrap(); assert_eq!(topic_partition.id, PARTITION_ID); diff --git a/sdk/src/messages/mod.rs b/sdk/src/messages/mod.rs index 752b772f9..04f4e2d09 100644 --- a/sdk/src/messages/mod.rs +++ b/sdk/src/messages/mod.rs @@ -3,4 +3,4 @@ pub mod poll_messages; pub mod send_messages; const MAX_HEADERS_SIZE: u32 = 1000 * 1000; -pub const MAX_PAYLOAD_SIZE: u32 = 1000 * 1000 * 1000; +pub const MAX_PAYLOAD_SIZE: u32 = 100 * 1000 * 1000; diff --git a/server/src/streaming/segments/messages.rs b/server/src/streaming/segments/messages.rs index cac925b1f..2276b27a8 100644 --- a/server/src/streaming/segments/messages.rs +++ b/server/src/streaming/segments/messages.rs @@ -279,7 +279,6 @@ impl Segment { self.unsaved_messages = Some(batch_accumulator); } let saved_bytes = storage.save_batches(self, batch, fsync).await?; - assert_eq!(saved_bytes, batch_size); storage.save_index(&self.index_path, index).await?; storage .save_time_index(&self.time_index_path, time_index) From 80a2f4f9fe32136202bd36cb7199b7a2b5ed344f Mon Sep 17 00:00:00 2001 From: spetz Date: Sun, 3 Nov 2024 07:45:53 +0100 Subject: [PATCH 07/11] Increase max payload size --- integration/tests/server/scenarios/message_size_scenario.rs | 2 +- sdk/src/messages/mod.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/integration/tests/server/scenarios/message_size_scenario.rs b/integration/tests/server/scenarios/message_size_scenario.rs index 1ee6f5399..ba7a0fcce 100644 --- a/integration/tests/server/scenarios/message_size_scenario.rs +++ b/integration/tests/server/scenarios/message_size_scenario.rs @@ -80,7 +80,7 @@ pub async fn run(client_factory: &dyn ClientFactory) { .await; send_message_and_check_result( &client, - MessageToSend::OfSizeWithHeaders(100_000, 100_000_001), + MessageToSend::OfSizeWithHeaders(100_000, 1_000_000_001), Err(InvalidResponse( 4022, 23, diff --git a/sdk/src/messages/mod.rs b/sdk/src/messages/mod.rs index 04f4e2d09..84ef3faee 100644 --- a/sdk/src/messages/mod.rs +++ b/sdk/src/messages/mod.rs @@ -2,5 +2,5 @@ pub mod flush_unsaved_buffer; pub mod poll_messages; pub mod send_messages; -const MAX_HEADERS_SIZE: u32 = 1000 * 1000; -pub const MAX_PAYLOAD_SIZE: u32 = 100 * 1000 * 1000; +const MAX_HEADERS_SIZE: u32 = 1_000_000; +pub const MAX_PAYLOAD_SIZE: u32 = 1_000_000_000; From fb9e52bff2f6f6f6c42ea1d2da972eaa04f5c8dd Mon Sep 17 00:00:00 2001 From: numinex Date: Fri, 15 Nov 2024 20:19:25 +0100 Subject: [PATCH 08/11] temp direct i/o --- Cargo.lock | 6 +- bench/src/consumer.rs | 2 + bench/src/producer.rs | 2 + configs/server.json | 2 +- configs/server.toml | 2 +- sdk/Cargo.toml | 1 + server/Cargo.toml | 1 + .../streaming/batching/batch_accumulator.rs | 4 + .../src/streaming/batching/message_batch.rs | 9 ++ server/src/streaming/direct_io/mod.rs | 1 + server/src/streaming/direct_io/storage.rs | 96 +++++++++++++++++++ server/src/streaming/mod.rs | 1 + server/src/streaming/segments/messages.rs | 50 +++++++--- server/src/streaming/segments/segment.rs | 3 + 14 files changed, 163 insertions(+), 17 deletions(-) create mode 100644 server/src/streaming/direct_io/mod.rs create mode 100644 server/src/streaming/direct_io/storage.rs diff --git a/Cargo.lock b/Cargo.lock index 30d29333c..0249feb81 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2121,6 +2121,7 @@ dependencies = [ "humantime", "keyring", "lazy_static", + "libc", "passterm", "quinn", "regex", @@ -2362,9 +2363,9 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" [[package]] name = "libc" -version = "0.2.158" +version = "0.2.162" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d8adc4bb1803a324070e64a98ae98f38934d91957a99cfb3a43dcbc01bc56439" +checksum = "18d287de67fe55fd7e1581fe933d965a5a9477b38e949cfa9f8574ef01506398" [[package]] name = "libdbus-sys" @@ -4121,6 +4122,7 @@ dependencies = [ "futures", "iggy", "jsonwebtoken", + "libc", "log", "moka", "openssl", diff --git a/bench/src/consumer.rs b/bench/src/consumer.rs index 3b5b09c80..8344ac8cf 100644 --- a/bench/src/consumer.rs +++ b/bench/src/consumer.rs @@ -79,6 +79,7 @@ impl Consumer { let mut topic_not_found_counter = 0; let mut strategy = PollingStrategy::offset(0); + /* if self.warmup_time.get_duration() != Duration::from_millis(0) { if let Some(cg_id) = self.consumer_group_id { info!( @@ -117,6 +118,7 @@ impl Consumer { current_iteration += 1; } } + */ if let Some(cg_id) = self.consumer_group_id { info!( diff --git a/bench/src/producer.rs b/bench/src/producer.rs index 65fa01c03..4927cd6e3 100644 --- a/bench/src/producer.rs +++ b/bench/src/producer.rs @@ -72,6 +72,7 @@ impl Producer { 1 => Partitioning::partition_id(default_partition_id), 2.. => Partitioning::balanced(), }; + /* info!( "Producer #{} → warming up for {}...", self.producer_id, self.warmup_time @@ -82,6 +83,7 @@ impl Producer { .send_messages(&stream_id, &topic_id, &partitioning, &mut messages) .await?; } + */ info!( "Producer #{} → sending {} messages in {} batches of {} messages...", diff --git a/configs/server.json b/configs/server.json index 9677a29ce..9b68b86cc 100644 --- a/configs/server.json +++ b/configs/server.json @@ -165,7 +165,7 @@ "sysinfo_print_interval": "10 s" }, "cache": { - "enabled": true, + "enabled": false, "size": "4 GB" }, "encryption": { diff --git a/configs/server.toml b/configs/server.toml index b1a7a31c0..6d98ebf11 100644 --- a/configs/server.toml +++ b/configs/server.toml @@ -372,7 +372,7 @@ sysinfo_print_interval = "10 s" # Enables or disables the system cache. # `true` activates caching for frequently accessed data. # `false` disables caching, data is always read from the source. -enabled = true +enabled = false # Maximum size of the cache, e.g. "4GB". size = "4 GB" diff --git a/sdk/Cargo.toml b/sdk/Cargo.toml index a9451b5f0..70b3f418a 100644 --- a/sdk/Cargo.toml +++ b/sdk/Cargo.toml @@ -39,6 +39,7 @@ futures-util = "0.3.30" humantime = "2.1.0" keyring = { version = "3.2.0", optional = true, features = ["sync-secret-service", "vendored"] } lazy_static = "1.4.0" +libc = "0.2.162" passterm = { version = "2.0.1", optional = true } quinn = { version = "0.11.5" } regex = "1.10.4" diff --git a/server/Cargo.toml b/server/Cargo.toml index ceb2fcd91..99e70c61e 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -26,6 +26,7 @@ console-subscriber = { version = "0.4.0", optional = true } dashmap = "6.0.1" derive_more = "1.0.0" figlet-rs = "0.1.5" +libc = "0.2.162" figment = { version = "0.10.18", features = ["json", "toml", "env"] } flume = "0.11.0" futures = "0.3.30" diff --git a/server/src/streaming/batching/batch_accumulator.rs b/server/src/streaming/batching/batch_accumulator.rs index 6cd4a66a4..89644dd57 100644 --- a/server/src/streaming/batching/batch_accumulator.rs +++ b/server/src/streaming/batching/batch_accumulator.rs @@ -1,6 +1,7 @@ use super::message_batch::{RetainedMessageBatch, RETAINED_BATCH_OVERHEAD}; use crate::streaming::{models::messages::RetainedMessage, sizeable::Sizeable}; use bytes::BytesMut; +use tracing::warn; use std::sync::Arc; #[derive(Debug, Clone)] @@ -82,6 +83,9 @@ impl BatchAccumulator { let mut remaining_messages = Vec::with_capacity(remainder.len()); let has_remainder = !remainder.is_empty(); + if has_remainder { + warn!("has remainder"); + } if has_remainder { self.base_offset = remainder.first().unwrap().offset; self.current_size = remainder diff --git a/server/src/streaming/batching/message_batch.rs b/server/src/streaming/batching/message_batch.rs index 5acacc0a7..3708a917f 100644 --- a/server/src/streaming/batching/message_batch.rs +++ b/server/src/streaming/batching/message_batch.rs @@ -2,6 +2,7 @@ use crate::streaming::batching::batch_filter::BatchItemizer; use crate::streaming::batching::iterator::IntoMessagesIterator; use crate::streaming::models::messages::RetainedMessage; use bytes::{BufMut, Bytes, BytesMut}; +use tracing::warn; pub const RETAINED_BATCH_OVERHEAD: u32 = 8 + 8 + 4 + 4; @@ -46,6 +47,14 @@ impl RetainedMessageBatch { self.base_offset + self.last_offset_delta as u64 } + pub fn extend2(&self, bytes: &mut Vec) { + bytes[0..8].copy_from_slice(&self.base_offset.to_le_bytes()); + bytes[8..12].copy_from_slice(&self.length.to_le_bytes()); + bytes[12..16].copy_from_slice(&self.last_offset_delta.to_le_bytes()); + bytes[16..24].copy_from_slice(&self.max_timestamp.to_le_bytes()); + bytes[24..self.length as usize + RETAINED_BATCH_OVERHEAD as usize].copy_from_slice(&self.bytes); + } + pub fn extend(&self, bytes: &mut BytesMut) { bytes.put_u64_le(self.base_offset); bytes.put_u32_le(self.length); diff --git a/server/src/streaming/direct_io/mod.rs b/server/src/streaming/direct_io/mod.rs new file mode 100644 index 000000000..30f61eb69 --- /dev/null +++ b/server/src/streaming/direct_io/mod.rs @@ -0,0 +1 @@ +pub mod storage; diff --git a/server/src/streaming/direct_io/storage.rs b/server/src/streaming/direct_io/storage.rs new file mode 100644 index 000000000..c9140fbb9 --- /dev/null +++ b/server/src/streaming/direct_io/storage.rs @@ -0,0 +1,96 @@ +use std::{io::{SeekFrom, Write}, os::unix::fs::OpenOptionsExt}; + +use bytes::{BufMut, BytesMut}; +use iggy::error::IggyError; +use tracing::warn; +use tokio::{fs::OpenOptions, io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufReader}}; +use crate::streaming::batching::message_batch::{RetainedMessageBatch, RETAINED_BATCH_OVERHEAD}; + +#[derive(Debug, Default)] +pub struct DirectIOStorage { +} + +impl DirectIOStorage { + pub async fn read_batches(&self, file_path: &str, start_position: u64, end_offset: u64) -> Result, IggyError> { + let file = OpenOptions::new().read(true).custom_flags(libc::O_DIRECT).open(file_path).await?; + warn!("start_position: {}", start_position); + + let sector_size = 4096; + let mut batches = Vec::new(); + let file_size = file.metadata().await?.len(); + if file_size == 0 { + warn!("file_size is 0"); + return Ok(batches); + } + + let mut reader = BufReader::with_capacity(4096 * 1000, file); + reader + .seek(SeekFrom::Start(start_position as u64)) + .await?; + + let mut read_bytes = start_position as u64; + let mut last_batch_to_read = false; + while !last_batch_to_read { + let Ok(batch_base_offset) = reader.read_u64_le().await else { + break; + }; + let batch_length = reader + .read_u32_le() + .await + .map_err(|_| IggyError::CannotReadBatchLength)?; + let last_offset_delta = reader + .read_u32_le() + .await + .map_err(|_| IggyError::CannotReadLastOffsetDelta)?; + let max_timestamp = reader + .read_u64_le() + .await + .map_err(|_| IggyError::CannotReadMaxTimestamp)?; + + let last_offset = batch_base_offset + (last_offset_delta as u64); + let total_batch_size = batch_length + RETAINED_BATCH_OVERHEAD; + let sectors = total_batch_size.div_ceil(sector_size); + let adjusted_size = sector_size * sectors; + warn!("adjusted_size: {}", adjusted_size); + let diff = adjusted_size - total_batch_size; + + let payload_len = batch_length as usize; + let mut payload = BytesMut::with_capacity(payload_len); + payload.put_bytes(0, payload_len); + if let Err(error) = reader.read_exact(&mut payload).await { + warn!( + "Cannot read batch payload for batch with base offset: {batch_base_offset}, last offset delta: {last_offset_delta}, max timestamp: {max_timestamp}, batch length: {batch_length} and payload length: {payload_len}.\nProbably OS hasn't flushed the data yet, try setting `enforce_fsync = true` for partition configuration if this issue occurs again.\n{error}", + ); + break; + } + // TEMP + let mut temp = BytesMut::with_capacity(diff as _); + temp.put_bytes(0, diff as _); + if let Err(e) = reader.read_exact(&mut temp).await { + warn!("lol error reading padding"); + } + + read_bytes += 8 + 4 + 4 + 8 + payload_len as u64; + last_batch_to_read = read_bytes >= file_size || last_offset == end_offset; + + let batch = RetainedMessageBatch::new( + batch_base_offset, + last_offset_delta, + max_timestamp, + batch_length, + payload.freeze(), + ); + batches.push(batch); + } + Ok(batches) + } + + pub async fn write_batches(&self, file_path: &str, bytes: &[u8]) -> Result { + //let mut std_file = std::fs::File::options().append(true).custom_flags(libc::O_DIRECT).open(file_path)?; + let mut file = OpenOptions::new().append(true).custom_flags(libc::O_DIRECT).open(file_path).await?; + if let Err(e) = file.write_all(bytes).await { + warn!("error writing: {}", e); + } + Ok(bytes.len() as _) + } +} diff --git a/server/src/streaming/mod.rs b/server/src/streaming/mod.rs index eb8709a44..453d94e35 100644 --- a/server/src/streaming/mod.rs +++ b/server/src/streaming/mod.rs @@ -17,3 +17,4 @@ pub mod systems; pub mod topics; pub mod users; pub mod utils; +pub mod direct_io; diff --git a/server/src/streaming/segments/messages.rs b/server/src/streaming/segments/messages.rs index 0fbfe9576..5f81e8d26 100644 --- a/server/src/streaming/segments/messages.rs +++ b/server/src/streaming/segments/messages.rs @@ -5,7 +5,11 @@ use crate::streaming::models::messages::RetainedMessage; use crate::streaming::segments::index::{Index, IndexRange}; use crate::streaming::segments::segment::Segment; use crate::streaming::sizeable::Sizeable; +use bytes::buf::UninitSlice; +use bytes::{BufMut, Bytes, BytesMut}; use iggy::error::IggyError; +use std::alloc::{self, Layout}; +use std::slice::from_raw_parts; use std::sync::atomic::Ordering; use std::sync::Arc; use tracing::{error, info, trace, warn}; @@ -161,15 +165,23 @@ impl Segment { end_offset: u64, ) -> Result>, IggyError> { let messages_count = (start_offset + end_offset) as usize; - let messages = self - .storage - .segment - .load_message_batches(self, index_range) - .await? + let path = self.log_path.as_str(); + let start_position = index_range.start.position; + let end_offset = index_range.end.offset as u64 + self.start_offset; + let batch = self + .direct_io_storage + .read_batches(path, start_position as _, end_offset) + .await?; + error!("batches_count: {}", batch.len()); + let messages = batch .iter() + .to_messages(); + /* .to_messages_with_filter(messages_count, &|msg| { msg.offset >= start_offset && msg.offset <= end_offset }); + */ + error!("messages len: {}", messages.len()); trace!( "Loaded {} messages from disk, segment start offset: {}, end offset: {}.", messages.len(), @@ -235,7 +247,9 @@ impl Segment { } pub async fn persist_messages(&mut self, fsync: bool) -> Result { - let storage = self.storage.segment.clone(); + let sector_size = 512; + let storage = self.direct_io_storage.clone(); + let index_storage = self.storage.segment.clone(); if self.unsaved_messages.is_none() { return Ok(0); } @@ -259,19 +273,29 @@ impl Segment { let (has_remainder, batch) = batch_accumulator.materialize_batch_and_maybe_update_state(); let batch_size = batch.get_size_bytes(); + let sectors = batch_size.div_ceil(sector_size); + let adjusted_size = sector_size * sectors; + let layout = Layout::from_size_align(adjusted_size as _, sector_size as _).unwrap(); + let ptr = unsafe { alloc::alloc(layout) }; + unsafe { std::ptr::write_bytes(ptr, 0, adjusted_size as _) }; + //let slice = unsafe {std::slice::from_raw_parts(ptr, adjusted_size as _)}; if has_remainder { self.unsaved_messages = Some(batch_accumulator); } - let saved_bytes = storage.save_batches(self, batch, fsync).await?; - storage.save_index(&self.index_path, index).await?; - self.last_index_position += batch_size; - self.size_bytes += RETAINED_BATCH_OVERHEAD; + let mut bytes = unsafe {Vec::from_raw_parts(ptr, adjusted_size as _, adjusted_size as _)}; + let diff = bytes.len() as u32 - batch_size; + batch.extend2(&mut bytes); + let saved_bytes = storage.write_batches(self.log_path.as_str(), &bytes).await?; + index_storage.save_index(&self.index_path, index).await?; + self.last_index_position += adjusted_size; + let size_increment = RETAINED_BATCH_OVERHEAD + diff; + self.size_bytes += size_increment; self.size_of_parent_stream - .fetch_add(RETAINED_BATCH_OVERHEAD as u64, Ordering::AcqRel); + .fetch_add(size_increment as u64, Ordering::AcqRel); self.size_of_parent_topic - .fetch_add(RETAINED_BATCH_OVERHEAD as u64, Ordering::AcqRel); + .fetch_add(size_increment as u64, Ordering::AcqRel); self.size_of_parent_partition - .fetch_add(RETAINED_BATCH_OVERHEAD as u64, Ordering::AcqRel); + .fetch_add(size_increment as u64, Ordering::AcqRel); trace!( "Saved {} messages on disk in segment with start offset: {} for partition with ID: {}, total bytes written: {}.", diff --git a/server/src/streaming/segments/segment.rs b/server/src/streaming/segments/segment.rs index b2e368534..7ef78c8d8 100644 --- a/server/src/streaming/segments/segment.rs +++ b/server/src/streaming/segments/segment.rs @@ -1,5 +1,6 @@ use crate::configs::system::SystemConfig; use crate::streaming::batching::batch_accumulator::BatchAccumulator; +use crate::streaming::direct_io::storage::DirectIOStorage; use crate::streaming::segments::index::Index; use crate::streaming::storage::SystemStorage; use iggy::utils::expiry::IggyExpiry; @@ -36,6 +37,7 @@ pub struct Segment { pub(crate) config: Arc, pub(crate) indexes: Option>, pub(crate) storage: Arc, + pub(crate) direct_io_storage: Arc, } impl Segment { @@ -78,6 +80,7 @@ impl Segment { true => Some(Vec::new()), false => None, }, + direct_io_storage: Default::default(), unsaved_messages, is_closed: false, size_of_parent_stream, From 83d4346c074e64b8a950035a9c9b3ce9dd50b9e1 Mon Sep 17 00:00:00 2001 From: numinex Date: Sat, 16 Nov 2024 11:40:06 +0100 Subject: [PATCH 09/11] it just works --- server/src/streaming/direct_io/storage.rs | 144 +++++++++++++--------- server/src/streaming/segments/index.rs | 18 ++- server/src/streaming/segments/messages.rs | 13 +- 3 files changed, 106 insertions(+), 69 deletions(-) diff --git a/server/src/streaming/direct_io/storage.rs b/server/src/streaming/direct_io/storage.rs index c9140fbb9..97e4997df 100644 --- a/server/src/streaming/direct_io/storage.rs +++ b/server/src/streaming/direct_io/storage.rs @@ -1,9 +1,9 @@ -use std::{io::{SeekFrom, Write}, os::unix::fs::OpenOptionsExt}; +use std::{alloc::{self, Layout}, io::{Read, Seek, SeekFrom, Write}, os::unix::fs::OpenOptionsExt}; -use bytes::{BufMut, BytesMut}; +use bytes::{BufMut, Bytes, BytesMut}; use iggy::error::IggyError; use tracing::warn; -use tokio::{fs::OpenOptions, io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufReader}}; +use tokio::{fs::OpenOptions, io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufReader}, task::spawn_blocking}; use crate::streaming::batching::message_batch::{RetainedMessageBatch, RETAINED_BATCH_OVERHEAD}; #[derive(Debug, Default)] @@ -11,86 +11,118 @@ pub struct DirectIOStorage { } impl DirectIOStorage { - pub async fn read_batches(&self, file_path: &str, start_position: u64, end_offset: u64) -> Result, IggyError> { - let file = OpenOptions::new().read(true).custom_flags(libc::O_DIRECT).open(file_path).await?; - warn!("start_position: {}", start_position); - - let sector_size = 4096; + pub async fn read_batches(&self, file_path: &str, start_position: u64, end_position: u64) -> Result, IggyError> { + //let mut file = OpenOptions::new().read(true).custom_flags(libc::O_DIRECT).open(file_path).await?; + let mut file = std::fs::File::options().read(true).custom_flags(libc::O_DIRECT).open(file_path)?; + file.seek(SeekFrom::Start(start_position))?; let mut batches = Vec::new(); - let file_size = file.metadata().await?.len(); + let file_size = file.metadata()?.len(); if file_size == 0 { - warn!("file_size is 0"); return Ok(batches); } + // Aloc the buf + let buf_size = if start_position == end_position { + file_size - start_position + } else { + end_position - start_position + }; + let sector_size = 4096; + let alignment = buf_size % sector_size; + assert!(alignment == 0); + + let layout = Layout::from_size_align(buf_size as _, sector_size as _).unwrap(); + let ptr = unsafe { alloc::alloc(layout) }; + // Not sure if this is required + unsafe { std::ptr::write_bytes(ptr, 0, buf_size as _) }; + let mut bytes = unsafe {Vec::from_raw_parts(ptr, buf_size as _, buf_size as _)}; + let result = spawn_blocking(move || { + if let Err(e) = file.read_exact(&mut bytes) { + warn!("error reading batch: {}", e); + } + Self::serialize_batches(bytes, &mut batches); + Ok(batches) + }).await.unwrap(); + result + } - let mut reader = BufReader::with_capacity(4096 * 1000, file); - reader - .seek(SeekFrom::Start(start_position as u64)) - .await?; + fn serialize_batches(bytes: Vec, batches: &mut Vec) { + let len = bytes.len(); + let mut read_bytes = 0; + let sector_size = 4096; - let mut read_bytes = start_position as u64; - let mut last_batch_to_read = false; - while !last_batch_to_read { - let Ok(batch_base_offset) = reader.read_u64_le().await else { - break; - }; - let batch_length = reader - .read_u32_le() - .await - .map_err(|_| IggyError::CannotReadBatchLength)?; - let last_offset_delta = reader - .read_u32_le() - .await - .map_err(|_| IggyError::CannotReadLastOffsetDelta)?; - let max_timestamp = reader - .read_u64_le() - .await - .map_err(|_| IggyError::CannotReadMaxTimestamp)?; + while read_bytes < len { + // Read batch_base_offset + let batch_base_offset = u64::from_le_bytes( + bytes[read_bytes..read_bytes + 8] + .try_into() + .expect("Failed to read batch_base_offset"), + ); + read_bytes += 8; - let last_offset = batch_base_offset + (last_offset_delta as u64); + // Read batch_length + let batch_length = u32::from_le_bytes( + bytes[read_bytes..read_bytes + 4] + .try_into() + .expect("Failed to read batch_length"), + ); + read_bytes += 4; + + // Read last_offset_delta + let last_offset_delta = u32::from_le_bytes( + bytes[read_bytes..read_bytes + 4] + .try_into() + .expect("Failed to read last_offset_delta"), + ); + read_bytes += 4; + + // Read max_timestamp + let max_timestamp = u64::from_le_bytes( + bytes[read_bytes..read_bytes + 8] + .try_into() + .expect("Failed to read max_timestamp"), + ); + read_bytes += 8; + + // Calculate last_offset and other values let total_batch_size = batch_length + RETAINED_BATCH_OVERHEAD; let sectors = total_batch_size.div_ceil(sector_size); let adjusted_size = sector_size * sectors; - warn!("adjusted_size: {}", adjusted_size); let diff = adjusted_size - total_batch_size; + // Read payload let payload_len = batch_length as usize; - let mut payload = BytesMut::with_capacity(payload_len); - payload.put_bytes(0, payload_len); - if let Err(error) = reader.read_exact(&mut payload).await { + let payload_start = read_bytes; + let payload_end = read_bytes + payload_len; + if payload_end > len { warn!( - "Cannot read batch payload for batch with base offset: {batch_base_offset}, last offset delta: {last_offset_delta}, max timestamp: {max_timestamp}, batch length: {batch_length} and payload length: {payload_len}.\nProbably OS hasn't flushed the data yet, try setting `enforce_fsync = true` for partition configuration if this issue occurs again.\n{error}", + "Cannot read batch payload for batch with base offset: {batch_base_offset}, last offset delta: {last_offset_delta}, max timestamp: {max_timestamp}, batch length: {batch_length} and payload length: {payload_len}.\nProbably OS hasn't flushed the data yet, try setting `enforce_fsync = true` for partition configuration if this issue occurs again." ); break; } - // TEMP - let mut temp = BytesMut::with_capacity(diff as _); - temp.put_bytes(0, diff as _); - if let Err(e) = reader.read_exact(&mut temp).await { - warn!("lol error reading padding"); - } - - read_bytes += 8 + 4 + 4 + 8 + payload_len as u64; - last_batch_to_read = read_bytes >= file_size || last_offset == end_offset; + // Ergh.... + let payload = Bytes::copy_from_slice(&bytes[payload_start..payload_end]); + read_bytes = payload_end + diff as usize; let batch = RetainedMessageBatch::new( batch_base_offset, last_offset_delta, max_timestamp, batch_length, - payload.freeze(), + payload, ); batches.push(batch); } - Ok(batches) } - pub async fn write_batches(&self, file_path: &str, bytes: &[u8]) -> Result { - //let mut std_file = std::fs::File::options().append(true).custom_flags(libc::O_DIRECT).open(file_path)?; - let mut file = OpenOptions::new().append(true).custom_flags(libc::O_DIRECT).open(file_path).await?; - if let Err(e) = file.write_all(bytes).await { - warn!("error writing: {}", e); - } - Ok(bytes.len() as _) + pub async fn write_batches(&self, file_path: &str, bytes: Vec) -> Result { + let mut std_file = std::fs::File::options().append(true).custom_flags(libc::O_DIRECT).open(file_path)?; + //let mut file = OpenOptions::new().append(true).custom_flags(libc::O_DIRECT).open(file_path).await?; + let size = bytes.len() as _; + spawn_blocking(move || { + if let Err(e) = std_file.write_all(&bytes) { + warn!("error writing: {}", e); + } + }).await.unwrap(); + Ok(size) } } diff --git a/server/src/streaming/segments/index.rs b/server/src/streaming/segments/index.rs index 36a963337..a947ec7fc 100644 --- a/server/src/streaming/segments/index.rs +++ b/server/src/streaming/segments/index.rs @@ -1,6 +1,7 @@ use crate::streaming::segments::segment::Segment; use iggy::error::IggyError; use iggy::error::IggyError::InvalidOffset; +use tracing::warn; #[derive(Debug, Eq, Clone, Copy, Default)] pub struct Index { @@ -32,10 +33,19 @@ impl Segment { let ending_offset_idx = binary_search_index(indices, end_offset); match (starting_offset_idx, ending_offset_idx) { - (Some(starting_offset_idx), Some(ending_offset_idx)) => Ok(IndexRange { - start: indices[starting_offset_idx], - end: indices[ending_offset_idx], - }), + (Some(starting_offset_idx), Some(ending_offset_idx)) => + { + // UGLY AS FOOOOOOOOOOOOOK, but will deal with it later on. + let end_idx = if ending_offset_idx == indices.len() - 1 { + ending_offset_idx + } else { + ending_offset_idx + 1 + }; + Ok(IndexRange { + start: indices[starting_offset_idx], + end: indices[end_idx], + }) + }, (Some(starting_offset_idx), None) => Ok(IndexRange { start: indices[starting_offset_idx], end: *indices.last().unwrap(), diff --git a/server/src/streaming/segments/messages.rs b/server/src/streaming/segments/messages.rs index 5f81e8d26..ef2ec1c4f 100644 --- a/server/src/streaming/segments/messages.rs +++ b/server/src/streaming/segments/messages.rs @@ -167,21 +167,16 @@ impl Segment { let messages_count = (start_offset + end_offset) as usize; let path = self.log_path.as_str(); let start_position = index_range.start.position; - let end_offset = index_range.end.offset as u64 + self.start_offset; + let end_position = index_range.end.position; let batch = self .direct_io_storage - .read_batches(path, start_position as _, end_offset) + .read_batches(path, start_position as _, end_position as _) .await?; - error!("batches_count: {}", batch.len()); let messages = batch .iter() - .to_messages(); - /* .to_messages_with_filter(messages_count, &|msg| { msg.offset >= start_offset && msg.offset <= end_offset }); - */ - error!("messages len: {}", messages.len()); trace!( "Loaded {} messages from disk, segment start offset: {}, end offset: {}.", messages.len(), @@ -247,7 +242,7 @@ impl Segment { } pub async fn persist_messages(&mut self, fsync: bool) -> Result { - let sector_size = 512; + let sector_size = 4096; let storage = self.direct_io_storage.clone(); let index_storage = self.storage.segment.clone(); if self.unsaved_messages.is_none() { @@ -285,7 +280,7 @@ impl Segment { let mut bytes = unsafe {Vec::from_raw_parts(ptr, adjusted_size as _, adjusted_size as _)}; let diff = bytes.len() as u32 - batch_size; batch.extend2(&mut bytes); - let saved_bytes = storage.write_batches(self.log_path.as_str(), &bytes).await?; + let saved_bytes = storage.write_batches(self.log_path.as_str(), bytes).await?; index_storage.save_index(&self.index_path, index).await?; self.last_index_position += adjusted_size; let size_increment = RETAINED_BATCH_OVERHEAD + diff; From 71992492c87d2258b24e02c6c0a07e576427ab37 Mon Sep 17 00:00:00 2001 From: numinex Date: Sun, 17 Nov 2024 21:38:50 +0100 Subject: [PATCH 10/11] dma_buf begin --- integration/tests/streaming/common/test_setup.rs | 2 +- integration/tests/streaming/consumer_offset.rs | 2 +- server/src/compat/storage_conversion/converter.rs | 2 +- server/src/compat/storage_conversion/mod.rs | 2 +- server/src/streaming/direct_io/storage.rs | 1 - server/src/streaming/{storage.rs => iggy_storage.rs} | 2 +- server/src/streaming/io/dma_buf.rs | 7 +++++++ server/src/streaming/io/mod.rs | 1 + server/src/streaming/mod.rs | 3 ++- server/src/streaming/partitions/messages.rs | 2 +- server/src/streaming/partitions/partition.rs | 4 ++-- server/src/streaming/partitions/storage.rs | 2 +- server/src/streaming/segments/index.rs | 2 +- server/src/streaming/segments/segment.rs | 4 ++-- server/src/streaming/segments/storage.rs | 2 +- server/src/streaming/streams/storage.rs | 2 +- server/src/streaming/streams/stream.rs | 4 ++-- server/src/streaming/streams/topics.rs | 2 +- server/src/streaming/systems/storage.rs | 2 +- server/src/streaming/systems/streams.rs | 2 +- server/src/streaming/systems/system.rs | 2 +- server/src/streaming/topics/consumer_groups.rs | 2 +- server/src/streaming/topics/messages.rs | 2 +- server/src/streaming/topics/storage.rs | 2 +- server/src/streaming/topics/topic.rs | 4 ++-- 25 files changed, 35 insertions(+), 27 deletions(-) rename server/src/streaming/{storage.rs => iggy_storage.rs} (99%) create mode 100644 server/src/streaming/io/dma_buf.rs create mode 100644 server/src/streaming/io/mod.rs diff --git a/integration/tests/streaming/common/test_setup.rs b/integration/tests/streaming/common/test_setup.rs index dee433404..143604d77 100644 --- a/integration/tests/streaming/common/test_setup.rs +++ b/integration/tests/streaming/common/test_setup.rs @@ -1,6 +1,6 @@ use server::configs::system::SystemConfig; use server::streaming::persistence::persister::{FilePersister, FileWithSyncPersister}; -use server::streaming::storage::SystemStorage; +use server::streaming::iggy_storage::SystemStorage; use std::sync::Arc; use tokio::fs; use uuid::Uuid; diff --git a/integration/tests/streaming/consumer_offset.rs b/integration/tests/streaming/consumer_offset.rs index 0b985b8fe..e5b733141 100644 --- a/integration/tests/streaming/consumer_offset.rs +++ b/integration/tests/streaming/consumer_offset.rs @@ -2,7 +2,7 @@ use crate::streaming::common::test_setup::TestSetup; use iggy::consumer::ConsumerKind; use server::configs::system::SystemConfig; use server::streaming::partitions::partition::ConsumerOffset; -use server::streaming::storage::PartitionStorage; +use server::streaming::iggy_storage::PartitionStorage; use std::sync::Arc; use tokio::fs; diff --git a/server/src/compat/storage_conversion/converter.rs b/server/src/compat/storage_conversion/converter.rs index bd1f4ef44..0620ec305 100644 --- a/server/src/compat/storage_conversion/converter.rs +++ b/server/src/compat/storage_conversion/converter.rs @@ -2,7 +2,7 @@ use crate::state::command::EntryCommand; use crate::state::models::CreatePersonalAccessTokenWithHash; use crate::state::State; use crate::streaming::personal_access_tokens::personal_access_token::PersonalAccessToken; -use crate::streaming::storage::SystemStorage; +use crate::streaming::iggy_storage::SystemStorage; use crate::streaming::streams::stream::Stream; use crate::streaming::users::user::User; use iggy::consumer_groups::create_consumer_group::CreateConsumerGroup; diff --git a/server/src/compat/storage_conversion/mod.rs b/server/src/compat/storage_conversion/mod.rs index 9c3b93290..4707ed16c 100644 --- a/server/src/compat/storage_conversion/mod.rs +++ b/server/src/compat/storage_conversion/mod.rs @@ -10,7 +10,7 @@ use crate::streaming::partitions::partition::{ConsumerOffset, Partition}; use crate::streaming::persistence::persister::Persister; use crate::streaming::segments::index::{Index, IndexRange}; use crate::streaming::segments::segment::Segment; -use crate::streaming::storage::{ +use crate::streaming::iggy_storage::{ PartitionStorage, SegmentStorage, StreamStorage, SystemInfoStorage, SystemStorage, TopicStorage, }; use crate::streaming::streams::stream::Stream; diff --git a/server/src/streaming/direct_io/storage.rs b/server/src/streaming/direct_io/storage.rs index 97e4997df..5bcb04ec9 100644 --- a/server/src/streaming/direct_io/storage.rs +++ b/server/src/streaming/direct_io/storage.rs @@ -33,7 +33,6 @@ impl DirectIOStorage { let layout = Layout::from_size_align(buf_size as _, sector_size as _).unwrap(); let ptr = unsafe { alloc::alloc(layout) }; // Not sure if this is required - unsafe { std::ptr::write_bytes(ptr, 0, buf_size as _) }; let mut bytes = unsafe {Vec::from_raw_parts(ptr, buf_size as _, buf_size as _)}; let result = spawn_blocking(move || { if let Err(e) = file.read_exact(&mut bytes) { diff --git a/server/src/streaming/storage.rs b/server/src/streaming/iggy_storage.rs similarity index 99% rename from server/src/streaming/storage.rs rename to server/src/streaming/iggy_storage.rs index 61b18251d..e4069c398 100644 --- a/server/src/streaming/storage.rs +++ b/server/src/streaming/iggy_storage.rs @@ -158,7 +158,7 @@ pub(crate) mod tests { use crate::streaming::partitions::partition::Partition; use crate::streaming::segments::index::{Index, IndexRange}; use crate::streaming::segments::segment::Segment; - use crate::streaming::storage::*; + use crate::streaming::iggy_storage::*; use crate::streaming::streams::stream::Stream; use crate::streaming::topics::topic::Topic; use async_trait::async_trait; diff --git a/server/src/streaming/io/dma_buf.rs b/server/src/streaming/io/dma_buf.rs new file mode 100644 index 000000000..0174ff432 --- /dev/null +++ b/server/src/streaming/io/dma_buf.rs @@ -0,0 +1,7 @@ +pub trait DmaBuf { + +} + +pub struct DmfBuf { + +} diff --git a/server/src/streaming/io/mod.rs b/server/src/streaming/io/mod.rs new file mode 100644 index 000000000..39d198881 --- /dev/null +++ b/server/src/streaming/io/mod.rs @@ -0,0 +1 @@ +pub mod dma_buf; diff --git a/server/src/streaming/mod.rs b/server/src/streaming/mod.rs index 453d94e35..4eb81a22f 100644 --- a/server/src/streaming/mod.rs +++ b/server/src/streaming/mod.rs @@ -11,10 +11,11 @@ pub mod polling_consumer; pub mod segments; pub mod session; pub mod sizeable; -pub mod storage; +pub mod iggy_storage; pub mod streams; pub mod systems; pub mod topics; pub mod users; pub mod utils; pub mod direct_io; +pub mod io; diff --git a/server/src/streaming/partitions/messages.rs b/server/src/streaming/partitions/messages.rs index 4d221f218..bc7c60070 100644 --- a/server/src/streaming/partitions/messages.rs +++ b/server/src/streaming/partitions/messages.rs @@ -543,7 +543,7 @@ mod tests { use super::*; use crate::configs::system::{MessageDeduplicationConfig, SystemConfig}; use crate::streaming::partitions::create_messages; - use crate::streaming::storage::tests::get_test_system_storage; + use crate::streaming::iggy_storage::tests::get_test_system_storage; #[tokio::test] async fn given_disabled_message_deduplication_all_messages_should_be_appended() { diff --git a/server/src/streaming/partitions/partition.rs b/server/src/streaming/partitions/partition.rs index 7dfad8f1b..f21b925e7 100644 --- a/server/src/streaming/partitions/partition.rs +++ b/server/src/streaming/partitions/partition.rs @@ -4,7 +4,7 @@ use crate::streaming::cache::memory_tracker::CacheMemoryTracker; use crate::streaming::deduplication::message_deduplicator::MessageDeduplicator; use crate::streaming::models::messages::RetainedMessage; use crate::streaming::segments::segment::Segment; -use crate::streaming::storage::SystemStorage; +use crate::streaming::iggy_storage::SystemStorage; use dashmap::DashMap; use iggy::consumer::ConsumerKind; use iggy::utils::duration::IggyDuration; @@ -177,7 +177,7 @@ impl Partition { mod tests { use crate::configs::system::{CacheConfig, SystemConfig}; use crate::streaming::partitions::partition::Partition; - use crate::streaming::storage::tests::get_test_system_storage; + use crate::streaming::iggy_storage::tests::get_test_system_storage; use iggy::utils::duration::IggyDuration; use iggy::utils::expiry::IggyExpiry; use iggy::utils::timestamp::IggyTimestamp; diff --git a/server/src/streaming/partitions/storage.rs b/server/src/streaming/partitions/storage.rs index 9cdeb8c27..2b9b04784 100644 --- a/server/src/streaming/partitions/storage.rs +++ b/server/src/streaming/partitions/storage.rs @@ -4,7 +4,7 @@ use crate::streaming::batching::batch_accumulator::BatchAccumulator; use crate::streaming::partitions::partition::{ConsumerOffset, Partition}; use crate::streaming::persistence::persister::Persister; use crate::streaming::segments::segment::{Segment, INDEX_EXTENSION, LOG_EXTENSION}; -use crate::streaming::storage::PartitionStorage; +use crate::streaming::iggy_storage::PartitionStorage; use crate::streaming::utils::file; use anyhow::Context; use async_trait::async_trait; diff --git a/server/src/streaming/segments/index.rs b/server/src/streaming/segments/index.rs index a947ec7fc..0d0960fad 100644 --- a/server/src/streaming/segments/index.rs +++ b/server/src/streaming/segments/index.rs @@ -89,7 +89,7 @@ impl IndexRange { mod tests { use super::*; use crate::configs::system::{SegmentConfig, SystemConfig}; - use crate::streaming::storage::tests::get_test_system_storage; + use crate::streaming::iggy_storage::tests::get_test_system_storage; use iggy::utils::expiry::IggyExpiry; use std::sync::atomic::AtomicU64; use std::sync::Arc; diff --git a/server/src/streaming/segments/segment.rs b/server/src/streaming/segments/segment.rs index 7ef78c8d8..014baa326 100644 --- a/server/src/streaming/segments/segment.rs +++ b/server/src/streaming/segments/segment.rs @@ -2,7 +2,7 @@ use crate::configs::system::SystemConfig; use crate::streaming::batching::batch_accumulator::BatchAccumulator; use crate::streaming::direct_io::storage::DirectIOStorage; use crate::streaming::segments::index::Index; -use crate::streaming::storage::SystemStorage; +use crate::streaming::iggy_storage::SystemStorage; use iggy::utils::expiry::IggyExpiry; use iggy::utils::timestamp::IggyTimestamp; use std::sync::atomic::AtomicU64; @@ -141,7 +141,7 @@ impl Segment { mod tests { use super::*; use crate::configs::system::SegmentConfig; - use crate::streaming::storage::tests::get_test_system_storage; + use crate::streaming::iggy_storage::tests::get_test_system_storage; use iggy::utils::duration::IggyDuration; #[tokio::test] diff --git a/server/src/streaming/segments/storage.rs b/server/src/streaming/segments/storage.rs index aae185d7c..57875fb60 100644 --- a/server/src/streaming/segments/storage.rs +++ b/server/src/streaming/segments/storage.rs @@ -5,7 +5,7 @@ use crate::streaming::persistence::persister::Persister; use crate::streaming::segments::index::{Index, IndexRange}; use crate::streaming::segments::segment::Segment; use crate::streaming::sizeable::Sizeable; -use crate::streaming::storage::SegmentStorage; +use crate::streaming::iggy_storage::SegmentStorage; use crate::streaming::utils::file; use crate::streaming::utils::head_tail_buf::HeadTailBuffer; use anyhow::Context; diff --git a/server/src/streaming/streams/storage.rs b/server/src/streaming/streams/storage.rs index 4bf3b45c4..00ad1ecf8 100644 --- a/server/src/streaming/streams/storage.rs +++ b/server/src/streaming/streams/storage.rs @@ -1,5 +1,5 @@ use crate::state::system::StreamState; -use crate::streaming::storage::StreamStorage; +use crate::streaming::iggy_storage::StreamStorage; use crate::streaming::streams::stream::Stream; use crate::streaming::topics::topic::Topic; use async_trait::async_trait; diff --git a/server/src/streaming/streams/stream.rs b/server/src/streaming/streams/stream.rs index a46e75911..9f7ec41fb 100644 --- a/server/src/streaming/streams/stream.rs +++ b/server/src/streaming/streams/stream.rs @@ -1,5 +1,5 @@ use crate::configs::system::SystemConfig; -use crate::streaming::storage::SystemStorage; +use crate::streaming::iggy_storage::SystemStorage; use crate::streaming::topics::topic::Topic; use iggy::utils::byte_size::IggyByteSize; use iggy::utils::timestamp::IggyTimestamp; @@ -68,7 +68,7 @@ impl Stream { #[cfg(test)] mod tests { use super::*; - use crate::streaming::storage::tests::get_test_system_storage; + use crate::streaming::iggy_storage::tests::get_test_system_storage; #[test] fn should_be_created_given_valid_parameters() { diff --git a/server/src/streaming/streams/topics.rs b/server/src/streaming/streams/topics.rs index b9737b297..59dc193bc 100644 --- a/server/src/streaming/streams/topics.rs +++ b/server/src/streaming/streams/topics.rs @@ -228,7 +228,7 @@ impl Stream { mod tests { use super::*; use crate::configs::system::SystemConfig; - use crate::streaming::storage::tests::get_test_system_storage; + use crate::streaming::iggy_storage::tests::get_test_system_storage; use iggy::utils::byte_size::IggyByteSize; use std::sync::Arc; diff --git a/server/src/streaming/systems/storage.rs b/server/src/streaming/systems/storage.rs index 8e3291977..8008e7d1b 100644 --- a/server/src/streaming/systems/storage.rs +++ b/server/src/streaming/systems/storage.rs @@ -1,5 +1,5 @@ use crate::streaming::persistence::persister::Persister; -use crate::streaming::storage::SystemInfoStorage; +use crate::streaming::iggy_storage::SystemInfoStorage; use crate::streaming::systems::info::SystemInfo; use crate::streaming::utils::file; use anyhow::Context; diff --git a/server/src/streaming/systems/streams.rs b/server/src/streaming/systems/streams.rs index e37d92733..88ef774eb 100644 --- a/server/src/streaming/systems/streams.rs +++ b/server/src/streaming/systems/streams.rs @@ -367,7 +367,7 @@ mod tests { use crate::state::command::EntryCommand; use crate::state::entry::StateEntry; use crate::state::State; - use crate::streaming::storage::tests::get_test_system_storage; + use crate::streaming::iggy_storage::tests::get_test_system_storage; use crate::streaming::users::user::User; use async_trait::async_trait; use iggy::users::defaults::{DEFAULT_ROOT_PASSWORD, DEFAULT_ROOT_USERNAME}; diff --git a/server/src/streaming/systems/system.rs b/server/src/streaming/systems/system.rs index eb55bd665..0c7381a19 100644 --- a/server/src/streaming/systems/system.rs +++ b/server/src/streaming/systems/system.rs @@ -5,7 +5,7 @@ use crate::streaming::clients::client_manager::ClientManager; use crate::streaming::diagnostics::metrics::Metrics; use crate::streaming::persistence::persister::*; use crate::streaming::session::Session; -use crate::streaming::storage::SystemStorage; +use crate::streaming::iggy_storage::SystemStorage; use crate::streaming::streams::stream::Stream; use crate::streaming::users::permissioner::Permissioner; use iggy::error::IggyError; diff --git a/server/src/streaming/topics/consumer_groups.rs b/server/src/streaming/topics/consumer_groups.rs index 0d426105f..faa2fee0d 100644 --- a/server/src/streaming/topics/consumer_groups.rs +++ b/server/src/streaming/topics/consumer_groups.rs @@ -192,7 +192,7 @@ impl Topic { mod tests { use super::*; use crate::configs::system::SystemConfig; - use crate::streaming::storage::tests::get_test_system_storage; + use crate::streaming::iggy_storage::tests::get_test_system_storage; use iggy::compression::compression_algorithm::CompressionAlgorithm; use iggy::utils::expiry::IggyExpiry; use iggy::utils::topic_size::MaxTopicSize; diff --git a/server/src/streaming/topics/messages.rs b/server/src/streaming/topics/messages.rs index 7187d4cd1..fe6a8b1ed 100644 --- a/server/src/streaming/topics/messages.rs +++ b/server/src/streaming/topics/messages.rs @@ -288,7 +288,7 @@ impl Topic { mod tests { use super::*; use crate::configs::system::SystemConfig; - use crate::streaming::storage::tests::get_test_system_storage; + use crate::streaming::iggy_storage::tests::get_test_system_storage; use bytes::Bytes; use iggy::compression::compression_algorithm::CompressionAlgorithm; use iggy::utils::topic_size::MaxTopicSize; diff --git a/server/src/streaming/topics/storage.rs b/server/src/streaming/topics/storage.rs index 6e006e2e3..71c090d22 100644 --- a/server/src/streaming/topics/storage.rs +++ b/server/src/streaming/topics/storage.rs @@ -1,6 +1,6 @@ use crate::state::system::TopicState; use crate::streaming::partitions::partition::Partition; -use crate::streaming::storage::TopicStorage; +use crate::streaming::iggy_storage::TopicStorage; use crate::streaming::topics::consumer_group::ConsumerGroup; use crate::streaming::topics::topic::Topic; use anyhow::Context; diff --git a/server/src/streaming/topics/topic.rs b/server/src/streaming/topics/topic.rs index bec156748..e9411ca4c 100644 --- a/server/src/streaming/topics/topic.rs +++ b/server/src/streaming/topics/topic.rs @@ -1,7 +1,7 @@ use crate::configs::system::SystemConfig; use crate::streaming::partitions::partition::Partition; use crate::streaming::polling_consumer::PollingConsumer; -use crate::streaming::storage::SystemStorage; +use crate::streaming::iggy_storage::SystemStorage; use crate::streaming::topics::consumer_group::ConsumerGroup; use core::fmt; use iggy::compression::compression_algorithm::CompressionAlgorithm; @@ -255,7 +255,7 @@ mod tests { use std::str::FromStr; use super::*; - use crate::streaming::storage::tests::get_test_system_storage; + use crate::streaming::iggy_storage::tests::get_test_system_storage; #[tokio::test] async fn should_be_created_given_valid_parameters() { From aab4350041f40c27e7abce863d80720a6ecd12f9 Mon Sep 17 00:00:00 2001 From: numinex Date: Sun, 24 Nov 2024 22:49:53 +0100 Subject: [PATCH 11/11] dma storage read --- Cargo.lock | 13 +- configs/server.json | 2 +- .../tests/streaming/common/test_setup.rs | 2 +- .../tests/streaming/consumer_offset.rs | 2 +- server/Cargo.toml | 3 +- .../compat/storage_conversion/converter.rs | 2 +- server/src/compat/storage_conversion/mod.rs | 6 +- .../streaming/batching/batch_accumulator.rs | 2 +- .../src/streaming/batching/message_batch.rs | 3 +- server/src/streaming/direct_io/storage.rs | 45 ++++-- server/src/streaming/iggy_storage.rs | 2 +- server/src/streaming/io/buf/dma_buf.rs | 66 +++++++++ server/src/streaming/io/buf/mod.rs | 8 ++ server/src/streaming/io/dma_buf.rs | 7 - server/src/streaming/io/mod.rs | 3 +- .../src/streaming/io/stream/message_stream.rs | 105 ++++++++++++++ server/src/streaming/io/stream/mod.rs | 1 + server/src/streaming/mod.rs | 7 +- server/src/streaming/partitions/messages.rs | 2 +- server/src/streaming/partitions/partition.rs | 4 +- server/src/streaming/partitions/storage.rs | 2 +- server/src/streaming/segments/index.rs | 6 +- server/src/streaming/segments/messages.rs | 72 +++++++++- server/src/streaming/segments/segment.rs | 7 +- server/src/streaming/segments/storage.rs | 2 +- server/src/streaming/storage/mod.rs | 17 +++ server/src/streaming/storage/storage.rs | 128 ++++++++++++++++++ server/src/streaming/systems/storage.rs | 2 +- server/src/streaming/systems/system.rs | 2 +- server/src/streaming/topics/storage.rs | 2 +- server/src/streaming/topics/topic.rs | 2 +- 31 files changed, 467 insertions(+), 60 deletions(-) create mode 100644 server/src/streaming/io/buf/dma_buf.rs create mode 100644 server/src/streaming/io/buf/mod.rs delete mode 100644 server/src/streaming/io/dma_buf.rs create mode 100644 server/src/streaming/io/stream/message_stream.rs create mode 100644 server/src/streaming/io/stream/mod.rs create mode 100644 server/src/streaming/storage/mod.rs create mode 100644 server/src/streaming/storage/storage.rs diff --git a/Cargo.lock b/Cargo.lock index 0249feb81..9ed0f852e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3049,18 +3049,18 @@ checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" [[package]] name = "pin-project" -version = "1.1.5" +version = "1.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6bf43b791c5b9e34c3d182969b4abb522f9343702850a2e57f460d00d09b4b3" +checksum = "be57f64e946e500c8ee36ef6331845d40a93055567ec57e8fae13efd33759b95" dependencies = [ "pin-project-internal", ] [[package]] name = "pin-project-internal" -version = "1.1.5" +version = "1.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" +checksum = "3c0f5fad0874fc7abcd4d750e76917eaebbecaa2c20bde22e1dbeeba8beb758c" dependencies = [ "proc-macro2", "quote", @@ -3069,9 +3069,9 @@ dependencies = [ [[package]] name = "pin-project-lite" -version = "0.2.14" +version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bda66fc9667c18cb2758a2ac84d1167245054bcf85d5d1aaa6923f45801bdd02" +checksum = "915a1e146535de9163f3987b8944ed8cf49a18bb0056bcebcdcece385cece4ff" [[package]] name = "pin-utils" @@ -4131,6 +4131,7 @@ dependencies = [ "opentelemetry-otlp", "opentelemetry-semantic-conventions", "opentelemetry_sdk", + "pin-project", "prometheus-client", "quinn", "rcgen", diff --git a/configs/server.json b/configs/server.json index 9b68b86cc..1cb347da1 100644 --- a/configs/server.json +++ b/configs/server.json @@ -188,7 +188,7 @@ "path": "partitions", "enforce_fsync": false, "validate_checksum": false, - "messages_required_to_save": 5000 + "messages_required_to_save": 1000 }, "segment": { "size": "1 GB", diff --git a/integration/tests/streaming/common/test_setup.rs b/integration/tests/streaming/common/test_setup.rs index 143604d77..20139b282 100644 --- a/integration/tests/streaming/common/test_setup.rs +++ b/integration/tests/streaming/common/test_setup.rs @@ -1,6 +1,6 @@ use server::configs::system::SystemConfig; -use server::streaming::persistence::persister::{FilePersister, FileWithSyncPersister}; use server::streaming::iggy_storage::SystemStorage; +use server::streaming::persistence::persister::{FilePersister, FileWithSyncPersister}; use std::sync::Arc; use tokio::fs; use uuid::Uuid; diff --git a/integration/tests/streaming/consumer_offset.rs b/integration/tests/streaming/consumer_offset.rs index e5b733141..57c640d3e 100644 --- a/integration/tests/streaming/consumer_offset.rs +++ b/integration/tests/streaming/consumer_offset.rs @@ -1,8 +1,8 @@ use crate::streaming::common::test_setup::TestSetup; use iggy::consumer::ConsumerKind; use server::configs::system::SystemConfig; -use server::streaming::partitions::partition::ConsumerOffset; use server::streaming::iggy_storage::PartitionStorage; +use server::streaming::partitions::partition::ConsumerOffset; use std::sync::Arc; use tokio::fs; diff --git a/server/Cargo.toml b/server/Cargo.toml index 99e70c61e..b175774e5 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -29,7 +29,7 @@ figlet-rs = "0.1.5" libc = "0.2.162" figment = { version = "0.10.18", features = ["json", "toml", "env"] } flume = "0.11.0" -futures = "0.3.30" +futures = { version = "0.3.30" } iggy = { path = "../sdk" } jsonwebtoken = "9.3.0" log = "0.4.20" @@ -90,6 +90,7 @@ ulid = "1.1.2" uuid = { version = "1.1.0", features = ["v7", "fast-rng", "zerocopy"] } xxhash-rust = { version = "0.8.12", features = ["xxh32"] } zip = "2.2.0" +pin-project = "1.1.7" [target.'cfg(not(target_env = "msvc"))'.dependencies] tikv-jemallocator = { version = "0.6", optional = true } diff --git a/server/src/compat/storage_conversion/converter.rs b/server/src/compat/storage_conversion/converter.rs index 0620ec305..1282a9c13 100644 --- a/server/src/compat/storage_conversion/converter.rs +++ b/server/src/compat/storage_conversion/converter.rs @@ -1,8 +1,8 @@ use crate::state::command::EntryCommand; use crate::state::models::CreatePersonalAccessTokenWithHash; use crate::state::State; -use crate::streaming::personal_access_tokens::personal_access_token::PersonalAccessToken; use crate::streaming::iggy_storage::SystemStorage; +use crate::streaming::personal_access_tokens::personal_access_token::PersonalAccessToken; use crate::streaming::streams::stream::Stream; use crate::streaming::users::user::User; use iggy::consumer_groups::create_consumer_group::CreateConsumerGroup; diff --git a/server/src/compat/storage_conversion/mod.rs b/server/src/compat/storage_conversion/mod.rs index 4707ed16c..ea1aa75b0 100644 --- a/server/src/compat/storage_conversion/mod.rs +++ b/server/src/compat/storage_conversion/mod.rs @@ -6,13 +6,13 @@ use crate::configs::system::SystemConfig; use crate::state::system::{PartitionState, StreamState, TopicState}; use crate::state::State; use crate::streaming::batching::message_batch::RetainedMessageBatch; +use crate::streaming::iggy_storage::{ + PartitionStorage, SegmentStorage, StreamStorage, SystemInfoStorage, SystemStorage, TopicStorage, +}; use crate::streaming::partitions::partition::{ConsumerOffset, Partition}; use crate::streaming::persistence::persister::Persister; use crate::streaming::segments::index::{Index, IndexRange}; use crate::streaming::segments::segment::Segment; -use crate::streaming::iggy_storage::{ - PartitionStorage, SegmentStorage, StreamStorage, SystemInfoStorage, SystemStorage, TopicStorage, -}; use crate::streaming::streams::stream::Stream; use crate::streaming::systems::info::SystemInfo; use crate::streaming::topics::topic::Topic; diff --git a/server/src/streaming/batching/batch_accumulator.rs b/server/src/streaming/batching/batch_accumulator.rs index 89644dd57..0e03f3cc3 100644 --- a/server/src/streaming/batching/batch_accumulator.rs +++ b/server/src/streaming/batching/batch_accumulator.rs @@ -1,8 +1,8 @@ use super::message_batch::{RetainedMessageBatch, RETAINED_BATCH_OVERHEAD}; use crate::streaming::{models::messages::RetainedMessage, sizeable::Sizeable}; use bytes::BytesMut; -use tracing::warn; use std::sync::Arc; +use tracing::warn; #[derive(Debug, Clone)] pub struct BatchAccumulator { diff --git a/server/src/streaming/batching/message_batch.rs b/server/src/streaming/batching/message_batch.rs index 3708a917f..106b61ea3 100644 --- a/server/src/streaming/batching/message_batch.rs +++ b/server/src/streaming/batching/message_batch.rs @@ -52,7 +52,8 @@ impl RetainedMessageBatch { bytes[8..12].copy_from_slice(&self.length.to_le_bytes()); bytes[12..16].copy_from_slice(&self.last_offset_delta.to_le_bytes()); bytes[16..24].copy_from_slice(&self.max_timestamp.to_le_bytes()); - bytes[24..self.length as usize + RETAINED_BATCH_OVERHEAD as usize].copy_from_slice(&self.bytes); + bytes[24..self.length as usize + RETAINED_BATCH_OVERHEAD as usize] + .copy_from_slice(&self.bytes); } pub fn extend(&self, bytes: &mut BytesMut) { diff --git a/server/src/streaming/direct_io/storage.rs b/server/src/streaming/direct_io/storage.rs index 5bcb04ec9..3371357db 100644 --- a/server/src/streaming/direct_io/storage.rs +++ b/server/src/streaming/direct_io/storage.rs @@ -1,19 +1,34 @@ -use std::{alloc::{self, Layout}, io::{Read, Seek, SeekFrom, Write}, os::unix::fs::OpenOptionsExt}; +use std::{ + alloc::{self, Layout}, + io::{Read, Seek, SeekFrom, Write}, + os::unix::fs::OpenOptionsExt, +}; +use crate::streaming::batching::message_batch::{RetainedMessageBatch, RETAINED_BATCH_OVERHEAD}; use bytes::{BufMut, Bytes, BytesMut}; use iggy::error::IggyError; +use tokio::{ + fs::OpenOptions, + io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufReader}, + task::spawn_blocking, +}; use tracing::warn; -use tokio::{fs::OpenOptions, io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufReader}, task::spawn_blocking}; -use crate::streaming::batching::message_batch::{RetainedMessageBatch, RETAINED_BATCH_OVERHEAD}; #[derive(Debug, Default)] -pub struct DirectIOStorage { -} +pub struct DirectIOStorage {} impl DirectIOStorage { - pub async fn read_batches(&self, file_path: &str, start_position: u64, end_position: u64) -> Result, IggyError> { + pub async fn read_batches( + &self, + file_path: &str, + start_position: u64, + end_position: u64, + ) -> Result, IggyError> { //let mut file = OpenOptions::new().read(true).custom_flags(libc::O_DIRECT).open(file_path).await?; - let mut file = std::fs::File::options().read(true).custom_flags(libc::O_DIRECT).open(file_path)?; + let mut file = std::fs::File::options() + .read(true) + .custom_flags(libc::O_DIRECT) + .open(file_path)?; file.seek(SeekFrom::Start(start_position))?; let mut batches = Vec::new(); let file_size = file.metadata()?.len(); @@ -33,14 +48,16 @@ impl DirectIOStorage { let layout = Layout::from_size_align(buf_size as _, sector_size as _).unwrap(); let ptr = unsafe { alloc::alloc(layout) }; // Not sure if this is required - let mut bytes = unsafe {Vec::from_raw_parts(ptr, buf_size as _, buf_size as _)}; + let mut bytes = unsafe { Vec::from_raw_parts(ptr, buf_size as _, buf_size as _) }; let result = spawn_blocking(move || { if let Err(e) = file.read_exact(&mut bytes) { warn!("error reading batch: {}", e); } Self::serialize_batches(bytes, &mut batches); Ok(batches) - }).await.unwrap(); + }) + .await + .unwrap(); result } @@ -98,7 +115,6 @@ impl DirectIOStorage { ); break; } - // Ergh.... let payload = Bytes::copy_from_slice(&bytes[payload_start..payload_end]); read_bytes = payload_end + diff as usize; @@ -114,14 +130,19 @@ impl DirectIOStorage { } pub async fn write_batches(&self, file_path: &str, bytes: Vec) -> Result { - let mut std_file = std::fs::File::options().append(true).custom_flags(libc::O_DIRECT).open(file_path)?; + let mut std_file = std::fs::File::options() + .append(true) + .custom_flags(libc::O_DIRECT) + .open(file_path)?; //let mut file = OpenOptions::new().append(true).custom_flags(libc::O_DIRECT).open(file_path).await?; let size = bytes.len() as _; spawn_blocking(move || { if let Err(e) = std_file.write_all(&bytes) { warn!("error writing: {}", e); } - }).await.unwrap(); + }) + .await + .unwrap(); Ok(size) } } diff --git a/server/src/streaming/iggy_storage.rs b/server/src/streaming/iggy_storage.rs index e4069c398..49f4daa10 100644 --- a/server/src/streaming/iggy_storage.rs +++ b/server/src/streaming/iggy_storage.rs @@ -155,10 +155,10 @@ impl Debug for dyn SegmentStorage { #[cfg(test)] pub(crate) mod tests { + use crate::streaming::iggy_storage::*; use crate::streaming::partitions::partition::Partition; use crate::streaming::segments::index::{Index, IndexRange}; use crate::streaming::segments::segment::Segment; - use crate::streaming::iggy_storage::*; use crate::streaming::streams::stream::Stream; use crate::streaming::topics::topic::Topic; use async_trait::async_trait; diff --git a/server/src/streaming/io/buf/dma_buf.rs b/server/src/streaming/io/buf/dma_buf.rs new file mode 100644 index 000000000..822366e99 --- /dev/null +++ b/server/src/streaming/io/buf/dma_buf.rs @@ -0,0 +1,66 @@ +use std::{ + alloc::{self, Layout}, + ptr, +}; + +use super::IoBuf; + +pub struct DmaBuf { + data: ptr::NonNull, + layout: Layout, + size: usize, +} + +// SAFETY: fuck safety. +unsafe impl Send for DmaBuf {} + +impl DmaBuf { + pub fn new(size: usize) -> Self { + assert!(size > 0); + assert!(size % 512 == 0); + let layout = + Layout::from_size_align(size, 4096).expect("Falied to create layout for DmaBuf"); + let data_ptr = unsafe { alloc::alloc(layout) }; + let data = ptr::NonNull::new(data_ptr).expect("DmaBuf data_ptr is not null"); + + Self { data, layout, size } + } +} + +impl Drop for DmaBuf { + fn drop(&mut self) { + unsafe { + alloc::dealloc(self.data.as_ptr(), self.layout); + } + } +} + +impl IoBuf for DmaBuf { + fn as_ptr(&self) -> *const u8 { + self.data.as_ptr() + } + + fn as_ptr_mut(&mut self) -> *mut u8 { + self.data.as_ptr() + } + + fn as_bytes(&self) -> &[u8] { + unsafe { std::slice::from_raw_parts(self.as_ptr(), self.size) } + } + + fn as_bytes_mut(&mut self) -> &mut [u8] { + unsafe { std::slice::from_raw_parts_mut(self.as_ptr_mut(), self.size) } + } +} + +impl AsRef<[u8]> for DmaBuf { + fn as_ref(&self) -> &[u8] { + self.as_bytes() + } +} + +impl AsMut<[u8]> for DmaBuf { + fn as_mut(&mut self) -> &mut [u8] { + self.as_bytes_mut() + } +} diff --git a/server/src/streaming/io/buf/mod.rs b/server/src/streaming/io/buf/mod.rs new file mode 100644 index 000000000..a15e08e97 --- /dev/null +++ b/server/src/streaming/io/buf/mod.rs @@ -0,0 +1,8 @@ +pub mod dma_buf; + +pub trait IoBuf: AsRef<[u8]> + AsMut<[u8]> { + fn as_ptr(&self) -> *const u8; + fn as_ptr_mut(&mut self) -> *mut u8; + fn as_bytes(&self) -> &[u8]; + fn as_bytes_mut(&mut self) -> &mut [u8]; +} diff --git a/server/src/streaming/io/dma_buf.rs b/server/src/streaming/io/dma_buf.rs deleted file mode 100644 index 0174ff432..000000000 --- a/server/src/streaming/io/dma_buf.rs +++ /dev/null @@ -1,7 +0,0 @@ -pub trait DmaBuf { - -} - -pub struct DmfBuf { - -} diff --git a/server/src/streaming/io/mod.rs b/server/src/streaming/io/mod.rs index 39d198881..c05035d2a 100644 --- a/server/src/streaming/io/mod.rs +++ b/server/src/streaming/io/mod.rs @@ -1 +1,2 @@ -pub mod dma_buf; +pub mod buf; +pub mod stream; diff --git a/server/src/streaming/io/stream/message_stream.rs b/server/src/streaming/io/stream/message_stream.rs new file mode 100644 index 000000000..eeb17068b --- /dev/null +++ b/server/src/streaming/io/stream/message_stream.rs @@ -0,0 +1,105 @@ +use crate::streaming::{ + batching::message_batch::RETAINED_BATCH_OVERHEAD, models::messages::RetainedMessage, +}; +use bytes::{BufMut, BytesMut}; +use futures::{AsyncBufRead, AsyncBufReadExt, AsyncReadExt, FutureExt, Stream}; +use iggy::error::IggyError; +use pin_project::pin_project; +use std::{future::Future, pin::Pin, task::Poll}; +use tokio::task::yield_now; +use tracing::warn; + +pub struct RetainedMessageStream +where + R: AsyncBufRead + Unpin, +{ + header_read: bool, + batch_length: usize, + sector_size: usize, + read_bytes: usize, + reader: R, +} + +impl RetainedMessageStream +where + R: AsyncBufRead + Unpin, +{ + pub fn new(reader: R, sector_size: usize) -> Self { + Self { + header_read: false, + batch_length: 0, + read_bytes: 0, + sector_size, + reader, + } + } +} + +impl Stream for RetainedMessageStream +where + R: AsyncBufRead + Unpin, +{ + type Item = Result; + + fn poll_next( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + let this = self.get_mut(); + let mut read_exact = + |buf: &mut [u8], cx: &mut std::task::Context<'_>| -> Poll> { + let mut read_offset = 0; + while read_offset < buf.len() { + let n = match this.reader.read(&mut buf[read_offset..]).poll_unpin(cx)? { + Poll::Ready(val) => val, + Poll::Pending => { + continue; + } + }; + read_offset += n; + } + Poll::Ready(Ok(())) + }; + if !this.header_read { + let mut buf = [0u8; RETAINED_BATCH_OVERHEAD as _]; + if let Err(e) = futures::ready!(read_exact(&mut buf, cx)) { + return Some(Err(e.into())).into(); + } + + //TODO: maybe we could use more of those fields ?? + let batch_length = u32::from_le_bytes(buf[8..12].try_into().unwrap()); + this.batch_length = batch_length as usize; + this.read_bytes = 0; + this.header_read = true; + } + assert!(this.batch_length > 0); + + let mut buf = [0u8; 4]; + if let Err(e) = futures::ready!(read_exact(&mut buf, cx)) { + return Some(Err(e.into())).into(); + } + let length = u32::from_le_bytes(buf[0..4].try_into().unwrap()); + + let mut payload = BytesMut::with_capacity(length as _); + payload.put_bytes(0, length as _); + if let Err(e) = futures::ready!(read_exact(&mut payload, cx)) { + return Some(Err(e.into())).into(); + } + this.read_bytes += length as usize + 4; + if this.read_bytes >= this.batch_length { + // This is a temp solution, to the padding that Direct I/O requires. + // Later on, we could encode that information in our batch header + // for example Header { batch_length: usize, padding: usize } + // and use the padding to advance the reader further. + let total_batch_length = this.batch_length + RETAINED_BATCH_OVERHEAD as usize; + let sectors = total_batch_length.div_ceil(this.sector_size); + let adjusted_size = this.sector_size * sectors; + let diff = adjusted_size - total_batch_length; + this.reader.consume_unpin(diff); + this.header_read = false; + } + + let message = RetainedMessage::try_from_bytes(payload.freeze()).unwrap(); + Poll::Ready(Some(Ok(message))) + } +} diff --git a/server/src/streaming/io/stream/mod.rs b/server/src/streaming/io/stream/mod.rs new file mode 100644 index 000000000..3402cbbbd --- /dev/null +++ b/server/src/streaming/io/stream/mod.rs @@ -0,0 +1 @@ +pub mod message_stream; diff --git a/server/src/streaming/mod.rs b/server/src/streaming/mod.rs index 4eb81a22f..869f46227 100644 --- a/server/src/streaming/mod.rs +++ b/server/src/streaming/mod.rs @@ -3,6 +3,9 @@ pub mod cache; pub mod clients; mod deduplication; pub mod diagnostics; +pub mod direct_io; +pub mod iggy_storage; +pub mod io; pub mod models; pub mod partitions; pub mod persistence; @@ -11,11 +14,9 @@ pub mod polling_consumer; pub mod segments; pub mod session; pub mod sizeable; -pub mod iggy_storage; +pub mod storage; pub mod streams; pub mod systems; pub mod topics; pub mod users; pub mod utils; -pub mod direct_io; -pub mod io; diff --git a/server/src/streaming/partitions/messages.rs b/server/src/streaming/partitions/messages.rs index bc7c60070..0f9f94614 100644 --- a/server/src/streaming/partitions/messages.rs +++ b/server/src/streaming/partitions/messages.rs @@ -542,8 +542,8 @@ mod tests { use super::*; use crate::configs::system::{MessageDeduplicationConfig, SystemConfig}; - use crate::streaming::partitions::create_messages; use crate::streaming::iggy_storage::tests::get_test_system_storage; + use crate::streaming::partitions::create_messages; #[tokio::test] async fn given_disabled_message_deduplication_all_messages_should_be_appended() { diff --git a/server/src/streaming/partitions/partition.rs b/server/src/streaming/partitions/partition.rs index f21b925e7..2828a508e 100644 --- a/server/src/streaming/partitions/partition.rs +++ b/server/src/streaming/partitions/partition.rs @@ -2,9 +2,9 @@ use crate::configs::system::SystemConfig; use crate::streaming::cache::buffer::SmartCache; use crate::streaming::cache::memory_tracker::CacheMemoryTracker; use crate::streaming::deduplication::message_deduplicator::MessageDeduplicator; +use crate::streaming::iggy_storage::SystemStorage; use crate::streaming::models::messages::RetainedMessage; use crate::streaming::segments::segment::Segment; -use crate::streaming::iggy_storage::SystemStorage; use dashmap::DashMap; use iggy::consumer::ConsumerKind; use iggy::utils::duration::IggyDuration; @@ -176,8 +176,8 @@ impl Partition { #[cfg(test)] mod tests { use crate::configs::system::{CacheConfig, SystemConfig}; - use crate::streaming::partitions::partition::Partition; use crate::streaming::iggy_storage::tests::get_test_system_storage; + use crate::streaming::partitions::partition::Partition; use iggy::utils::duration::IggyDuration; use iggy::utils::expiry::IggyExpiry; use iggy::utils::timestamp::IggyTimestamp; diff --git a/server/src/streaming/partitions/storage.rs b/server/src/streaming/partitions/storage.rs index 2b9b04784..e5b9406bb 100644 --- a/server/src/streaming/partitions/storage.rs +++ b/server/src/streaming/partitions/storage.rs @@ -1,10 +1,10 @@ use crate::compat::index_conversion::index_converter::IndexConverter; use crate::state::system::PartitionState; use crate::streaming::batching::batch_accumulator::BatchAccumulator; +use crate::streaming::iggy_storage::PartitionStorage; use crate::streaming::partitions::partition::{ConsumerOffset, Partition}; use crate::streaming::persistence::persister::Persister; use crate::streaming::segments::segment::{Segment, INDEX_EXTENSION, LOG_EXTENSION}; -use crate::streaming::iggy_storage::PartitionStorage; use crate::streaming::utils::file; use anyhow::Context; use async_trait::async_trait; diff --git a/server/src/streaming/segments/index.rs b/server/src/streaming/segments/index.rs index 0d0960fad..4736b6277 100644 --- a/server/src/streaming/segments/index.rs +++ b/server/src/streaming/segments/index.rs @@ -1,7 +1,6 @@ use crate::streaming::segments::segment::Segment; use iggy::error::IggyError; use iggy::error::IggyError::InvalidOffset; -use tracing::warn; #[derive(Debug, Eq, Clone, Copy, Default)] pub struct Index { @@ -33,8 +32,7 @@ impl Segment { let ending_offset_idx = binary_search_index(indices, end_offset); match (starting_offset_idx, ending_offset_idx) { - (Some(starting_offset_idx), Some(ending_offset_idx)) => - { + (Some(starting_offset_idx), Some(ending_offset_idx)) => { // UGLY AS FOOOOOOOOOOOOOK, but will deal with it later on. let end_idx = if ending_offset_idx == indices.len() - 1 { ending_offset_idx @@ -45,7 +43,7 @@ impl Segment { start: indices[starting_offset_idx], end: indices[end_idx], }) - }, + } (Some(starting_offset_idx), None) => Ok(IndexRange { start: indices[starting_offset_idx], end: *indices.last().unwrap(), diff --git a/server/src/streaming/segments/messages.rs b/server/src/streaming/segments/messages.rs index ef2ec1c4f..fe2f7600e 100644 --- a/server/src/streaming/segments/messages.rs +++ b/server/src/streaming/segments/messages.rs @@ -1,14 +1,19 @@ use crate::streaming::batching::batch_accumulator::BatchAccumulator; use crate::streaming::batching::batch_filter::BatchItemizer; use crate::streaming::batching::message_batch::{RetainedMessageBatch, RETAINED_BATCH_OVERHEAD}; +use crate::streaming::io::stream::message_stream::RetainedMessageStream; use crate::streaming::models::messages::RetainedMessage; use crate::streaming::segments::index::{Index, IndexRange}; use crate::streaming::segments::segment::Segment; use crate::streaming::sizeable::Sizeable; +use crate::streaming::storage::Storage; use bytes::buf::UninitSlice; use bytes::{BufMut, Bytes, BytesMut}; +use futures::{AsyncReadExt, StreamExt, TryStreamExt}; use iggy::error::IggyError; use std::alloc::{self, Layout}; +use std::future; +use std::io::BufReader; use std::slice::from_raw_parts; use std::sync::atomic::Ordering; use std::sync::Arc; @@ -41,26 +46,26 @@ impl Segment { let end_offset = offset + (count - 1) as u64; // In case that the partition messages buffer is disabled, we need to check the unsaved messages buffer if self.unsaved_messages.is_none() { - return self.load_messages_from_disk(offset, end_offset).await; + return self.load_n_messages_from_file(offset, count).await; } let batch_accumulator = self.unsaved_messages.as_ref().unwrap(); if batch_accumulator.is_empty() { - return self.load_messages_from_disk(offset, end_offset).await; + return self.load_n_messages_from_file(offset, count).await; } let first_offset = batch_accumulator.batch_base_offset(); if end_offset < first_offset { - return self.load_messages_from_disk(offset, end_offset).await; + return self.load_n_messages_from_file(offset, count).await; } let last_offset = batch_accumulator.batch_max_offset(); if offset >= first_offset && end_offset <= last_offset { - return Ok(self.load_messages_from_unsaved_buffer(offset, end_offset)); + return self.load_n_messages_from_file(offset, count).await; } // Can this be somehow improved? maybe with chain iterators - let mut messages = self.load_messages_from_disk(offset, end_offset).await?; + let mut messages = self.load_n_messages_from_file(offset, count).await?; let mut buffered_messages = self.load_messages_from_unsaved_buffer(offset, last_offset); messages.append(&mut buffered_messages); @@ -101,6 +106,61 @@ impl Segment { batch_accumulator.get_messages_by_offset(start_offset, end_offset) } + async fn load_n_messages_from_file( + &self, + start_offset: u64, + count: u32, + ) -> Result>, IggyError> { + let end_offset = start_offset + (count - 1) as u64; + if let Some(indices) = &self.indexes { + let relative_start_offset = (start_offset - self.start_offset) as u32; + let relative_end_offset = (end_offset - self.start_offset) as u32; + let index_range = match self.load_highest_lower_bound_index( + indices, + relative_start_offset, + relative_end_offset, + ) { + Ok(range) => range, + Err(_) => { + error!( + "Cannot load messages from disk, index range not found: {} - {}.", + start_offset, end_offset + ); + return Ok(EMPTY_MESSAGES.into_iter().map(Arc::new).collect()); + } + }; + let start_position = index_range.start.position; + return self + .load_n_messages_from_disk(start_position, count, |msg| msg.offset >= start_offset) + .await; + } + Ok(EMPTY_MESSAGES.into_iter().map(Arc::new).collect()) + } + + async fn load_n_messages_from_disk( + &self, + start_position: u32, + count: u32, + filter: F, + ) -> Result>, IggyError> + where + F: Fn(&RetainedMessage) -> bool, + { + let reader = self + .new_storage + .read_blocks(start_position as _, self.size_bytes as u64) + .into_async_read(); + let message_stream = RetainedMessageStream::new(reader, 4096); + let messages = message_stream + .try_filter(|msg| future::ready(filter(msg))) + .take(count as _) + .map_ok(Arc::new) + .try_collect() + .await?; + + Ok(messages) + } + async fn load_messages_from_disk( &self, start_offset: u64, @@ -277,7 +337,7 @@ impl Segment { if has_remainder { self.unsaved_messages = Some(batch_accumulator); } - let mut bytes = unsafe {Vec::from_raw_parts(ptr, adjusted_size as _, adjusted_size as _)}; + let mut bytes = unsafe { Vec::from_raw_parts(ptr, adjusted_size as _, adjusted_size as _) }; let diff = bytes.len() as u32 - batch_size; batch.extend2(&mut bytes); let saved_bytes = storage.write_batches(self.log_path.as_str(), bytes).await?; diff --git a/server/src/streaming/segments/segment.rs b/server/src/streaming/segments/segment.rs index 014baa326..3693ad903 100644 --- a/server/src/streaming/segments/segment.rs +++ b/server/src/streaming/segments/segment.rs @@ -1,10 +1,12 @@ use crate::configs::system::SystemConfig; use crate::streaming::batching::batch_accumulator::BatchAccumulator; use crate::streaming::direct_io::storage::DirectIOStorage; -use crate::streaming::segments::index::Index; use crate::streaming::iggy_storage::SystemStorage; +use crate::streaming::segments::index::Index; +use crate::streaming::storage::storage::DmaStorage; use iggy::utils::expiry::IggyExpiry; use iggy::utils::timestamp::IggyTimestamp; +use std::io::BufReader; use std::sync::atomic::AtomicU64; use std::sync::Arc; @@ -38,6 +40,7 @@ pub struct Segment { pub(crate) indexes: Option>, pub(crate) storage: Arc, pub(crate) direct_io_storage: Arc, + pub(crate) new_storage: Arc, } impl Segment { @@ -59,6 +62,7 @@ impl Segment { messages_count_of_parent_partition: Arc, ) -> Segment { let path = config.get_segment_path(stream_id, topic_id, partition_id, start_offset); + let block_size = 10 * 4096; Segment { stream_id, @@ -81,6 +85,7 @@ impl Segment { false => None, }, direct_io_storage: Default::default(), + new_storage: Arc::new(DmaStorage::new(Self::get_log_path(&path), block_size)), unsaved_messages, is_closed: false, size_of_parent_stream, diff --git a/server/src/streaming/segments/storage.rs b/server/src/streaming/segments/storage.rs index 57875fb60..58f8a34c9 100644 --- a/server/src/streaming/segments/storage.rs +++ b/server/src/streaming/segments/storage.rs @@ -1,11 +1,11 @@ use crate::streaming::batching::iterator::IntoMessagesIterator; use crate::streaming::batching::message_batch::RetainedMessageBatch; +use crate::streaming::iggy_storage::SegmentStorage; use crate::streaming::models::messages::RetainedMessage; use crate::streaming::persistence::persister::Persister; use crate::streaming::segments::index::{Index, IndexRange}; use crate::streaming::segments::segment::Segment; use crate::streaming::sizeable::Sizeable; -use crate::streaming::iggy_storage::SegmentStorage; use crate::streaming::utils::file; use crate::streaming::utils::head_tail_buf::HeadTailBuffer; use anyhow::Context; diff --git a/server/src/streaming/storage/mod.rs b/server/src/streaming/storage/mod.rs new file mode 100644 index 000000000..ef1ced59e --- /dev/null +++ b/server/src/streaming/storage/mod.rs @@ -0,0 +1,17 @@ +use super::io::buf::IoBuf; +use futures::Stream; + +pub mod storage; + +pub trait Storage +where + B: IoBuf, +{ + //TODO: support taking as an input Iterator and used write_vectored. + //fn write_blocks(&self, buffer: B) -> Result; + fn read_blocks( + &self, + position: u64, + limit: u64, + ) -> impl Stream>; +} diff --git a/server/src/streaming/storage/storage.rs b/server/src/streaming/storage/storage.rs new file mode 100644 index 000000000..9b09a7c10 --- /dev/null +++ b/server/src/streaming/storage/storage.rs @@ -0,0 +1,128 @@ +use super::Storage; +use crate::streaming::io::buf::dma_buf::DmaBuf; +use futures::{FutureExt, Stream, TryStream}; +use std::{ + future::Future, + io::{Read, Seek, SeekFrom}, + os::unix::fs::{FileExt, OpenOptionsExt}, + pin::Pin, + sync::{Arc, Mutex}, + task::Poll, +}; +use tokio::{ + fs::File, + io::{AsyncRead, AsyncReadExt, BufReader}, + task::{spawn_blocking, JoinHandle}, +}; +use tracing::warn; + +#[derive(Debug)] +pub struct DmaStorage { + file_path: &'static str, + block_size: usize, +} + +impl DmaStorage { + pub fn new(file_path: String, block_size: usize) -> Self { + Self { + file_path: file_path.leak(), + block_size, + } + } +} + +impl Storage for DmaStorage { + fn read_blocks( + &self, + position: u64, + limit: u64, + ) -> impl Stream> { + let file = std::fs::File::options() + .read(true) + .append(true) + .custom_flags(libc::O_DIRECT) + .open(self.file_path) + .unwrap(); + let state = State::init(file); + BlockStream { + state: Arc::new(Mutex::new(state)), + position: Arc::new(Mutex::new(position)), + limit, + size: self.block_size, + } + } +} + +struct BlockStream +where + R: FileExt + Read + Unpin, +{ + state: Arc>>, + position: Arc>, + size: usize, + limit: u64, +} + +struct State +where + R: FileExt + Read, +{ + reader: R, + result: Option<(usize, Result, std::io::Error>)>, +} + +impl State +where + R: FileExt + Read, +{ + fn init(reader: R) -> Self { + Self { + reader, + result: None, + } + } +} + +impl Stream for BlockStream +where + R: Seek + FileExt + Read + Send + Sync + Unpin + 'static, +{ + type Item = Result; + + fn poll_next( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + let mut state = self.state.lock().unwrap(); + if let Some(result) = state.result.take() { + let mut position = self.position.lock().unwrap(); + *position += result.0 as u64; + return Poll::Ready(result.1.transpose()); + } + let waker = cx.waker().clone(); + drop(state); + + let size = self.size; + let limit = self.limit; + let position = self.position.clone(); + let state = Arc::clone(&self.state); + spawn_blocking(move || { + let mut state = state.lock().unwrap(); + let position = position.lock().unwrap(); + let buf_size = std::cmp::min(size, (limit - *position) as usize); + if buf_size > 0 { + let mut buf = DmaBuf::new(buf_size); + let result = match state.reader.read_exact_at(buf.as_mut(), *position) { + Ok(_) => Ok(Some(buf)), + Err(err) => Err(err), + }; + drop(position); + state.result = Some((buf_size, result)); + waker.wake(); + } else { + state.result = Some((0, Ok(None))); + } + }); + Poll::Pending + } +} diff --git a/server/src/streaming/systems/storage.rs b/server/src/streaming/systems/storage.rs index 8008e7d1b..4ef2ce471 100644 --- a/server/src/streaming/systems/storage.rs +++ b/server/src/streaming/systems/storage.rs @@ -1,5 +1,5 @@ -use crate::streaming::persistence::persister::Persister; use crate::streaming::iggy_storage::SystemInfoStorage; +use crate::streaming::persistence::persister::Persister; use crate::streaming::systems::info::SystemInfo; use crate::streaming::utils::file; use anyhow::Context; diff --git a/server/src/streaming/systems/system.rs b/server/src/streaming/systems/system.rs index 0c7381a19..6c67c3093 100644 --- a/server/src/streaming/systems/system.rs +++ b/server/src/streaming/systems/system.rs @@ -3,9 +3,9 @@ use crate::configs::system::SystemConfig; use crate::streaming::cache::memory_tracker::CacheMemoryTracker; use crate::streaming::clients::client_manager::ClientManager; use crate::streaming::diagnostics::metrics::Metrics; +use crate::streaming::iggy_storage::SystemStorage; use crate::streaming::persistence::persister::*; use crate::streaming::session::Session; -use crate::streaming::iggy_storage::SystemStorage; use crate::streaming::streams::stream::Stream; use crate::streaming::users::permissioner::Permissioner; use iggy::error::IggyError; diff --git a/server/src/streaming/topics/storage.rs b/server/src/streaming/topics/storage.rs index 71c090d22..33d54a4f5 100644 --- a/server/src/streaming/topics/storage.rs +++ b/server/src/streaming/topics/storage.rs @@ -1,6 +1,6 @@ use crate::state::system::TopicState; -use crate::streaming::partitions::partition::Partition; use crate::streaming::iggy_storage::TopicStorage; +use crate::streaming::partitions::partition::Partition; use crate::streaming::topics::consumer_group::ConsumerGroup; use crate::streaming::topics::topic::Topic; use anyhow::Context; diff --git a/server/src/streaming/topics/topic.rs b/server/src/streaming/topics/topic.rs index e9411ca4c..977ebf32c 100644 --- a/server/src/streaming/topics/topic.rs +++ b/server/src/streaming/topics/topic.rs @@ -1,7 +1,7 @@ use crate::configs::system::SystemConfig; +use crate::streaming::iggy_storage::SystemStorage; use crate::streaming::partitions::partition::Partition; use crate::streaming::polling_consumer::PollingConsumer; -use crate::streaming::iggy_storage::SystemStorage; use crate::streaming::topics::consumer_group::ConsumerGroup; use core::fmt; use iggy::compression::compression_algorithm::CompressionAlgorithm;