Skip to content

Commit

Permalink
Removing scheduling plan
Browse files Browse the repository at this point in the history
Summary:
- Completely remote scheduling plan
- Schedular now updates the replication group of each partition directly in the partition table
- PartitionRouting uses partition table replication group for routing decisions
  • Loading branch information
muhamadazmy committed Jan 10, 2025
1 parent 0286b87 commit 003245a
Show file tree
Hide file tree
Showing 9 changed files with 365 additions and 708 deletions.
700 changes: 286 additions & 414 deletions crates/admin/src/cluster_controller/scheduler.rs

Large diffs are not rendered by default.

23 changes: 7 additions & 16 deletions crates/admin/src/cluster_controller/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,10 @@ use tonic::codec::CompressionEncoding;
use tracing::{debug, info};

use restate_metadata_store::ReadModifyWriteError;
use restate_types::cluster_controller::SchedulingPlan;
use restate_types::logs::metadata::{
LogletParams, Logs, LogsConfiguration, ProviderConfiguration, ProviderKind, SegmentIndex,
};
use restate_types::metadata_store::keys::{
BIFROST_CONFIG_KEY, PARTITION_TABLE_KEY, SCHEDULING_PLAN_KEY,
};
use restate_types::metadata_store::keys::{BIFROST_CONFIG_KEY, PARTITION_TABLE_KEY};
use restate_types::partition_table::{
self, PartitionTable, PartitionTableBuilder, ReplicationStrategy,
};
Expand Down Expand Up @@ -62,7 +59,7 @@ use super::grpc_svc_handler::ClusterCtrlSvcHandler;
use super::protobuf::cluster_ctrl_svc_server::ClusterCtrlSvcServer;
use crate::cluster_controller::logs_controller::{self, NodeSetSelectorHints};
use crate::cluster_controller::observed_cluster_state::ObservedClusterState;
use crate::cluster_controller::scheduler::SchedulingPlanNodeSetSelectorHints;
use crate::cluster_controller::scheduler::PartitionTableNodeSetSelectorHints;

#[derive(Debug, thiserror::Error, CodedError)]
pub enum Error {
Expand Down Expand Up @@ -326,7 +323,7 @@ impl<T: TransportConnect> Service<T> {
}
result = state.run() => {
let leader_event = result?;
state.on_leader_event(leader_event).await?;
state.on_leader_event(&self.observed_cluster_state, leader_event).await?;
}
_ = &mut shutdown => {
self.health_status.update(AdminStatus::Unknown);
Expand Down Expand Up @@ -489,7 +486,6 @@ impl<T: TransportConnect> Service<T> {
extension,
min_version,
bifrost: self.bifrost.clone(),
metadata_writer: self.metadata_writer.clone(),
observed_cluster_state: self.observed_cluster_state.clone(),
};

Expand Down Expand Up @@ -636,7 +632,6 @@ struct SealAndExtendTask {
min_version: Version,
extension: Option<ChainExtension>,
bifrost: Bifrost,
metadata_writer: MetadataWriter,
observed_cluster_state: ObservedClusterState,
}

Expand Down Expand Up @@ -712,20 +707,16 @@ impl SealAndExtendTask {
),
#[cfg(feature = "replicated-loglet")]
ProviderConfiguration::Replicated(config) => {
let schedule_plan = self
.metadata_writer
.metadata_store_client()
.get::<SchedulingPlan>(SCHEDULING_PLAN_KEY.clone())
.await?;

let loglet_params = logs_controller::build_new_replicated_loglet_configuration(
config,
loglet_id.next(),
&Metadata::with_current(|m| m.nodes_config_ref()),
&self.observed_cluster_state,
previous_params.as_ref(),
SchedulingPlanNodeSetSelectorHints::from(schedule_plan.as_ref())
.preferred_sequencer(&self.log_id),
Metadata::with_current(|m| {
PartitionTableNodeSetSelectorHints::from(m.partition_table_ref())
})
.preferred_sequencer(&self.log_id),
)
.ok_or_else(|| anyhow::anyhow!("Insufficient writeable nodes in the nodeset"))?;

Expand Down
64 changes: 43 additions & 21 deletions crates/admin/src/cluster_controller/service/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::cluster_controller::logs_controller::{
LogsBasedPartitionProcessorPlacementHints, LogsController,
};
use crate::cluster_controller::observed_cluster_state::ObservedClusterState;
use crate::cluster_controller::scheduler::{Scheduler, SchedulingPlanNodeSetSelectorHints};
use crate::cluster_controller::scheduler::{PartitionTableNodeSetSelectorHints, Scheduler};
use crate::cluster_controller::service::Service;

pub enum ClusterControllerState<T> {
Expand Down Expand Up @@ -85,10 +85,18 @@ where
Ok(())
}

pub async fn on_leader_event(&mut self, leader_event: LeaderEvent) -> anyhow::Result<()> {
pub async fn on_leader_event(
&mut self,
observed_cluster_state: &ObservedClusterState,
leader_event: LeaderEvent,
) -> anyhow::Result<()> {
match self {
ClusterControllerState::Follower => Ok(()),
ClusterControllerState::Leader(leader) => leader.on_leader_event(leader_event).await,
ClusterControllerState::Leader(leader) => {
leader
.on_leader_event(observed_cluster_state, leader_event)
.await
}
}
}

Expand Down Expand Up @@ -151,12 +159,7 @@ where
async fn from_service(service: &Service<T>) -> anyhow::Result<Leader<T>> {
let configuration = service.configuration.pinned();

let scheduler = Scheduler::init(
&configuration,
service.metadata_writer.metadata_store_client().clone(),
service.networking.clone(),
)
.await?;
let scheduler = Scheduler::new(service.metadata_writer.clone(), service.networking.clone());

let logs_controller =
LogsController::new(service.bifrost.clone(), service.metadata_writer.clone())?;
Expand Down Expand Up @@ -195,13 +198,14 @@ where
self.logs_controller.on_observed_cluster_state_update(
&nodes_config,
observed_cluster_state,
SchedulingPlanNodeSetSelectorHints::from(&self.scheduler),
Metadata::with_current(|m| {
PartitionTableNodeSetSelectorHints::from(m.partition_table_ref())
}),
)?;

self.scheduler
.on_observed_cluster_state(
observed_cluster_state,
Metadata::with_current(|m| m.partition_table_ref()).replication_strategy(),
&nodes_config,
LogsBasedPartitionProcessorPlacementHints::from(&self.logs_controller),
)
Expand Down Expand Up @@ -229,7 +233,6 @@ where
}
Ok(_) = self.logs_watcher.changed() => {
return Ok(LeaderEvent::LogsUpdate);

}
Ok(_) = self.partition_table_watcher.changed() => {
return Ok(LeaderEvent::PartitionTableUpdate);
Expand All @@ -238,42 +241,61 @@ where
}
}

pub async fn on_leader_event(&mut self, leader_event: LeaderEvent) -> anyhow::Result<()> {
pub async fn on_leader_event(
&mut self,
observed_cluster_state: &ObservedClusterState,
leader_event: LeaderEvent,
) -> anyhow::Result<()> {
match leader_event {
LeaderEvent::TrimLogs => {
self.trim_logs().await;
}
LeaderEvent::LogsUpdate => {
self.on_logs_update().await?;
self.on_logs_update(observed_cluster_state).await?;
}
LeaderEvent::PartitionTableUpdate => {
self.on_partition_table_update().await?;
self.on_partition_table_update(observed_cluster_state)
.await?;
}
}

Ok(())
}

async fn on_logs_update(&mut self) -> anyhow::Result<()> {
async fn on_logs_update(
&mut self,
observed_cluster_state: &ObservedClusterState,
) -> anyhow::Result<()> {
self.logs_controller
.on_logs_update(Metadata::with_current(|m| m.logs_ref()))?;

self.scheduler
.on_logs_update(
&Metadata::with_current(|m| m.logs_ref()),
&Metadata::with_current(|m| m.partition_table_ref()),
.on_observed_cluster_state(
observed_cluster_state,
&Metadata::with_current(|m| m.nodes_config_ref()),
LogsBasedPartitionProcessorPlacementHints::from(&self.logs_controller),
)
.await?;

Ok(())
}

async fn on_partition_table_update(&mut self) -> anyhow::Result<()> {
async fn on_partition_table_update(
&mut self,
observed_cluster_state: &ObservedClusterState,
) -> anyhow::Result<()> {
let partition_table = Metadata::with_current(|m| m.partition_table_ref());

self.logs_controller
.on_partition_table_update(&partition_table);

self.scheduler
.on_observed_cluster_state(
observed_cluster_state,
&Metadata::with_current(|m| m.nodes_config_ref()),
LogsBasedPartitionProcessorPlacementHints::from(&self.logs_controller),
)
.await?;

Ok(())
}

Expand Down
71 changes: 27 additions & 44 deletions crates/core/src/partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,17 @@ use std::pin::pin;
use std::sync::Arc;

use arc_swap::ArcSwap;
use restate_types::net::metadata::MetadataKind;
use tokio::sync::mpsc;
use tokio::time::MissedTickBehavior;
use tracing::{debug, trace};
use xxhash_rust::xxh3::Xxh3Builder;

use restate_types::cluster_controller::SchedulingPlan;
use restate_types::config::Configuration;
use restate_types::identifiers::PartitionId;
use restate_types::metadata_store::keys::SCHEDULING_PLAN_KEY;
use restate_types::{NodeId, Version, Versioned};
use restate_types::{NodeId, Version};

use crate::metadata_store::MetadataStoreClient;
use crate::{cancellation_watcher, ShutdownError, TaskCenter, TaskHandle, TaskId, TaskKind};
use crate::{
cancellation_watcher, Metadata, ShutdownError, TaskCenter, TaskHandle, TaskId, TaskKind,
};

pub type CommandSender = mpsc::Sender<Command>;
pub type CommandReceiver = mpsc::Receiver<Command>;
Expand Down Expand Up @@ -118,16 +116,20 @@ struct PartitionToNodesRoutingTable {
pub struct PartitionRoutingRefresher {
sender: CommandSender,
receiver: CommandReceiver,
metadata_store_client: MetadataStoreClient,
inflight_refresh_task: Option<TaskHandle<()>>,
inner: Arc<ArcSwap<PartitionToNodesRoutingTable>>,
}

impl Default for PartitionRoutingRefresher {
fn default() -> Self {
Self::new()
}
}

impl PartitionRoutingRefresher {
pub fn new(metadata_store_client: MetadataStoreClient) -> Self {
fn new() -> Self {
let (sender, receiver) = mpsc::channel(1);
Self {
metadata_store_client,
receiver,
sender,
inflight_refresh_task: None,
Expand All @@ -149,13 +151,9 @@ impl PartitionRoutingRefresher {
async fn run(mut self) -> anyhow::Result<()> {
debug!("Routing information refresher started");

let update_interval = Configuration::pinned()
.common
.metadata_update_interval
.into();
let mut update_interval = tokio::time::interval(update_interval);
update_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
let mut cancel = pin!(cancellation_watcher());
let mut partition_table_watch =
Metadata::with_current(|m| m.watch(MetadataKind::PartitionTable));

loop {
tokio::select! {
Expand All @@ -166,32 +164,31 @@ impl PartitionRoutingRefresher {
Some(cmd) = self.receiver.recv() => {
match cmd {
Command::SyncRoutingInformation => {
self.spawn_sync_routing_information_task();
self.spawn_sync_routing_information_task(*partition_table_watch.borrow());
}
}
}
_ = update_interval.tick() => {
_ = partition_table_watch.changed() => {
trace!("Refreshing routing information...");
self.spawn_sync_routing_information_task();
self.spawn_sync_routing_information_task(*partition_table_watch.borrow());
}
}
}
Ok(())
}

fn spawn_sync_routing_information_task(&mut self) {
fn spawn_sync_routing_information_task(&mut self, version: Version) {
if !self
.inflight_refresh_task
.as_ref()
.is_some_and(|t| !t.is_finished())
{
let partition_to_node_mappings = self.inner.clone();
let metadata_store_client = self.metadata_store_client.clone();

let task = TaskCenter::spawn_unmanaged(
TaskKind::Disposable,
"refresh-routing-information",
sync_routing_information(partition_to_node_mappings, metadata_store_client),
sync_routing_information(partition_to_node_mappings, version),
);
self.inflight_refresh_task = task.ok();
} else {
Expand All @@ -212,43 +209,29 @@ pub fn spawn_partition_routing_refresher(

async fn sync_routing_information(
partition_to_node_mappings: Arc<ArcSwap<PartitionToNodesRoutingTable>>,
metadata_store_client: MetadataStoreClient,
version: Version,
) {
let result: Result<Option<SchedulingPlan>, _> =
metadata_store_client.get(SCHEDULING_PLAN_KEY.clone()).await;

let Ok(scheduling_plan) = result else {
debug!(
"Failed to fetch scheduling plan from metadata store: {:?}",
result
);
let Ok(partition_table) = Metadata::current().wait_for_partition_table(version).await else {
// just return on shutdown error
return;
};

let scheduling_plan = match scheduling_plan {
Some(plan) => plan,
None => {
debug!("No scheduling plan found in metadata store, unable to refresh partition routing information");
return;
}
};

let current_mappings = partition_to_node_mappings.load();
if scheduling_plan.version() <= current_mappings.version {
if partition_table.version() <= current_mappings.version {
return; // No need for update
}

let mut partition_nodes = HashMap::<PartitionId, NodeId, Xxh3Builder>::default();
for (partition_id, target_state) in scheduling_plan.iter() {
if let Some(leader) = target_state.leader {
partition_nodes.insert(*partition_id, leader.into());
for (partition_id, partition) in partition_table.partitions() {
if let Some(leader) = partition.replica_group.first() {
partition_nodes.insert(*partition_id, (*leader).into());
}
}

let _ = partition_to_node_mappings.compare_and_swap(
current_mappings,
Arc::new(PartitionToNodesRoutingTable {
version: scheduling_plan.version(),
version: partition_table.version(),
inner: partition_nodes,
}),
);
Expand Down
Loading

0 comments on commit 003245a

Please sign in to comment.