Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
pcholakov committed Dec 6, 2024
1 parent 6437fbd commit 92bd1c5
Show file tree
Hide file tree
Showing 5 changed files with 249 additions and 179 deletions.
2 changes: 1 addition & 1 deletion crates/admin/src/cluster_controller/logs_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ fn build_new_replicated_loglet_configuration(
// todo: make the default nodeset size configurable
let replication = previous_configuration
.map(|config| config.replication.clone())
.unwrap_or_else(|| ReplicationProperty::new(NonZeroU8::new(2).expect("to be valid")));
.unwrap_or_else(|| ReplicationProperty::new(NonZeroU8::new(1).expect("to be valid")));

// todo: make nodeset selection strategy configurable
let strategy = NodeSetSelectionStrategy::StrictFaultTolerantGreedy;
Expand Down
126 changes: 24 additions & 102 deletions crates/admin/src/cluster_controller/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ mod tests {
}

struct NodeStateHandler {
persisted_lsn: Arc<AtomicU64>,
// persisted_lsn: Arc<AtomicU64>,
archived_lsn: Arc<AtomicU64>,
// set of node ids for which the handler won't send a response to the caller, this allows to simulate
// dead nodes
Expand All @@ -552,7 +552,7 @@ mod tests {
}

let partition_processor_status = PartitionProcessorStatus {
last_persisted_log_lsn: Some(Lsn::from(self.persisted_lsn.load(Ordering::Relaxed))),
last_persisted_log_lsn: None, // deprecated
last_archived_log_lsn: Some(Lsn::from(self.archived_lsn.load(Ordering::Relaxed))),
..PartitionProcessorStatus::new()
};
Expand Down Expand Up @@ -581,11 +581,11 @@ mod tests {
..Default::default()
};

let persisted_lsn = Arc::new(AtomicU64::new(0));
let _persisted_lsn = Arc::new(AtomicU64::new(0));
let archived_lsn = Arc::new(AtomicU64::new(0));

let get_node_state_handler = Arc::new(NodeStateHandler {
persisted_lsn: Arc::clone(&persisted_lsn),
// persisted_lsn: Arc::clone(&persisted_lsn),
archived_lsn: Arc::clone(&archived_lsn),
block_list: BTreeSet::new(),
});
Expand Down Expand Up @@ -622,122 +622,37 @@ mod tests {
}

tokio::time::sleep(interval_duration * 10).await;

assert_eq!(Lsn::INVALID, bifrost.get_trim_point(LOG_ID).await?);

// report persisted lsn back to cluster controller
persisted_lsn.store(6, Ordering::Relaxed);

tokio::time::sleep(interval_duration * 10).await;
// we delete 1-6.
assert_eq!(Lsn::from(6), bifrost.get_trim_point(LOG_ID).await?);

// increase by 4 more, this should not overcome the threshold
persisted_lsn.store(10, Ordering::Relaxed);

archived_lsn.store(6, Ordering::Relaxed);
tokio::time::sleep(interval_duration * 10).await;
assert_eq!(Lsn::from(6), bifrost.get_trim_point(LOG_ID).await?);

// now we have reached the min threshold wrt to the last trim point
persisted_lsn.store(11, Ordering::Relaxed);

archived_lsn.store(11, Ordering::Relaxed);
tokio::time::sleep(interval_duration * 10).await;
assert_eq!(Lsn::from(11), bifrost.get_trim_point(LOG_ID).await?);

Ok(())
}

#[test(restate_core::test(start_paused = true))]
async fn auto_log_trim_zero_threshold() -> anyhow::Result<()> {
const LOG_ID: LogId = LogId::new(0);
let mut admin_options = AdminOptions::default();
admin_options.log_trim_threshold = 0;
let interval_duration = Duration::from_secs(10);
admin_options.log_trim_interval = Some(interval_duration.into());
let config = Configuration {
admin: admin_options,
..Default::default()
};

let persisted_lsn = Arc::new(AtomicU64::new(0));
let archived_lsn = Arc::new(AtomicU64::new(0));

let get_node_state_handler = Arc::new(NodeStateHandler {
persisted_lsn: Arc::clone(&persisted_lsn),
archived_lsn: Arc::clone(&archived_lsn),
block_list: BTreeSet::new(),
});
let (node_env, bifrost) = create_test_env(config, |builder| {
builder
.add_message_handler(get_node_state_handler.clone())
.add_message_handler(NoOpMessageHandler::<ControlProcessors>::default())
})
.await?;

// simulate a connection from node 2 so we can have a connection between the two
// nodes
let node_2 = MockPeerConnection::connect(
GenerationalNodeId::new(2, 2),
node_env.metadata.nodes_config_version(),
node_env
.metadata
.nodes_config_ref()
.cluster_name()
.to_owned(),
node_env.networking.connection_manager(),
10,
)
.await?;
// let node2 receive messages and use the same message handler as node1
let (_node_2, _node2_reactor) =
node_2.process_with_message_handler(get_node_state_handler)?;

let mut appender = bifrost.create_appender(LOG_ID)?;
for i in 1..=20 {
let lsn = appender.append(format!("record{i}")).await?;
assert_eq!(Lsn::from(i), lsn);
}
tokio::time::sleep(interval_duration * 10).await;
assert_eq!(Lsn::INVALID, bifrost.get_trim_point(LOG_ID).await?);

// report persisted lsn back to cluster controller
persisted_lsn.store(3, Ordering::Relaxed);

tokio::time::sleep(interval_duration * 10).await;
// everything before the persisted_lsn.
assert_eq!(bifrost.get_trim_point(LOG_ID).await?, Lsn::from(3));
// we should be able to after the last persisted lsn
let v = bifrost.read(LOG_ID, Lsn::from(4)).await?.unwrap();
assert_that!(v.sequence_number(), eq(Lsn::new(4)));
assert!(v.is_data_record());
assert_that!(v.decode_unchecked::<String>(), eq("record4".to_owned()));

persisted_lsn.store(20, Ordering::Relaxed);

tokio::time::sleep(interval_duration * 10).await;
assert_eq!(Lsn::from(20), bifrost.get_trim_point(LOG_ID).await?);

Ok(())
}

#[test(restate_core::test(start_paused = true))]
async fn do_not_trim_if_not_all_nodes_report_persisted_lsn() -> anyhow::Result<()> {
async fn do_not_trim_if_no_nodes_report_archived_lsn() -> anyhow::Result<()> {
const LOG_ID: LogId = LogId::new(0);

let mut admin_options = AdminOptions::default();
admin_options.log_trim_threshold = 0;
// admin_options.log_trim_threshold = 0;
let interval_duration = Duration::from_secs(10);
admin_options.log_trim_interval = Some(interval_duration.into());
let config = Configuration {
admin: admin_options,
..Default::default()
};

let persisted_lsn = Arc::new(AtomicU64::new(0));
let _persisted_lsn = Arc::new(AtomicU64::new(0));
let archived_lsn = Arc::new(AtomicU64::new(0));

let (_node_env, bifrost) = create_test_env(config, |builder| {
let black_list = builder
let block_list = builder
.nodes_config
.iter()
.next()
Expand All @@ -746,9 +661,8 @@ mod tests {
.collect();

let get_node_state_handler = NodeStateHandler {
persisted_lsn: Arc::clone(&persisted_lsn),
archived_lsn: Arc::clone(&archived_lsn),
block_list: black_list,
block_list,
};

builder.add_message_handler(get_node_state_handler)
Expand All @@ -760,17 +674,25 @@ mod tests {
let lsn = appender.append(format!("record{i}")).await?;
assert_eq!(Lsn::from(i), lsn);
}

// report persisted lsn back to cluster controller for a subset of the nodes
persisted_lsn.store(5, Ordering::Relaxed);

// archived_lsn.store(5, Ordering::Relaxed);
tokio::time::sleep(interval_duration * 10).await;
// no trimming should have happened because one node did not report the persisted lsn

// no trimming should have happened because no nodes report archived_lsn
assert_eq!(Lsn::INVALID, bifrost.get_trim_point(LOG_ID).await?);

Ok(())
}

#[test(restate_core::test(start_paused = true))]
async fn do_not_trim_if_dead_nodes_present() -> anyhow::Result<()> {
todo!()
}

#[test(restate_core::test(start_paused = true))]
async fn do_not_trim_if_slow_nodes_present() -> anyhow::Result<()> {
todo!()
}

async fn create_test_env<F>(
config: Configuration,
mut modify_builder: F,
Expand Down
Loading

0 comments on commit 92bd1c5

Please sign in to comment.