From 5b757b09ade540d57de0e9e0c06a0c63c8533871 Mon Sep 17 00:00:00 2001 From: stack72 Date: Mon, 25 Nov 2024 16:30:17 +0000 Subject: [PATCH] chore(sdf): Cleanup Migration of builtin workspace --- lib/sdf-server/src/migrations.rs | 172 +------------------------------ 1 file changed, 4 insertions(+), 168 deletions(-) diff --git a/lib/sdf-server/src/migrations.rs b/lib/sdf-server/src/migrations.rs index 58e7317210..a999991568 100644 --- a/lib/sdf-server/src/migrations.rs +++ b/lib/sdf-server/src/migrations.rs @@ -1,22 +1,16 @@ -use std::{future::IntoFuture as _, time::Duration}; +use std::future::IntoFuture as _; use audit_logs::database::{ AuditDatabaseContext, AuditDatabaseContextError, AuditDatabaseMigrationError, }; use dal::{ - builtins, cached_module::CachedModuleError, pkg::PkgError, slow_rt::SlowRuntimeError, - workspace_snapshot::migrator::SnapshotGraphMigrator, DalContext, ServicesContext, Workspace, + cached_module::CachedModuleError, slow_rt::SlowRuntimeError, + workspace_snapshot::migrator::SnapshotGraphMigrator, ServicesContext, }; -use module_index_client::{BuiltinsDetailsResponse, ModuleDetailsResponse, ModuleIndexClient}; -use si_pkg::SiPkg; use telemetry::prelude::*; use thiserror::Error; -use tokio::{ - task::{JoinError, JoinSet}, - time::{self, Instant}, -}; +use tokio::task::JoinError; use tokio_util::{sync::CancellationToken, task::TaskTracker}; -use ulid::Ulid; use crate::{init, Config}; @@ -31,8 +25,6 @@ pub enum MigratorError { Join(#[from] JoinError), #[error("error while migrating audit database: {0}")] MigrateAuditDatabase(#[source] AuditDatabaseMigrationError), - #[error("error while migrating builtins from module index: {0}")] - MigrateBuiltins(#[source] Box), #[error("error while migrating dal database: {0}")] MigrateDalDatabase(#[source] dal::ModelError), #[error("error while migrating layer db database: {0}")] @@ -54,13 +46,6 @@ impl MigratorError { { Self::MigrateSnapshots(Box::new(err)) } - - fn migrate_builtins(err: E) -> Self - where - E: std::error::Error + 'static + Sync + Send, - { - Self::MigrateBuiltins(Box::new(err)) - } } type MigratorResult = std::result::Result; @@ -136,10 +121,6 @@ impl Migrator { .await .map_err(|err| span.record_err(err))?; - self.migrate_builtins_from_module_index() - .await - .map_err(|err| span.record_err(err))?; - span.record_ok(); Ok(()) } @@ -189,149 +170,4 @@ impl Migrator { .map_err(MigratorError::migrate_snapshots)?; Ok(()) } - - #[instrument( - name = "sdf.migrator.migrate_builtins_from_module_index", - level = "info", - skip_all - )] - async fn migrate_builtins_from_module_index(&self) -> MigratorResult<()> { - let mut interval = time::interval(Duration::from_secs(5)); - let instant = Instant::now(); - - let mut dal_context = self.services_context.clone().into_builder(true); - dal_context.set_no_dependent_values(); - let mut ctx = dal_context - .build_default() - .await - .map_err(MigratorError::migrate_builtins)?; - info!("setup builtin workspace"); - Workspace::setup_builtin(&mut ctx) - .await - .map_err(MigratorError::migrate_builtins)?; - - info!("migrating intrinsic functions"); - builtins::func::migrate_intrinsics(&ctx) - .await - .map_err(MigratorError::migrate_builtins)?; - - let module_index_url = self - .services_context - .module_index_url() - .ok_or(MigratorError::ModuleIndexNotSet)?; - - let module_index_client = ModuleIndexClient::unauthenticated_client( - module_index_url - .try_into() - .map_err(MigratorError::migrate_builtins)?, - ); - let module_list = module_index_client - .list_builtins() - .await - .map_err(MigratorError::migrate_builtins)?; - info!("builtins install starting"); - let install_builtins = install_builtins(ctx, module_list, module_index_client); - tokio::pin!(install_builtins); - loop { - tokio::select! { - _ = interval.tick() => { - info!(elapsed = instant.elapsed().as_secs_f32(), "migrating in progress..."); - } - result = &mut install_builtins => match result { - Ok(_) => { - info!(elapsed = instant.elapsed().as_secs_f32(), "migrating completed"); - break; - } - Err(err) => return Err(err), - } - } - } - - Ok(()) - } -} - -async fn install_builtins( - ctx: DalContext, - module_list: BuiltinsDetailsResponse, - module_index_client: ModuleIndexClient, -) -> MigratorResult<()> { - let dal = &ctx; - let client = &module_index_client.clone(); - let modules: Vec = module_list.modules; - - let total = modules.len(); - - let mut join_set = JoinSet::new(); - for module in modules { - let module = module.clone(); - let client = client.clone(); - join_set.spawn(async move { - ( - module.name.to_owned(), - (module.to_owned(), fetch_builtin(&module, &client).await), - ) - }); - } - - let mut count: usize = 0; - while let Some(res) = join_set.join_next().await { - let (pkg_name, (module, res)) = res.map_err(MigratorError::migrate_builtins)?; - match res { - Ok(pkg) => { - let instant = Instant::now(); - - match dal::pkg::import_pkg_from_pkg( - &ctx, - &pkg, - Some(dal::pkg::ImportOptions { - is_builtin: true, - schema_id: module.schema_id().map(Into::into), - past_module_hashes: module.past_hashes, - ..Default::default() - }), - ) - .await - { - Ok(_) => { - count += 1; - let elapsed = instant.elapsed().as_secs_f32(); - debug!( - "pkg {pkg_name} install finished successfully and took {elapsed:.2} seconds ({count} of {total} installed)", - ); - } - Err(PkgError::PackageAlreadyInstalled(hash)) => { - count += 1; - debug!(%hash, "skipping pkg {pkg_name}: already installed ({count} of {total} installed)"); - } - Err(err) => error!(?err, "pkg {pkg_name} install failed"), - } - } - Err(err) => { - error!(?err, "pkg {pkg_name} install failed with server error"); - } - } - } - dal.commit() - .await - .map_err(MigratorError::migrate_builtins)?; - - let mut ctx = ctx.clone(); - ctx.update_snapshot_to_visibility() - .await - .map_err(MigratorError::migrate_builtins)?; - - Ok(()) -} - -async fn fetch_builtin( - module: &ModuleDetailsResponse, - module_index_client: &ModuleIndexClient, -) -> MigratorResult { - let module = module_index_client - .get_builtin(Ulid::from_string(&module.id).unwrap_or_default()) - .await - .map_err(MigratorError::migrate_builtins)?; - - SiPkg::load_from_bytes(&module).map_err(MigratorError::migrate_builtins) }