From 003245a155deef839911e00197cc4cd59f2a617f Mon Sep 17 00:00:00 2001 From: Muhamad Awad Date: Fri, 10 Jan 2025 10:59:25 +0100 Subject: [PATCH] Removing scheduling plan Summary: - Completely remote scheduling plan - Schedular now updates the replication group of each partition directly in the partition table - PartitionRouting uses partition table replication group for routing decisions --- .../admin/src/cluster_controller/scheduler.rs | 700 +++++++----------- .../admin/src/cluster_controller/service.rs | 23 +- .../src/cluster_controller/service/state.rs | 64 +- crates/core/src/partitions.rs | 71 +- crates/core/src/test_env.rs | 20 +- crates/node/src/lib.rs | 3 +- crates/types/src/cluster_controller.rs | 189 ----- crates/types/src/lib.rs | 1 - crates/types/src/metadata_store.rs | 2 - 9 files changed, 365 insertions(+), 708 deletions(-) delete mode 100644 crates/types/src/cluster_controller.rs diff --git a/crates/admin/src/cluster_controller/scheduler.rs b/crates/admin/src/cluster_controller/scheduler.rs index e7ab524be..684dafc7f 100644 --- a/crates/admin/src/cluster_controller/scheduler.rs +++ b/crates/admin/src/cluster_controller/scheduler.rs @@ -9,31 +9,28 @@ // by the Apache License, Version 2.0. use rand::seq::IteratorRandom; -use std::collections::{BTreeMap, BTreeSet}; -use std::time::{Duration, Instant}; +use restate_types::live::Pinned; +use std::collections::BTreeMap; +use std::sync::Arc; use tracing::debug; use xxhash_rust::xxh3::Xxh3Builder; use restate_core::metadata_store::{ - retry_on_network_error, MetadataStoreClient, Precondition, ReadError, ReadWriteError, - WriteError, + MetadataStoreClient, Precondition, ReadError, ReadWriteError, WriteError, }; use restate_core::network::{NetworkSender, Networking, Outgoing, TransportConnect}; -use restate_core::{Metadata, ShutdownError, SyncError, TaskCenter, TaskKind}; -use restate_types::cluster_controller::{ - SchedulingPlan, SchedulingPlanBuilder, TargetPartitionState, -}; -use restate_types::config::Configuration; +use restate_core::{Metadata, MetadataWriter, ShutdownError, SyncError, TaskCenter, TaskKind}; use restate_types::identifiers::PartitionId; -use restate_types::logs::metadata::Logs; use restate_types::logs::LogId; -use restate_types::metadata_store::keys::SCHEDULING_PLAN_KEY; +use restate_types::metadata_store::keys::PARTITION_TABLE_KEY; use restate_types::net::partition_processor_manager::{ ControlProcessor, ControlProcessors, ProcessorCommand, }; use restate_types::nodes_config::NodesConfiguration; -use restate_types::partition_table::{PartitionTable, ReplicationStrategy}; -use restate_types::{NodeId, PlainNodeId, Versioned}; +use restate_types::partition_table::{ + Partition, PartitionTable, ReplicationStrategy, TargetPartitionReplicationState, +}; +use restate_types::{NodeId, PlainNodeId, Version}; use crate::cluster_controller::logs_controller; use crate::cluster_controller::observed_cluster_state::ObservedClusterState; @@ -50,26 +47,14 @@ pub enum Error { MetadataStoreWrite(#[from] WriteError), #[error("failed reading from metadata store: {0}")] MetadataStoreRead(#[from] ReadError), + #[error("failed read/write on metadata store: {0}")] + MetadataStoreReadWrite(#[from] ReadWriteError), #[error("failed syncing metadata: {0}")] Metadata(#[from] SyncError), #[error("system is shutting down")] Shutdown(#[from] ShutdownError), } -enum UpdateOutcome { - Written(T), - NewerVersionFound(T), -} - -impl UpdateOutcome { - fn into_inner(self) -> T { - match self { - UpdateOutcome::Written(value) => value, - UpdateOutcome::NewerVersionFound(value) => value, - } - } -} - /// Placement hints for the [`Scheduler`]. The hints can specify which nodes should be chosen for /// the partition processor placement and on which node the leader should run. pub trait PartitionProcessorPlacementHints { @@ -89,9 +74,8 @@ impl PartitionProcessorPlacementHints for & } pub struct Scheduler { - scheduling_plan: SchedulingPlan, - last_updated_scheduling_plan: Instant, metadata_store_client: MetadataStoreClient, + metadata_writer: MetadataWriter, networking: Networking, } @@ -100,36 +84,20 @@ pub struct Scheduler { /// store and then driving the observed cluster state to the target state (represented by the /// scheduling plan). impl Scheduler { - pub async fn init( - configuration: &Configuration, - metadata_store_client: MetadataStoreClient, - networking: Networking, - ) -> Result { - let scheduling_plan = retry_on_network_error( - configuration.common.network_error_retry_policy.clone(), - || { - metadata_store_client - .get_or_insert(SCHEDULING_PLAN_KEY.clone(), SchedulingPlan::default) - }, - ) - .await?; - - Ok(Self { - scheduling_plan, - last_updated_scheduling_plan: Instant::now(), - metadata_store_client, + pub fn new(metadata_writer: MetadataWriter, networking: Networking) -> Self { + Self { + metadata_store_client: metadata_writer.metadata_store_client().clone(), + metadata_writer, networking, - }) + } } pub async fn on_observed_cluster_state( &mut self, observed_cluster_state: &ObservedClusterState, - replication_strategy: ReplicationStrategy, nodes_config: &NodesConfiguration, placement_hints: impl PartitionProcessorPlacementHints, ) -> Result<(), Error> { - // todo: Only update scheduling plan on observed cluster changes? let alive_workers = observed_cluster_state .alive_nodes .keys() @@ -137,13 +105,9 @@ impl Scheduler { .filter(|node_id| nodes_config.has_worker_role(node_id)) .collect(); - self.update_scheduling_plan( - &alive_workers, - replication_strategy, - nodes_config, - placement_hints, - ) - .await?; + self.update_scheduling_plan(&alive_workers, nodes_config, placement_hints) + .await?; + self.instruct_nodes(observed_cluster_state)?; Ok(()) @@ -153,303 +117,222 @@ impl Scheduler { // nothing to do since we don't make time based scheduling decisions yet } - pub async fn on_logs_update( - &mut self, - logs: &Logs, - partition_table: &PartitionTable, - ) -> Result<(), Error> { - let mut builder = self.scheduling_plan.clone().into_builder(); - - loop { - // add partitions to the scheduling plan for which we have provisioned the logs - for (log_id, _) in logs.iter() { - let partition_id = (*log_id).into(); - - // add the partition to the scheduling plan if we aren't already scheduling it - if !builder.contains_partition(&partition_id) { - // check whether the provisioned log is actually needed - if let Some(partition) = partition_table.get_partition(&partition_id) { - builder.insert_partition( - partition_id, - TargetPartitionState::new(partition.key_range.clone()), - ) - } - } - } - - if let Some(scheduling_plan) = builder.build_if_modified() { - let scheduling_plan = self.try_update_scheduling_plan(scheduling_plan).await?; - match scheduling_plan { - UpdateOutcome::Written(scheduling_plan) => { - self.assign_scheduling_plan(scheduling_plan); - break; - } - UpdateOutcome::NewerVersionFound(scheduling_plan) => { - self.assign_scheduling_plan(scheduling_plan); - builder = self.scheduling_plan.clone().into_builder(); - } - } - } else { - break; - } - } - - Ok(()) - } - async fn update_scheduling_plan( &mut self, alive_workers: &HashSet, - replication_strategy: ReplicationStrategy, nodes_config: &NodesConfiguration, placement_hints: impl PartitionProcessorPlacementHints, ) -> Result<(), Error> { - // todo temporary band-aid to ensure convergence of multiple schedulers. Remove once we - // accept equivalent configurations and remove persisting of the SchedulingPlan - if self.last_updated_scheduling_plan.elapsed() > Duration::from_secs(10) { - let new_scheduling_plan = self.fetch_scheduling_plan().await?; - - if new_scheduling_plan.version() > self.scheduling_plan.version() { - debug!( - "Found a newer scheduling plan in the metadata store. Updating to version {}.", - new_scheduling_plan.version() - ); - } - - self.assign_scheduling_plan(new_scheduling_plan); + let logs = Metadata::with_current(|m| m.logs_ref()); + let partition_table = Metadata::with_current(|m| m.partition_table_ref()); + + if logs.num_logs() != partition_table.num_partitions() as usize { + // either the partition table or the logs are not fully initialized + // hence there is nothing we can do atm. + // we need to wait until both partitions and logs are created + return Ok(()); } - let mut builder = self.scheduling_plan.clone().into_builder(); + let version = partition_table.version(); - self.ensure_replication( - &mut builder, - alive_workers, - replication_strategy, - nodes_config, - &placement_hints, - ); - self.ensure_leadership(&mut builder, placement_hints); + // todo(azmy): avoid cloning the partition table every time by keeping + // the latest built always available as a field + let mut builder = partition_table.clone().into_builder(); + let replication_strategy = builder.replication_strategy(); - if let Some(scheduling_plan) = builder.build_if_modified() { - let scheduling_plan = self - .try_update_scheduling_plan(scheduling_plan) - .await? - .into_inner(); + builder.for_each(|partition_id, target_state| { + self.ensure_replication( + partition_id, + target_state, + alive_workers, + replication_strategy, + nodes_config, + &placement_hints, + ); + + self.ensure_leadership(partition_id, target_state, &placement_hints); + }); + + if let Some(partition_table) = builder.build_if_modified() { + debug!("Updated partition table placement: {partition_table:?}"); + self.try_update_partition_table(version, partition_table) + .await?; - debug!("Updated scheduling plan: {scheduling_plan:?}"); - self.assign_scheduling_plan(scheduling_plan); + return Ok(()); } Ok(()) } - fn assign_scheduling_plan(&mut self, scheduling_plan: SchedulingPlan) { - self.scheduling_plan = scheduling_plan; - self.last_updated_scheduling_plan = Instant::now(); - } - - async fn try_update_scheduling_plan( + async fn try_update_partition_table( &self, - scheduling_plan: SchedulingPlan, - ) -> Result, Error> { + version: Version, + partition_table: PartitionTable, + ) -> Result<(), Error> { match self .metadata_store_client .put( - SCHEDULING_PLAN_KEY.clone(), - &scheduling_plan, - Precondition::MatchesVersion(self.scheduling_plan.version()), + PARTITION_TABLE_KEY.clone(), + &partition_table, + Precondition::MatchesVersion(version), ) .await { - Ok(_) => Ok(UpdateOutcome::Written(scheduling_plan)), - Err(err) => match err { - WriteError::FailedPrecondition(_) => { - // There was a concurrent modification of the scheduling plan. Fetch the latest version. - let scheduling_plan = self.fetch_scheduling_plan().await?; - Ok(UpdateOutcome::NewerVersionFound(scheduling_plan)) - } - err => Err(err.into()), - }, + Ok(_) => {} + Err(WriteError::FailedPrecondition(msg)) => { + debug!("Partition table update failed due to: {msg}"); + } + Err(err) => return Err(err.into()), } - } - async fn fetch_scheduling_plan(&self) -> Result { - self.metadata_store_client - .get(SCHEDULING_PLAN_KEY.clone()) - .await - .map(|scheduling_plan| scheduling_plan.expect("must be present")) + self.metadata_writer + .update(Arc::new(partition_table)) + .await?; + Ok(()) } - fn ensure_replication( + fn ensure_replication( &self, - scheduling_plan_builder: &mut SchedulingPlanBuilder, + partition_id: &PartitionId, + target_state: &mut TargetPartitionReplicationState, alive_workers: &HashSet, replication_strategy: ReplicationStrategy, nodes_config: &NodesConfiguration, - placement_hints: impl PartitionProcessorPlacementHints, + placement_hints: &H, ) { - let partition_ids: Vec<_> = scheduling_plan_builder.partition_ids().cloned().collect(); - let mut rng = rand::thread_rng(); + target_state + .node_set + .retain(|node_id| alive_workers.contains(node_id)); + + match replication_strategy { + ReplicationStrategy::OnAllNodes => { + // The extend will only add the new nodes that + // don't exist in the node set. + // the retain done above will make sure alive nodes in the set + // will keep there initial order. + target_state.node_set.extend(alive_workers.iter()); + } + ReplicationStrategy::Factor(replication_factor) => { + let replication_factor = + usize::try_from(replication_factor.get()).expect("u32 should fit into usize"); - for partition_id in &partition_ids { - scheduling_plan_builder.modify_partition(partition_id, |target_state| { - let mut modified = false; + if target_state.node_set.len() == replication_factor { + return; + } - match replication_strategy { - ReplicationStrategy::OnAllNodes => { - if target_state.node_set != *alive_workers { - target_state.node_set.clone_from(alive_workers); - modified = true; - } - } - ReplicationStrategy::Factor(replication_factor) => { - // only retain alive nodes => remove dead ones - target_state.node_set.retain(|node| { - let result = alive_workers.contains(node); - modified |= !result; - result + let preferred_worker_nodes = placement_hints + .preferred_nodes(partition_id) + .filter(|node_id| nodes_config.has_worker_role(node_id)); + let preferred_leader = + placement_hints + .preferred_leader(partition_id) + .and_then(|node_id| { + if alive_workers.contains(&node_id) { + Some(node_id) + } else { + None + } }); - let replication_factor = usize::try_from(replication_factor.get()) - .expect("u32 should fit into usize"); + // if we are under replicated and have other alive nodes available + if target_state.node_set.len() < replication_factor + && target_state.node_set.len() < alive_workers.len() + { + if let Some(preferred_leader) = preferred_leader { + target_state.node_set.insert(preferred_leader); + } - if target_state.node_set.len() == replication_factor { - return modified; - } + // todo: Implement cleverer strategies + // randomly choose from the preferred workers nodes first + let new_nodes = preferred_worker_nodes + .filter(|node_id| !target_state.node_set.contains(*node_id)) + .choose_multiple( + &mut rng, + replication_factor - target_state.node_set.len(), + ); - let preferred_worker_nodes = placement_hints - .preferred_nodes(partition_id) - .filter(|node_id| nodes_config.has_worker_role(node_id)); - let preferred_leader = placement_hints - .preferred_leader(partition_id) - .and_then(|node_id| { - if alive_workers.contains(&node_id) { - Some(node_id) - } else { - None - } - }); - - // if we are under replicated and have other alive nodes available - if target_state.node_set.len() < replication_factor - && target_state.node_set.len() < alive_workers.len() - { - if let Some(preferred_leader) = preferred_leader { - modified |= !target_state.node_set.contains(&preferred_leader); - target_state.node_set.insert(preferred_leader); - } + target_state.node_set.extend(new_nodes); - // todo: Implement cleverer strategies - // randomly choose from the preferred workers nodes first - let new_nodes = preferred_worker_nodes - .filter(|node_id| !target_state.node_set.contains(node_id)) - .choose_multiple( - &mut rng, - replication_factor - target_state.node_set.len(), - ); - - modified |= !new_nodes.is_empty(); - target_state.node_set.extend(new_nodes); - - if target_state.node_set.len() < replication_factor { - // randomly choose from the remaining worker nodes - let new_nodes = alive_workers - .iter() - .filter(|node| !target_state.node_set.contains(*node)) - .cloned() - .choose_multiple( - &mut rng, - replication_factor - target_state.node_set.len(), - ); - - modified |= !new_nodes.is_empty(); - target_state.node_set.extend(new_nodes); - } - } else if target_state.node_set.len() > replication_factor { - let preferred_worker_nodes: HashSet = - preferred_worker_nodes.cloned().collect(); - - // first remove the not preferred nodes - for node_id in target_state - .node_set - .iter() - .filter(|node_id| { - !preferred_worker_nodes.contains(node_id) - && Some(**node_id) != preferred_leader - }) - .cloned() - .choose_multiple( - &mut rng, - target_state.node_set.len() - replication_factor, - ) - { - target_state.node_set.remove(&node_id); - modified = true; - } + if target_state.node_set.len() < replication_factor { + // randomly choose from the remaining worker nodes + let new_nodes = alive_workers + .iter() + .filter(|node| !target_state.node_set.contains(*node)) + .cloned() + .choose_multiple( + &mut rng, + replication_factor - target_state.node_set.len(), + ); - if target_state.node_set.len() > replication_factor { - for node_id in target_state - .node_set - .iter() - .filter(|node_id| Some(**node_id) != preferred_leader) - .cloned() - .choose_multiple( - &mut rng, - target_state.node_set.len() - replication_factor, - ) - { - target_state.node_set.remove(&node_id); - modified = true; - } - } - } + target_state.node_set.extend(new_nodes); + } + } else if target_state.node_set.len() > replication_factor { + let preferred_worker_nodes: HashSet = + preferred_worker_nodes.cloned().collect(); + + // first remove the not preferred nodes + for node_id in target_state + .node_set + .iter() + .filter(|node_id| { + !preferred_worker_nodes.contains(node_id) + && Some(**node_id) != preferred_leader + }) + .cloned() + .choose_multiple(&mut rng, target_state.node_set.len() - replication_factor) + { + target_state.node_set.shift_remove(&node_id); } - } - // check if the leader is still part of the node set; if not, then clear leader field - if let Some(leader) = target_state.leader.as_ref() { - if !target_state.node_set.contains(leader) { - target_state.leader = None; - modified = true; + if target_state.node_set.len() > replication_factor { + for node_id in target_state + .node_set + .iter() + .filter(|node_id| Some(**node_id) != preferred_leader) + .cloned() + .choose_multiple( + &mut rng, + target_state.node_set.len() - replication_factor, + ) + { + target_state.node_set.shift_remove(&node_id); + } } } + } + } - modified - }) + // check if the leader is still part of the node set; if not, then clear leader field + if let Some(leader) = target_state.leader.as_ref() { + if !target_state.node_set.contains(leader) { + target_state.leader = None; + } } } - fn ensure_leadership( + fn ensure_leadership( &self, - scheduling_plan_builder: &mut SchedulingPlanBuilder, - placement_hints: impl PartitionProcessorPlacementHints, + partition_id: &PartitionId, + target_state: &mut TargetPartitionReplicationState, + placement_hints: &H, ) { - let partition_ids: Vec<_> = scheduling_plan_builder.partition_ids().cloned().collect(); - for partition_id in partition_ids { - scheduling_plan_builder.modify_partition(&partition_id, |target_state| { - let preferred_leader = placement_hints.preferred_leader(&partition_id); - if target_state.leader.is_none() { - target_state.leader = - self.select_leader_from(&target_state.node_set, preferred_leader); - // check whether we modified the leader - return target_state.leader.is_some(); - } else if preferred_leader.is_some_and(|preferred_leader| { - Some(preferred_leader) != target_state.leader - && target_state.node_set.contains(&preferred_leader) - }) { - target_state.leader = preferred_leader; - return true; - } - - false - }) + let preferred_leader = placement_hints.preferred_leader(partition_id); + + if target_state.leader.is_none() { + target_state.leader = self.select_leader_from(target_state, preferred_leader); + // check whether we modified the leader + } else if preferred_leader.is_some_and(|preferred_leader| { + Some(preferred_leader) != target_state.leader + && target_state.node_set.contains(&preferred_leader) + }) { + target_state.leader = preferred_leader; } } fn select_leader_from( &self, - leader_candidates: &HashSet, + leader_candidates: &TargetPartitionReplicationState, preferred_leader: Option, ) -> Option { // todo: Implement leader balancing between nodes @@ -462,14 +345,14 @@ impl Scheduler { } fn instruct_nodes(&self, observed_cluster_state: &ObservedClusterState) -> Result<(), Error> { - let mut partitions: BTreeSet<_> = self.scheduling_plan.partition_ids().cloned().collect(); - partitions.extend(observed_cluster_state.partitions.keys().cloned()); + let partition_table = Metadata::with_current(|m| m.partition_table_ref()); let mut commands = BTreeMap::default(); - for partition_id in &partitions { + for (partition_id, partition) in partition_table.partitions() { self.generate_instructions_for_partition( partition_id, + partition, observed_cluster_state, &mut commands, ); @@ -509,10 +392,10 @@ impl Scheduler { fn generate_instructions_for_partition( &self, partition_id: &PartitionId, + partition: &Partition, observed_cluster_state: &ObservedClusterState, commands: &mut BTreeMap>, ) { - let target_state = self.scheduling_plan.get(partition_id); // todo: Avoid cloning of node_set if this becomes measurable let mut observed_state = observed_cluster_state .partitions @@ -520,17 +403,18 @@ impl Scheduler { .map(|state| state.partition_processors.clone()) .unwrap_or_default(); - if let Some(target_state) = target_state { - for (node_id, run_mode) in target_state.iter() { - if !observed_state - .remove(&node_id) - .is_some_and(|observed_run_mode| observed_run_mode == run_mode) - { - commands.entry(node_id).or_default().push(ControlProcessor { + for (node_id, run_mode) in partition.replica_group.iter() { + if !observed_state + .remove(node_id) + .is_some_and(|observed_run_mode| observed_run_mode == run_mode) + { + commands + .entry(*node_id) + .or_default() + .push(ControlProcessor { partition_id: *partition_id, command: ProcessorCommand::from(run_mode), }); - } } } @@ -549,31 +433,25 @@ impl Scheduler { /// Placement hints for the [`logs_controller::LogsController`] based on the current /// [`SchedulingPlan`]. -pub struct SchedulingPlanNodeSetSelectorHints<'a> { - scheduling_plan: Option<&'a SchedulingPlan>, +pub struct PartitionTableNodeSetSelectorHints { + partition_table: Pinned, } -impl<'a, T> From<&'a Scheduler> for SchedulingPlanNodeSetSelectorHints<'a> { - fn from(value: &'a Scheduler) -> Self { +impl From> for PartitionTableNodeSetSelectorHints { + fn from(value: Pinned) -> Self { Self { - scheduling_plan: Some(&value.scheduling_plan), + partition_table: value, } } } -impl<'a> From> for SchedulingPlanNodeSetSelectorHints<'a> { - fn from(scheduling_plan: Option<&'a SchedulingPlan>) -> Self { - Self { scheduling_plan } - } -} - -impl<'a> logs_controller::NodeSetSelectorHints for SchedulingPlanNodeSetSelectorHints<'a> { +impl logs_controller::NodeSetSelectorHints for PartitionTableNodeSetSelectorHints { fn preferred_sequencer(&self, log_id: &LogId) -> Option { let partition_id = PartitionId::from(*log_id); - self.scheduling_plan - .and_then(|p| p.get(&partition_id)) - .and_then(|target_state| target_state.leader.map(Into::into)) + self.partition_table + .get_partition(&partition_id) + .and_then(|partition| partition.replica_group.first().cloned().map(NodeId::from)) } } @@ -585,6 +463,7 @@ mod tests { use http::Uri; use rand::prelude::ThreadRng; use rand::Rng; + use restate_types::metadata_store::keys::PARTITION_TABLE_KEY; use std::collections::BTreeMap; use std::iter; use std::num::NonZero; @@ -603,19 +482,16 @@ mod tests { use restate_types::cluster::cluster_state::{ AliveNode, ClusterState, DeadNode, NodeState, PartitionProcessorStatus, RunMode, }; - use restate_types::cluster_controller::{ - SchedulingPlan, SchedulingPlanBuilder, TargetPartitionState, - }; - use restate_types::config::Configuration; - use restate_types::identifiers::{PartitionId, PartitionKey}; - use restate_types::metadata_store::keys::SCHEDULING_PLAN_KEY; + use restate_types::identifiers::PartitionId; use restate_types::net::codec::WireDecode; use restate_types::net::partition_processor_manager::{ControlProcessors, ProcessorCommand}; use restate_types::net::{AdvertisedAddress, TargetName}; use restate_types::nodes_config::{ LogServerConfig, NodeConfig, NodesConfiguration, Role, StorageState, }; - use restate_types::partition_table::{PartitionTable, ReplicationStrategy}; + use restate_types::partition_table::{ + PartitionTable, PartitionTableBuilder, ReplicationStrategy, + }; use restate_types::time::MillisSinceEpoch; use restate_types::{GenerationalNodeId, PlainNodeId, Version}; @@ -635,39 +511,32 @@ mod tests { } #[test(restate_core::test)] - async fn empty_leadership_changes_dont_modify_plan() -> googletest::Result<()> { + async fn empty_leadership_changes_donot_modify_plan() -> googletest::Result<()> { let test_env = TestCoreEnv::create_with_single_node(0, 0).await; let metadata_store_client = test_env.metadata_store_client.clone(); + let metadata_writer = test_env.metadata_writer.clone(); let networking = test_env.networking.clone(); - let initial_scheduling_plan = metadata_store_client - .get::(SCHEDULING_PLAN_KEY.clone()) - .await - .expect("scheduling plan"); - let mut scheduler = Scheduler::init( - Configuration::pinned().as_ref(), - metadata_store_client.clone(), - networking, - ) - .await?; + let initial_partition_table = test_env.metadata.partition_table_ref(); + + let mut scheduler = Scheduler::new(metadata_writer, networking); let observed_cluster_state = ObservedClusterState::default(); - let replication_strategy = ReplicationStrategy::OnAllNodes; scheduler .on_observed_cluster_state( &observed_cluster_state, - replication_strategy, &Metadata::with_current(|m| m.nodes_config_ref()), NoPlacementHints, ) .await?; - let scheduling_plan = metadata_store_client - .get::(SCHEDULING_PLAN_KEY.clone()) + let partition_table = metadata_store_client + .get::(PARTITION_TABLE_KEY.clone()) .await - .expect("scheduling plan"); + .expect("partition table") + .unwrap(); - assert_eq!(initial_scheduling_plan, scheduling_plan); + assert_eq!(*initial_partition_table, partition_table); Ok(()) } @@ -739,25 +608,23 @@ mod tests { }) .boxed(); - let partition_table = - PartitionTable::with_equally_sized_partitions(Version::MIN, num_partitions); - let initial_scheduling_plan = SchedulingPlan::from(&partition_table); + let mut partition_table_builder = + PartitionTable::with_equally_sized_partitions(Version::MIN, num_partitions) + .into_builder(); + partition_table_builder.set_replication_strategy(replication_strategy); + let partition_table = partition_table_builder.build(); + let metadata_store_client = builder.metadata_store_client.clone(); + let metadata_writer = builder.metadata_writer.clone(); let networking = builder.networking.clone(); let _env = builder .set_nodes_config(nodes_config.clone()) .set_partition_table(partition_table.clone()) - .set_scheduling_plan(initial_scheduling_plan) .build() .await; - let mut scheduler = Scheduler::init( - Configuration::pinned().as_ref(), - metadata_store_client.clone(), - networking, - ) - .await?; + let mut scheduler = Scheduler::new(metadata_writer, networking); let mut observed_cluster_state = ObservedClusterState::default(); for _ in 0..num_scheduling_rounds { @@ -767,7 +634,6 @@ mod tests { scheduler .on_observed_cluster_state( &observed_cluster_state, - replication_strategy, &Metadata::with_current(|m| m.nodes_config_ref()), NoPlacementHints, ) @@ -781,15 +647,15 @@ mod tests { let observed_cluster_state = derive_observed_cluster_state(&cluster_state, control_messages); - let target_scheduling_plan = metadata_store_client - .get::(SCHEDULING_PLAN_KEY.clone()) + let target_partition_table = metadata_store_client + .get::(PARTITION_TABLE_KEY.clone()) .await? .expect("the scheduler should have created a scheduling plan"); // assert that the effective scheduling plan aligns with the target scheduling plan assert_that!( observed_cluster_state, - matches_scheduling_plan(&target_scheduling_plan) + matches_scheduling_plan(&target_partition_table) ); let alive_nodes: HashSet<_> = cluster_state @@ -797,16 +663,17 @@ mod tests { .map(|node| node.generational_node_id.as_plain()) .collect(); - for (_, target_state) in target_scheduling_plan.iter() { + for (_, partition) in target_partition_table.partitions() { + let target_state = partition.replica_group.clone().into_target_state(); // assert that the replication strategy was respected match replication_strategy { ReplicationStrategy::OnAllNodes => { // assert that every partition has a leader which is part of the alive nodes set + assert!(target_state.contains_all(&alive_nodes)); + assert!(target_state .leader .is_some_and(|leader| alive_nodes.contains(&leader))); - - assert_eq!(target_state.node_set, alive_nodes); } ReplicationStrategy::Factor(replication_factor) => { // assert that every partition has a leader which is part of the alive nodes set @@ -831,53 +698,58 @@ mod tests { #[test(restate_core::test)] async fn handle_too_few_placed_partition_processors() -> googletest::Result<()> { - let mut scheduling_plan_builder = SchedulingPlanBuilder::default(); let num_partition_processors = NonZero::new(2).expect("non-zero"); - let mut target_partition_state = TargetPartitionState::new(0..=PartitionKey::MAX); - // add one too few nodes to the target state - target_partition_state.add_node(PlainNodeId::from(0), true); + let mut partition_table_builder = + PartitionTable::with_equally_sized_partitions(Version::MIN, 2).into_builder(); - let partition_id = PartitionId::from(0); - scheduling_plan_builder.insert_partition(partition_id, target_partition_state); + partition_table_builder.for_each(|_, target_state| { + target_state.node_set.extend([PlainNodeId::from(0)]); + }); - let scheduling_plan = run_ensure_replication_test( - scheduling_plan_builder, + let partition_table = run_ensure_replication_test( + partition_table_builder, ReplicationStrategy::Factor(num_partition_processors), ) .await?; - let partition = scheduling_plan.get(&partition_id).expect("must be present"); + let partition = partition_table + .get_partition(&PartitionId::from(0)) + .expect("must be present"); assert_eq!( - partition.node_set.len(), + partition.replica_group.len(), num_partition_processors.get() as usize ); + Ok(()) } #[test(restate_core::test)] async fn handle_too_many_placed_partition_processors() -> googletest::Result<()> { - let mut scheduling_plan_builder = SchedulingPlanBuilder::default(); let num_partition_processors = NonZero::new(2).expect("non-zero"); - let mut target_partition_state = TargetPartitionState::new(0..=PartitionKey::MAX); - // add one too many nodes to the target state - target_partition_state.add_node(PlainNodeId::from(0), true); - target_partition_state.add_node(PlainNodeId::from(1), false); - target_partition_state.add_node(PlainNodeId::from(2), false); + let mut partition_table_builder = + PartitionTable::with_equally_sized_partitions(Version::MIN, 2).into_builder(); - let partition_id = PartitionId::from(0); - scheduling_plan_builder.insert_partition(partition_id, target_partition_state); + partition_table_builder.for_each(|_, target_state| { + target_state.node_set.extend([ + PlainNodeId::from(0), + PlainNodeId::from(1), + PlainNodeId::from(2), + ]); + }); - let scheduling_plan = run_ensure_replication_test( - scheduling_plan_builder, + let partition_table = run_ensure_replication_test( + partition_table_builder, ReplicationStrategy::Factor(num_partition_processors), ) .await?; - let partition = scheduling_plan.get(&partition_id).expect("must be present"); + let partition = partition_table + .get_partition(&PartitionId::from(0)) + .expect("must be present"); assert_eq!( - partition.node_set.len(), + partition.replica_group.len(), num_partition_processors.get() as usize ); @@ -885,17 +757,12 @@ mod tests { } async fn run_ensure_replication_test( - mut scheduling_plan_builder: SchedulingPlanBuilder, + mut partition_table_builder: PartitionTableBuilder, replication_strategy: ReplicationStrategy, - ) -> googletest::Result { + ) -> googletest::Result { let env = TestCoreEnv::create_with_single_node(0, 0).await; - let scheduler = Scheduler::init( - &Configuration::pinned(), - env.metadata_store_client.clone(), - env.networking.clone(), - ) - .await?; + let scheduler = Scheduler::new(env.metadata_writer.clone(), env.networking.clone()); let alive_workers = vec![ PlainNodeId::from(0), PlainNodeId::from(1), @@ -908,41 +775,46 @@ mod tests { .build() .nodes_config; - scheduler.ensure_replication( - &mut scheduling_plan_builder, - &alive_workers, - replication_strategy, - &nodes_config, - NoPlacementHints, - ); + partition_table_builder.for_each(|partition_id, target_state| { + scheduler.ensure_replication( + partition_id, + target_state, + &alive_workers, + replication_strategy, + &nodes_config, + &NoPlacementHints, + ); + }); - Ok(scheduling_plan_builder.build()) + Ok(partition_table_builder.build()) } - fn matches_scheduling_plan(scheduling_plan: &SchedulingPlan) -> SchedulingPlanMatcher<'_> { - SchedulingPlanMatcher { scheduling_plan } + fn matches_scheduling_plan(scheduling_plan: &PartitionTable) -> SchedulingPlanMatcher<'_> { + SchedulingPlanMatcher { + partition_table: scheduling_plan, + } } struct SchedulingPlanMatcher<'a> { - scheduling_plan: &'a SchedulingPlan, + partition_table: &'a PartitionTable, } impl<'a> Matcher for SchedulingPlanMatcher<'a> { type ActualT = ObservedClusterState; fn matches(&self, actual: &Self::ActualT) -> MatcherResult { - if actual.partitions.len() != self.scheduling_plan.partitions().len() { + if actual.partitions.len() != self.partition_table.num_partitions() as usize { return MatcherResult::NoMatch; } - for (partition_id, target_state) in self.scheduling_plan.iter() { + for (partition_id, partition) in self.partition_table.partitions() { if let Some(observed_state) = actual.partitions.get(partition_id) { - if observed_state.partition_processors.len() != target_state.node_set.len() { + if observed_state.partition_processors.len() != partition.replica_group.len() { return MatcherResult::NoMatch; } - for (node_id, run_mode) in target_state.iter() { - if observed_state.partition_processors.get(&node_id) != Some(&run_mode) { + for (node_id, run_mode) in partition.replica_group.iter() { + if observed_state.partition_processors.get(node_id) != Some(&run_mode) { return MatcherResult::NoMatch; } } @@ -959,13 +831,13 @@ mod tests { MatcherResult::Match => { format!( "should reflect the scheduling plan {:?}", - self.scheduling_plan + self.partition_table ) } MatcherResult::NoMatch => { format!( "does not reflect the scheduling plan {:?}", - self.scheduling_plan + self.partition_table ) } } diff --git a/crates/admin/src/cluster_controller/service.rs b/crates/admin/src/cluster_controller/service.rs index d13ce53a6..e9d122b76 100644 --- a/crates/admin/src/cluster_controller/service.rs +++ b/crates/admin/src/cluster_controller/service.rs @@ -23,13 +23,10 @@ use tonic::codec::CompressionEncoding; use tracing::{debug, info}; use restate_metadata_store::ReadModifyWriteError; -use restate_types::cluster_controller::SchedulingPlan; use restate_types::logs::metadata::{ LogletParams, Logs, LogsConfiguration, ProviderConfiguration, ProviderKind, SegmentIndex, }; -use restate_types::metadata_store::keys::{ - BIFROST_CONFIG_KEY, PARTITION_TABLE_KEY, SCHEDULING_PLAN_KEY, -}; +use restate_types::metadata_store::keys::{BIFROST_CONFIG_KEY, PARTITION_TABLE_KEY}; use restate_types::partition_table::{ self, PartitionTable, PartitionTableBuilder, ReplicationStrategy, }; @@ -62,7 +59,7 @@ use super::grpc_svc_handler::ClusterCtrlSvcHandler; use super::protobuf::cluster_ctrl_svc_server::ClusterCtrlSvcServer; use crate::cluster_controller::logs_controller::{self, NodeSetSelectorHints}; use crate::cluster_controller::observed_cluster_state::ObservedClusterState; -use crate::cluster_controller::scheduler::SchedulingPlanNodeSetSelectorHints; +use crate::cluster_controller::scheduler::PartitionTableNodeSetSelectorHints; #[derive(Debug, thiserror::Error, CodedError)] pub enum Error { @@ -326,7 +323,7 @@ impl Service { } result = state.run() => { let leader_event = result?; - state.on_leader_event(leader_event).await?; + state.on_leader_event(&self.observed_cluster_state, leader_event).await?; } _ = &mut shutdown => { self.health_status.update(AdminStatus::Unknown); @@ -489,7 +486,6 @@ impl Service { extension, min_version, bifrost: self.bifrost.clone(), - metadata_writer: self.metadata_writer.clone(), observed_cluster_state: self.observed_cluster_state.clone(), }; @@ -636,7 +632,6 @@ struct SealAndExtendTask { min_version: Version, extension: Option, bifrost: Bifrost, - metadata_writer: MetadataWriter, observed_cluster_state: ObservedClusterState, } @@ -712,20 +707,16 @@ impl SealAndExtendTask { ), #[cfg(feature = "replicated-loglet")] ProviderConfiguration::Replicated(config) => { - let schedule_plan = self - .metadata_writer - .metadata_store_client() - .get::(SCHEDULING_PLAN_KEY.clone()) - .await?; - let loglet_params = logs_controller::build_new_replicated_loglet_configuration( config, loglet_id.next(), &Metadata::with_current(|m| m.nodes_config_ref()), &self.observed_cluster_state, previous_params.as_ref(), - SchedulingPlanNodeSetSelectorHints::from(schedule_plan.as_ref()) - .preferred_sequencer(&self.log_id), + Metadata::with_current(|m| { + PartitionTableNodeSetSelectorHints::from(m.partition_table_ref()) + }) + .preferred_sequencer(&self.log_id), ) .ok_or_else(|| anyhow::anyhow!("Insufficient writeable nodes in the nodeset"))?; diff --git a/crates/admin/src/cluster_controller/service/state.rs b/crates/admin/src/cluster_controller/service/state.rs index 09ce88381..762391091 100644 --- a/crates/admin/src/cluster_controller/service/state.rs +++ b/crates/admin/src/cluster_controller/service/state.rs @@ -32,7 +32,7 @@ use crate::cluster_controller::logs_controller::{ LogsBasedPartitionProcessorPlacementHints, LogsController, }; use crate::cluster_controller::observed_cluster_state::ObservedClusterState; -use crate::cluster_controller::scheduler::{Scheduler, SchedulingPlanNodeSetSelectorHints}; +use crate::cluster_controller::scheduler::{PartitionTableNodeSetSelectorHints, Scheduler}; use crate::cluster_controller::service::Service; pub enum ClusterControllerState { @@ -85,10 +85,18 @@ where Ok(()) } - pub async fn on_leader_event(&mut self, leader_event: LeaderEvent) -> anyhow::Result<()> { + pub async fn on_leader_event( + &mut self, + observed_cluster_state: &ObservedClusterState, + leader_event: LeaderEvent, + ) -> anyhow::Result<()> { match self { ClusterControllerState::Follower => Ok(()), - ClusterControllerState::Leader(leader) => leader.on_leader_event(leader_event).await, + ClusterControllerState::Leader(leader) => { + leader + .on_leader_event(observed_cluster_state, leader_event) + .await + } } } @@ -151,12 +159,7 @@ where async fn from_service(service: &Service) -> anyhow::Result> { let configuration = service.configuration.pinned(); - let scheduler = Scheduler::init( - &configuration, - service.metadata_writer.metadata_store_client().clone(), - service.networking.clone(), - ) - .await?; + let scheduler = Scheduler::new(service.metadata_writer.clone(), service.networking.clone()); let logs_controller = LogsController::new(service.bifrost.clone(), service.metadata_writer.clone())?; @@ -195,13 +198,14 @@ where self.logs_controller.on_observed_cluster_state_update( &nodes_config, observed_cluster_state, - SchedulingPlanNodeSetSelectorHints::from(&self.scheduler), + Metadata::with_current(|m| { + PartitionTableNodeSetSelectorHints::from(m.partition_table_ref()) + }), )?; self.scheduler .on_observed_cluster_state( observed_cluster_state, - Metadata::with_current(|m| m.partition_table_ref()).replication_strategy(), &nodes_config, LogsBasedPartitionProcessorPlacementHints::from(&self.logs_controller), ) @@ -229,7 +233,6 @@ where } Ok(_) = self.logs_watcher.changed() => { return Ok(LeaderEvent::LogsUpdate); - } Ok(_) = self.partition_table_watcher.changed() => { return Ok(LeaderEvent::PartitionTableUpdate); @@ -238,42 +241,61 @@ where } } - pub async fn on_leader_event(&mut self, leader_event: LeaderEvent) -> anyhow::Result<()> { + pub async fn on_leader_event( + &mut self, + observed_cluster_state: &ObservedClusterState, + leader_event: LeaderEvent, + ) -> anyhow::Result<()> { match leader_event { LeaderEvent::TrimLogs => { self.trim_logs().await; } LeaderEvent::LogsUpdate => { - self.on_logs_update().await?; + self.on_logs_update(observed_cluster_state).await?; } LeaderEvent::PartitionTableUpdate => { - self.on_partition_table_update().await?; + self.on_partition_table_update(observed_cluster_state) + .await?; } } Ok(()) } - async fn on_logs_update(&mut self) -> anyhow::Result<()> { + async fn on_logs_update( + &mut self, + observed_cluster_state: &ObservedClusterState, + ) -> anyhow::Result<()> { self.logs_controller .on_logs_update(Metadata::with_current(|m| m.logs_ref()))?; self.scheduler - .on_logs_update( - &Metadata::with_current(|m| m.logs_ref()), - &Metadata::with_current(|m| m.partition_table_ref()), + .on_observed_cluster_state( + observed_cluster_state, + &Metadata::with_current(|m| m.nodes_config_ref()), + LogsBasedPartitionProcessorPlacementHints::from(&self.logs_controller), ) .await?; - Ok(()) } - async fn on_partition_table_update(&mut self) -> anyhow::Result<()> { + async fn on_partition_table_update( + &mut self, + observed_cluster_state: &ObservedClusterState, + ) -> anyhow::Result<()> { let partition_table = Metadata::with_current(|m| m.partition_table_ref()); self.logs_controller .on_partition_table_update(&partition_table); + self.scheduler + .on_observed_cluster_state( + observed_cluster_state, + &Metadata::with_current(|m| m.nodes_config_ref()), + LogsBasedPartitionProcessorPlacementHints::from(&self.logs_controller), + ) + .await?; + Ok(()) } diff --git a/crates/core/src/partitions.rs b/crates/core/src/partitions.rs index bd0885c57..67ed9db80 100644 --- a/crates/core/src/partitions.rs +++ b/crates/core/src/partitions.rs @@ -14,19 +14,17 @@ use std::pin::pin; use std::sync::Arc; use arc_swap::ArcSwap; +use restate_types::net::metadata::MetadataKind; use tokio::sync::mpsc; -use tokio::time::MissedTickBehavior; use tracing::{debug, trace}; use xxhash_rust::xxh3::Xxh3Builder; -use restate_types::cluster_controller::SchedulingPlan; -use restate_types::config::Configuration; use restate_types::identifiers::PartitionId; -use restate_types::metadata_store::keys::SCHEDULING_PLAN_KEY; -use restate_types::{NodeId, Version, Versioned}; +use restate_types::{NodeId, Version}; -use crate::metadata_store::MetadataStoreClient; -use crate::{cancellation_watcher, ShutdownError, TaskCenter, TaskHandle, TaskId, TaskKind}; +use crate::{ + cancellation_watcher, Metadata, ShutdownError, TaskCenter, TaskHandle, TaskId, TaskKind, +}; pub type CommandSender = mpsc::Sender; pub type CommandReceiver = mpsc::Receiver; @@ -118,16 +116,20 @@ struct PartitionToNodesRoutingTable { pub struct PartitionRoutingRefresher { sender: CommandSender, receiver: CommandReceiver, - metadata_store_client: MetadataStoreClient, inflight_refresh_task: Option>, inner: Arc>, } +impl Default for PartitionRoutingRefresher { + fn default() -> Self { + Self::new() + } +} + impl PartitionRoutingRefresher { - pub fn new(metadata_store_client: MetadataStoreClient) -> Self { + fn new() -> Self { let (sender, receiver) = mpsc::channel(1); Self { - metadata_store_client, receiver, sender, inflight_refresh_task: None, @@ -149,13 +151,9 @@ impl PartitionRoutingRefresher { async fn run(mut self) -> anyhow::Result<()> { debug!("Routing information refresher started"); - let update_interval = Configuration::pinned() - .common - .metadata_update_interval - .into(); - let mut update_interval = tokio::time::interval(update_interval); - update_interval.set_missed_tick_behavior(MissedTickBehavior::Delay); let mut cancel = pin!(cancellation_watcher()); + let mut partition_table_watch = + Metadata::with_current(|m| m.watch(MetadataKind::PartitionTable)); loop { tokio::select! { @@ -166,32 +164,31 @@ impl PartitionRoutingRefresher { Some(cmd) = self.receiver.recv() => { match cmd { Command::SyncRoutingInformation => { - self.spawn_sync_routing_information_task(); + self.spawn_sync_routing_information_task(*partition_table_watch.borrow()); } } } - _ = update_interval.tick() => { + _ = partition_table_watch.changed() => { trace!("Refreshing routing information..."); - self.spawn_sync_routing_information_task(); + self.spawn_sync_routing_information_task(*partition_table_watch.borrow()); } } } Ok(()) } - fn spawn_sync_routing_information_task(&mut self) { + fn spawn_sync_routing_information_task(&mut self, version: Version) { if !self .inflight_refresh_task .as_ref() .is_some_and(|t| !t.is_finished()) { let partition_to_node_mappings = self.inner.clone(); - let metadata_store_client = self.metadata_store_client.clone(); let task = TaskCenter::spawn_unmanaged( TaskKind::Disposable, "refresh-routing-information", - sync_routing_information(partition_to_node_mappings, metadata_store_client), + sync_routing_information(partition_to_node_mappings, version), ); self.inflight_refresh_task = task.ok(); } else { @@ -212,43 +209,29 @@ pub fn spawn_partition_routing_refresher( async fn sync_routing_information( partition_to_node_mappings: Arc>, - metadata_store_client: MetadataStoreClient, + version: Version, ) { - let result: Result, _> = - metadata_store_client.get(SCHEDULING_PLAN_KEY.clone()).await; - - let Ok(scheduling_plan) = result else { - debug!( - "Failed to fetch scheduling plan from metadata store: {:?}", - result - ); + let Ok(partition_table) = Metadata::current().wait_for_partition_table(version).await else { + // just return on shutdown error return; }; - let scheduling_plan = match scheduling_plan { - Some(plan) => plan, - None => { - debug!("No scheduling plan found in metadata store, unable to refresh partition routing information"); - return; - } - }; - let current_mappings = partition_to_node_mappings.load(); - if scheduling_plan.version() <= current_mappings.version { + if partition_table.version() <= current_mappings.version { return; // No need for update } let mut partition_nodes = HashMap::::default(); - for (partition_id, target_state) in scheduling_plan.iter() { - if let Some(leader) = target_state.leader { - partition_nodes.insert(*partition_id, leader.into()); + for (partition_id, partition) in partition_table.partitions() { + if let Some(leader) = partition.replica_group.first() { + partition_nodes.insert(*partition_id, (*leader).into()); } } let _ = partition_to_node_mappings.compare_and_swap( current_mappings, Arc::new(PartitionToNodesRoutingTable { - version: scheduling_plan.version(), + version: partition_table.version(), inner: partition_nodes, }), ); diff --git a/crates/core/src/test_env.rs b/crates/core/src/test_env.rs index 568fa3717..bc6dee194 100644 --- a/crates/core/src/test_env.rs +++ b/crates/core/src/test_env.rs @@ -14,11 +14,10 @@ use std::sync::Arc; use futures::Stream; -use restate_types::cluster_controller::SchedulingPlan; use restate_types::config::NetworkingOptions; use restate_types::logs::metadata::{bootstrap_logs_metadata, ProviderKind}; use restate_types::metadata_store::keys::{ - BIFROST_CONFIG_KEY, NODES_CONFIG_KEY, PARTITION_TABLE_KEY, SCHEDULING_PLAN_KEY, + BIFROST_CONFIG_KEY, NODES_CONFIG_KEY, PARTITION_TABLE_KEY, }; use restate_types::net::codec::{Targeted, WireDecode}; use restate_types::net::metadata::MetadataKind; @@ -47,7 +46,6 @@ pub struct TestCoreEnvBuilder { pub provider_kind: ProviderKind, pub router_builder: MessageRouterBuilder, pub partition_table: PartitionTable, - pub scheduling_plan: SchedulingPlan, pub metadata_store_client: MetadataStoreClient, } @@ -91,7 +89,6 @@ impl TestCoreEnvBuilder { let router_builder = MessageRouterBuilder::default(); let nodes_config = NodesConfiguration::new(Version::MIN, "test-cluster".to_owned()); let partition_table = PartitionTable::with_equally_sized_partitions(Version::MIN, 10); - let scheduling_plan = SchedulingPlan::from(&partition_table); TaskCenter::try_set_global_metadata(metadata.clone()); // Use memory-loglet as a default if in test-mode @@ -109,7 +106,6 @@ impl TestCoreEnvBuilder { nodes_config, router_builder, partition_table, - scheduling_plan, metadata_store_client, provider_kind, } @@ -125,11 +121,6 @@ impl TestCoreEnvBuilder { self } - pub fn set_scheduling_plan(mut self, scheduling_plan: SchedulingPlan) -> Self { - self.scheduling_plan = scheduling_plan; - self - } - pub fn set_my_node_id(mut self, my_node_id: GenerationalNodeId) -> Self { self.my_node_id = my_node_id; self @@ -196,15 +187,6 @@ impl TestCoreEnvBuilder { .expect("to store partition table in metadata store"); self.metadata_writer.submit(Arc::new(self.partition_table)); - self.metadata_store_client - .put( - SCHEDULING_PLAN_KEY.clone(), - &self.scheduling_plan, - Precondition::None, - ) - .await - .expect("to store scheduling plan in metadata store"); - let _ = self .metadata .wait_for_version( diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index 0b17be9bf..a2a0a285e 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -169,8 +169,7 @@ impl Node { let metadata_manager = MetadataManager::new(metadata_builder, metadata_store_client.clone()); metadata_manager.register_in_message_router(&mut router_builder); - let partition_routing_refresher = - PartitionRoutingRefresher::new(metadata_store_client.clone()); + let partition_routing_refresher = PartitionRoutingRefresher::default(); #[cfg(feature = "replicated-loglet")] let record_cache = RecordCache::new( diff --git a/crates/types/src/cluster_controller.rs b/crates/types/src/cluster_controller.rs deleted file mode 100644 index ad213251a..000000000 --- a/crates/types/src/cluster_controller.rs +++ /dev/null @@ -1,189 +0,0 @@ -// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH. -// All rights reserved. -// -// Use of this software is governed by the Business Source License -// included in the LICENSE file. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0. - -use std::collections::{BTreeMap, HashSet}; -use std::ops::RangeInclusive; - -use serde_with::serde_as; -use xxhash_rust::xxh3::Xxh3Builder; - -use crate::cluster::cluster_state::RunMode; -use crate::identifiers::{PartitionId, PartitionKey}; -use crate::partition_table::PartitionTable; -use crate::{flexbuffers_storage_encode_decode, PlainNodeId, Version, Versioned}; - -/// The scheduling plan represents the target state of the cluster. The cluster controller will -/// try to drive the observed cluster state to match the target state. -#[serde_as] -#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] -pub struct SchedulingPlan { - version: Version, - // flexbuffers only supports string-keyed maps :-( --> so we store it as vector of kv pairs - #[serde_as(as = "serde_with::Seq<(_, _)>")] - partitions: BTreeMap, -} - -flexbuffers_storage_encode_decode!(SchedulingPlan); - -impl SchedulingPlan { - pub fn from(partition_table: &PartitionTable) -> Self { - let mut scheduling_plan_builder = SchedulingPlanBuilder::default(); - - for (partition_id, partition) in partition_table.partitions() { - scheduling_plan_builder.insert_partition( - *partition_id, - TargetPartitionState::new(partition.key_range.clone()), - ); - } - - scheduling_plan_builder.build() - } - - pub fn into_builder(self) -> SchedulingPlanBuilder { - SchedulingPlanBuilder::from(self) - } - - pub fn partition_ids(&self) -> impl Iterator { - self.partitions.keys() - } - - pub fn get(&self, partition_id: &PartitionId) -> Option<&TargetPartitionState> { - self.partitions.get(partition_id) - } - - pub fn get_mut(&mut self, partition_id: &PartitionId) -> Option<&mut TargetPartitionState> { - self.partitions.get_mut(partition_id) - } - - pub fn iter(&self) -> impl Iterator { - self.partitions.iter() - } - - #[cfg(feature = "test-util")] - pub fn partitions(&self) -> &BTreeMap { - &self.partitions - } -} - -impl Versioned for SchedulingPlan { - fn version(&self) -> Version { - self.version - } -} - -impl Default for SchedulingPlan { - fn default() -> Self { - Self { - version: Version::INVALID, - partitions: BTreeMap::default(), - } - } -} - -/// The target state of a partition. -#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] -pub struct TargetPartitionState { - pub partition_key_range: RangeInclusive, - /// Node which is the designated leader - pub leader: Option, - /// Set of nodes that should run a partition processor for this partition - pub node_set: HashSet, -} - -impl TargetPartitionState { - pub fn new(partition_key_range: RangeInclusive) -> Self { - Self { - partition_key_range, - leader: None, - node_set: HashSet::default(), - } - } - - pub fn add_node(&mut self, node_id: PlainNodeId, is_leader: bool) { - self.node_set.insert(node_id); - - if is_leader { - self.leader = Some(node_id); - } - } - - pub fn remove_node(&mut self, node_id: PlainNodeId) { - self.node_set.remove(&node_id); - - if self.leader == Some(node_id) { - self.leader = None; - } - } - - pub fn iter(&self) -> impl Iterator + '_ { - self.node_set.iter().map(|node_id| { - if self.leader.as_ref() == Some(node_id) { - (*node_id, RunMode::Leader) - } else { - (*node_id, RunMode::Follower) - } - }) - } -} - -#[derive(Default)] -pub struct SchedulingPlanBuilder { - modified: bool, - inner: SchedulingPlan, -} - -impl SchedulingPlanBuilder { - pub fn modify_partition(&mut self, partition_id: &PartitionId, mut modify: F) - where - F: FnMut(&mut TargetPartitionState) -> bool, - { - if let Some(partition) = self.inner.partitions.get_mut(partition_id) { - if modify(partition) { - self.modified = true; - } - } - } - - pub fn insert_partition(&mut self, partition_id: PartitionId, partition: TargetPartitionState) { - self.inner.partitions.insert(partition_id, partition); - self.modified = true; - } - - pub fn build_if_modified(mut self) -> Option { - if self.modified { - self.inner.version = self.inner.version.next(); - Some(self.inner) - } else { - None - } - } - - pub fn build(mut self) -> SchedulingPlan { - self.inner.version = self.inner.version.next(); - self.inner - } - - pub fn partition_ids(&self) -> impl Iterator { - self.inner.partition_ids() - } - - pub fn contains_partition(&self, partition_id: &PartitionId) -> bool { - self.inner.partitions.contains_key(partition_id) - } -} - -impl From for SchedulingPlanBuilder { - fn from(value: SchedulingPlan) -> Self { - Self { - inner: value, - modified: false, - } - } -} diff --git a/crates/types/src/lib.rs b/crates/types/src/lib.rs index ce3bfb1e4..b95c03194 100644 --- a/crates/types/src/lib.rs +++ b/crates/types/src/lib.rs @@ -20,7 +20,6 @@ pub mod art; pub mod cluster; pub mod health; -pub mod cluster_controller; pub mod config; pub mod config_loader; pub mod deployment; diff --git a/crates/types/src/metadata_store.rs b/crates/types/src/metadata_store.rs index d7b7fd4ea..b2265e56a 100644 --- a/crates/types/src/metadata_store.rs +++ b/crates/types/src/metadata_store.rs @@ -21,8 +21,6 @@ pub mod keys { pub static SCHEMA_INFORMATION_KEY: ByteString = ByteString::from_static("schema_registry"); - pub static SCHEDULING_PLAN_KEY: ByteString = ByteString::from_static("scheduling_plan"); - pub fn partition_processor_epoch_key(partition_id: PartitionId) -> ByteString { ByteString::from(format!("{PARTITION_PROCESSOR_EPOCH_PREFIX}_{partition_id}")) }