From 12a0d97512ba74e511d96a75d7cc2943401ed28d Mon Sep 17 00:00:00 2001 From: Zachary Hamm Date: Wed, 3 Jul 2024 17:22:04 -0500 Subject: [PATCH] feat(dal,sdf,module-index-server): use schema id when importing Installing an asset that is a variant of an existing asset will use the module "schema id" (along with the hashes, if the schema id is not available) to try and install the asset as a variant of the existing schema. --- app/web/.env | 2 +- lib/dal/src/layer_db_types/content_types.rs | 38 +++++++++ lib/dal/src/module.rs | 77 +++++++++++++++---- lib/dal/src/pkg/import.rs | 67 +++++++++------- lib/dal/src/schema.rs | 15 +++- lib/dal/src/schema/variant/authoring.rs | 9 ++- lib/dal/tests/integration_test/pkg/mod.rs | 15 +++- lib/module-index-client/src/client.rs | 40 ++++++++-- lib/module-index-client/src/lib.rs | 7 +- lib/module-index-client/src/types.rs | 19 +++++ .../src/migrations/U0008__add_indexes.sql | 2 + .../src/models/si_module.rs | 48 ++++++++++-- .../src/routes/list_builtins_route.rs | 16 ++-- .../src/routes/list_modules_route.rs | 14 +++- .../src/routes/promote_builtin_route.rs | 26 +++++-- .../src/routes/reject_module_route.rs | 16 ++-- .../src/routes/upsert_module_route.rs | 74 ++++++++++++------ .../src/routes/upsert_workspace_route.rs | 2 +- lib/sdf-server/src/server/server.rs | 8 +- .../server/service/module/export_module.rs | 14 ++-- .../server/service/module/install_module.rs | 13 +++- 21 files changed, 400 insertions(+), 122 deletions(-) create mode 100644 lib/module-index-server/src/migrations/U0008__add_indexes.sql diff --git a/app/web/.env b/app/web/.env index f6a554a306..55b821db32 100644 --- a/app/web/.env +++ b/app/web/.env @@ -22,8 +22,8 @@ VITE_BACKEND_HOSTS=["/localhost/g","/si.keeb.dev/g","/app.systeminit.com/g","/to # VITE_SI_CYPRESS_MULTIPLIER=1 # How many times to run each test in cypress, only changes modelling tests -# TODO: point this at public/deployed module index api VITE_MODULE_INDEX_API_URL=https://module-index.systeminit.com +#VITE_MODULE_INDEX_API_URL=http://localhost:5157 # vars only used for local dev, but we keep them here anyway since they dont hurt anything DEV_HOST=127.0.0.1 # set this to 0.0.0.0 to serve vite to external clients diff --git a/lib/dal/src/layer_db_types/content_types.rs b/lib/dal/src/layer_db_types/content_types.rs index 509b1733d1..83bc087bb5 100644 --- a/lib/dal/src/layer_db_types/content_types.rs +++ b/lib/dal/src/layer_db_types/content_types.rs @@ -1,5 +1,6 @@ use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; +use si_events::ulid::Ulid; use si_events::{CasValue, ContentHash}; use strum::EnumDiscriminants; use thiserror::Error; @@ -317,6 +318,16 @@ pub struct InputSocketContentV1 { #[derive(Debug, Clone, EnumDiscriminants, Serialize, Deserialize, PartialEq)] pub enum ModuleContent { V1(ModuleContentV1), + V2(ModuleContentV2), +} + +impl ModuleContent { + pub fn inner(&self) -> ModuleContentV2 { + match self { + ModuleContent::V1(inner) => inner.to_owned().into(), + ModuleContent::V2(inner) => inner.to_owned(), + } + } } #[derive(Debug, Clone, Deserialize, Serialize, PartialEq)] @@ -330,6 +341,33 @@ pub struct ModuleContentV1 { pub created_at: DateTime, } +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)] +pub struct ModuleContentV2 { + pub timestamp: Timestamp, + pub name: String, + pub root_hash: String, + pub version: String, + pub description: String, + pub created_by_email: String, + pub created_at: DateTime, + pub schema_id: Option, +} + +impl From for ModuleContentV2 { + fn from(value: ModuleContentV1) -> Self { + Self { + timestamp: value.timestamp, + name: value.name, + root_hash: value.root_hash, + version: value.version, + description: value.description, + created_by_email: value.created_by_email, + created_at: value.created_at, + schema_id: None, + } + } +} + #[derive(Debug, Clone, EnumDiscriminants, Serialize, Deserialize, PartialEq)] pub enum OutputSocketContent { V1(OutputSocketContentV1), diff --git a/lib/dal/src/module.rs b/lib/dal/src/module.rs index fdbc911302..a1f9ee0246 100644 --- a/lib/dal/src/module.rs +++ b/lib/dal/src/module.rs @@ -3,15 +3,15 @@ use std::collections::HashMap; use std::sync::Arc; use serde::{Deserialize, Serialize}; +use si_events::ulid::Ulid; use si_events::ContentHash; use thiserror::Error; use tokio::sync::TryLockError; -use ulid::Ulid; use si_layer_cache::LayerDbError; use telemetry::prelude::*; -use crate::layer_db_types::{ModuleContent, ModuleContentV1}; +use crate::layer_db_types::{ModuleContent, ModuleContentV2}; use crate::workspace_snapshot::content_address::{ContentAddress, ContentAddressDiscriminants}; use crate::workspace_snapshot::edge_weight::{ EdgeWeight, EdgeWeightError, EdgeWeightKind, EdgeWeightKindDiscriminants, @@ -64,10 +64,11 @@ pub struct Module { description: String, created_by_email: String, created_at: DateTime, + schema_id: Option, } impl Module { - pub fn assemble(id: ModuleId, inner: ModuleContentV1) -> Self { + pub fn assemble(id: ModuleId, inner: ModuleContentV2) -> Self { Self { id, timestamp: inner.timestamp, @@ -77,6 +78,7 @@ impl Module { description: inner.description, created_by_email: inner.created_by_email, created_at: inner.created_at, + schema_id: inner.schema_id, } } @@ -104,6 +106,16 @@ impl Module { self.created_at } + /// This is the "module" schema id. It's a unique id that all variants of a + /// single schema get in the module index database. If this is the first + /// time installing the asset, the schema will get this, but this is not + /// guaranteed to be the id of the schema in workspaces that have assets + /// installed before this feature was added! + pub fn schema_id(&self) -> Option { + self.schema_id + } + + #[allow(clippy::too_many_arguments)] pub async fn new( ctx: &DalContext, name: impl Into, @@ -112,8 +124,9 @@ impl Module { description: impl Into, created_by_email: impl Into, created_at: impl Into>, + schema_id: Option, ) -> ModuleResult { - let content = ModuleContentV1 { + let content = ModuleContentV2 { timestamp: Timestamp::now(), name: name.into(), root_hash: root_hash.into(), @@ -121,13 +134,14 @@ impl Module { description: description.into(), created_by_email: created_by_email.into(), created_at: created_at.into(), + schema_id, }; let (hash, _) = ctx .layer_db() .cas() .write( - Arc::new(ModuleContent::V1(content.clone()).into()), + Arc::new(ModuleContent::V2(content.clone()).into()), None, ctx.events_tenancy(), ctx.events_actor(), @@ -171,15 +185,19 @@ impl Module { .ok_or(WorkspaceSnapshotError::MissingContentFromStore(id.into()))?; // Add any extra migrations here! - let ModuleContent::V1(inner) = content; + let inner = match content { + ModuleContent::V1(v1_inner) => v1_inner.into(), + ModuleContent::V2(inner) => inner, + }; Ok(Self::assemble(id, inner)) } - pub async fn find_by_root_hash( - ctx: &DalContext, - root_hash: impl AsRef, - ) -> ModuleResult> { + pub async fn find

(ctx: &DalContext, predicate: P) -> ModuleResult> + where + P: FnMut(&Module) -> bool, + { + let mut predicate = predicate; let workspace_snapshot = ctx.workspace_snapshot()?; let module_node_indices = { let module_category_index_id = workspace_snapshot @@ -200,7 +218,7 @@ impl Module { .get_content_node_weight_of_kind(ContentAddressDiscriminants::Module)?; let module: Module = Self::get_by_id(ctx, module_node_weight.id().into()).await?; - if module.root_hash == root_hash.as_ref() { + if predicate(&module) { return Ok(Some(module)); } } @@ -208,6 +226,20 @@ impl Module { Ok(None) } + pub async fn find_by_root_hash( + ctx: &DalContext, + root_hash: impl AsRef, + ) -> ModuleResult> { + Self::find(ctx, |module| module.root_hash() == root_hash.as_ref()).await + } + + pub async fn find_for_module_schema_id( + ctx: &DalContext, + module_schema_id: Ulid, + ) -> ModuleResult> { + Self::find(ctx, |module| module.schema_id() == Some(module_schema_id)).await + } + pub async fn find_for_schema_id( ctx: &DalContext, schema_id: SchemaId, @@ -284,6 +316,20 @@ impl Module { Ok(all_schemas) } + pub async fn find_matching_module(&self, ctx: &DalContext) -> ModuleResult> { + let mut maybe_mod = None; + + if let Some(module_schema_id) = self.schema_id() { + maybe_mod = Self::find_for_module_schema_id(ctx, module_schema_id).await?; + } + + if maybe_mod.is_none() { + maybe_mod = Self::find_by_root_hash(ctx, self.root_hash()).await?; + } + + Ok(maybe_mod) + } + pub async fn list_associated_schema_variants( &self, ctx: &DalContext, @@ -341,11 +387,10 @@ impl Module { for node_weight in node_weights { match content_map.get(&node_weight.content_hash()) { - Some(module_content) => { - let ModuleContent::V1(inner) = module_content; - - modules.push(Self::assemble(node_weight.id().into(), inner.to_owned())) - } + Some(module_content) => modules.push(Self::assemble( + node_weight.id().into(), + module_content.inner(), + )), None => Err(WorkspaceSnapshotError::MissingContentFromStore( node_weight.id(), ))?, diff --git a/lib/dal/src/pkg/import.rs b/lib/dal/src/pkg/import.rs index 226e74c26d..171e1f8182 100644 --- a/lib/dal/src/pkg/import.rs +++ b/lib/dal/src/pkg/import.rs @@ -1,4 +1,5 @@ use chrono::NaiveDateTime; +use si_events::ulid::Ulid; use si_pkg::{ SchemaVariantSpecPropRoot, SiPkg, SiPkgActionFunc, SiPkgAttrFuncInputView, SiPkgAuthFunc, SiPkgComponent, SiPkgEdge, SiPkgError, SiPkgFunc, SiPkgFuncArgument, SiPkgFuncData, SiPkgKind, @@ -65,6 +66,11 @@ pub struct ImportOptions { /// Locked schema variants can't be edited directly. Setting this to `true` will create /// editable components. pub create_unlocked: bool, + /// The "schema id" for this asset, provided by the module index API + pub schema_id: Option, + /// A list of "past hashes" for this module, used to find the existing + /// schema if a schema_id is not provided + pub past_module_hashes: Option>, } const SPECIAL_CASE_FUNCS: [&str; 2] = ["si:resourcePayloadToValue", "si:normalizeToArray"]; @@ -77,7 +83,7 @@ async fn import_change_set( schemas: &[SiPkgSchema<'_>], _components: &[SiPkgComponent<'_>], _edges: &[SiPkgEdge<'_>], - installed_pkg: Option, + installed_module: Option, thing_map: &mut ThingMap, options: &ImportOptions, ) -> PkgResult<( @@ -100,7 +106,7 @@ async fn import_change_set( } else if let Some(func) = import_func( ctx, func_spec, - installed_pkg.clone(), + installed_module.clone(), thing_map, options.create_unlocked, ) @@ -118,10 +124,8 @@ async fn import_change_set( .as_ref() .map(|skip_funcs| skip_funcs.get(&unique_id)) { - if let Some(installed_pkg) = installed_pkg.clone() { - installed_pkg - .create_association(ctx, func.id.into()) - .await?; + if let Some(module) = installed_module.clone() { + module.create_association(ctx, func.id.into()).await?; } // We're not going to import this func but we need it in the map for lookups later @@ -135,7 +139,7 @@ async fn import_change_set( import_func( ctx, func_spec, - installed_pkg.clone(), + installed_module.clone(), thing_map, options.is_builtin, ) @@ -145,7 +149,7 @@ async fn import_change_set( if let Some(func) = func { thing_map.insert(unique_id.to_owned(), Thing::Func(func.to_owned())); - if let Some(module) = installed_pkg.clone() { + if let Some(module) = installed_module.clone() { module.create_association(ctx, func.id.into()).await?; } @@ -190,7 +194,7 @@ async fn import_change_set( let (_, schema_variant_ids) = import_schema( ctx, schema_spec, - installed_pkg.clone(), + installed_module.clone(), thing_map, options.create_unlocked, ) @@ -245,6 +249,7 @@ pub async fn import_pkg_from_pkg( metadata.description(), metadata.created_by(), metadata.created_at(), + options.schema_id, ) .await?, ) @@ -423,14 +428,21 @@ async fn import_func_arguments( Ok(()) } -async fn create_schema(ctx: &DalContext, schema_spec_data: &SiPkgSchemaData) -> PkgResult { - let schema = Schema::new(ctx, schema_spec_data.name()) - .await? - .modify(ctx, |schema| { - schema.ui_hidden = schema_spec_data.ui_hidden(); - Ok(()) - }) - .await?; +async fn create_schema( + ctx: &DalContext, + maybe_existing_schema_id: Option, + schema_spec_data: &SiPkgSchemaData, +) -> PkgResult { + let schema = match maybe_existing_schema_id { + Some(id) => Schema::new_with_id(ctx, id.into(), schema_spec_data.name()).await?, + None => Schema::new(ctx, schema_spec_data.name()).await?, + } + .modify(ctx, |schema| { + schema.ui_hidden = schema_spec_data.ui_hidden(); + Ok(()) + }) + .await?; + Ok(schema) } @@ -443,16 +455,19 @@ async fn import_schema( ) -> PkgResult<(Option, Vec)> { let schema_and_category = { let mut existing_schema: Option = None; - if installed_module.is_some() { - let associated_schemas = Schema::list(ctx).await?; - let mut maybe_matching_schema: Vec = associated_schemas - .into_iter() - .filter(|s| s.name.clone() == schema_spec.name()) - .collect(); - if let Some(matching_schema) = maybe_matching_schema.pop() { - existing_schema = Some(matching_schema); + let mut existing_schema_id = None; + + if let Some(installed_module) = installed_module.as_ref() { + existing_schema_id = installed_module.schema_id(); + if let Some(matching_module) = installed_module.find_matching_module(ctx).await? { + existing_schema = matching_module + .list_associated_schemas(ctx) + .await? + .into_iter() + .next(); } } + let data = schema_spec .data() .ok_or(PkgError::DataNotFound("schema".into()))?; @@ -462,7 +477,7 @@ async fn import_schema( let category = data.category.clone(); let schema = match existing_schema { - None => create_schema(ctx, data).await?, + None => create_schema(ctx, existing_schema_id, data).await?, Some(installed_schema_record) => installed_schema_record, }; diff --git a/lib/dal/src/schema.rs b/lib/dal/src/schema.rs index c79e5477ce..57f06d13c7 100644 --- a/lib/dal/src/schema.rs +++ b/lib/dal/src/schema.rs @@ -126,6 +126,15 @@ impl Schema { ); pub async fn new(ctx: &DalContext, name: impl Into) -> SchemaResult { + let id = ctx.change_set()?.generate_ulid()?; + Self::new_with_id(ctx, id.into(), name).await + } + + pub async fn new_with_id( + ctx: &DalContext, + id: SchemaId, + name: impl Into, + ) -> SchemaResult { let content = SchemaContentV1 { timestamp: Timestamp::now(), name: name.into(), @@ -145,8 +154,8 @@ impl Schema { .await?; let change_set = ctx.change_set()?; - let id = change_set.generate_ulid()?; - let node_weight = NodeWeight::new_content(change_set, id, ContentAddress::Schema(hash))?; + let node_weight = + NodeWeight::new_content(change_set, id.into(), ContentAddress::Schema(hash))?; let workspace_snapshot = ctx.workspace_snapshot()?; workspace_snapshot.add_node(node_weight).await?; @@ -162,7 +171,7 @@ impl Schema { ) .await?; - Ok(Self::assemble(id.into(), content)) + Ok(Self::assemble(id, content)) } pub async fn get_default_schema_variant_id( diff --git a/lib/dal/src/schema/variant/authoring.rs b/lib/dal/src/schema/variant/authoring.rs index 2da78a2190..d712d788eb 100644 --- a/lib/dal/src/schema/variant/authoring.rs +++ b/lib/dal/src/schema/variant/authoring.rs @@ -6,6 +6,7 @@ use base64::Engine; use chrono::Utc; use convert_case::{Case, Casing}; use pkg::import::import_schema_variant; +use si_events::ulid::Ulid; use si_layer_cache::LayerDbError; use si_pkg::{ FuncSpec, FuncSpecBackendKind, FuncSpecBackendResponseType, FuncSpecData, MergeSkip, PkgSpec, @@ -168,6 +169,7 @@ impl VariantAuthoringClient { asset_func.clone(), )])), create_unlocked: true, + schema_id: Some(Ulid::new()), ..Default::default() }), ) @@ -181,7 +183,11 @@ impl VariantAuthoringClient { Ok(SchemaVariant::get_by_id(ctx, schema_variant_id).await?) } - #[instrument(name = "variant.authoring.clone_variant", level = "info", skip_all)] + #[instrument( + name = "variant.authoring.new_schema_with_cloned_variant", + level = "info", + skip_all + )] pub async fn new_schema_with_cloned_variant( ctx: &DalContext, schema_variant_id: SchemaVariantId, @@ -229,6 +235,7 @@ impl VariantAuthoringClient { cloned_func.clone(), )])), create_unlocked: true, + schema_id: Some(Ulid::new()), ..Default::default() }), ) diff --git a/lib/dal/tests/integration_test/pkg/mod.rs b/lib/dal/tests/integration_test/pkg/mod.rs index 36481db74b..39c777413a 100644 --- a/lib/dal/tests/integration_test/pkg/mod.rs +++ b/lib/dal/tests/integration_test/pkg/mod.rs @@ -1,5 +1,5 @@ use dal::pkg::export::PkgExporter; -use dal::pkg::import_pkg_from_pkg; +use dal::pkg::{import_pkg_from_pkg, ImportOptions}; use dal::schema::variant::authoring::VariantAuthoringClient; use dal::{DalContext, FuncBackendKind, FuncBackendResponseType}; use dal_test::test; @@ -87,9 +87,16 @@ async fn import_pkg_from_pkg_set_latest_default(ctx: &mut DalContext) { let pkg = SiPkg::load_from_spec(pkg_spec).expect("should load from spec"); // import and get add variants - let (_, mut variants, _) = import_pkg_from_pkg(ctx, &pkg, None) - .await - .expect("should import"); + let (_, mut variants, _) = import_pkg_from_pkg( + ctx, + &pkg, + Some(ImportOptions { + schema_id: Some(schema.id().into()), + ..Default::default() + }), + ) + .await + .expect("should import"); assert_eq!(variants.len(), 1); let default_schema_variant = schema diff --git a/lib/module-index-client/src/client.rs b/lib/module-index-client/src/client.rs index 4ea352dac1..d5af18a5e4 100644 --- a/lib/module-index-client/src/client.rs +++ b/lib/module-index-client/src/client.rs @@ -5,8 +5,8 @@ use url::Url; use crate::types::{BuiltinsDetailsResponse, ModulePromotedResponse, ModuleRejectionResponse}; use crate::{ - IndexClientError, IndexClientResult, ModuleDetailsResponse, MODULE_BASED_ON_HASH_NAME, - MODULE_BUNDLE_FIELD_NAME, + IndexClientError, IndexClientResult, ModuleDetailsResponse, MODULE_BASED_ON_HASH_FIELD_NAME, + MODULE_BUNDLE_FIELD_NAME, MODULE_SCHEMA_ID_FIELD_NAME, }; #[derive(Debug, Clone)] @@ -84,6 +84,7 @@ impl IndexClient { module_name: &str, module_version: &str, module_based_on_hash: Option, + module_schema_id: Option, module_bytes: Vec, ) -> IndexClientResult { let module_upload_part = reqwest::multipart::Part::bytes(module_bytes) @@ -94,11 +95,18 @@ impl IndexClient { if let Some(module_based_on_hash) = module_based_on_hash { multipart_form = multipart_form.part( - MODULE_BASED_ON_HASH_NAME, + MODULE_BASED_ON_HASH_FIELD_NAME, reqwest::multipart::Part::text(module_based_on_hash), ); } + if let Some(schema_id) = module_schema_id { + multipart_form = multipart_form.part( + MODULE_SCHEMA_ID_FIELD_NAME, + reqwest::multipart::Part::text(schema_id), + ); + } + let upload_url = self.base_url.join("modules")?; let upload_response = reqwest::Client::new() .post(upload_url) @@ -159,12 +167,32 @@ impl IndexClient { Ok(builtins) } + pub async fn module_details( + &self, + module_id: Ulid, + ) -> IndexClientResult { + let details_url = self + .base_url + .join("modules/")? + .join(&format!("{}", module_id))?; + + Ok(reqwest::Client::new() + .get(details_url) + .bearer_auth(&self.auth_token) + .send() + .await? + .error_for_status()? + .json() + .await?) + } + pub async fn get_builtin(&self, module_id: Ulid) -> IndexClientResult> { let download_url = self .base_url .join("modules/")? .join(&format!("{}/", module_id.to_string()))? .join("download_builtin")?; + let mut response = reqwest::Client::new().get(download_url).send().await?; if response.status() == StatusCode::NOT_FOUND @@ -176,10 +204,8 @@ impl IndexClient { .join(&format!("{}/", module_id.to_string()))? .join("download_builtin")?; - let prod_response = reqwest::Client::new().get(url).send().await?; - - response = prod_response - } + response = reqwest::Client::new().get(url).send().await?; + }; let bytes = response.error_for_status()?.bytes().await?; diff --git a/lib/module-index-client/src/lib.rs b/lib/module-index-client/src/lib.rs index 5f853523bc..9747758ba5 100644 --- a/lib/module-index-client/src/lib.rs +++ b/lib/module-index-client/src/lib.rs @@ -2,8 +2,11 @@ pub mod client; pub mod types; pub use client::IndexClient; -pub use types::{FuncMetadata, IndexClientError, IndexClientResult, ModuleDetailsResponse}; +pub use types::{ + ExtraMetadata, FuncMetadata, IndexClientError, IndexClientResult, ModuleDetailsResponse, +}; pub const DEFAULT_URL: &str = "http://localhost:5157"; pub const MODULE_BUNDLE_FIELD_NAME: &str = "module_bundle"; -pub const MODULE_BASED_ON_HASH_NAME: &str = "based_on_hash"; +pub const MODULE_BASED_ON_HASH_FIELD_NAME: &str = "based_on_hash"; +pub const MODULE_SCHEMA_ID_FIELD_NAME: &str = "schema_id"; diff --git a/lib/module-index-client/src/types.rs b/lib/module-index-client/src/types.rs index b06f989197..e37d1d91e1 100644 --- a/lib/module-index-client/src/types.rs +++ b/lib/module-index-client/src/types.rs @@ -1,6 +1,7 @@ use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use thiserror::Error; +use ulid::Ulid; #[remain::sorted] #[derive(Debug, Error)] @@ -47,6 +48,16 @@ pub struct ModuleDetailsResponse { pub latest_hash: String, pub latest_hash_created_at: DateTime, pub created_at: DateTime, + pub schema_id: Option, + pub past_hashes: Option>, +} + +impl ModuleDetailsResponse { + pub fn schema_id(&self) -> Option { + self.schema_id + .as_deref() + .and_then(|schema_id| Ulid::from_string(schema_id).ok()) + } } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -56,3 +67,11 @@ pub struct FuncMetadata { pub display_name: Option, pub description: Option, } + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ExtraMetadata { + pub version: String, + pub schemas: Vec, + pub funcs: Vec, +} diff --git a/lib/module-index-server/src/migrations/U0008__add_indexes.sql b/lib/module-index-server/src/migrations/U0008__add_indexes.sql new file mode 100644 index 0000000000..671d6d696f --- /dev/null +++ b/lib/module-index-server/src/migrations/U0008__add_indexes.sql @@ -0,0 +1,2 @@ +CREATE INDEX ON modules (latest_hash); +CREATE INDEX ON modules (schema_id); \ No newline at end of file diff --git a/lib/module-index-server/src/models/si_module.rs b/lib/module-index-server/src/models/si_module.rs index 7f19645239..b88a7a5bfb 100644 --- a/lib/module-index-server/src/models/si_module.rs +++ b/lib/module-index-server/src/models/si_module.rs @@ -1,3 +1,4 @@ +use module_index_client::ModuleDetailsResponse; use sea_orm::{entity::prelude::*, sea_query, TryGetError}; use serde::{Deserialize, Serialize}; use std::str::FromStr; @@ -103,14 +104,49 @@ pub struct Model { } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] -pub enum Relation {} +pub enum Relation { + #[sea_orm( + belongs_to = "Entity", + from = "Column::SchemaId", + to = "Column::SchemaId" + on_condition = r#"Column::SchemaId.is_not_null()"# + )] + SchemaIdReference, +} -impl ActiveModelBehavior for ActiveModel {} +pub struct SchemaIdReferenceLink; -impl TryInto for Model { - type Error = crate::routes::upsert_module_route::UpsertModuleError; +impl Linked for SchemaIdReferenceLink { + type FromEntity = Entity; + type ToEntity = Entity; + + fn link(&self) -> Vec { + vec![Relation::SchemaIdReference.def()] + } +} + +impl ActiveModelBehavior for ActiveModel {} - fn try_into(self) -> Result { - Ok(serde_json::from_value(serde_json::to_value(self)?)?) +pub fn make_module_details_response( + module: Model, + linked_modules: Vec, +) -> ModuleDetailsResponse { + ModuleDetailsResponse { + id: module.id.to_string(), + name: module.name, + description: module.description, + owner_user_id: module.owner_user_id.to_string(), + owner_display_name: module.owner_display_name, + metadata: module.metadata, + latest_hash: module.latest_hash, + latest_hash_created_at: module.latest_hash_created_at.into(), + created_at: module.created_at.into(), + schema_id: module.schema_id.map(|schema_id| schema_id.to_string()), + past_hashes: Some( + linked_modules + .into_iter() + .map(|module| module.latest_hash) + .collect(), + ), } } diff --git a/lib/module-index-server/src/routes/list_builtins_route.rs b/lib/module-index-server/src/routes/list_builtins_route.rs index e8906bf5b8..1ee8f0fa99 100644 --- a/lib/module-index-server/src/routes/list_builtins_route.rs +++ b/lib/module-index-server/src/routes/list_builtins_route.rs @@ -4,11 +4,12 @@ use axum::{ Json, }; use hyper::StatusCode; +use module_index_client::ModuleDetailsResponse; use sea_orm::{ColumnTrait, DbErr, EntityTrait, QueryFilter}; use serde::{Deserialize, Serialize}; use thiserror::Error; -use crate::models::si_module::ModuleKind; +use crate::models::si_module::{make_module_details_response, ModuleKind, SchemaIdReferenceLink}; use crate::{app_state::AppState, extract::DbConnection, models::si_module, whoami::WhoamiError}; #[remain::sorted] @@ -43,7 +44,7 @@ pub struct ListBuiltinsRequest { #[derive(Deserialize, Serialize, Debug)] #[serde(rename_all = "camelCase")] pub struct ListBuiltinsResponse { - modules: Vec, + modules: Vec, } pub async fn list_builtins_route( @@ -54,14 +55,19 @@ pub async fn list_builtins_route( let query = si_module::Entity::find(); // filters - let query = query.filter(si_module::Column::IsBuiltinAt.is_not_null()); - let query = query + .find_with_linked(SchemaIdReferenceLink) + .filter(si_module::Column::IsBuiltinAt.is_not_null()) .filter(si_module::Column::RejectedAt.is_null()) .filter(si_module::Column::Kind.eq(ModuleKind::Module)); // This should give us a list of builtin modules that are not rejected - let modules: Vec = query.all(&txn).await?; + let modules = query + .all(&txn) + .await? + .into_iter() + .map(|(module, linked_modules)| make_module_details_response(module, linked_modules)) + .collect(); Ok(Json(ListBuiltinsResponse { modules })) } diff --git a/lib/module-index-server/src/routes/list_modules_route.rs b/lib/module-index-server/src/routes/list_modules_route.rs index 34e8542daa..6f4fd3dbea 100644 --- a/lib/module-index-server/src/routes/list_modules_route.rs +++ b/lib/module-index-server/src/routes/list_modules_route.rs @@ -4,6 +4,7 @@ use axum::{ Json, }; use hyper::StatusCode; +use module_index_client::ModuleDetailsResponse; use sea_orm::{ColumnTrait, DbErr, EntityTrait, QueryFilter, QueryOrder}; use serde::{Deserialize, Serialize}; use thiserror::Error; @@ -11,7 +12,7 @@ use thiserror::Error; use crate::{ app_state::AppState, extract::{Authorization, DbConnection}, - models::si_module, + models::si_module::{self, make_module_details_response, SchemaIdReferenceLink}, whoami::{is_systeminit_auth_token, WhoamiError}, }; @@ -48,7 +49,7 @@ pub struct ListModulesRequest { #[derive(Deserialize, Serialize, Debug)] #[serde(rename_all = "camelCase")] pub struct ListModulesResponse { - modules: Vec, + modules: Vec, } pub async fn list_module_route( @@ -73,7 +74,6 @@ pub async fn list_module_route( .filter(si_module::Column::Kind.eq(kind.to_db_kind())); let query = if !su { let user_id = user_claim.user_pk.to_string(); - dbg!(&user_id); query.filter(si_module::Column::OwnerUserId.eq(user_id)) } else { query @@ -92,7 +92,13 @@ pub async fn list_module_route( .order_by_desc(si_module::Column::OwnerUserId) .order_by_desc(si_module::Column::CreatedAt); - let modules: Vec = query.all(&txn).await?; + let modules = query + .find_with_linked(SchemaIdReferenceLink) + .all(&txn) + .await? + .into_iter() + .map(|(module, linked_modules)| make_module_details_response(module, linked_modules)) + .collect(); Ok(Json(ListModulesResponse { modules })) } diff --git a/lib/module-index-server/src/routes/promote_builtin_route.rs b/lib/module-index-server/src/routes/promote_builtin_route.rs index 4c1860ce5d..524e7e611a 100644 --- a/lib/module-index-server/src/routes/promote_builtin_route.rs +++ b/lib/module-index-server/src/routes/promote_builtin_route.rs @@ -1,3 +1,4 @@ +use axum::extract::multipart::MultipartError; use axum::extract::{Multipart, State}; use axum::http::StatusCode; use axum::response::{IntoResponse, Response}; @@ -5,11 +6,12 @@ use axum::{extract::Path, Json}; use chrono::{DateTime, FixedOffset, Offset, Utc}; use module_index_client::ModuleDetailsResponse; use sea_orm::ActiveValue::Set; -use sea_orm::{ActiveModelTrait, DbErr, EntityTrait}; +use sea_orm::{ActiveModelTrait, DbErr, EntityTrait, QuerySelect}; use telemetry::prelude::info; use thiserror::Error; use crate::app_state::AppState; +use crate::models::si_module::make_module_details_response; use crate::routes::upsert_module_route::UpsertModuleError; use crate::whoami::{is_systeminit_auth_token, WhoamiError}; use crate::{ @@ -22,6 +24,8 @@ use crate::{ pub enum PromoteModuleError { #[error("db error: {0}")] DbErr(#[from] DbErr), + #[error("multipart decode error: {0}")] + Multipart(#[from] MultipartError), #[error(r#"Module "{0}" not found"#)] NotFound(ModuleId), #[error("error rejecting module: {0}")] @@ -60,18 +64,22 @@ pub async fn promote_builtin_route( } info!("Promote to builtin"); - let field = match multipart.next_field().await.unwrap() { + let field = match multipart.next_field().await? { Some(f) => f, None => return Err(PromoteModuleError::UserSupplied()), }; info!("Found multipart field"); - let data = dbg!(field.text().await.unwrap()); + let data = field.text().await?; info!("Got part data"); - let module = match si_module::Entity::find_by_id(module_id).one(&txn).await? { - Some(module) => module, - _ => return Err(PromoteModuleError::NotFound(module_id)), - }; + let (module, linked_modules) = si_module::Entity::find_by_id(module_id) + .limit(1) + .find_with_linked(si_module::SchemaIdReferenceLink) + .all(&txn) + .await? + .first() + .cloned() + .ok_or(PromoteModuleError::NotFound(module_id))?; let active_module = si_module::ActiveModel { id: Set(module.id), @@ -98,5 +106,7 @@ pub async fn promote_builtin_route( txn.commit().await?; - Ok(Json(Some(updated_module.try_into()?))) + let response = make_module_details_response(updated_module, linked_modules); + + Ok(Json(Some(response))) } diff --git a/lib/module-index-server/src/routes/reject_module_route.rs b/lib/module-index-server/src/routes/reject_module_route.rs index 814762d026..7a1f29bb24 100644 --- a/lib/module-index-server/src/routes/reject_module_route.rs +++ b/lib/module-index-server/src/routes/reject_module_route.rs @@ -10,6 +10,7 @@ use telemetry::prelude::info; use thiserror::Error; use crate::app_state::AppState; +use crate::models::si_module::make_module_details_response; use crate::routes::upsert_module_route::UpsertModuleError; use crate::whoami::{is_systeminit_auth_token, WhoamiError}; use crate::{ @@ -68,10 +69,13 @@ pub async fn reject_module( let data = dbg!(field.text().await.unwrap()); info!("Got part data"); - let module = match si_module::Entity::find_by_id(module_id).one(&txn).await? { - Some(module) => module, - _ => return Err(RejectModuleError::NotFound(module_id)), - }; + let (module, linked_modules) = si_module::Entity::find_by_id(module_id) + .find_with_linked(si_module::SchemaIdReferenceLink) + .all(&txn) + .await? + .first() + .cloned() + .ok_or(RejectModuleError::NotFound(module_id))?; let active_module = si_module::ActiveModel { id: Set(module.id), @@ -98,5 +102,7 @@ pub async fn reject_module( txn.commit().await?; - Ok(Json(Some(updated_module.try_into()?))) + let response = make_module_details_response(updated_module, linked_modules); + + Ok(Json(Some(response))) } diff --git a/lib/module-index-server/src/routes/upsert_module_route.rs b/lib/module-index-server/src/routes/upsert_module_route.rs index 51c53ff613..1897f55321 100644 --- a/lib/module-index-server/src/routes/upsert_module_route.rs +++ b/lib/module-index-server/src/routes/upsert_module_route.rs @@ -6,7 +6,8 @@ use axum::{ use chrono::{DateTime, FixedOffset, Offset, Utc}; use hyper::StatusCode; use module_index_client::{ - FuncMetadata, ModuleDetailsResponse, MODULE_BASED_ON_HASH_NAME, MODULE_BUNDLE_FIELD_NAME, + ExtraMetadata, FuncMetadata, ModuleDetailsResponse, MODULE_BASED_ON_HASH_FIELD_NAME, + MODULE_BUNDLE_FIELD_NAME, MODULE_SCHEMA_ID_FIELD_NAME, }; use s3::error::S3Error; use sea_orm::{ActiveModelTrait, ColumnTrait, DbErr, EntityTrait, QueryFilter, QuerySelect, Set}; @@ -18,7 +19,7 @@ use ulid::Ulid; use crate::{ extract::{Authorization, DbConnection, ExtractedS3Bucket}, - models::si_module::{self, ModuleKind, SchemaId}, + models::si_module::{self, make_module_details_response, ModuleId, ModuleKind, SchemaId}, }; #[derive(Deserialize, Serialize, Debug)] @@ -36,12 +37,16 @@ pub enum UpsertModuleError { IoError(#[from] std::io::Error), #[error("multipart decode error: {0}")] Multipart(#[from] MultipartError), + #[error("module with {0} could not be found after insert!")] + NotFoundAfterInsert(ModuleId), #[error("s3 error: {0}")] S3Error(#[from] S3Error), #[error("JSON serialization/deserialization error: {0}")] SerdeJson(#[from] serde_json::Error), #[error("module parsing error: {0}")] SiPkgError(#[from] SiPkgError), + #[error("Ulid decode error: {0}")] + UlidDecode(#[from] ulid::DecodeError), #[error("upload is required")] UploadRequiredError, } @@ -70,14 +75,18 @@ pub async fn upsert_module_route( ) -> Result, UpsertModuleError> { let mut module_data = None; let mut module_based_on_hash = None; + let mut module_schema_id = None; while let Some(field) = multipart.next_field().await? { match field.name() { Some(MODULE_BUNDLE_FIELD_NAME) => { module_data = Some(field.bytes().await?); } - Some(MODULE_BASED_ON_HASH_NAME) => { + Some(MODULE_BASED_ON_HASH_FIELD_NAME) => { module_based_on_hash = Some(field.text().await?); } + Some(MODULE_SCHEMA_ID_FIELD_NAME) => { + module_schema_id = Some(field.text().await?); + } _ => debug!("Unknown multipart form field on module upload, skipping..."), } } @@ -89,8 +98,8 @@ pub async fn upsert_module_route( let module_metadata = loaded_module.metadata()?; info!( - "upserting module: {:?} based on hash: {:?}", - &module_metadata, &module_based_on_hash + "upserting module: {:?} based on hash: {:?} with provided schema id of {:?}", + &module_metadata, &module_based_on_hash, &module_schema_id ); let version = module_metadata.version().to_owned(); @@ -99,18 +108,37 @@ pub async fn upsert_module_route( SiPkgKind::Module => ModuleKind::Module, }; + let new_schema_id = Some(SchemaId(Ulid::new())); let schema_id = match module_kind { ModuleKind::WorkspaceBackup => None, - ModuleKind::Module => match module_based_on_hash { - None => Some(SchemaId(Ulid::new())), - Some(based_on_hash) => si_module::Entity::find() - .filter(si_module::Column::Kind.eq(ModuleKind::Module)) - .filter(si_module::Column::LatestHash.eq(based_on_hash)) - .limit(1) - .all(&txn) - .await? - .first() - .and_then(|module| module.schema_id), + ModuleKind::Module => match module_schema_id { + Some(schema_id_string) => Some(SchemaId(Ulid::from_string(&schema_id_string)?)), + None => match module_based_on_hash { + None => new_schema_id, + Some(based_on_hash) => { + match si_module::Entity::find() + .filter(si_module::Column::Kind.eq(ModuleKind::Module)) + .filter(si_module::Column::LatestHash.eq(based_on_hash)) + .limit(1) + .all(&txn) + .await? + .first() + { + None => new_schema_id, + Some(module) => match module.schema_id { + some @ Some(_) => some, + None => { + // If we found matching past hash but it has no schema id, backfill it to match the one we're generating + let mut active: si_module::ActiveModel = module.to_owned().into(); + active.schema_id = Set(new_schema_id); + active.update(&txn).await?; + + new_schema_id + } + }, + } + } + }, }, }; @@ -161,15 +189,15 @@ pub async fn upsert_module_route( .await?; let new_module: si_module::Model = new_module.insert(&txn).await?; + let (module, linked_modules) = si_module::Entity::find_by_id(new_module.id) + .find_with_linked(si_module::SchemaIdReferenceLink) + .all(&txn) + .await? + .first() + .cloned() + .ok_or(UpsertModuleError::NotFoundAfterInsert(new_module.id))?; txn.commit().await?; - Ok(Json(new_module.try_into()?)) -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct ExtraMetadata { - pub version: String, - pub schemas: Vec, - pub funcs: Vec, + Ok(Json(make_module_details_response(module, linked_modules))) } diff --git a/lib/module-index-server/src/routes/upsert_workspace_route.rs b/lib/module-index-server/src/routes/upsert_workspace_route.rs index 85c517043b..f3b17a9c19 100644 --- a/lib/module-index-server/src/routes/upsert_workspace_route.rs +++ b/lib/module-index-server/src/routes/upsert_workspace_route.rs @@ -14,11 +14,11 @@ use telemetry::prelude::*; use thiserror::Error; use crate::models::si_module::ModuleKind; -use crate::routes::upsert_module_route::ExtraMetadata; use crate::{ extract::{Authorization, DbConnection, ExtractedS3Bucket}, models::si_module, }; +use module_index_client::ExtraMetadata; #[derive(Deserialize, Serialize, Debug)] #[serde(rename_all = "camelCase")] diff --git a/lib/sdf-server/src/server/server.rs b/lib/sdf-server/src/server/server.rs index 0f711e00db..b5b539d9e6 100644 --- a/lib/sdf-server/src/server/server.rs +++ b/lib/sdf-server/src/server/server.rs @@ -407,14 +407,14 @@ async fn install_builtins( join_set.spawn(async move { ( module.name.to_owned(), - fetch_builtin(&module, &client).await, + (module.to_owned(), fetch_builtin(&module, &client).await), ) }); } let mut count: usize = 0; while let Some(res) = join_set.join_next().await { - let (pkg_name, res) = res?; + let (pkg_name, (module, res)) = res?; match res { Ok(pkg) => { let instant = Instant::now(); @@ -424,6 +424,8 @@ async fn install_builtins( &pkg, Some(dal::pkg::ImportOptions { is_builtin: true, + schema_id: module.schema_id().map(Into::into), + past_module_hashes: module.past_hashes, ..Default::default() }), ) @@ -461,7 +463,7 @@ async fn fetch_builtin( module_index_client: &IndexClient, ) -> Result { let module = module_index_client - .get_builtin(Ulid::from_string(module.id.as_str()).unwrap_or_default()) + .get_builtin(Ulid::from_string(&module.id).unwrap_or_default()) .await?; Ok(SiPkg::load_from_bytes(module)?) diff --git a/lib/sdf-server/src/server/service/module/export_module.rs b/lib/sdf-server/src/server/service/module/export_module.rs index aa49d87452..5e27e7bbcc 100644 --- a/lib/sdf-server/src/server/service/module/export_module.rs +++ b/lib/sdf-server/src/server/service/module/export_module.rs @@ -78,15 +78,16 @@ pub async fn export_module( schema_ids.push(schema.id()); } - let based_on_hash = if schema_ids.len() == 1 { + let (based_on_hash, module_schema_id) = if schema_ids.len() == 1 { match schema_ids.first().copied() { - None => None, - Some(schema_id) => Module::find_for_schema_id(&ctx, schema_id) - .await? - .map(|module| module.root_hash().to_string()), + None => (None, None), + Some(schema_id) => match Module::find_for_schema_id(&ctx, schema_id).await? { + Some(module) => (Some(module.root_hash().to_string()), module.schema_id()), + None => (None, None), + }, } } else { - None + (None, None) }; let mut exporter = PkgExporter::new_module_exporter( @@ -106,6 +107,7 @@ pub async fn export_module( request.name.trim(), request.version.trim(), based_on_hash, + module_schema_id.map(|id| id.to_string()), module_payload, ) .await?; diff --git a/lib/sdf-server/src/server/service/module/install_module.rs b/lib/sdf-server/src/server/service/module/install_module.rs index 53f63a92d9..3653fc02a8 100644 --- a/lib/sdf-server/src/server/service/module/install_module.rs +++ b/lib/sdf-server/src/server/service/module/install_module.rs @@ -1,6 +1,7 @@ use axum::extract::OriginalUri; use axum::http::Uri; use axum::{response::IntoResponse, Json}; +use dal::pkg::ImportOptions; use serde::{Deserialize, Serialize}; use ulid::Ulid; @@ -96,11 +97,21 @@ async fn install_module_inner( }; let module_index_client = IndexClient::new(module_index_url.try_into()?, &raw_access_token); + let module_details = module_index_client.module_details(request.id).await?; let pkg_data = module_index_client.download_module(request.id).await?; let pkg = SiPkg::load_from_bytes(pkg_data)?; let metadata = pkg.metadata()?; - let (_, svs, _import_skips) = import_pkg_from_pkg(ctx, &pkg, None).await?; + let (_, svs, _) = import_pkg_from_pkg( + ctx, + &pkg, + Some(ImportOptions { + schema_id: module_details.schema_id().map(Into::into), + past_module_hashes: module_details.past_hashes, + ..Default::default() + }), + ) + .await?; track( &posthog_client,