diff --git a/crates/admin/src/cluster_controller/service.rs b/crates/admin/src/cluster_controller/service.rs index d9560fd68..7cd3c15cc 100644 --- a/crates/admin/src/cluster_controller/service.rs +++ b/crates/admin/src/cluster_controller/service.rs @@ -536,7 +536,7 @@ mod tests { } struct NodeStateHandler { - persisted_lsn: Arc, + // persisted_lsn: Arc, archived_lsn: Arc, // set of node ids for which the handler won't send a response to the caller, this allows to simulate // dead nodes @@ -552,7 +552,7 @@ mod tests { } let partition_processor_status = PartitionProcessorStatus { - last_persisted_log_lsn: Some(Lsn::from(self.persisted_lsn.load(Ordering::Relaxed))), + last_persisted_log_lsn: None, // deprecated last_archived_log_lsn: Some(Lsn::from(self.archived_lsn.load(Ordering::Relaxed))), ..PartitionProcessorStatus::new() }; @@ -581,11 +581,11 @@ mod tests { ..Default::default() }; - let persisted_lsn = Arc::new(AtomicU64::new(0)); + let _persisted_lsn = Arc::new(AtomicU64::new(0)); let archived_lsn = Arc::new(AtomicU64::new(0)); let get_node_state_handler = Arc::new(NodeStateHandler { - persisted_lsn: Arc::clone(&persisted_lsn), + // persisted_lsn: Arc::clone(&persisted_lsn), archived_lsn: Arc::clone(&archived_lsn), block_list: BTreeSet::new(), }); @@ -622,25 +622,13 @@ mod tests { } tokio::time::sleep(interval_duration * 10).await; - assert_eq!(Lsn::INVALID, bifrost.get_trim_point(LOG_ID).await?); - // report persisted lsn back to cluster controller - persisted_lsn.store(6, Ordering::Relaxed); - - tokio::time::sleep(interval_duration * 10).await; - // we delete 1-6. - assert_eq!(Lsn::from(6), bifrost.get_trim_point(LOG_ID).await?); - - // increase by 4 more, this should not overcome the threshold - persisted_lsn.store(10, Ordering::Relaxed); - + archived_lsn.store(6, Ordering::Relaxed); tokio::time::sleep(interval_duration * 10).await; assert_eq!(Lsn::from(6), bifrost.get_trim_point(LOG_ID).await?); - // now we have reached the min threshold wrt to the last trim point - persisted_lsn.store(11, Ordering::Relaxed); - + archived_lsn.store(11, Ordering::Relaxed); tokio::time::sleep(interval_duration * 10).await; assert_eq!(Lsn::from(11), bifrost.get_trim_point(LOG_ID).await?); @@ -648,84 +636,11 @@ mod tests { } #[test(restate_core::test(start_paused = true))] - async fn auto_log_trim_zero_threshold() -> anyhow::Result<()> { - const LOG_ID: LogId = LogId::new(0); - let mut admin_options = AdminOptions::default(); - admin_options.log_trim_threshold = 0; - let interval_duration = Duration::from_secs(10); - admin_options.log_trim_interval = Some(interval_duration.into()); - let config = Configuration { - admin: admin_options, - ..Default::default() - }; - - let persisted_lsn = Arc::new(AtomicU64::new(0)); - let archived_lsn = Arc::new(AtomicU64::new(0)); - - let get_node_state_handler = Arc::new(NodeStateHandler { - persisted_lsn: Arc::clone(&persisted_lsn), - archived_lsn: Arc::clone(&archived_lsn), - block_list: BTreeSet::new(), - }); - let (node_env, bifrost) = create_test_env(config, |builder| { - builder - .add_message_handler(get_node_state_handler.clone()) - .add_message_handler(NoOpMessageHandler::::default()) - }) - .await?; - - // simulate a connection from node 2 so we can have a connection between the two - // nodes - let node_2 = MockPeerConnection::connect( - GenerationalNodeId::new(2, 2), - node_env.metadata.nodes_config_version(), - node_env - .metadata - .nodes_config_ref() - .cluster_name() - .to_owned(), - node_env.networking.connection_manager(), - 10, - ) - .await?; - // let node2 receive messages and use the same message handler as node1 - let (_node_2, _node2_reactor) = - node_2.process_with_message_handler(get_node_state_handler)?; - - let mut appender = bifrost.create_appender(LOG_ID)?; - for i in 1..=20 { - let lsn = appender.append(format!("record{i}")).await?; - assert_eq!(Lsn::from(i), lsn); - } - tokio::time::sleep(interval_duration * 10).await; - assert_eq!(Lsn::INVALID, bifrost.get_trim_point(LOG_ID).await?); - - // report persisted lsn back to cluster controller - persisted_lsn.store(3, Ordering::Relaxed); - - tokio::time::sleep(interval_duration * 10).await; - // everything before the persisted_lsn. - assert_eq!(bifrost.get_trim_point(LOG_ID).await?, Lsn::from(3)); - // we should be able to after the last persisted lsn - let v = bifrost.read(LOG_ID, Lsn::from(4)).await?.unwrap(); - assert_that!(v.sequence_number(), eq(Lsn::new(4))); - assert!(v.is_data_record()); - assert_that!(v.decode_unchecked::(), eq("record4".to_owned())); - - persisted_lsn.store(20, Ordering::Relaxed); - - tokio::time::sleep(interval_duration * 10).await; - assert_eq!(Lsn::from(20), bifrost.get_trim_point(LOG_ID).await?); - - Ok(()) - } - - #[test(restate_core::test(start_paused = true))] - async fn do_not_trim_if_not_all_nodes_report_persisted_lsn() -> anyhow::Result<()> { + async fn do_not_trim_if_no_nodes_report_archived_lsn() -> anyhow::Result<()> { const LOG_ID: LogId = LogId::new(0); let mut admin_options = AdminOptions::default(); - admin_options.log_trim_threshold = 0; + // admin_options.log_trim_threshold = 0; let interval_duration = Duration::from_secs(10); admin_options.log_trim_interval = Some(interval_duration.into()); let config = Configuration { @@ -733,11 +648,11 @@ mod tests { ..Default::default() }; - let persisted_lsn = Arc::new(AtomicU64::new(0)); + let _persisted_lsn = Arc::new(AtomicU64::new(0)); let archived_lsn = Arc::new(AtomicU64::new(0)); let (_node_env, bifrost) = create_test_env(config, |builder| { - let black_list = builder + let block_list = builder .nodes_config .iter() .next() @@ -746,9 +661,8 @@ mod tests { .collect(); let get_node_state_handler = NodeStateHandler { - persisted_lsn: Arc::clone(&persisted_lsn), archived_lsn: Arc::clone(&archived_lsn), - block_list: black_list, + block_list, }; builder.add_message_handler(get_node_state_handler) @@ -760,17 +674,25 @@ mod tests { let lsn = appender.append(format!("record{i}")).await?; assert_eq!(Lsn::from(i), lsn); } - - // report persisted lsn back to cluster controller for a subset of the nodes - persisted_lsn.store(5, Ordering::Relaxed); - + // archived_lsn.store(5, Ordering::Relaxed); tokio::time::sleep(interval_duration * 10).await; - // no trimming should have happened because one node did not report the persisted lsn + + // no trimming should have happened because no nodes report archived_lsn assert_eq!(Lsn::INVALID, bifrost.get_trim_point(LOG_ID).await?); Ok(()) } + #[test(restate_core::test(start_paused = true))] + async fn do_not_trim_if_dead_nodes_present() -> anyhow::Result<()> { + todo!() + } + + #[test(restate_core::test(start_paused = true))] + async fn do_not_trim_if_slow_nodes_present() -> anyhow::Result<()> { + todo!() + } + async fn create_test_env( config: Configuration, mut modify_builder: F, diff --git a/crates/admin/src/cluster_controller/service/state.rs b/crates/admin/src/cluster_controller/service/state.rs index 938a4ac7e..8efe72a97 100644 --- a/crates/admin/src/cluster_controller/service/state.rs +++ b/crates/admin/src/cluster_controller/service/state.rs @@ -8,20 +8,23 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashMap}; +use std::sync::Arc; use futures::future::OptionFuture; use itertools::Itertools; use tokio::sync::watch; use tokio::time; use tokio::time::{Interval, MissedTickBehavior}; -use tracing::{debug, info, warn}; +use tracing::{info, warn}; use restate_bifrost::{Bifrost, BifrostAdmin}; use restate_core::metadata_store::MetadataStoreClient; use restate_core::network::TransportConnect; use restate_core::{my_node_id, Metadata, MetadataWriter}; -use restate_types::cluster::cluster_state::{AliveNode, NodeState}; +use restate_types::cluster::cluster_state::{ + AliveNode, ClusterState, NodeState, PartitionProcessorStatus, +}; use restate_types::config::{AdminOptions, Configuration}; use restate_types::identifiers::PartitionId; use restate_types::live::Live; @@ -147,7 +150,7 @@ pub struct Leader { partition_table: Live, nodes_config: Live, find_logs_tail_interval: Interval, - log_trim_interval: Option, + log_trim_check_interval: Option, logs_controller: LogsController, scheduler: Scheduler, cluster_state_watcher: ClusterStateWatcher, @@ -199,7 +202,7 @@ where partition_table: metadata.updateable_partition_table(), logs: metadata.updateable_logs_metadata(), find_logs_tail_interval, - log_trim_interval, + log_trim_check_interval: log_trim_interval, log_trim_threshold, logs_controller, scheduler, @@ -233,7 +236,7 @@ where } fn reconfigure(&mut self, configuration: &Configuration) { - (self.log_trim_interval, self.log_trim_threshold) = + (self.log_trim_check_interval, self.log_trim_threshold) = create_log_trim_interval(&configuration.admin); } @@ -243,7 +246,7 @@ where _ = self.find_logs_tail_interval.tick() => { self.logs_controller.find_logs_tail(); } - _ = OptionFuture::from(self.log_trim_interval.as_mut().map(|interval| interval.tick())) => { + _ = OptionFuture::from(self.log_trim_check_interval.as_mut().map(|interval| interval.tick())) => { return Ok(LeaderEvent::TrimLogs); } result = self.logs_controller.run_async_operations() => { @@ -302,7 +305,7 @@ where let result = self.trim_logs_inner().await; if let Err(err) = result { - warn!("Could not trim the logs. This can lead to increased disk usage: {err}"); + warn!("Could not trim the logs. This can lead to increased disk usage on log servers: {err}"); } } @@ -315,59 +318,114 @@ where let cluster_state = self.cluster_state_watcher.current(); - let mut persisted_lsns_per_partition: BTreeMap< - PartitionId, - BTreeMap, - > = BTreeMap::default(); - - for node_state in cluster_state.nodes.values() { - match node_state { - NodeState::Alive(AliveNode { - generational_node_id, - partitions, - .. - }) => { - for (partition_id, partition_processor_status) in partitions.iter() { - let lsn = partition_processor_status - .last_persisted_log_lsn - .unwrap_or(Lsn::INVALID); - persisted_lsns_per_partition - .entry(*partition_id) - .or_default() - .insert(*generational_node_id, lsn); - } - } - NodeState::Dead(_) | NodeState::Suspect(_) => { - // nothing to do - } - } + let current_trim_points = get_trim_points(&cluster_state, &bifrost_admin).await?; + let new_trim_points = compute_new_safe_trim_points(cluster_state, ¤t_trim_points); + + for (log_id, (trim_point, partition_id)) in new_trim_points { + info!( + %partition_id, + "Automatic trim log '{log_id}' for all records before='{trim_point}'" + ); + bifrost_admin.trim(log_id, trim_point).await? } - for (partition_id, persisted_lsns) in persisted_lsns_per_partition.into_iter() { - let log_id = LogId::from(partition_id); - - // todo: Remove once Restate nodes can share partition processor snapshots - // only try to trim if we know about the persisted lsns of all known nodes; otherwise we - // risk that a node cannot fully replay the log; this assumes that no new nodes join the - // cluster after the first trimming has happened - if persisted_lsns.len() >= cluster_state.nodes.len() { - let min_persisted_lsn = persisted_lsns.into_values().min().unwrap_or(Lsn::INVALID); - // trim point is before the oldest record - let current_trim_point = bifrost_admin.get_trim_point(log_id).await?; - - if min_persisted_lsn >= current_trim_point + self.log_trim_threshold { - debug!( - "Automatic trim log '{log_id}' for all records before='{min_persisted_lsn}'" - ); - bifrost_admin.trim(log_id, min_persisted_lsn).await? + Ok(()) + } +} + +async fn get_trim_points( + cluster_state: &Arc, + bifrost_admin: &BifrostAdmin<'_>, +) -> Result, restate_bifrost::Error> { + let partition_ids: Vec = cluster_state + .nodes + .values() + .filter_map(|node_state| match node_state { + NodeState::Alive(alive_node) => Some(alive_node), + _ => None, + }) + .flat_map(|node| node.partitions.keys()) + .cloned() + .collect(); + + let mut trim_points = HashMap::new(); + for partition in partition_ids { + let log_id = LogId::from(partition); + let current_trim_point = bifrost_admin.get_trim_point(log_id).await?; + trim_points.insert(log_id, current_trim_point); + } + + Ok(trim_points) +} + +fn compute_new_safe_trim_points( + cluster_state: Arc, + trim_points: &HashMap, +) -> BTreeMap { + let mut partition_statuses: BTreeMap< + PartitionId, + BTreeMap, + > = BTreeMap::new(); + let mut safe_trim_points = BTreeMap::new(); + + for node_state in cluster_state.nodes.values() { + match node_state { + NodeState::Alive(AliveNode { + generational_node_id, + partitions, + .. + }) => { + for (partition_id, partition_processor_status) in partitions.iter() { + partition_statuses + .entry(*partition_id) + .or_default() + .insert(*generational_node_id, partition_processor_status); } - } else { - warn!("Stop automatically trimming log '{log_id}' because not all nodes are running a partition processor applying this log."); + } + NodeState::Dead(_) | NodeState::Suspect(_) => { + // todo(pavel): until we implement trim-gap support (https://github.com/restatedev/restate/issues/2247), + // pause trimming unless we know the status of all nodes dead nodes might come + // back and will be stuck if their applied lsn < trim point. + warn!("Automatic log trimming paused until all nodes are reporting applied LSN status. This can lead to increased disk usage on log servers."); + return safe_trim_points; } } + } - Ok(()) + for (partition_id, processor_status) in partition_statuses.into_iter() { + let log_id = LogId::from(partition_id); + + // We trust that if a single node from the cluster reports a partition's archived LSN, + // that this is accessible to all other nodes that may need it. + // todo(pavel): read snapshot archived LSN from repository, so we can switch to min(archived_lsn) + let archived_lsn = processor_status + .values() + .map(|s| s.last_archived_log_lsn.unwrap_or(Lsn::INVALID)) + .max() + .unwrap_or(Lsn::INVALID); + + let min_applied_lsn = processor_status + .values() + .map(|s| s.last_applied_log_lsn.unwrap_or(Lsn::INVALID)) + .min() + .unwrap_or(Lsn::INVALID); + + let current_trim_point = trim_points.get(&log_id).unwrap_or(&Lsn::INVALID).clone(); + + if archived_lsn == Lsn::INVALID { + warn!("Not trimming log '{log_id}' because no node is reporting a valid archived LSN"); + } else if min_applied_lsn < current_trim_point { + // todo(pavel): remove this check once we implement trim-gap support (https://github.com/restatedev/restate/issues/2247) + warn!( + %partition_id, + "Not trimming log '{log_id}' because some nodes have not applied the log up to the archived LSN" + ); + } else if archived_lsn <= min_applied_lsn && archived_lsn > current_trim_point { + safe_trim_points.insert(log_id, (archived_lsn, partition_id)); + } } + + safe_trim_points } fn create_log_trim_interval(options: &AdminOptions) -> (Option, Lsn) { @@ -381,3 +439,125 @@ fn create_log_trim_interval(options: &AdminOptions) -> (Option, Lsn) { (log_trim_interval, log_trim_threshold) } + +#[cfg(test)] +mod tests { + use std::collections::{BTreeMap, HashMap}; + use std::sync::Arc; + + use crate::cluster_controller::service::state::compute_new_safe_trim_points; + use restate_types::cluster::cluster_state::{ + AliveNode, ClusterState, DeadNode, NodeState, PartitionProcessorStatus, RunMode, + }; + use restate_types::identifiers::PartitionId; + use restate_types::logs::{LogId, Lsn, SequenceNumber}; + use restate_types::time::MillisSinceEpoch; + use restate_types::{GenerationalNodeId, PlainNodeId, Version}; + use RunMode::{Follower, Leader}; + + #[test] + fn test_compute_new_safe_trim_points() { + let p1 = PartitionId::from(0); + let p2 = PartitionId::from(1); + let p3 = PartitionId::from(2); + let p4 = PartitionId::from(3); + + let n1 = GenerationalNodeId::new(1, 0); + let n1_partitions = [ + (p1, processor_status(Leader, applied(10), None)), + (p2, processor_status(Follower, applied(20), None)), + (p3, processor_status(Leader, applied(30), archived(30))), + (p4, processor_status(Follower, applied(40), archived(40))), + ] + .into_iter() + .collect(); + + let n2 = GenerationalNodeId::new(2, 0); + let n2_partitions = [ + (p1, processor_status(Follower, applied(10), None)), + (p2, processor_status(Leader, applied(20), archived(10))), + (p3, processor_status(Follower, applied(10), None)), + (p4, processor_status(Leader, applied(45), None)), + ] + .into_iter() + .collect(); + + let cluster_state = Arc::new(ClusterState { + last_refreshed: None, + nodes_config_version: Version::MIN, + partition_table_version: Version::MIN, + logs_metadata_version: Version::MIN, + nodes: [ + (n1.as_plain(), alive_node(n1, n1_partitions)), + (n2.as_plain(), alive_node(n2, n2_partitions)), + ] + .into(), + }); + + let mut current_trim_points = HashMap::new(); + current_trim_points.insert(LogId::from(p1), Lsn::INVALID); + current_trim_points.insert(LogId::from(p2), Lsn::INVALID); + + let trim_points = compute_new_safe_trim_points(cluster_state.clone(), ¤t_trim_points); + + assert_eq!( + trim_points, + BTreeMap::from([ + // p1 does not report archived LSN - no trim + (LogId::from(p2), (Lsn::new(10), p2)), + // p3 has applied LSN = archived LSN - no trim necessary + (LogId::from(p4), (Lsn::new(40), p4)), + ]) + ); + + let mut nodes = cluster_state.nodes.clone(); + nodes.insert(PlainNodeId::new(3), dead_node()); + let cluster_state = Arc::new(ClusterState { + nodes, + ..*cluster_state + }); + + let trim_points = compute_new_safe_trim_points(cluster_state.clone(), ¤t_trim_points); + + assert!(trim_points.is_empty()); + } + + fn applied(sn: u64) -> Option { + Some(Lsn::new(sn)) + } + + fn archived(sn: u64) -> Option { + Some(Lsn::new(sn)) + } + + fn processor_status( + mode: RunMode, + applied_lsn: Option, + archived_lsn: Option, + ) -> PartitionProcessorStatus { + PartitionProcessorStatus { + planned_mode: mode, + effective_mode: mode, + last_applied_log_lsn: applied_lsn, + last_archived_log_lsn: archived_lsn, + ..PartitionProcessorStatus::default() + } + } + + fn alive_node( + generational_node_id: GenerationalNodeId, + partitions: BTreeMap, + ) -> NodeState { + NodeState::Alive(AliveNode { + generational_node_id, + last_heartbeat_at: MillisSinceEpoch::now(), + partitions, + }) + } + + fn dead_node() -> NodeState { + NodeState::Dead(DeadNode { + last_seen_alive: None, + }) + } +} diff --git a/crates/local-cluster-runner/src/node/mod.rs b/crates/local-cluster-runner/src/node/mod.rs index bd6234eeb..792c680bd 100644 --- a/crates/local-cluster-runner/src/node/mod.rs +++ b/crates/local-cluster-runner/src/node/mod.rs @@ -183,6 +183,10 @@ impl Node { for node in 1..=size { let mut base_config = base_config.clone(); base_config.common.force_node_id = Some(PlainNodeId::new(node)); + base_config.ingress.bind_address = SocketAddr::new( + "127.0.0.1".parse().unwrap(), + 8080 - 1 + u16::try_from(node).unwrap(), + ); // Create a separate ingress role when running a worker let roles = if roles.contains(Role::Worker) { diff --git a/crates/types/src/cluster/cluster_state.rs b/crates/types/src/cluster/cluster_state.rs index b6313ae98..9d3ef48c0 100644 --- a/crates/types/src/cluster/cluster_state.rs +++ b/crates/types/src/cluster/cluster_state.rs @@ -139,6 +139,7 @@ pub struct PartitionProcessorStatus { pub last_record_applied_at: Option, pub num_skipped_records: u64, pub replay_status: ReplayStatus, + // todo(pavel): remove last_persisted_log_lsn? pub last_persisted_log_lsn: Option, pub last_archived_log_lsn: Option, // Set if replay_status is CatchingUp diff --git a/crates/types/src/config/admin.rs b/crates/types/src/config/admin.rs index b0e1deadf..1a717ad24 100644 --- a/crates/types/src/config/admin.rs +++ b/crates/types/src/config/admin.rs @@ -44,6 +44,7 @@ pub struct AdminOptions { #[cfg_attr(feature = "schemars", schemars(with = "String"))] pub heartbeat_interval: humantime::Duration, + // todo(pavel): should we leave this to be exclusively for local loglets, and introduce a new log_trim_check_interval config? /// # Log trim interval /// /// Controls the interval at which cluster controller tries to trim the logs. Log trimming diff --git a/crates/worker/src/partition_processor_manager.rs b/crates/worker/src/partition_processor_manager.rs index 1a0492d89..2f0c371de 100644 --- a/crates/worker/src/partition_processor_manager.rs +++ b/crates/worker/src/partition_processor_manager.rs @@ -9,7 +9,7 @@ // by the Apache License, Version 2.0. mod message_handler; -mod persisted_lsn_watchdog; +// mod persisted_lsn_watchdog; mod processor_state; mod spawn_processor_task; @@ -22,8 +22,8 @@ use std::time::Duration; use futures::stream::{FuturesUnordered, StreamExt}; use metrics::gauge; +use tokio::sync::mpsc; use tokio::sync::oneshot; -use tokio::sync::{mpsc, watch}; use tokio::task::JoinSet; use tokio::time::MissedTickBehavior; use tracing::{debug, error, info, instrument, warn}; @@ -74,7 +74,7 @@ use crate::metric_definitions::PARTITION_TIME_SINCE_LAST_RECORD; use crate::metric_definitions::PARTITION_TIME_SINCE_LAST_STATUS_UPDATE; use crate::partition::snapshots::{SnapshotPartitionTask, SnapshotRepository}; use crate::partition_processor_manager::message_handler::PartitionProcessorManagerMessageHandler; -use crate::partition_processor_manager::persisted_lsn_watchdog::PersistedLogLsnWatchdog; +// use crate::partition_processor_manager::persisted_lsn_watchdog::PersistedLogLsnWatchdog; use crate::partition_processor_manager::processor_state::{ LeaderEpochToken, ProcessorState, StartedProcessor, }; @@ -94,7 +94,7 @@ pub struct PartitionProcessorManager { rx: mpsc::Receiver, tx: mpsc::Sender, - persisted_lsns_rx: Option>>, + // persisted_lsns_rx: Option>>, archived_lsns: HashMap, invokers_status_reader: MultiplexedInvokerStatusReader, pending_control_processors: Option, @@ -193,7 +193,7 @@ impl PartitionProcessorManager { bifrost, rx, tx, - persisted_lsns_rx: None, + // persisted_lsns_rx: None, archived_lsns: HashMap::default(), invokers_status_reader: MultiplexedInvokerStatusReader::default(), pending_control_processors: None, @@ -219,17 +219,17 @@ impl PartitionProcessorManager { pub async fn run(mut self) -> anyhow::Result<()> { let mut shutdown = std::pin::pin!(cancellation_watcher()); - let (persisted_lsns_tx, persisted_lsns_rx) = watch::channel(BTreeMap::default()); - self.persisted_lsns_rx = Some(persisted_lsns_rx); - - let watchdog = PersistedLogLsnWatchdog::new( - self.updateable_config - .clone() - .map(|config| &config.worker.storage), - self.partition_store_manager.clone(), - persisted_lsns_tx, - ); - TaskCenter::spawn_child(TaskKind::Watchdog, "persisted-lsn-watchdog", watchdog.run())?; + // let (persisted_lsns_tx, persisted_lsns_rx) = watch::channel(BTreeMap::default()); + // self.persisted_lsns_rx = Some(persisted_lsns_rx); + + // let watchdog = PersistedLogLsnWatchdog::new( + // self.updateable_config + // .clone() + // .map(|config| &config.worker.storage), + // self.partition_store_manager.clone(), + // persisted_lsns_tx, + // ); + // TaskCenter::spawn_child(TaskKind::Watchdog, "persisted-lsn-watchdog", watchdog.run())?; let metadata = Metadata::current(); let mut logs_version_watcher = metadata.watch(MetadataKind::Logs); @@ -519,7 +519,7 @@ impl PartitionProcessorManager { } fn get_state(&self) -> BTreeMap { - let persisted_lsns = self.persisted_lsns_rx.as_ref().map(|w| w.borrow()); + // let persisted_lsns = self.persisted_lsns_rx.as_ref().map(|w| w.borrow()); // For all running partitions, collect state, enrich it, and send it back. self.processor_states @@ -566,11 +566,9 @@ impl PartitionProcessorManager { .set(last_record_applied_at.elapsed()); } - // it is a bit unfortunate that we share PartitionProcessorStatus between the - // PP and the PPManager :-(. Maybe at some point we want to split the struct for it. - status.last_persisted_log_lsn = persisted_lsns - .as_ref() - .and_then(|lsns| lsns.get(partition_id).cloned()); + // status.last_persisted_log_lsn = persisted_lsns + // .as_ref() + // .and_then(|lsns| lsns.get(partition_id).cloned()); status.last_archived_log_lsn = self.archived_lsns.get(partition_id).cloned();