Skip to content

Commit

Permalink
Merge pull request #5021 from systeminit/chore-cleanup-migrations-code
Browse files Browse the repository at this point in the history
chore(sdf): Cleanup Migration of builtin workspace
  • Loading branch information
stack72 authored Nov 25, 2024
2 parents f027046 + 5b757b0 commit e2a0fd6
Showing 1 changed file with 4 additions and 168 deletions.
172 changes: 4 additions & 168 deletions lib/sdf-server/src/migrations.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,16 @@
use std::{future::IntoFuture as _, time::Duration};
use std::future::IntoFuture as _;

use audit_logs::database::{
AuditDatabaseContext, AuditDatabaseContextError, AuditDatabaseMigrationError,
};
use dal::{
builtins, cached_module::CachedModuleError, pkg::PkgError, slow_rt::SlowRuntimeError,
workspace_snapshot::migrator::SnapshotGraphMigrator, DalContext, ServicesContext, Workspace,
cached_module::CachedModuleError, slow_rt::SlowRuntimeError,
workspace_snapshot::migrator::SnapshotGraphMigrator, ServicesContext,
};
use module_index_client::{BuiltinsDetailsResponse, ModuleDetailsResponse, ModuleIndexClient};
use si_pkg::SiPkg;
use telemetry::prelude::*;
use thiserror::Error;
use tokio::{
task::{JoinError, JoinSet},
time::{self, Instant},
};
use tokio::task::JoinError;
use tokio_util::{sync::CancellationToken, task::TaskTracker};
use ulid::Ulid;

use crate::{init, Config};

Expand All @@ -31,8 +25,6 @@ pub enum MigratorError {
Join(#[from] JoinError),
#[error("error while migrating audit database: {0}")]
MigrateAuditDatabase(#[source] AuditDatabaseMigrationError),
#[error("error while migrating builtins from module index: {0}")]
MigrateBuiltins(#[source] Box<dyn std::error::Error + 'static + Sync + Send>),
#[error("error while migrating dal database: {0}")]
MigrateDalDatabase(#[source] dal::ModelError),
#[error("error while migrating layer db database: {0}")]
Expand All @@ -54,13 +46,6 @@ impl MigratorError {
{
Self::MigrateSnapshots(Box::new(err))
}

fn migrate_builtins<E>(err: E) -> Self
where
E: std::error::Error + 'static + Sync + Send,
{
Self::MigrateBuiltins(Box::new(err))
}
}

type MigratorResult<T> = std::result::Result<T, MigratorError>;
Expand Down Expand Up @@ -136,10 +121,6 @@ impl Migrator {
.await
.map_err(|err| span.record_err(err))?;

self.migrate_builtins_from_module_index()
.await
.map_err(|err| span.record_err(err))?;

span.record_ok();
Ok(())
}
Expand Down Expand Up @@ -189,149 +170,4 @@ impl Migrator {
.map_err(MigratorError::migrate_snapshots)?;
Ok(())
}

#[instrument(
name = "sdf.migrator.migrate_builtins_from_module_index",
level = "info",
skip_all
)]
async fn migrate_builtins_from_module_index(&self) -> MigratorResult<()> {
let mut interval = time::interval(Duration::from_secs(5));
let instant = Instant::now();

let mut dal_context = self.services_context.clone().into_builder(true);
dal_context.set_no_dependent_values();
let mut ctx = dal_context
.build_default()
.await
.map_err(MigratorError::migrate_builtins)?;
info!("setup builtin workspace");
Workspace::setup_builtin(&mut ctx)
.await
.map_err(MigratorError::migrate_builtins)?;

info!("migrating intrinsic functions");
builtins::func::migrate_intrinsics(&ctx)
.await
.map_err(MigratorError::migrate_builtins)?;

let module_index_url = self
.services_context
.module_index_url()
.ok_or(MigratorError::ModuleIndexNotSet)?;

let module_index_client = ModuleIndexClient::unauthenticated_client(
module_index_url
.try_into()
.map_err(MigratorError::migrate_builtins)?,
);
let module_list = module_index_client
.list_builtins()
.await
.map_err(MigratorError::migrate_builtins)?;
info!("builtins install starting");
let install_builtins = install_builtins(ctx, module_list, module_index_client);
tokio::pin!(install_builtins);
loop {
tokio::select! {
_ = interval.tick() => {
info!(elapsed = instant.elapsed().as_secs_f32(), "migrating in progress...");
}
result = &mut install_builtins => match result {
Ok(_) => {
info!(elapsed = instant.elapsed().as_secs_f32(), "migrating completed");
break;
}
Err(err) => return Err(err),
}
}
}

Ok(())
}
}

async fn install_builtins(
ctx: DalContext,
module_list: BuiltinsDetailsResponse,
module_index_client: ModuleIndexClient,
) -> MigratorResult<()> {
let dal = &ctx;
let client = &module_index_client.clone();
let modules: Vec<ModuleDetailsResponse> = module_list.modules;

let total = modules.len();

let mut join_set = JoinSet::new();
for module in modules {
let module = module.clone();
let client = client.clone();
join_set.spawn(async move {
(
module.name.to_owned(),
(module.to_owned(), fetch_builtin(&module, &client).await),
)
});
}

let mut count: usize = 0;
while let Some(res) = join_set.join_next().await {
let (pkg_name, (module, res)) = res.map_err(MigratorError::migrate_builtins)?;
match res {
Ok(pkg) => {
let instant = Instant::now();

match dal::pkg::import_pkg_from_pkg(
&ctx,
&pkg,
Some(dal::pkg::ImportOptions {
is_builtin: true,
schema_id: module.schema_id().map(Into::into),
past_module_hashes: module.past_hashes,
..Default::default()
}),
)
.await
{
Ok(_) => {
count += 1;
let elapsed = instant.elapsed().as_secs_f32();
debug!(
"pkg {pkg_name} install finished successfully and took {elapsed:.2} seconds ({count} of {total} installed)",
);
}
Err(PkgError::PackageAlreadyInstalled(hash)) => {
count += 1;
debug!(%hash, "skipping pkg {pkg_name}: already installed ({count} of {total} installed)");
}
Err(err) => error!(?err, "pkg {pkg_name} install failed"),
}
}
Err(err) => {
error!(?err, "pkg {pkg_name} install failed with server error");
}
}
}
dal.commit()
.await
.map_err(MigratorError::migrate_builtins)?;

let mut ctx = ctx.clone();
ctx.update_snapshot_to_visibility()
.await
.map_err(MigratorError::migrate_builtins)?;

Ok(())
}

async fn fetch_builtin(
module: &ModuleDetailsResponse,
module_index_client: &ModuleIndexClient,
) -> MigratorResult<SiPkg> {
let module = module_index_client
.get_builtin(Ulid::from_string(&module.id).unwrap_or_default())
.await
.map_err(MigratorError::migrate_builtins)?;

SiPkg::load_from_bytes(&module).map_err(MigratorError::migrate_builtins)
}

0 comments on commit e2a0fd6

Please sign in to comment.