Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transfer batch accumulator when rolling to new segment #1338

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
21 changes: 12 additions & 9 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion bench/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ impl Consumer {
let mut topic_not_found_counter = 0;
let mut strategy = PollingStrategy::offset(0);

/*
if self.warmup_time.get_duration() != Duration::from_millis(0) {
if let Some(cg_id) = self.consumer_group_id {
info!(
Expand Down Expand Up @@ -117,6 +118,7 @@ impl Consumer {
current_iteration += 1;
}
}
*/

if let Some(cg_id) = self.consumer_group_id {
info!(
Expand All @@ -131,6 +133,7 @@ impl Consumer {
}

current_iteration = 0;
let mut previous_start_offset = 0;
let start_timestamp = Instant::now();
while received_messages < total_messages {
let offset = current_iteration * self.messages_per_batch as u64;
Expand Down Expand Up @@ -173,7 +176,7 @@ impl Consumer {
);
continue;
}

assert!(previous_start_offset <= polled_messages.messages[0].offset);
if polled_messages.messages.len() != self.messages_per_batch as usize {
warn!(
"Consumer #{} → expected {} messages, but got {} messages, retrying...",
Expand All @@ -183,6 +186,7 @@ impl Consumer {
);
continue;
}
previous_start_offset = polled_messages.messages[0].offset;

latencies.push(latency_end);
received_messages += polled_messages.messages.len() as u64;
Expand Down
2 changes: 2 additions & 0 deletions bench/src/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ impl Producer {
1 => Partitioning::partition_id(default_partition_id),
2.. => Partitioning::balanced(),
};
/*
info!(
"Producer #{} → warming up for {}...",
self.producer_id, self.warmup_time
Expand All @@ -82,6 +83,7 @@ impl Producer {
.send_messages(&stream_id, &topic_id, &partitioning, &mut messages)
.await?;
}
*/

info!(
"Producer #{} → sending {} messages in {} batches of {} messages...",
Expand Down
4 changes: 2 additions & 2 deletions configs/server.json
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@
"sysinfo_print_interval": "10 s"
},
"cache": {
"enabled": true,
"enabled": false,
"size": "4 GB"
},
"encryption": {
Expand All @@ -188,7 +188,7 @@
"path": "partitions",
"enforce_fsync": false,
"validate_checksum": false,
"messages_required_to_save": 5000
"messages_required_to_save": 1000
},
"segment": {
"size": "1 GB",
Expand Down
8 changes: 4 additions & 4 deletions configs/server.toml
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ path = "compatibility"
# Determines whether to enforce file synchronization on state updates (boolean).
# `true` ensures immediate writing of data to disk for durability.
# `false` allows the OS to manage write operations, which can improve performance.
enforce_fsync = false
enforce_fsync = true

# Runtime configuration.
[system.runtime]
Expand Down Expand Up @@ -372,7 +372,7 @@ sysinfo_print_interval = "10 s"
# Enables or disables the system cache.
# `true` activates caching for frequently accessed data.
# `false` disables caching, data is always read from the source.
enabled = true
enabled = false

# Maximum size of the cache, e.g. "4GB".
size = "4 GB"
Expand Down Expand Up @@ -444,12 +444,12 @@ validate_checksum = false
# The threshold of buffered messages before triggering a save to disk (integer).
# Specifies how many messages accumulate before persisting to storage.
# Adjusting this can balance between write performance and data durability.
messages_required_to_save = 5000
messages_required_to_save = 1000

# Segment configuration
[system.segment]
# Defines the soft limit for the size of a storage segment.
# When a segment reaches this size, a new segment is created for subsequent data.
# When a segment reaches this size (maximum 4 GB), a new segment is created for subsequent data.
# Example: if `size` is set "1GB", the actual segment size may be 1GB + the size of remaining messages in received batch.
size = "1 GB"
# Configures the message time-based expiry setting.
Expand Down
12 changes: 9 additions & 3 deletions integration/tests/server/scenarios/message_size_scenario.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,13 @@ pub async fn run(client_factory: &dyn ClientFactory) {
.await;
send_message_and_check_result(
&client,
MessageToSend::OfSizeWithHeaders(100_001, 10_000_000),
MessageToSend::OfSizeWithHeaders(1_000_000, 10_000_000),
Ok(()),
)
.await;
send_message_and_check_result(
&client,
MessageToSend::OfSizeWithHeaders(1_000_001, 10_000_000),
Err(InvalidResponse(
4017,
23,
Expand All @@ -74,7 +80,7 @@ pub async fn run(client_factory: &dyn ClientFactory) {
.await;
send_message_and_check_result(
&client,
MessageToSend::OfSizeWithHeaders(100_000, 10_000_001),
MessageToSend::OfSizeWithHeaders(100_000, 1_000_000_001),
Err(InvalidResponse(
4022,
23,
Expand All @@ -83,7 +89,7 @@ pub async fn run(client_factory: &dyn ClientFactory) {
)
.await;

assert_message_count(&client, 6).await;
assert_message_count(&client, 7).await;
cleanup_system(&client).await;
assert_clean_system(&client).await;
}
Expand Down
2 changes: 1 addition & 1 deletion integration/tests/server/scenarios/system_scenario.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ pub async fn run(client_factory: &dyn ClientFactory) {
assert_eq!(topic.name, TOPIC_NAME);
assert_eq!(topic.partitions_count, PARTITIONS_COUNT);
assert_eq!(topic.partitions.len(), PARTITIONS_COUNT as usize);
assert_eq!(topic.size, 55890);
assert_eq!(topic.size, 55914);
assert_eq!(topic.messages_count, MESSAGES_COUNT as u64);
let topic_partition = topic.partitions.get((PARTITION_ID - 1) as usize).unwrap();
assert_eq!(topic_partition.id, PARTITION_ID);
Expand Down
13 changes: 9 additions & 4 deletions integration/tests/streaming/common/test_setup.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use server::configs::system::SystemConfig;
use server::streaming::persistence::persister::FilePersister;
use server::streaming::storage::SystemStorage;
use server::streaming::iggy_storage::SystemStorage;
use server::streaming::persistence::persister::{FilePersister, FileWithSyncPersister};
use std::sync::Arc;
use tokio::fs;
use uuid::Uuid;
Expand All @@ -20,8 +20,13 @@ impl TestSetup {

let config = Arc::new(config);
fs::create_dir(config.get_system_path()).await.unwrap();
let persister = FilePersister {};
let storage = Arc::new(SystemStorage::new(config.clone(), Arc::new(persister)));
let persister = FilePersister;
let fsync_persister = FileWithSyncPersister;
let storage = Arc::new(SystemStorage::new(
config.clone(),
Arc::new(persister),
Arc::new(fsync_persister),
));
TestSetup { config, storage }
}

Expand Down
2 changes: 1 addition & 1 deletion integration/tests/streaming/consumer_offset.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::streaming::common::test_setup::TestSetup;
use iggy::consumer::ConsumerKind;
use server::configs::system::SystemConfig;
use server::streaming::iggy_storage::PartitionStorage;
use server::streaming::partitions::partition::ConsumerOffset;
use server::streaming::storage::PartitionStorage;
use std::sync::Arc;
use tokio::fs;

Expand Down
13 changes: 10 additions & 3 deletions integration/tests/streaming/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ async fn should_persist_segment() {
topic_id,
partition_id,
start_offset,
None,
setup.config.clone(),
setup.storage.clone(),
IggyExpiry::NeverExpire,
Expand Down Expand Up @@ -63,6 +64,7 @@ async fn should_load_existing_segment_from_disk() {
topic_id,
partition_id,
start_offset,
None,
setup.config.clone(),
setup.storage.clone(),
IggyExpiry::NeverExpire,
Expand Down Expand Up @@ -90,6 +92,7 @@ async fn should_load_existing_segment_from_disk() {
topic_id,
partition_id,
start_offset,
None,
setup.config.clone(),
setup.storage.clone(),
IggyExpiry::NeverExpire,
Expand Down Expand Up @@ -127,6 +130,7 @@ async fn should_persist_and_load_segment_with_messages() {
topic_id,
partition_id,
start_offset,
None,
setup.config.clone(),
setup.storage.clone(),
IggyExpiry::NeverExpire,
Expand Down Expand Up @@ -172,12 +176,13 @@ async fn should_persist_and_load_segment_with_messages() {
.append_batch(batch_size, messages_count as u32, &messages)
.await
.unwrap();
segment.persist_messages().await.unwrap();
segment.persist_messages(true).await.unwrap();
let mut loaded_segment = segment::Segment::create(
stream_id,
topic_id,
partition_id,
start_offset,
None,
setup.config.clone(),
setup.storage.clone(),
IggyExpiry::NeverExpire,
Expand Down Expand Up @@ -210,6 +215,7 @@ async fn given_all_expired_messages_segment_should_be_expired() {
topic_id,
partition_id,
start_offset,
None,
setup.config.clone(),
setup.storage.clone(),
message_expiry,
Expand Down Expand Up @@ -257,7 +263,7 @@ async fn given_all_expired_messages_segment_should_be_expired() {
.append_batch(batch_size, messages_count as u32, &messages)
.await
.unwrap();
segment.persist_messages().await.unwrap();
segment.persist_messages(true).await.unwrap();

segment.is_closed = true;
let is_expired = segment.is_expired(now).await;
Expand All @@ -278,6 +284,7 @@ async fn given_at_least_one_not_expired_message_segment_should_not_be_expired()
topic_id,
partition_id,
start_offset,
None,
setup.config.clone(),
setup.storage.clone(),
message_expiry,
Expand Down Expand Up @@ -342,7 +349,7 @@ async fn given_at_least_one_not_expired_message_segment_should_not_be_expired()
.append_batch(not_expired_message_size, 1, &not_expired_messages)
.await
.unwrap();
segment.persist_messages().await.unwrap();
segment.persist_messages(true).await.unwrap();

let is_expired = segment.is_expired(now).await;
assert!(!is_expired);
Expand Down
3 changes: 2 additions & 1 deletion sdk/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "iggy"
version = "0.6.33"
version = "0.6.34"
description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second."
edition = "2021"
license = "MIT"
Expand Down Expand Up @@ -39,6 +39,7 @@ futures-util = "0.3.30"
humantime = "2.1.0"
keyring = { version = "3.2.0", optional = true, features = ["sync-secret-service", "vendored"] }
lazy_static = "1.4.0"
libc = "0.2.162"
passterm = { version = "2.0.1", optional = true }
quinn = { version = "0.11.5" }
regex = "1.10.4"
Expand Down
4 changes: 2 additions & 2 deletions sdk/src/messages/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ pub mod flush_unsaved_buffer;
pub mod poll_messages;
pub mod send_messages;

const MAX_HEADERS_SIZE: u32 = 100 * 1000;
pub const MAX_PAYLOAD_SIZE: u32 = 10 * 1000 * 1000;
const MAX_HEADERS_SIZE: u32 = 1_000_000;
pub const MAX_PAYLOAD_SIZE: u32 = 1_000_000_000;
4 changes: 3 additions & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ console-subscriber = { version = "0.4.0", optional = true }
dashmap = "6.0.1"
derive_more = "1.0.0"
figlet-rs = "0.1.5"
libc = "0.2.162"
figment = { version = "0.10.18", features = ["json", "toml", "env"] }
flume = "0.11.0"
futures = "0.3.30"
futures = { version = "0.3.30" }
iggy = { path = "../sdk" }
jsonwebtoken = "9.3.0"
log = "0.4.20"
Expand Down Expand Up @@ -89,6 +90,7 @@ ulid = "1.1.2"
uuid = { version = "1.1.0", features = ["v7", "fast-rng", "zerocopy"] }
xxhash-rust = { version = "0.8.12", features = ["xxh32"] }
zip = "2.2.0"
pin-project = "1.1.7"

[target.'cfg(not(target_env = "msvc"))'.dependencies]
tikv-jemallocator = { version = "0.6", optional = true }
Expand Down
2 changes: 1 addition & 1 deletion server/src/channels/commands/maintain_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ async fn delete_segments(

if partition.get_segments().is_empty() {
let start_offset = last_end_offset + 1;
partition.add_persisted_segment(start_offset).await?;
partition.add_persisted_segment(None, start_offset).await?;
}
}
Err(error) => {
Expand Down
Loading