Skip to content

Commit

Permalink
Merge pull request #4085 from systeminit/zack/install-module-gets-sch…
Browse files Browse the repository at this point in the history
…ema-id

feat(dal,sdf,module-index-server): use schema id when importing
  • Loading branch information
zacharyhamm authored Jul 5, 2024
2 parents 5ee6840 + 12a0d97 commit dcac307
Show file tree
Hide file tree
Showing 21 changed files with 400 additions and 122 deletions.
2 changes: 1 addition & 1 deletion app/web/.env
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ VITE_BACKEND_HOSTS=["/localhost/g","/si.keeb.dev/g","/app.systeminit.com/g","/to
# VITE_SI_CYPRESS_MULTIPLIER=1 # How many times to run each test in cypress, only changes modelling tests


# TODO: point this at public/deployed module index api
VITE_MODULE_INDEX_API_URL=https://module-index.systeminit.com
#VITE_MODULE_INDEX_API_URL=http://localhost:5157

# vars only used for local dev, but we keep them here anyway since they dont hurt anything
DEV_HOST=127.0.0.1 # set this to 0.0.0.0 to serve vite to external clients
Expand Down
38 changes: 38 additions & 0 deletions lib/dal/src/layer_db_types/content_types.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use si_events::ulid::Ulid;
use si_events::{CasValue, ContentHash};
use strum::EnumDiscriminants;
use thiserror::Error;
Expand Down Expand Up @@ -317,6 +318,16 @@ pub struct InputSocketContentV1 {
#[derive(Debug, Clone, EnumDiscriminants, Serialize, Deserialize, PartialEq)]
pub enum ModuleContent {
V1(ModuleContentV1),
V2(ModuleContentV2),
}

impl ModuleContent {
pub fn inner(&self) -> ModuleContentV2 {
match self {
ModuleContent::V1(inner) => inner.to_owned().into(),
ModuleContent::V2(inner) => inner.to_owned(),
}
}
}

#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
Expand All @@ -330,6 +341,33 @@ pub struct ModuleContentV1 {
pub created_at: DateTime<Utc>,
}

#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
pub struct ModuleContentV2 {
pub timestamp: Timestamp,
pub name: String,
pub root_hash: String,
pub version: String,
pub description: String,
pub created_by_email: String,
pub created_at: DateTime<Utc>,
pub schema_id: Option<Ulid>,
}

impl From<ModuleContentV1> for ModuleContentV2 {
fn from(value: ModuleContentV1) -> Self {
Self {
timestamp: value.timestamp,
name: value.name,
root_hash: value.root_hash,
version: value.version,
description: value.description,
created_by_email: value.created_by_email,
created_at: value.created_at,
schema_id: None,
}
}
}

#[derive(Debug, Clone, EnumDiscriminants, Serialize, Deserialize, PartialEq)]
pub enum OutputSocketContent {
V1(OutputSocketContentV1),
Expand Down
77 changes: 61 additions & 16 deletions lib/dal/src/module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@ use std::collections::HashMap;
use std::sync::Arc;

use serde::{Deserialize, Serialize};
use si_events::ulid::Ulid;
use si_events::ContentHash;
use thiserror::Error;
use tokio::sync::TryLockError;
use ulid::Ulid;

use si_layer_cache::LayerDbError;
use telemetry::prelude::*;

use crate::layer_db_types::{ModuleContent, ModuleContentV1};
use crate::layer_db_types::{ModuleContent, ModuleContentV2};
use crate::workspace_snapshot::content_address::{ContentAddress, ContentAddressDiscriminants};
use crate::workspace_snapshot::edge_weight::{
EdgeWeight, EdgeWeightError, EdgeWeightKind, EdgeWeightKindDiscriminants,
Expand Down Expand Up @@ -64,10 +64,11 @@ pub struct Module {
description: String,
created_by_email: String,
created_at: DateTime<Utc>,
schema_id: Option<Ulid>,
}

impl Module {
pub fn assemble(id: ModuleId, inner: ModuleContentV1) -> Self {
pub fn assemble(id: ModuleId, inner: ModuleContentV2) -> Self {
Self {
id,
timestamp: inner.timestamp,
Expand All @@ -77,6 +78,7 @@ impl Module {
description: inner.description,
created_by_email: inner.created_by_email,
created_at: inner.created_at,
schema_id: inner.schema_id,
}
}

Expand Down Expand Up @@ -104,6 +106,16 @@ impl Module {
self.created_at
}

/// This is the "module" schema id. It's a unique id that all variants of a
/// single schema get in the module index database. If this is the first
/// time installing the asset, the schema will get this, but this is not
/// guaranteed to be the id of the schema in workspaces that have assets
/// installed before this feature was added!
pub fn schema_id(&self) -> Option<Ulid> {
self.schema_id
}

#[allow(clippy::too_many_arguments)]
pub async fn new(
ctx: &DalContext,
name: impl Into<String>,
Expand All @@ -112,22 +124,24 @@ impl Module {
description: impl Into<String>,
created_by_email: impl Into<String>,
created_at: impl Into<DateTime<Utc>>,
schema_id: Option<Ulid>,
) -> ModuleResult<Self> {
let content = ModuleContentV1 {
let content = ModuleContentV2 {
timestamp: Timestamp::now(),
name: name.into(),
root_hash: root_hash.into(),
version: version.into(),
description: description.into(),
created_by_email: created_by_email.into(),
created_at: created_at.into(),
schema_id,
};

let (hash, _) = ctx
.layer_db()
.cas()
.write(
Arc::new(ModuleContent::V1(content.clone()).into()),
Arc::new(ModuleContent::V2(content.clone()).into()),
None,
ctx.events_tenancy(),
ctx.events_actor(),
Expand Down Expand Up @@ -171,15 +185,19 @@ impl Module {
.ok_or(WorkspaceSnapshotError::MissingContentFromStore(id.into()))?;

// Add any extra migrations here!
let ModuleContent::V1(inner) = content;
let inner = match content {
ModuleContent::V1(v1_inner) => v1_inner.into(),
ModuleContent::V2(inner) => inner,
};

Ok(Self::assemble(id, inner))
}

pub async fn find_by_root_hash(
ctx: &DalContext,
root_hash: impl AsRef<str>,
) -> ModuleResult<Option<Self>> {
pub async fn find<P>(ctx: &DalContext, predicate: P) -> ModuleResult<Option<Self>>
where
P: FnMut(&Module) -> bool,
{
let mut predicate = predicate;
let workspace_snapshot = ctx.workspace_snapshot()?;
let module_node_indices = {
let module_category_index_id = workspace_snapshot
Expand All @@ -200,14 +218,28 @@ impl Module {
.get_content_node_weight_of_kind(ContentAddressDiscriminants::Module)?;

let module: Module = Self::get_by_id(ctx, module_node_weight.id().into()).await?;
if module.root_hash == root_hash.as_ref() {
if predicate(&module) {
return Ok(Some(module));
}
}

Ok(None)
}

pub async fn find_by_root_hash(
ctx: &DalContext,
root_hash: impl AsRef<str>,
) -> ModuleResult<Option<Self>> {
Self::find(ctx, |module| module.root_hash() == root_hash.as_ref()).await
}

pub async fn find_for_module_schema_id(
ctx: &DalContext,
module_schema_id: Ulid,
) -> ModuleResult<Option<Self>> {
Self::find(ctx, |module| module.schema_id() == Some(module_schema_id)).await
}

pub async fn find_for_schema_id(
ctx: &DalContext,
schema_id: SchemaId,
Expand Down Expand Up @@ -284,6 +316,20 @@ impl Module {
Ok(all_schemas)
}

pub async fn find_matching_module(&self, ctx: &DalContext) -> ModuleResult<Option<Self>> {
let mut maybe_mod = None;

if let Some(module_schema_id) = self.schema_id() {
maybe_mod = Self::find_for_module_schema_id(ctx, module_schema_id).await?;
}

if maybe_mod.is_none() {
maybe_mod = Self::find_by_root_hash(ctx, self.root_hash()).await?;
}

Ok(maybe_mod)
}

pub async fn list_associated_schema_variants(
&self,
ctx: &DalContext,
Expand Down Expand Up @@ -341,11 +387,10 @@ impl Module {

for node_weight in node_weights {
match content_map.get(&node_weight.content_hash()) {
Some(module_content) => {
let ModuleContent::V1(inner) = module_content;

modules.push(Self::assemble(node_weight.id().into(), inner.to_owned()))
}
Some(module_content) => modules.push(Self::assemble(
node_weight.id().into(),
module_content.inner(),
)),
None => Err(WorkspaceSnapshotError::MissingContentFromStore(
node_weight.id(),
))?,
Expand Down
67 changes: 41 additions & 26 deletions lib/dal/src/pkg/import.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use chrono::NaiveDateTime;
use si_events::ulid::Ulid;
use si_pkg::{
SchemaVariantSpecPropRoot, SiPkg, SiPkgActionFunc, SiPkgAttrFuncInputView, SiPkgAuthFunc,
SiPkgComponent, SiPkgEdge, SiPkgError, SiPkgFunc, SiPkgFuncArgument, SiPkgFuncData, SiPkgKind,
Expand Down Expand Up @@ -65,6 +66,11 @@ pub struct ImportOptions {
/// Locked schema variants can't be edited directly. Setting this to `true` will create
/// editable components.
pub create_unlocked: bool,
/// The "schema id" for this asset, provided by the module index API
pub schema_id: Option<Ulid>,
/// A list of "past hashes" for this module, used to find the existing
/// schema if a schema_id is not provided
pub past_module_hashes: Option<Vec<String>>,
}

const SPECIAL_CASE_FUNCS: [&str; 2] = ["si:resourcePayloadToValue", "si:normalizeToArray"];
Expand All @@ -77,7 +83,7 @@ async fn import_change_set(
schemas: &[SiPkgSchema<'_>],
_components: &[SiPkgComponent<'_>],
_edges: &[SiPkgEdge<'_>],
installed_pkg: Option<Module>,
installed_module: Option<Module>,
thing_map: &mut ThingMap,
options: &ImportOptions,
) -> PkgResult<(
Expand All @@ -100,7 +106,7 @@ async fn import_change_set(
} else if let Some(func) = import_func(
ctx,
func_spec,
installed_pkg.clone(),
installed_module.clone(),
thing_map,
options.create_unlocked,
)
Expand All @@ -118,10 +124,8 @@ async fn import_change_set(
.as_ref()
.map(|skip_funcs| skip_funcs.get(&unique_id))
{
if let Some(installed_pkg) = installed_pkg.clone() {
installed_pkg
.create_association(ctx, func.id.into())
.await?;
if let Some(module) = installed_module.clone() {
module.create_association(ctx, func.id.into()).await?;
}

// We're not going to import this func but we need it in the map for lookups later
Expand All @@ -135,7 +139,7 @@ async fn import_change_set(
import_func(
ctx,
func_spec,
installed_pkg.clone(),
installed_module.clone(),
thing_map,
options.is_builtin,
)
Expand All @@ -145,7 +149,7 @@ async fn import_change_set(
if let Some(func) = func {
thing_map.insert(unique_id.to_owned(), Thing::Func(func.to_owned()));

if let Some(module) = installed_pkg.clone() {
if let Some(module) = installed_module.clone() {
module.create_association(ctx, func.id.into()).await?;
}

Expand Down Expand Up @@ -190,7 +194,7 @@ async fn import_change_set(
let (_, schema_variant_ids) = import_schema(
ctx,
schema_spec,
installed_pkg.clone(),
installed_module.clone(),
thing_map,
options.create_unlocked,
)
Expand Down Expand Up @@ -245,6 +249,7 @@ pub async fn import_pkg_from_pkg(
metadata.description(),
metadata.created_by(),
metadata.created_at(),
options.schema_id,
)
.await?,
)
Expand Down Expand Up @@ -423,14 +428,21 @@ async fn import_func_arguments(
Ok(())
}

async fn create_schema(ctx: &DalContext, schema_spec_data: &SiPkgSchemaData) -> PkgResult<Schema> {
let schema = Schema::new(ctx, schema_spec_data.name())
.await?
.modify(ctx, |schema| {
schema.ui_hidden = schema_spec_data.ui_hidden();
Ok(())
})
.await?;
async fn create_schema(
ctx: &DalContext,
maybe_existing_schema_id: Option<Ulid>,
schema_spec_data: &SiPkgSchemaData,
) -> PkgResult<Schema> {
let schema = match maybe_existing_schema_id {
Some(id) => Schema::new_with_id(ctx, id.into(), schema_spec_data.name()).await?,
None => Schema::new(ctx, schema_spec_data.name()).await?,
}
.modify(ctx, |schema| {
schema.ui_hidden = schema_spec_data.ui_hidden();
Ok(())
})
.await?;

Ok(schema)
}

Expand All @@ -443,16 +455,19 @@ async fn import_schema(
) -> PkgResult<(Option<SchemaId>, Vec<SchemaVariantId>)> {
let schema_and_category = {
let mut existing_schema: Option<Schema> = None;
if installed_module.is_some() {
let associated_schemas = Schema::list(ctx).await?;
let mut maybe_matching_schema: Vec<Schema> = associated_schemas
.into_iter()
.filter(|s| s.name.clone() == schema_spec.name())
.collect();
if let Some(matching_schema) = maybe_matching_schema.pop() {
existing_schema = Some(matching_schema);
let mut existing_schema_id = None;

if let Some(installed_module) = installed_module.as_ref() {
existing_schema_id = installed_module.schema_id();
if let Some(matching_module) = installed_module.find_matching_module(ctx).await? {
existing_schema = matching_module
.list_associated_schemas(ctx)
.await?
.into_iter()
.next();
}
}

let data = schema_spec
.data()
.ok_or(PkgError::DataNotFound("schema".into()))?;
Expand All @@ -462,7 +477,7 @@ async fn import_schema(
let category = data.category.clone();

let schema = match existing_schema {
None => create_schema(ctx, data).await?,
None => create_schema(ctx, existing_schema_id, data).await?,
Some(installed_schema_record) => installed_schema_record,
};

Expand Down
Loading

0 comments on commit dcac307

Please sign in to comment.