Skip to content

Commit

Permalink
[Bifrost] Let providers propose their own parameters
Browse files Browse the repository at this point in the history
- BifrostAdmin can now init empty logs configuration
- BifrostAdmin can now auto extend the chain
- Providers now have control over suggesting a new segment configuration
- Introduces a temporary copy of NodeSetSelector into bifrost until log-controller is removed
  • Loading branch information
AhmedSoliman committed Dec 24, 2024
1 parent 1ac1f70 commit 834c9a2
Show file tree
Hide file tree
Showing 21 changed files with 912 additions and 113 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

58 changes: 33 additions & 25 deletions crates/admin/src/cluster_controller/logs_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,16 @@

mod nodeset_selection;

use futures::never::Never;
use rand::prelude::IteratorRandom;
use rand::thread_rng;
use std::collections::HashMap;
use std::iter;
use std::ops::Deref;
use std::sync::Arc;
use std::time::Duration;

use futures::never::Never;
use rand::prelude::IteratorRandom;
use rand::thread_rng;
use restate_types::config::Configuration;
use tokio::sync::Semaphore;
use tokio::task::JoinSet;
use tracing::{debug, error, trace, trace_span, Instrument};
Expand All @@ -28,14 +30,13 @@ use restate_core::metadata_store::{
retry_on_network_error, MetadataStoreClient, Precondition, ReadWriteError, WriteError,
};
use restate_core::{Metadata, MetadataWriter, ShutdownError, TaskCenterFutureExt};
use restate_types::config::Configuration;
use restate_types::errors::GenericError;
use restate_types::identifiers::PartitionId;
use restate_types::live::Pinned;
use restate_types::logs::builder::LogsBuilder;
use restate_types::logs::metadata::{
Chain, DefaultProvider, LogletConfig, LogletParams, Logs, LogsConfiguration,
NodeSetSelectionStrategy, ProviderKind, ReplicatedLogletConfig, SegmentIndex,
Chain, LogletConfig, LogletParams, Logs, LogsConfiguration, NodeSetSelectionStrategy,
ProviderConfiguration, ProviderKind, ReplicatedLogletConfig, SegmentIndex,
};
use restate_types::logs::{LogId, LogletId, Lsn, TailState};
use restate_types::metadata_store::keys::BIFROST_CONFIG_KEY;
Expand Down Expand Up @@ -320,17 +321,17 @@ fn try_provisioning(
node_set_selector_hints: impl NodeSetSelectorHints,
) -> Option<LogletConfiguration> {
match logs_configuration.default_provider {
DefaultProvider::Local => {
ProviderConfiguration::Local => {
let log_id = LogletId::new(log_id, SegmentIndex::OLDEST);
Some(LogletConfiguration::Local(log_id.into()))
}
#[cfg(any(test, feature = "memory-loglet"))]
DefaultProvider::InMemory => {
ProviderConfiguration::InMemory => {
let log_id = LogletId::new(log_id, SegmentIndex::OLDEST);
Some(LogletConfiguration::Memory(log_id.into()))
}
#[cfg(feature = "replicated-loglet")]
DefaultProvider::Replicated(ref config) => build_new_replicated_loglet_configuration(
ProviderConfiguration::Replicated(ref config) => build_new_replicated_loglet_configuration(
config,
LogletId::new(log_id, SegmentIndex::OLDEST),
&Metadata::with_current(|m| m.nodes_config_ref()),
Expand Down Expand Up @@ -436,10 +437,10 @@ impl LogletConfiguration {
) -> bool {
match (self, &logs_configuration.default_provider) {
#[cfg(any(test, feature = "memory-loglet"))]
(Self::Memory(_), DefaultProvider::InMemory) => false,
(Self::Local(_), DefaultProvider::Local) => false,
(Self::Memory(_), ProviderConfiguration::InMemory) => false,
(Self::Local(_), ProviderConfiguration::Local) => false,
#[cfg(feature = "replicated-loglet")]
(Self::Replicated(params), DefaultProvider::Replicated(config)) => {
(Self::Replicated(params), ProviderConfiguration::Replicated(config)) => {
let sequencer_change_required = !observed_cluster_state
.is_node_alive(params.sequencer)
&& !observed_cluster_state.alive_nodes.is_empty();
Expand Down Expand Up @@ -468,9 +469,10 @@ impl LogletConfiguration {

sequencer_change_required || nodeset_improvement_possible
}
_ => {
(x, y) => {
debug!(
"Changing provider type is not supporter at the moment. Ignoring reconfigure"
"Changing provider type from {} to {} is not supporter at the moment. Ignoring reconfigure",
x.as_provider(), y.kind(),
);
false
}
Expand Down Expand Up @@ -501,10 +503,14 @@ impl LogletConfiguration {

match logs_configuration.default_provider {
#[cfg(any(test, feature = "memory-loglet"))]
DefaultProvider::InMemory => Some(LogletConfiguration::Memory(loglet_id.next().into())),
DefaultProvider::Local => Some(LogletConfiguration::Local(loglet_id.next().into())),
ProviderConfiguration::InMemory => {
Some(LogletConfiguration::Memory(loglet_id.next().into()))
}
ProviderConfiguration::Local => {
Some(LogletConfiguration::Local(loglet_id.next().into()))
}
#[cfg(feature = "replicated-loglet")]
DefaultProvider::Replicated(ref config) => {
ProviderConfiguration::Replicated(ref config) => {
let previous_params = match self {
Self::Replicated(previous_params) => Some(previous_params),
_ => None,
Expand Down Expand Up @@ -639,9 +645,9 @@ struct LogsControllerInner {
}

impl LogsControllerInner {
fn new(configuration: LogsConfiguration, retry_policy: RetryPolicy) -> Self {
fn new(current_logs: Arc<Logs>, retry_policy: RetryPolicy) -> Self {
Self {
current_logs: Arc::new(Logs::with_logs_configuration(configuration)),
current_logs,
logs_state: HashMap::with_hasher(Xxh3Builder::default()),
logs_write_in_progress: None,
retry_policy,
Expand Down Expand Up @@ -942,7 +948,6 @@ impl LogsController {
)
.await?;

let logs_configuration = logs.configuration().clone();
metadata_writer.update(Arc::new(logs)).await?;

//todo(azmy): make configurable
Expand All @@ -955,7 +960,10 @@ impl LogsController {

let mut this = Self {
effects: Some(Vec::new()),
inner: LogsControllerInner::new(logs_configuration, retry_policy),
inner: LogsControllerInner::new(
Metadata::with_current(|m| m.logs_snapshot()),
retry_policy,
),
bifrost,
metadata_store_client,
metadata_writer,
Expand Down Expand Up @@ -1279,7 +1287,7 @@ pub mod tests {

use enumset::{enum_set, EnumSet};
use restate_types::logs::metadata::{
DefaultProvider, LogsConfiguration, NodeSetSelectionStrategy, ReplicatedLogletConfig,
LogsConfiguration, NodeSetSelectionStrategy, ProviderConfiguration, ReplicatedLogletConfig,
};
use restate_types::logs::LogletId;
use restate_types::nodes_config::{
Expand Down Expand Up @@ -1452,7 +1460,7 @@ pub mod tests {

fn logs_configuration(replication_factor: u8) -> LogsConfiguration {
LogsConfiguration {
default_provider: DefaultProvider::Replicated(ReplicatedLogletConfig {
default_provider: ProviderConfiguration::Replicated(ReplicatedLogletConfig {
replication_property: ReplicationProperty::new(
NonZeroU8::new(replication_factor).expect("must be non zero"),
),
Expand Down Expand Up @@ -1537,7 +1545,7 @@ pub mod tests {
&nodes.observed_state
));

let DefaultProvider::Replicated(ref replicated_loglet_config) =
let ProviderConfiguration::Replicated(ref replicated_loglet_config) =
logs_config.default_provider
else {
unreachable!()
Expand Down Expand Up @@ -1571,7 +1579,7 @@ pub mod tests {

let logs_config = logs_configuration(2);

let DefaultProvider::Replicated(ref replicated_loglet_config) =
let ProviderConfiguration::Replicated(ref replicated_loglet_config) =
logs_config.default_provider
else {
unreachable!()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ use std::cmp::{max, Ordering};
use itertools::Itertools;
use rand::prelude::IteratorRandom;
use rand::Rng;
use restate_types::logs::metadata::NodeSetSelectionStrategy;
use tracing::trace;

use restate_types::logs::metadata::NodeSetSelectionStrategy;
use restate_types::nodes_config::NodesConfiguration;
use restate_types::replicated_loglet::{LocationScope, NodeSet, ReplicationProperty};

Expand Down
30 changes: 19 additions & 11 deletions crates/admin/src/cluster_controller/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use tracing::{debug, info};
use restate_metadata_store::ReadModifyWriteError;
use restate_types::cluster_controller::SchedulingPlan;
use restate_types::logs::metadata::{
DefaultProvider, LogletParams, Logs, LogsConfiguration, ProviderKind, SegmentIndex,
LogletParams, Logs, LogsConfiguration, ProviderConfiguration, ProviderKind, SegmentIndex,
};
use restate_types::metadata_store::keys::{
BIFROST_CONFIG_KEY, PARTITION_TABLE_KEY, SCHEDULING_PLAN_KEY,
Expand Down Expand Up @@ -104,6 +104,10 @@ where
metadata_writer: MetadataWriter,
metadata_store_client: MetadataStoreClient,
) -> Self {
println!(
"CONFIGURATION DEFaAULT IS: {}",
configuration.live_load().bifrost.default_provider
);
let (command_tx, command_rx) = mpsc::channel(2);

let cluster_state_refresher =
Expand Down Expand Up @@ -183,7 +187,7 @@ enum ClusterControllerCommand {
UpdateClusterConfiguration {
num_partitions: NonZeroU16,
replication_strategy: ReplicationStrategy,
default_provider: DefaultProvider,
default_provider: ProviderConfiguration,
response_tx: oneshot::Sender<anyhow::Result<()>>,
},
SealAndExtendChain {
Expand Down Expand Up @@ -249,7 +253,7 @@ impl ClusterControllerHandle {
&self,
num_partitions: NonZeroU16,
replication_strategy: ReplicationStrategy,
default_provider: DefaultProvider,
default_provider: ProviderConfiguration,
) -> Result<anyhow::Result<()>, ShutdownError> {
let (response_tx, response_rx) = oneshot::channel();

Expand Down Expand Up @@ -439,7 +443,7 @@ impl<T: TransportConnect> Service<T> {
&self,
num_partitions: u16,
replication_strategy: ReplicationStrategy,
default_provider: DefaultProvider,
default_provider: ProviderConfiguration,
) -> anyhow::Result<()> {
let logs = self
.metadata_store_client
Expand All @@ -457,8 +461,7 @@ impl<T: TransportConnect> Service<T> {

// we can only change the default provider
if logs.version() != Version::INVALID
&& logs.configuration().default_provider.as_provider_kind()
!= default_provider.as_provider_kind()
&& logs.configuration().default_provider.kind() != default_provider.kind()
{
{
return Err(
Expand Down Expand Up @@ -786,16 +789,16 @@ impl SealAndExtendTask {

let (provider, params) = match &logs.configuration().default_provider {
#[cfg(any(test, feature = "memory-loglet"))]
DefaultProvider::InMemory => (
ProviderConfiguration::InMemory => (
ProviderKind::InMemory,
u64::from(loglet_id.next()).to_string().into(),
),
DefaultProvider::Local => (
ProviderConfiguration::Local => (
ProviderKind::Local,
u64::from(loglet_id.next()).to_string().into(),
),
#[cfg(feature = "replicated-loglet")]
DefaultProvider::Replicated(config) => {
ProviderConfiguration::Replicated(config) => {
let schedule_plan = self
.metadata_store_client
.get::<SchedulingPlan>(SCHEDULING_PLAN_KEY.clone())
Expand Down Expand Up @@ -833,6 +836,7 @@ mod tests {

use googletest::assert_that;
use googletest::matchers::eq;
use restate_types::logs::metadata::ProviderKind;
use test_log::test;

use restate_bifrost::providers::memory_loglet;
Expand All @@ -843,7 +847,7 @@ mod tests {
use restate_core::test_env::NoOpMessageHandler;
use restate_core::{TaskCenter, TaskKind, TestCoreEnv, TestCoreEnvBuilder};
use restate_types::cluster::cluster_state::PartitionProcessorStatus;
use restate_types::config::{AdminOptions, Configuration};
use restate_types::config::{AdminOptions, BifrostOptions, Configuration};
use restate_types::health::HealthStatus;
use restate_types::identifiers::PartitionId;
use restate_types::live::Live;
Expand Down Expand Up @@ -1086,8 +1090,11 @@ mod tests {
admin_options.log_trim_threshold = 0;
let interval_duration = Duration::from_secs(10);
admin_options.log_trim_interval = Some(interval_duration.into());
let mut bifrost_options = BifrostOptions::default();
bifrost_options.default_provider = ProviderKind::InMemory;
let config = Configuration {
admin: admin_options,
bifrost: bifrost_options,
..Default::default()
};

Expand Down Expand Up @@ -1136,14 +1143,15 @@ mod tests {
where
F: FnMut(TestCoreEnvBuilder<FailingConnector>) -> TestCoreEnvBuilder<FailingConnector>,
{
restate_types::config::set_current_config(config);
let mut builder = TestCoreEnvBuilder::with_incoming_only_connector();
let bifrost_svc = BifrostService::new().with_factory(memory_loglet::Factory::default());
let bifrost = bifrost_svc.handle();

let mut server_builder = NetworkServerBuilder::default();

let svc = Service::new(
Live::from_value(config),
Configuration::updateable(),
HealthStatus::default(),
bifrost.clone(),
builder.networking.clone(),
Expand Down
1 change: 1 addition & 0 deletions crates/bifrost/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ derive_more = { workspace = true }
enum-map = { workspace = true, features = ["serde"] }
futures = { workspace = true }
googletest = { workspace = true, features = ["anyhow"], optional = true }
itertools = { workspace = true }
metrics = { workspace = true }
parking_lot = { workspace = true }
pin-project = { workspace = true }
Expand Down
6 changes: 3 additions & 3 deletions crates/bifrost/src/appender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ impl Appender {
info!(
attempt = attempt,
segment_index = %loglet.segment_index(),
"Append batch will be retried (loglet being sealed), waiting for tail to be determined"
"Append batch will be retried (loglet is being sealed), waiting for tail to be determined"
);
let new_loglet = Self::wait_next_unsealed_loglet(
self.log_id,
Expand All @@ -131,7 +131,7 @@ impl Appender {
Err(AppendError::Other(err)) if err.retryable() => {
if let Some(retry_dur) = retry_iter.next() {
info!(
?err,
%err,
attempt = attempt,
segment_index = %loglet.segment_index(),
"Failed to append this batch. Since underlying error is retryable, will retry in {:?}",
Expand All @@ -140,7 +140,7 @@ impl Appender {
tokio::time::sleep(retry_dur).await;
} else {
warn!(
?err,
%err,
attempt = attempt,
segment_index = %loglet.segment_index(),
"Failed to append this batch and exhausted all attempts to retry",
Expand Down
Loading

0 comments on commit 834c9a2

Please sign in to comment.