diff --git a/crates/admin/src/cluster_controller/logs_controller.rs b/crates/admin/src/cluster_controller/logs_controller.rs index 5935181af..c522dadb0 100644 --- a/crates/admin/src/cluster_controller/logs_controller.rs +++ b/crates/admin/src/cluster_controller/logs_controller.rs @@ -19,16 +19,13 @@ use std::time::Duration; use futures::never::Never; use rand::prelude::IteratorRandom; use rand::thread_rng; -use restate_types::config::Configuration; use tokio::sync::Semaphore; use tokio::task::JoinSet; use tracing::{debug, error, trace, trace_span, Instrument}; use xxhash_rust::xxh3::Xxh3Builder; use restate_bifrost::{Bifrost, Error as BifrostError}; -use restate_core::metadata_store::{ - retry_on_network_error, Precondition, ReadWriteError, WriteError, -}; +use restate_core::metadata_store::{Precondition, ReadWriteError, WriteError}; use restate_core::{Metadata, MetadataWriter, ShutdownError, TaskCenterFutureExt}; use restate_types::errors::GenericError; use restate_types::identifiers::PartitionId; @@ -930,26 +927,7 @@ pub struct LogsController { } impl LogsController { - pub async fn init( - configuration: &Configuration, - bifrost: Bifrost, - metadata_writer: MetadataWriter, - ) -> Result { - // obtain the latest logs or init it with an empty logs variant - let logs = retry_on_network_error( - configuration.common.network_error_retry_policy.clone(), - || { - metadata_writer - .metadata_store_client() - .get_or_insert(BIFROST_CONFIG_KEY.clone(), || { - Logs::from_configuration(configuration) - }) - }, - ) - .await?; - - metadata_writer.update(Arc::new(logs)).await?; - + pub async fn init(bifrost: Bifrost, metadata_writer: MetadataWriter) -> Result { //todo(azmy): make configurable let retry_policy = RetryPolicy::exponential( Duration::from_millis(10), diff --git a/crates/admin/src/cluster_controller/service/state.rs b/crates/admin/src/cluster_controller/service/state.rs index 430ff8124..fada9e129 100644 --- a/crates/admin/src/cluster_controller/service/state.rs +++ b/crates/admin/src/cluster_controller/service/state.rs @@ -158,12 +158,8 @@ where ) .await?; - let logs_controller = LogsController::init( - &configuration, - service.bifrost.clone(), - service.metadata_writer.clone(), - ) - .await?; + let logs_controller = + LogsController::init(service.bifrost.clone(), service.metadata_writer.clone()).await?; let (log_trim_interval, log_trim_threshold) = create_log_trim_interval(&configuration.admin); diff --git a/crates/bifrost/src/bifrost_admin.rs b/crates/bifrost/src/bifrost_admin.rs index ba0c09493..94d91503a 100644 --- a/crates/bifrost/src/bifrost_admin.rs +++ b/crates/bifrost/src/bifrost_admin.rs @@ -316,6 +316,7 @@ impl<'a> BifrostAdmin<'a> { .metadata_writer .metadata_store_client() .get_or_insert(BIFROST_CONFIG_KEY.clone(), || { + debug!("Attempting to initialize logs metadata in metadata store"); Logs::from_configuration(&Configuration::pinned()) }) }) diff --git a/crates/bifrost/src/service.rs b/crates/bifrost/src/service.rs index ec6718aa7..03059c774 100644 --- a/crates/bifrost/src/service.rs +++ b/crates/bifrost/src/service.rs @@ -11,7 +11,6 @@ use std::collections::HashMap; use std::sync::Arc; -use anyhow::Context; use enum_map::EnumMap; use tracing::{debug, error, trace}; @@ -81,11 +80,10 @@ impl BifrostService { /// /// This requires to run within a task_center context. pub async fn start(self) -> anyhow::Result<()> { - // Perform an initial metadata sync. - self.inner - .sync_metadata() - .await - .context("Initial bifrost metadata sync has failed!")?; + // Make sure we have v1 metadata written to metadata store with the default + // configuration. If metadata is already initialized, this will make sure we have the + // latest version set in metadata manager. + self.bifrost.admin().init_metadata().await?; // initialize all enabled providers. if self.factories.is_empty() {