Skip to content

Commit

Permalink
Fix over placement of number of partition processors
Browse files Browse the repository at this point in the history
This commit fixes an incorrectly computed number of partition processors
that need to be removed in case of over placement.
  • Loading branch information
tillrohrmann committed Dec 16, 2024
1 parent 59a8351 commit 0cd6be3
Showing 1 changed file with 100 additions and 5 deletions.
105 changes: 100 additions & 5 deletions crates/admin/src/cluster_controller/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ impl<T: TransportConnect> Scheduler<T> {
.cloned()
.choose_multiple(
&mut rng,
replication_factor - target_state.node_set.len(),
target_state.node_set.len() - replication_factor,
)
{
target_state.node_set.remove(&node_id);
Expand All @@ -396,7 +396,7 @@ impl<T: TransportConnect> Scheduler<T> {
.cloned()
.choose_multiple(
&mut rng,
replication_factor - target_state.node_set.len(),
target_state.node_set.len() - replication_factor,
)
{
target_state.node_set.remove(&node_id);
Expand Down Expand Up @@ -593,6 +593,7 @@ mod tests {
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;

use crate::cluster_controller::logs_controller::tests::MockNodes;
use crate::cluster_controller::observed_cluster_state::ObservedClusterState;
use crate::cluster_controller::scheduler::{
HashSet, PartitionProcessorPlacementHints, Scheduler,
Expand All @@ -602,14 +603,18 @@ mod tests {
use restate_types::cluster::cluster_state::{
AliveNode, ClusterState, DeadNode, NodeState, PartitionProcessorStatus, RunMode,
};
use restate_types::cluster_controller::SchedulingPlan;
use restate_types::cluster_controller::{
SchedulingPlan, SchedulingPlanBuilder, TargetPartitionState,
};
use restate_types::config::Configuration;
use restate_types::identifiers::PartitionId;
use restate_types::identifiers::{PartitionId, PartitionKey};
use restate_types::metadata_store::keys::SCHEDULING_PLAN_KEY;
use restate_types::net::codec::WireDecode;
use restate_types::net::partition_processor_manager::{ControlProcessors, ProcessorCommand};
use restate_types::net::{AdvertisedAddress, TargetName};
use restate_types::nodes_config::{LogServerConfig, NodeConfig, NodesConfiguration, Role};
use restate_types::nodes_config::{
LogServerConfig, NodeConfig, NodesConfiguration, Role, StorageState,
};
use restate_types::partition_table::{PartitionTable, ReplicationStrategy};
use restate_types::time::MillisSinceEpoch;
use restate_types::{GenerationalNodeId, PlainNodeId, Version};
Expand Down Expand Up @@ -824,6 +829,96 @@ mod tests {
Ok(())
}

#[test(restate_core::test)]
async fn handle_too_few_placed_partition_processors() -> googletest::Result<()> {
let mut scheduling_plan_builder = SchedulingPlanBuilder::default();
let num_partition_processors = NonZero::new(2).expect("non-zero");
let mut target_partition_state = TargetPartitionState::new(0..=PartitionKey::MAX);

// add one too few nodes to the target state
target_partition_state.add_node(PlainNodeId::from(0), true);

let partition_id = PartitionId::from(0);
scheduling_plan_builder.insert_partition(partition_id, target_partition_state);

let scheduling_plan = run_ensure_replication_test(
scheduling_plan_builder,
ReplicationStrategy::Factor(num_partition_processors),
)
.await?;
let partition = scheduling_plan.get(&partition_id).expect("must be present");

assert_eq!(
partition.node_set.len(),
num_partition_processors.get() as usize
);
Ok(())
}

#[test(restate_core::test)]
async fn handle_too_many_placed_partition_processors() -> googletest::Result<()> {
let mut scheduling_plan_builder = SchedulingPlanBuilder::default();
let num_partition_processors = NonZero::new(2).expect("non-zero");
let mut target_partition_state = TargetPartitionState::new(0..=PartitionKey::MAX);

// add one too many nodes to the target state
target_partition_state.add_node(PlainNodeId::from(0), true);
target_partition_state.add_node(PlainNodeId::from(1), false);
target_partition_state.add_node(PlainNodeId::from(2), false);

let partition_id = PartitionId::from(0);
scheduling_plan_builder.insert_partition(partition_id, target_partition_state);

let scheduling_plan = run_ensure_replication_test(
scheduling_plan_builder,
ReplicationStrategy::Factor(num_partition_processors),
)
.await?;
let partition = scheduling_plan.get(&partition_id).expect("must be present");

assert_eq!(
partition.node_set.len(),
num_partition_processors.get() as usize
);

Ok(())
}

async fn run_ensure_replication_test(
mut scheduling_plan_builder: SchedulingPlanBuilder,
replication_strategy: ReplicationStrategy,
) -> googletest::Result<SchedulingPlan> {
let env = TestCoreEnv::create_with_single_node(0, 0).await;

let scheduler = Scheduler::init(
&Configuration::pinned(),
env.metadata_store_client.clone(),
env.networking.clone(),
)
.await?;
let alive_workers = vec![
PlainNodeId::from(0),
PlainNodeId::from(1),
PlainNodeId::from(2),
]
.into_iter()
.collect();
let nodes_config = MockNodes::builder()
.with_nodes([0, 1, 2], Role::Worker.into(), StorageState::ReadWrite)
.build()
.nodes_config;

scheduler.ensure_replication(
&mut scheduling_plan_builder,
&alive_workers,
replication_strategy,
&nodes_config,
NoPlacementHints,
);

Ok(scheduling_plan_builder.build())
}

fn matches_scheduling_plan(scheduling_plan: &SchedulingPlan) -> SchedulingPlanMatcher<'_> {
SchedulingPlanMatcher { scheduling_plan }
}
Expand Down

0 comments on commit 0cd6be3

Please sign in to comment.