diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/logicalplan/datasets/DatasetCaseConverter.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/logicalplan/datasets/DatasetCaseConverter.java index 4a5134f43c6..cd326ab46eb 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/logicalplan/datasets/DatasetCaseConverter.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/logicalplan/datasets/DatasetCaseConverter.java @@ -30,58 +30,7 @@ public Dataset applyCaseOnDataset(Dataset dataset, Function stra Optional newSchemaName = dataset.datasetReference().group().map(strategy); Optional newDatabaseName = dataset.datasetReference().database().map(strategy); - List newDatasetFields = new ArrayList<>(); - for (Field field : dataset.schema().fields()) - { - Field newField = field.withName(strategy.apply(field.name())); - newDatasetFields.add(newField); - } - - List newDatasetIndices = new ArrayList<>(); - for (Index index : dataset.schema().indexes()) - { - List indexColumnNames = new ArrayList<>(); - for (String columnName : index.columns()) - { - String newColumnName = strategy.apply(columnName); - indexColumnNames.add(newColumnName); - } - Index newIndex = index.withIndexName(strategy.apply(index.indexName())).withColumns(indexColumnNames); - newDatasetIndices.add(newIndex); - } - - ColumnStoreSpecification newColumnStoreSpecification = null; - if (dataset.schema().columnStoreSpecification().isPresent()) - { - ColumnStoreSpecification columnStoreSpecification = dataset.schema().columnStoreSpecification().get(); - List newColumnStoreKeys = new ArrayList<>(); - for (Field field : columnStoreSpecification.columnStoreKeys()) - { - Field newField = field.withName(strategy.apply(field.name())); - newColumnStoreKeys.add(newField); - } - newColumnStoreSpecification = columnStoreSpecification.withColumnStoreKeys(newColumnStoreKeys); - } - - ShardSpecification newShardSpecification = null; - if (dataset.schema().shardSpecification().isPresent()) - { - ShardSpecification shardSpecification = dataset.schema().shardSpecification().get(); - List newShardKeys = new ArrayList<>(); - for (Field field : shardSpecification.shardKeys()) - { - Field newField = field.withName(strategy.apply(field.name())); - newShardKeys.add(newField); - } - newShardSpecification = shardSpecification.withShardKeys(newShardKeys); - } - - SchemaDefinition schemaDefinition = SchemaDefinition.builder() - .addAllFields(newDatasetFields) - .addAllIndexes(newDatasetIndices) - .columnStoreSpecification(newColumnStoreSpecification) - .shardSpecification(newShardSpecification) - .build(); + SchemaDefinition schemaDefinition = applyCaseOnSchemaDefinition(dataset.schema(), strategy); if (dataset instanceof DatasetDefinition) { @@ -181,4 +130,60 @@ public LockInfoDataset applyCaseOnLockInfoDataset(LockInfoDataset lockInfoDatase .tableNameField(strategy.apply(lockInfoDataset.tableNameField())) .build(); } + + public SchemaDefinition applyCaseOnSchemaDefinition(SchemaDefinition schema, Function strategy) + { + List newDatasetFields = new ArrayList<>(); + for (Field field : schema.fields()) + { + Field newField = field.withName(strategy.apply(field.name())); + newDatasetFields.add(newField); + } + + List newDatasetIndices = new ArrayList<>(); + for (Index index : schema.indexes()) + { + List indexColumnNames = new ArrayList<>(); + for (String columnName : index.columns()) + { + String newColumnName = strategy.apply(columnName); + indexColumnNames.add(newColumnName); + } + Index newIndex = index.withIndexName(strategy.apply(index.indexName())).withColumns(indexColumnNames); + newDatasetIndices.add(newIndex); + } + + ColumnStoreSpecification newColumnStoreSpecification = null; + if (schema.columnStoreSpecification().isPresent()) + { + ColumnStoreSpecification columnStoreSpecification = schema.columnStoreSpecification().get(); + List newColumnStoreKeys = new ArrayList<>(); + for (Field field : columnStoreSpecification.columnStoreKeys()) + { + Field newField = field.withName(strategy.apply(field.name())); + newColumnStoreKeys.add(newField); + } + newColumnStoreSpecification = columnStoreSpecification.withColumnStoreKeys(newColumnStoreKeys); + } + + ShardSpecification newShardSpecification = null; + if (schema.shardSpecification().isPresent()) + { + ShardSpecification shardSpecification = schema.shardSpecification().get(); + List newShardKeys = new ArrayList<>(); + for (Field field : shardSpecification.shardKeys()) + { + Field newField = field.withName(strategy.apply(field.name())); + newShardKeys.add(newField); + } + newShardSpecification = shardSpecification.withShardKeys(newShardKeys); + } + + return SchemaDefinition.builder() + .addAllFields(newDatasetFields) + .addAllIndexes(newDatasetIndices) + .columnStoreSpecification(newColumnStoreSpecification) + .shardSpecification(newShardSpecification) + .build(); + } } diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/logicalplan/datasets/DatasetReference.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/logicalplan/datasets/DatasetReference.java index 08da5e8ba15..363cf9bf296 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/logicalplan/datasets/DatasetReference.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/logicalplan/datasets/DatasetReference.java @@ -39,4 +39,9 @@ default DatasetReference datasetReference() { return this; } + + default Optional datasetAdditionalProperties() + { + return Optional.empty(); + } } diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-physical-plan/src/main/java/org/finos/legend/engine/persistence/components/executor/RelationalExecutionHelper.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-physical-plan/src/main/java/org/finos/legend/engine/persistence/components/executor/RelationalExecutionHelper.java index 3a6ae90fe35..9ea0ce65b47 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-physical-plan/src/main/java/org/finos/legend/engine/persistence/components/executor/RelationalExecutionHelper.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-physical-plan/src/main/java/org/finos/legend/engine/persistence/components/executor/RelationalExecutionHelper.java @@ -36,7 +36,7 @@ public interface RelationalExecutionHelper void validateDatasetSchema(Dataset dataset, TypeMapping datatypeMapping); - Dataset constructDatasetFromDatabase(Dataset dataset, TypeMapping mapping); + Dataset constructDatasetFromDatabase(Dataset dataset, TypeMapping mapping, boolean escape); void executeStatement(String sql); diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-physical-plan/src/main/java/org/finos/legend/engine/persistence/components/schemaevolution/SchemaEvolution.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-physical-plan/src/main/java/org/finos/legend/engine/persistence/components/schemaevolution/SchemaEvolution.java index 8ce440285a7..cd5bc2af7de 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-physical-plan/src/main/java/org/finos/legend/engine/persistence/components/schemaevolution/SchemaEvolution.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-physical-plan/src/main/java/org/finos/legend/engine/persistence/components/schemaevolution/SchemaEvolution.java @@ -103,7 +103,7 @@ public SchemaEvolution(Sink sink, IngestMode ingestMode, Set operations = new ArrayList<>(); Set modifiedFields = new HashSet<>(); @@ -116,9 +116,9 @@ public SchemaEvolutionResult buildLogicalPlanForSchemaEvolution(Dataset mainData return SchemaEvolutionResult.of(LogicalPlan.of(operations), mainDataset.withSchema(evolvedSchema)); } - private void validatePrimaryKeys(Dataset mainDataset, Dataset stagingDataset) + private void validatePrimaryKeys(Dataset mainDataset, SchemaDefinition stagingDataset) { - List stagingFilteredFields = stagingDataset.schema().fields().stream().filter(field -> !(ingestMode.accept(STAGING_TABLE_FIELDS_TO_IGNORE).contains(field.name()))).collect(Collectors.toList()); + List stagingFilteredFields = stagingDataset.fields().stream().filter(field -> !(ingestMode.accept(STAGING_TABLE_FIELDS_TO_IGNORE).contains(field.name()))).collect(Collectors.toList()); Set stagingPkNames = stagingFilteredFields.stream().filter(Field::primaryKey).map(Field::name).collect(Collectors.toSet()); List mainFilteredFields = mainDataset.schema().fields().stream().filter(field -> !(ingestMode.accept(MAIN_TABLE_FIELDS_TO_IGNORE).contains(field.name()))).collect(Collectors.toList()); Set mainPkNames = mainFilteredFields.stream().filter(Field::primaryKey).map(Field::name).collect(Collectors.toSet()); @@ -129,14 +129,11 @@ private void validatePrimaryKeys(Dataset mainDataset, Dataset stagingDataset) } //Validate all columns (allowing exceptions) in staging dataset must have a matching column in main dataset - private List stagingToMainTableColumnMatch(Dataset mainDataset, - Dataset stagingDataset, - Set fieldsToIgnore, - Set modifiedFields) + private List stagingToMainTableColumnMatch(Dataset mainDataset, SchemaDefinition stagingDataset, Set fieldsToIgnore, Set modifiedFields) { List operations = new ArrayList<>(); List mainFields = mainDataset.schema().fields(); - List stagingFields = stagingDataset.schema().fields(); + List stagingFields = stagingDataset.fields(); List filteredFields = stagingFields.stream().filter(field -> !fieldsToIgnore.contains(field.name())).collect(Collectors.toList()); for (Field stagingField : filteredFields) { @@ -145,12 +142,19 @@ private List stagingToMainTableColumnMatch(Dataset mainDataset, if (matchedMainField == null) { // Add the new column in the main table if database supports ADD_COLUMN capability and - // if user capability supports ADD_COLUMN or is empty (since empty means no overriden preference) + // if user capability supports ADD_COLUMN if (sink.capabilities().contains(Capability.ADD_COLUMN) && (schemaEvolutionCapabilitySet.contains(SchemaEvolutionCapability.ADD_COLUMN))) { - operations.add(Alter.of(mainDataset, Alter.AlterOperation.ADD, stagingField, Optional.empty())); - modifiedFields.add(stagingField); + if (stagingField.nullable()) + { + operations.add(Alter.of(mainDataset, Alter.AlterOperation.ADD, stagingField, Optional.empty())); + modifiedFields.add(stagingField); + } + else + { + throw new IncompatibleSchemaChangeException(String.format("Non-nullable field \"%s\" in staging dataset cannot be added, as it is backward-incompatible change.", stagingFieldName)); + } } else { @@ -270,11 +274,11 @@ private void alterColumnWithNullable(Field newField, Dataset mainDataset, List mainToStagingTableColumnMatch(Dataset mainDataset, Dataset stagingDataset, Set fieldsToIgnore, Set modifiedFields) + private List mainToStagingTableColumnMatch(Dataset mainDataset, SchemaDefinition stagingDataset, Set fieldsToIgnore, Set modifiedFields) { List operations = new ArrayList<>(); List mainFields = mainDataset.schema().fields(); - Set stagingFieldNames = stagingDataset.schema().fields().stream().map(Field::name).collect(Collectors.toSet()); + Set stagingFieldNames = stagingDataset.fields().stream().map(Field::name).collect(Collectors.toSet()); for (Field mainField : mainFields.stream().filter(field -> !fieldsToIgnore.contains(field.name())).collect(Collectors.toList())) { String mainFieldName = mainField.name(); diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/schemaevolution/SchemaEvolutionTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/schemaevolution/SchemaEvolutionTest.java index c0ac0883216..60ad2512995 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/schemaevolution/SchemaEvolutionTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/schemaevolution/SchemaEvolutionTest.java @@ -100,7 +100,7 @@ void testSnapshotMilestoningWithAddColumnAndUserProvidedSchemaEvolutionCapabilit schemaEvolutionCapabilitySet.add(SchemaEvolutionCapability.ADD_COLUMN); SchemaEvolution schemaEvolution = new SchemaEvolution(relationalSink, ingestMode, schemaEvolutionCapabilitySet); - SchemaEvolutionResult result = schemaEvolution.buildLogicalPlanForSchemaEvolution(mainTable, stagingTable); + SchemaEvolutionResult result = schemaEvolution.buildLogicalPlanForSchemaEvolution(mainTable, stagingTable.schema()); SqlPlan physicalPlanForSchemaEvolution = transformer.generatePhysicalPlan(result.logicalPlan()); // Use the planner utils to return the sql @@ -130,7 +130,7 @@ void testSnapshotMilestoningWithAddColumnEvolutionUpperCase() schemaEvolutionCapabilitySet.add(SchemaEvolutionCapability.ADD_COLUMN); SchemaEvolution schemaEvolution = new SchemaEvolution(relationalSink, ingestMode, schemaEvolutionCapabilitySet); - SchemaEvolutionResult result = schemaEvolution.buildLogicalPlanForSchemaEvolution(mainTable, stagingTable); + SchemaEvolutionResult result = schemaEvolution.buildLogicalPlanForSchemaEvolution(mainTable, stagingTable.schema()); SqlPlan physicalPlanForSchemaEvolution = transformer.generatePhysicalPlan(result.logicalPlan()); // Use the planner utils to return the sql @@ -159,7 +159,7 @@ void testSnapshotMilestoningWithAddColumnWithoutUserProvidedSchemaEvolutionCapab SchemaEvolution schemaEvolution = new SchemaEvolution(relationalSink, ingestMode, Collections.emptySet()); try { - SchemaEvolutionResult result = schemaEvolution.buildLogicalPlanForSchemaEvolution(mainTable, stagingTable); + SchemaEvolutionResult result = schemaEvolution.buildLogicalPlanForSchemaEvolution(mainTable, stagingTable.schema()); SqlPlan physicalPlanForSchemaEvolution = transformer.generatePhysicalPlan(result.logicalPlan()); // Use the planner utils to return the sql @@ -191,7 +191,7 @@ void testSnapshotMilestoningWithColumnLengthChangeEvolution() schemaEvolutionCapabilitySet.add(SchemaEvolutionCapability.DATA_TYPE_SIZE_CHANGE); SchemaEvolution schemaEvolution = new SchemaEvolution(relationalSink, ingestMode, schemaEvolutionCapabilitySet); - SchemaEvolutionResult result = schemaEvolution.buildLogicalPlanForSchemaEvolution(mainTable, stagingTable); + SchemaEvolutionResult result = schemaEvolution.buildLogicalPlanForSchemaEvolution(mainTable, stagingTable.schema()); RelationalTransformer transformer = new RelationalTransformer(relationalSink); SqlPlan physicalPlanForSchemaEvolution = transformer.generatePhysicalPlan(result.logicalPlan()); @@ -221,7 +221,7 @@ void testSnapshotMilestoningWithColumnLengthChangeEvolutionWithUpperCase() schemaEvolutionCapabilitySet.add(SchemaEvolutionCapability.DATA_TYPE_SIZE_CHANGE); SchemaEvolution schemaEvolution = new SchemaEvolution(relationalSink, ingestMode, schemaEvolutionCapabilitySet); - SchemaEvolutionResult result = schemaEvolution.buildLogicalPlanForSchemaEvolution(mainTable, stagingTable); + SchemaEvolutionResult result = schemaEvolution.buildLogicalPlanForSchemaEvolution(mainTable, stagingTable.schema()); SqlPlan physicalPlanForSchemaEvolution = transformer.generatePhysicalPlan(result.logicalPlan()); List sqlsForSchemaEvolution = physicalPlanForSchemaEvolution.getSqlList(); @@ -250,7 +250,7 @@ void testSnapshotMilestoningWithColumnLengthChangeEvolutionAndUserProvidedSchema try { - SchemaEvolutionResult result = schemaEvolution.buildLogicalPlanForSchemaEvolution(mainTable, stagingTable); + SchemaEvolutionResult result = schemaEvolution.buildLogicalPlanForSchemaEvolution(mainTable, stagingTable.schema()); RelationalTransformer transformer = new RelationalTransformer(relationalSink); SqlPlan physicalPlanForSchemaEvolution = transformer.generatePhysicalPlan(result.logicalPlan()); @@ -283,7 +283,7 @@ void testSnapshotMilestoningWithColumnScaleChangeEvolution() schemaEvolutionCapabilitySet.add(SchemaEvolutionCapability.DATA_TYPE_SIZE_CHANGE); SchemaEvolution schemaEvolution = new SchemaEvolution(relationalSink, ingestMode, schemaEvolutionCapabilitySet); - SchemaEvolutionResult result = schemaEvolution.buildLogicalPlanForSchemaEvolution(mainTable, stagingTable); + SchemaEvolutionResult result = schemaEvolution.buildLogicalPlanForSchemaEvolution(mainTable, stagingTable.schema()); RelationalTransformer transformer = new RelationalTransformer(relationalSink); SqlPlan physicalPlanForSchemaEvolution = transformer.generatePhysicalPlan(result.logicalPlan()); @@ -313,7 +313,7 @@ void testSnapshotMilestoningWithColumnScaleChangeEvolutionAndUserProvidedSchemaE try { - SchemaEvolutionResult result = schemaEvolution.buildLogicalPlanForSchemaEvolution(mainTable, stagingTable); + SchemaEvolutionResult result = schemaEvolution.buildLogicalPlanForSchemaEvolution(mainTable, stagingTable.schema()); RelationalTransformer transformer = new RelationalTransformer(relationalSink); SqlPlan physicalPlanForSchemaEvolution = transformer.generatePhysicalPlan(result.logicalPlan()); Assertions.fail("Exception was not thrown"); @@ -341,7 +341,7 @@ void testSnapshotMilestoningWithImplicitDataTypeEvolution() NontemporalSnapshot ingestMode = NontemporalSnapshot.builder().auditing(NoAuditing.builder().build()).build(); SchemaEvolution schemaEvolution = new SchemaEvolution(relationalSink, ingestMode, Collections.emptySet()); - SchemaEvolutionResult result = schemaEvolution.buildLogicalPlanForSchemaEvolution(mainTable, stagingTable); + SchemaEvolutionResult result = schemaEvolution.buildLogicalPlanForSchemaEvolution(mainTable, stagingTable.schema()); RelationalTransformer transformer = new RelationalTransformer(relationalSink); SqlPlan physicalPlanForSchemaEvolution = transformer.generatePhysicalPlan(result.logicalPlan()); @@ -369,7 +369,7 @@ void testSnapshotMilestoningWithImplicitDataTypeEvolutionAndAlterNullability() schemaEvolutionCapabilitySet.add(SchemaEvolutionCapability.COLUMN_NULLABILITY_CHANGE); SchemaEvolution schemaEvolution = new SchemaEvolution(relationalSink, ingestMode, schemaEvolutionCapabilitySet); - SchemaEvolutionResult result = schemaEvolution.buildLogicalPlanForSchemaEvolution(mainTable, stagingTable); + SchemaEvolutionResult result = schemaEvolution.buildLogicalPlanForSchemaEvolution(mainTable, stagingTable.schema()); RelationalTransformer transformer = new RelationalTransformer(relationalSink); SqlPlan physicalPlanForSchemaEvolution = transformer.generatePhysicalPlan(result.logicalPlan()); @@ -399,7 +399,7 @@ void testSnapshotMilestoningWithAlterNullabilityWithoutUserCapability() SchemaEvolution schemaEvolution = new SchemaEvolution(relationalSink, ingestMode, schemaEvolutionCapabilitySet); try { - SchemaEvolutionResult result = schemaEvolution.buildLogicalPlanForSchemaEvolution(mainTable, stagingTable); + SchemaEvolutionResult result = schemaEvolution.buildLogicalPlanForSchemaEvolution(mainTable, stagingTable.schema()); RelationalTransformer transformer = new RelationalTransformer(relationalSink); SqlPlan physicalPlanForSchemaEvolution = transformer.generatePhysicalPlan(result.logicalPlan()); List sqlsForSchemaEvolution = physicalPlanForSchemaEvolution.getSqlList(); @@ -429,7 +429,7 @@ void testSnapshotMilestoningWithAlterNullabilityAndUserCapability() Set schemaEvolutionCapabilitySet = new HashSet<>(); schemaEvolutionCapabilitySet.add(SchemaEvolutionCapability.COLUMN_NULLABILITY_CHANGE); SchemaEvolution schemaEvolution = new SchemaEvolution(relationalSink, ingestMode, schemaEvolutionCapabilitySet); - SchemaEvolutionResult result = schemaEvolution.buildLogicalPlanForSchemaEvolution(mainTable, stagingTable); + SchemaEvolutionResult result = schemaEvolution.buildLogicalPlanForSchemaEvolution(mainTable, stagingTable.schema()); RelationalTransformer transformer = new RelationalTransformer(relationalSink); SqlPlan physicalPlanForSchemaEvolution = transformer.generatePhysicalPlan(result.logicalPlan()); @@ -460,7 +460,7 @@ void testSnapshotMilestoningWithNonBreakingDataTypeEvolution() schemaEvolutionCapabilitySet.add(SchemaEvolutionCapability.DATA_TYPE_CONVERSION); SchemaEvolution schemaEvolution = new SchemaEvolution(relationalSink, ingestMode, schemaEvolutionCapabilitySet); - SchemaEvolutionResult result = schemaEvolution.buildLogicalPlanForSchemaEvolution(mainTable, stagingTable); + SchemaEvolutionResult result = schemaEvolution.buildLogicalPlanForSchemaEvolution(mainTable, stagingTable.schema()); RelationalTransformer transformer = new RelationalTransformer(relationalSink); SqlPlan physicalPlanForSchemaEvolution = transformer.generatePhysicalPlan(result.logicalPlan()); @@ -490,7 +490,7 @@ void testSnapshotMilestoningWithNonBreakingDataTypeEvolutionAndUserProvidedSchem try { - SchemaEvolutionResult result = schemaEvolution.buildLogicalPlanForSchemaEvolution(mainTable, stagingTable); + SchemaEvolutionResult result = schemaEvolution.buildLogicalPlanForSchemaEvolution(mainTable, stagingTable.schema()); RelationalTransformer transformer = new RelationalTransformer(relationalSink); SqlPlan physicalPlanForSchemaEvolution = transformer.generatePhysicalPlan(result.logicalPlan()); @@ -525,7 +525,7 @@ void testSnapshotMilestoningWithNonBreakingDataTypeEvolutionAndSizingChange() try { - SchemaEvolutionResult result = schemaEvolution.buildLogicalPlanForSchemaEvolution(mainTable, stagingTable); + SchemaEvolutionResult result = schemaEvolution.buildLogicalPlanForSchemaEvolution(mainTable, stagingTable.schema()); RelationalTransformer transformer = new RelationalTransformer(relationalSink); SqlPlan physicalPlanForSchemaEvolution = transformer.generatePhysicalPlan(result.logicalPlan()); @@ -562,7 +562,7 @@ void testSnapshotMilestoningWithNonBreakingDataTypeEvolutionAndSizingChangeAllow try { - SchemaEvolutionResult result = schemaEvolution.buildLogicalPlanForSchemaEvolution(mainTable, stagingTable); + SchemaEvolutionResult result = schemaEvolution.buildLogicalPlanForSchemaEvolution(mainTable, stagingTable.schema()); RelationalTransformer transformer = new RelationalTransformer(relationalSink); SqlPlan physicalPlanForSchemaEvolution = transformer.generatePhysicalPlan(result.logicalPlan()); @@ -595,7 +595,7 @@ void testSnapshotMilestoningWithBreakingDataTypeEvolution() try { - schemaEvolution.buildLogicalPlanForSchemaEvolution(mainTable, stagingTable); + schemaEvolution.buildLogicalPlanForSchemaEvolution(mainTable, stagingTable.schema()); Assertions.fail("Exception was not thrown"); } catch (IncompatibleSchemaChangeException e) @@ -622,7 +622,7 @@ void testSnapshotMilestoningWithColumnMissingInStagingTableAndUserCapability() Set schemaEvolutionCapabilitySet = new HashSet<>(); schemaEvolutionCapabilitySet.add(SchemaEvolutionCapability.COLUMN_NULLABILITY_CHANGE); SchemaEvolution schemaEvolution = new SchemaEvolution(relationalSink, ingestMode, schemaEvolutionCapabilitySet); - SchemaEvolutionResult result = schemaEvolution.buildLogicalPlanForSchemaEvolution(mainTable, stagingTable); + SchemaEvolutionResult result = schemaEvolution.buildLogicalPlanForSchemaEvolution(mainTable, stagingTable.schema()); RelationalTransformer transformer = new RelationalTransformer(relationalSink); SqlPlan physicalPlanForSchemaEvolution = transformer.generatePhysicalPlan(result.logicalPlan()); @@ -652,7 +652,7 @@ void testSnapshotMilestoningWithColumnMissingInStagingTableWithoutUserCapability SchemaEvolution schemaEvolution = new SchemaEvolution(relationalSink, ingestMode, schemaEvolutionCapabilitySet); try { - SchemaEvolutionResult result = schemaEvolution.buildLogicalPlanForSchemaEvolution(mainTable, stagingTable); + SchemaEvolutionResult result = schemaEvolution.buildLogicalPlanForSchemaEvolution(mainTable, stagingTable.schema()); RelationalTransformer transformer = new RelationalTransformer(relationalSink); SqlPlan physicalPlanForSchemaEvolution = transformer.generatePhysicalPlan(result.logicalPlan()); List sqlsForSchemaEvolution = physicalPlanForSchemaEvolution.getSqlList(); @@ -680,7 +680,7 @@ void testSnapshotMilestoningWithNullableColumnMissingInStagingTable() NontemporalSnapshot ingestMode = NontemporalSnapshot.builder().auditing(NoAuditing.builder().build()).build(); Set schemaEvolutionCapabilitySet = new HashSet<>(); SchemaEvolution schemaEvolution = new SchemaEvolution(relationalSink, ingestMode, schemaEvolutionCapabilitySet); - SchemaEvolutionResult result = schemaEvolution.buildLogicalPlanForSchemaEvolution(mainTable, stagingTable); + SchemaEvolutionResult result = schemaEvolution.buildLogicalPlanForSchemaEvolution(mainTable, stagingTable.schema()); RelationalTransformer transformer = new RelationalTransformer(relationalSink); SqlPlan physicalPlanForSchemaEvolution = transformer.generatePhysicalPlan(result.logicalPlan()); List sqlsForSchemaEvolution = physicalPlanForSchemaEvolution.getSqlList(); @@ -704,7 +704,7 @@ void testBitemporalDeltaSourceSpeciesBothFieldsSchemaEvolution() schemaEvolutionCapabilitySet.add(SchemaEvolutionCapability.ADD_COLUMN); SchemaEvolution schemaEvolution = new SchemaEvolution(relationalSink, ingestMode, schemaEvolutionCapabilitySet); - SchemaEvolutionResult result = schemaEvolution.buildLogicalPlanForSchemaEvolution(mainTable, stagingTable); + SchemaEvolutionResult result = schemaEvolution.buildLogicalPlanForSchemaEvolution(mainTable, stagingTable.schema()); SqlPlan physicalPlanForSchemaEvolution = transformer.generatePhysicalPlan(result.logicalPlan()); // Use the planner utils to return the sql @@ -728,7 +728,7 @@ void testBitemporalDeltaSourceSpeciesFromOnlyFieldsSchemaEvolution() schemaEvolutionCapabilitySet.add(SchemaEvolutionCapability.ADD_COLUMN); SchemaEvolution schemaEvolution = new SchemaEvolution(relationalSink, ingestMode, schemaEvolutionCapabilitySet); - SchemaEvolutionResult result = schemaEvolution.buildLogicalPlanForSchemaEvolution(mainTable, stagingTable); + SchemaEvolutionResult result = schemaEvolution.buildLogicalPlanForSchemaEvolution(mainTable, stagingTable.schema()); SqlPlan physicalPlanForSchemaEvolution = transformer.generatePhysicalPlan(result.logicalPlan()); // Use the planner utils to return the sql diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/main/java/org/finos/legend/engine/persistence/components/relational/bigquery/BigQuerySink.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/main/java/org/finos/legend/engine/persistence/components/relational/bigquery/BigQuerySink.java index 091a58c6552..ea092d38ce8 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/main/java/org/finos/legend/engine/persistence/components/relational/bigquery/BigQuerySink.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/main/java/org/finos/legend/engine/persistence/components/relational/bigquery/BigQuerySink.java @@ -170,7 +170,7 @@ private BigQuerySink() LOGICAL_PLAN_VISITOR_BY_CLASS, (executor, sink, dataset) -> sink.doesTableExist(dataset), (executor, sink, dataset) -> sink.validateDatasetSchema(dataset, new BigQueryDataTypeMapping()), - (executor, sink, dataset) -> sink.constructDatasetFromDatabase(dataset, new BigQueryDataTypeToLogicalDataTypeMapping())); + (executor, sink, dataset) -> sink.constructDatasetFromDatabase(dataset, new BigQueryDataTypeToLogicalDataTypeMapping(), false)); } @Override diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/main/java/org/finos/legend/engine/persistence/components/relational/bigquery/executor/BigQueryHelper.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/main/java/org/finos/legend/engine/persistence/components/relational/bigquery/executor/BigQueryHelper.java index be90222d9db..60d23a941df 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/main/java/org/finos/legend/engine/persistence/components/relational/bigquery/executor/BigQueryHelper.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/main/java/org/finos/legend/engine/persistence/components/relational/bigquery/executor/BigQueryHelper.java @@ -220,7 +220,7 @@ public void validateDatasetSchema(Dataset dataset, TypeMapping typeMapping) validateColumns(userColumns, dbColumns); } - public Dataset constructDatasetFromDatabase(Dataset dataset, TypeMapping typeMapping) + public Dataset constructDatasetFromDatabase(Dataset dataset, TypeMapping typeMapping, boolean escape) { String tableName = dataset.datasetReference().name().orElseThrow(IllegalStateException::new); String schemaName = dataset.datasetReference().group().orElse(null); diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/e2e/SchemaEvolutionTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/e2e/SchemaEvolutionTest.java index 5398884f267..494b8566fc3 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/e2e/SchemaEvolutionTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/e2e/SchemaEvolutionTest.java @@ -588,7 +588,7 @@ public void testLengthEvolution() throws IOException private static void schemaEvolve(Executor relationalExecutor, RelationalTransformer transformer, SchemaEvolution schemaEvolution, Dataset datasetMain, Dataset datasetStage, Dataset datasetToAssert, List alterSqlsToAssert) { - SchemaEvolutionResult schemaEvolutionResult = schemaEvolution.buildLogicalPlanForSchemaEvolution(datasetMain, datasetStage); + SchemaEvolutionResult schemaEvolutionResult = schemaEvolution.buildLogicalPlanForSchemaEvolution(datasetMain, datasetStage.schema()); SqlPlan physicalPlan = transformer.generatePhysicalPlan(schemaEvolutionResult.logicalPlan()); Assertions.assertEquals(alterSqlsToAssert, physicalPlan.getSqlList()); relationalExecutor.executePhysicalPlan(physicalPlan); diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/ApiUtils.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/ApiUtils.java index fe2419e59de..c537f389558 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/ApiUtils.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/ApiUtils.java @@ -29,8 +29,11 @@ import org.finos.legend.engine.persistence.components.logicalplan.LogicalPlan; import org.finos.legend.engine.persistence.components.logicalplan.LogicalPlanFactory; import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset; +import org.finos.legend.engine.persistence.components.logicalplan.datasets.DatasetCaseConverter; +import org.finos.legend.engine.persistence.components.logicalplan.datasets.DatasetReference; import org.finos.legend.engine.persistence.components.logicalplan.datasets.DatasetsCaseConverter; import org.finos.legend.engine.persistence.components.logicalplan.datasets.Field; +import org.finos.legend.engine.persistence.components.logicalplan.datasets.SchemaDefinition; import org.finos.legend.engine.persistence.components.logicalplan.values.FieldValue; import org.finos.legend.engine.persistence.components.planner.Planner; import org.finos.legend.engine.persistence.components.relational.CaseConversion; @@ -42,6 +45,7 @@ import org.finos.legend.engine.persistence.components.util.MetadataDataset; import java.util.*; +import java.util.function.Function; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -84,6 +88,49 @@ public static Datasets enrichAndApplyCase(Datasets datasets, CaseConversion case return enrichedDatasets; } + public static DatasetReference applyCase(DatasetReference datasetReference, CaseConversion caseConversion) + { + Function strategy; + if (caseConversion == CaseConversion.TO_UPPER) + { + strategy = String::toUpperCase; + } + else if (caseConversion == CaseConversion.TO_LOWER) + { + strategy = String::toLowerCase; + } + else + { + return datasetReference; + } + + datasetReference = datasetReference.withName(strategy.apply(datasetReference.name().orElseThrow(IllegalAccessError::new))); + if (datasetReference.database().isPresent()) + { + datasetReference = datasetReference.withDatabase(strategy.apply(datasetReference.database().get())); + } + if (datasetReference.group().isPresent()) + { + datasetReference = datasetReference.withGroup(strategy.apply(datasetReference.group().get())); + } + + return datasetReference; + } + + public static SchemaDefinition applyCase(SchemaDefinition schema, CaseConversion caseConversion) + { + DatasetCaseConverter converter = new DatasetCaseConverter(); + if (caseConversion == CaseConversion.TO_UPPER) + { + return converter.applyCaseOnSchemaDefinition(schema, String::toUpperCase); + } + if (caseConversion == CaseConversion.TO_LOWER) + { + return converter.applyCaseOnSchemaDefinition(schema, String::toLowerCase); + } + return schema; + } + public static IngestMode applyCase(IngestMode ingestMode, CaseConversion caseConversion) { if (caseConversion == CaseConversion.TO_UPPER) diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/RelationalGeneratorAbstract.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/RelationalGeneratorAbstract.java index b2588d34dbf..47a703f6bdf 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/RelationalGeneratorAbstract.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/RelationalGeneratorAbstract.java @@ -266,7 +266,7 @@ GeneratorResult generateOperations(Datasets datasets, Resources resources, Plann { // Get logical plan and physical plan for schema evolution and update datasets SchemaEvolution schemaEvolution = new SchemaEvolution(relationalSink(), ingestMode, schemaEvolutionCapabilitySet()); - SchemaEvolutionResult schemaEvolutionResult = schemaEvolution.buildLogicalPlanForSchemaEvolution(datasets.mainDataset(), datasets.stagingDataset()); + SchemaEvolutionResult schemaEvolutionResult = schemaEvolution.buildLogicalPlanForSchemaEvolution(datasets.mainDataset(), datasets.stagingDataset().schema()); LogicalPlan schemaEvolutionLogicalPlan = schemaEvolutionResult.logicalPlan(); schemaEvolutionSqlPlan = Optional.of(transformer.generatePhysicalPlan(schemaEvolutionLogicalPlan)); diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/RelationalIngestorAbstract.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/RelationalIngestorAbstract.java index 5dcfc3fc02f..3837c348964 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/RelationalIngestorAbstract.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/RelationalIngestorAbstract.java @@ -426,7 +426,7 @@ private List evolveMetadataDatasetSchema() Set schemaEvolutionCapabilitySet = new HashSet<>(); schemaEvolutionCapabilitySet.add(SchemaEvolutionCapability.ADD_COLUMN); SchemaEvolution schemaEvolution = new SchemaEvolution(relationalSink(), this.ingestMode(), schemaEvolutionCapabilitySet); - org.finos.legend.engine.persistence.components.schemaevolution.SchemaEvolutionResult schemaEvolutionResult = schemaEvolution.buildLogicalPlanForSchemaEvolution(existingMetadataDataset, desiredMetadataDataset); + org.finos.legend.engine.persistence.components.schemaevolution.SchemaEvolutionResult schemaEvolutionResult = schemaEvolution.buildLogicalPlanForSchemaEvolution(existingMetadataDataset, desiredMetadataDataset.schema()); LogicalPlan schemaEvolutionLogicalPlan = schemaEvolutionResult.logicalPlan(); Optional schemaEvolutionSqlPlan = Optional.of(transformer.generatePhysicalPlan(schemaEvolutionLogicalPlan)); if (schemaEvolutionSqlPlan.isPresent() && !schemaEvolutionSqlPlan.get().getSqlList().isEmpty()) diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/RelationalSchemaEvolutionServiceAbstract.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/RelationalSchemaEvolutionServiceAbstract.java new file mode 100644 index 00000000000..95571632f2d --- /dev/null +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/RelationalSchemaEvolutionServiceAbstract.java @@ -0,0 +1,157 @@ +// Copyright 2024 Goldman Sachs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.finos.legend.engine.persistence.components.relational.api; + +import org.finos.legend.engine.persistence.components.executor.Executor; +import org.finos.legend.engine.persistence.components.ingestmode.IngestMode; +import org.finos.legend.engine.persistence.components.logicalplan.LogicalPlan; +import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset; +import org.finos.legend.engine.persistence.components.logicalplan.datasets.DatasetReference; +import org.finos.legend.engine.persistence.components.logicalplan.datasets.SchemaDefinition; +import org.finos.legend.engine.persistence.components.logicalplan.operations.Operation; +import org.finos.legend.engine.persistence.components.relational.CaseConversion; +import org.finos.legend.engine.persistence.components.relational.RelationalSink; +import org.finos.legend.engine.persistence.components.relational.SqlPlan; +import org.finos.legend.engine.persistence.components.relational.sql.TabularData; +import org.finos.legend.engine.persistence.components.relational.sqldom.SqlGen; +import org.finos.legend.engine.persistence.components.relational.transformer.RelationalTransformer; +import org.finos.legend.engine.persistence.components.schemaevolution.SchemaEvolution; +import org.finos.legend.engine.persistence.components.schemaevolution.SchemaEvolutionResult; +import org.finos.legend.engine.persistence.components.transformer.TransformOptions; +import org.finos.legend.engine.persistence.components.transformer.Transformer; +import org.finos.legend.engine.persistence.components.util.SchemaEvolutionCapability; +import org.finos.legend.engine.persistence.components.util.SqlLogging; +import org.immutables.value.Value; +import org.immutables.value.Value.Default; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +@Value.Immutable +@Value.Style( + typeAbstract = "*Abstract", + typeImmutable = "*", + jdkOnly = true, + optionalAcceptNullable = true, + strictBuilder = true +) +public abstract class RelationalSchemaEvolutionServiceAbstract +{ + //---------- FIELDS ---------- + public abstract RelationalSink relationalSink(); + + public abstract IngestMode ingestMode(); + + @Default + public Set schemaEvolutionCapabilitySet() + { + return Collections.emptySet(); + } + + @Default + public CaseConversion caseConversion() + { + return CaseConversion.NONE; + } + + @Default + public SqlLogging sqlLogging() + { + return SqlLogging.DISABLED; + } + + @Value.Derived + protected TransformOptions transformOptions() + { + TransformOptions.Builder builder = TransformOptions.builder(); + relationalSink().optimizerForCaseConversion(caseConversion()).ifPresent(builder::addOptimizers); + return builder.build(); + } + + private static final Logger LOGGER = LoggerFactory.getLogger(RelationalSchemaEvolutionService.class); + + // ------API----- + public SchemaEvolutionServiceResult evolve(DatasetReference mainDatasetReference, SchemaDefinition stagingSchema, RelationalConnection connection) + { + LOGGER.info("Invoked evolve method, will evolve the target dataset"); + + // 1. Initialize executor and transformer + LOGGER.info("Initializing executor and transformer"); + Executor executor = relationalSink().getRelationalExecutor(connection); + executor.setSqlLogging(sqlLogging()); + Transformer transformer = new RelationalTransformer(relationalSink(), transformOptions()); + + // 2. Handle case conversion + IngestMode ingestMode = ApiUtils.applyCase(ingestMode(), caseConversion()); + mainDatasetReference = ApiUtils.applyCase(mainDatasetReference, caseConversion()); + stagingSchema = ApiUtils.applyCase(stagingSchema, caseConversion()); + + // 3. Check if main dataset exists + LOGGER.info("Checking if target dataset exists"); + if (!executor.datasetExists(mainDatasetReference)) + { + return SchemaEvolutionServiceResult.builder() + .status(SchemaEvolutionStatus.FAILED) + .message("Dataset is not found: " + mainDatasetReference.datasetReference().name().orElseThrow(IllegalStateException::new)) + .build(); + } + + // 4. Derive main dataset schema + LOGGER.info("Constructing target dataset schema from database"); + Dataset mainDataset = executor.constructDatasetFromDatabase(mainDatasetReference); + + // 5. Generate schema evolution operations + LOGGER.info("Generating schema evolution operations"); + SchemaEvolution schemaEvolution = new SchemaEvolution(relationalSink(), ingestMode, schemaEvolutionCapabilitySet()); + SchemaEvolutionResult schemaEvolutionResult = schemaEvolution.buildLogicalPlanForSchemaEvolution(mainDataset, stagingSchema); + LogicalPlan schemaEvolutionLogicalPlan = schemaEvolutionResult.logicalPlan(); + + // 6. Execute schema evolution operations + LOGGER.info("Starting schema evolution execution"); + List executedSqls = new ArrayList<>(); + if (!schemaEvolutionLogicalPlan.ops().isEmpty()) + { + for (Operation op : schemaEvolutionLogicalPlan.ops()) + { + // Recreating a logical plan per operation such that we can keep track of executed SQLs in case of partial failure + SqlPlan singleOperationAlterSqlPlan = transformer.generatePhysicalPlan(LogicalPlan.of(Collections.singletonList(op))); + try + { + executor.executePhysicalPlan(singleOperationAlterSqlPlan); + executedSqls.addAll(singleOperationAlterSqlPlan.getSqlList()); + } + catch (Exception e) + { + LOGGER.info("Encountered error in executing schema evolution"); + return SchemaEvolutionServiceResult.builder() + .status(executedSqls.isEmpty() ? SchemaEvolutionStatus.FAILED : SchemaEvolutionStatus.PARTIALLY_SUCCEEDED) + .addAllExecutedSchemaEvolutionSqls(executedSqls) + .message(e.getMessage()) + .build(); + } + } + } + + LOGGER.info("Schema evolution completed"); + return SchemaEvolutionServiceResult.builder() + .status(SchemaEvolutionStatus.SUCCEEDED) + .addAllExecutedSchemaEvolutionSqls(executedSqls) + .build(); + } +} diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/SchemaEvolutionServiceResultAbstract.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/SchemaEvolutionServiceResultAbstract.java new file mode 100644 index 00000000000..d8d9a06649f --- /dev/null +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/SchemaEvolutionServiceResultAbstract.java @@ -0,0 +1,39 @@ +// Copyright 2024 Goldman Sachs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.finos.legend.engine.persistence.components.relational.api; + +import org.immutables.value.Value.Immutable; +import org.immutables.value.Value.Style; + +import java.util.List; +import java.util.Optional; + +@Immutable +@Style( + typeAbstract = "*Abstract", + typeImmutable = "*", + jdkOnly = true, + optionalAcceptNullable = true, + strictBuilder = true +) +public abstract class SchemaEvolutionServiceResultAbstract +{ + + public abstract SchemaEvolutionStatus status(); + + public abstract Optional message(); + + public abstract List executedSchemaEvolutionSqls(); +} diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/SchemaEvolutionStatus.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/SchemaEvolutionStatus.java new file mode 100644 index 00000000000..0c33915fb87 --- /dev/null +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/SchemaEvolutionStatus.java @@ -0,0 +1,20 @@ +// Copyright 2024 Goldman Sachs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.finos.legend.engine.persistence.components.relational.api; + +public enum SchemaEvolutionStatus +{ + SUCCEEDED, FAILED, PARTIALLY_SUCCEEDED +} diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/jdbc/JdbcHelper.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/jdbc/JdbcHelper.java index fc1e4a7b53e..5bdbbaae6db 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/jdbc/JdbcHelper.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/jdbc/JdbcHelper.java @@ -259,11 +259,15 @@ public void validateDatasetSchema(Dataset dataset, TypeMapping typeMapping) } @Override - public Dataset constructDatasetFromDatabase(Dataset dataset, TypeMapping typeMapping) + public Dataset constructDatasetFromDatabase(Dataset dataset, TypeMapping typeMapping, boolean escape) { String tableName = dataset.datasetReference().name().orElseThrow(IllegalStateException::new); String schemaName = dataset.datasetReference().group().orElse(null); String databaseName = dataset.datasetReference().database().orElse(null); + + String escapedTableName = tableName.replace("_", "\\_"); + String escapedSchemaName = schemaName == null ? null : schemaName.replace("_", "\\_"); + try { if (!(typeMapping instanceof JdbcPropertiesToLogicalDataTypeMapping)) @@ -275,7 +279,7 @@ public Dataset constructDatasetFromDatabase(Dataset dataset, TypeMapping typeMap // Get primary keys Set primaryKeys = new HashSet<>(); - ResultSet primaryKeyResult = dbMetaData.getPrimaryKeys(databaseName, schemaName, tableName); + ResultSet primaryKeyResult = escape ? dbMetaData.getPrimaryKeys(databaseName, escapedSchemaName, escapedTableName) : dbMetaData.getPrimaryKeys(databaseName, schemaName, tableName); while (primaryKeyResult.next()) { primaryKeys.add(primaryKeyResult.getString(RelationalExecutionHelper.COLUMN_NAME)); @@ -324,7 +328,7 @@ public Dataset constructDatasetFromDatabase(Dataset dataset, TypeMapping typeMap // Get all columns List fields = new ArrayList<>(); - ResultSet columnResult = dbMetaData.getColumns(databaseName, schemaName, tableName, null); + ResultSet columnResult = escape ? dbMetaData.getColumns(databaseName, escapedSchemaName, escapedTableName, null) : dbMetaData.getColumns(databaseName, schemaName, tableName, null); while (columnResult.next()) { String columnName = columnResult.getString(RelationalExecutionHelper.COLUMN_NAME); diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/main/java/org/finos/legend/engine/persistence/components/relational/h2/H2Sink.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/main/java/org/finos/legend/engine/persistence/components/relational/h2/H2Sink.java index 9aaae1d47bd..0df11877d40 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/main/java/org/finos/legend/engine/persistence/components/relational/h2/H2Sink.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/main/java/org/finos/legend/engine/persistence/components/relational/h2/H2Sink.java @@ -192,7 +192,7 @@ private H2Sink() LOGICAL_PLAN_VISITOR_BY_CLASS, (executor, sink, dataset) -> sink.doesTableExist(dataset), (executor, sink, dataset) -> sink.validateDatasetSchema(dataset, new H2DataTypeMapping()), - (executor, sink, dataset) -> sink.constructDatasetFromDatabase(dataset, new H2JdbcPropertiesToLogicalDataTypeMapping())); + (executor, sink, dataset) -> sink.constructDatasetFromDatabase(dataset, new H2JdbcPropertiesToLogicalDataTypeMapping(), false)); } @Override diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/RelationalSchemaEvolutionServiceTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/RelationalSchemaEvolutionServiceTest.java new file mode 100644 index 00000000000..bf393e51dac --- /dev/null +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/RelationalSchemaEvolutionServiceTest.java @@ -0,0 +1,410 @@ +// Copyright 2024 Goldman Sachs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.finos.legend.engine.persistence.components; + +import org.finos.legend.engine.persistence.components.ingestmode.AppendOnly; +import org.finos.legend.engine.persistence.components.ingestmode.audit.DateTimeAuditing; +import org.finos.legend.engine.persistence.components.ingestmode.deduplication.FilterDuplicates; +import org.finos.legend.engine.persistence.components.ingestmode.digest.UserProvidedDigestGenStrategy; +import org.finos.legend.engine.persistence.components.logicalplan.datasets.DatasetDefinition; +import org.finos.legend.engine.persistence.components.logicalplan.datasets.DatasetReference; +import org.finos.legend.engine.persistence.components.logicalplan.datasets.DatasetReferenceImpl; +import org.finos.legend.engine.persistence.components.relational.CaseConversion; +import org.finos.legend.engine.persistence.components.relational.api.RelationalSchemaEvolutionService; +import org.finos.legend.engine.persistence.components.relational.api.SchemaEvolutionServiceResult; +import org.finos.legend.engine.persistence.components.relational.api.SchemaEvolutionStatus; +import org.finos.legend.engine.persistence.components.relational.h2.H2Sink; +import org.finos.legend.engine.persistence.components.relational.jdbc.JdbcConnection; +import org.finos.legend.engine.persistence.components.schemaevolution.IncompatibleSchemaChangeException; +import org.finos.legend.engine.persistence.components.util.SchemaEvolutionCapability; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static org.finos.legend.engine.persistence.components.TestUtils.batchIdName; +import static org.finos.legend.engine.persistence.components.TestUtils.batchUpdateTimeName; +import static org.finos.legend.engine.persistence.components.TestUtils.digestName; +import static org.finos.legend.engine.persistence.components.TestUtils.expiryDateName; +import static org.finos.legend.engine.persistence.components.TestUtils.getColumnDataTypeFromTable; +import static org.finos.legend.engine.persistence.components.TestUtils.getColumnDataTypeLengthFromTable; +import static org.finos.legend.engine.persistence.components.TestUtils.getColumnsFromTable; +import static org.finos.legend.engine.persistence.components.TestUtils.getIsColumnNullableFromTable; +import static org.finos.legend.engine.persistence.components.TestUtils.idName; +import static org.finos.legend.engine.persistence.components.TestUtils.incomeName; +import static org.finos.legend.engine.persistence.components.TestUtils.mainTableName; +import static org.finos.legend.engine.persistence.components.TestUtils.nameName; +import static org.finos.legend.engine.persistence.components.TestUtils.startTimeName; +import static org.finos.legend.engine.persistence.components.TestUtils.testDatabaseName; +import static org.finos.legend.engine.persistence.components.TestUtils.testSchemaName; + +class RelationalSchemaEvolutionServiceTest extends BaseTest +{ + @Test + void testAddColumn() throws Exception + { + DatasetDefinition mainTable = TestUtils.getSchemaEvolutionAddColumnMainTable(); + DatasetDefinition stagingTable = TestUtils.getBasicStagingTable(); + + // Create staging table + createStagingTable(stagingTable); + + // Create main table with old schema + createTempTable(mainTable); + + AppendOnly ingestMode = AppendOnly.builder() + .digestGenStrategy(UserProvidedDigestGenStrategy.builder().digestField(digestName).build()) + .deduplicationStrategy(FilterDuplicates.builder().build()) + .auditing(DateTimeAuditing.builder().dateTimeField(batchUpdateTimeName).build()) + .build(); + + Set schemaEvolutionCapabilitySet = new HashSet<>(); + schemaEvolutionCapabilitySet.add(SchemaEvolutionCapability.ADD_COLUMN); + + String[] schema = new String[]{idName, nameName, incomeName, startTimeName, expiryDateName, digestName, batchUpdateTimeName, batchIdName}; + + RelationalSchemaEvolutionService evolutionService = RelationalSchemaEvolutionService.builder() + .relationalSink(H2Sink.get()) + .schemaEvolutionCapabilitySet(schemaEvolutionCapabilitySet) + .ingestMode(ingestMode) + .build(); + + SchemaEvolutionServiceResult result = evolutionService.evolve(mainTable.datasetReference(), stagingTable.schema(), JdbcConnection.of(h2Sink.connection())); + + List actualSchema = getColumnsFromTable(h2Sink.connection(), null, testSchemaName, mainTableName); + List expectedSchema = Arrays.asList(schema); + Assertions.assertTrue(actualSchema.size() == expectedSchema.size() && actualSchema.containsAll(expectedSchema) && expectedSchema.containsAll(actualSchema)); + Assertions.assertEquals(SchemaEvolutionStatus.SUCCEEDED, result.status()); + Assertions.assertEquals(Arrays.asList("ALTER TABLE \"TEST\".\"main\" ADD COLUMN \"income\" BIGINT"), result.executedSchemaEvolutionSqls()); + } + + @Test + void testAddColumnUpperCase() throws Exception + { + DatasetDefinition mainTable = TestUtils.getSchemaEvolutionAddColumnMainTableUpperCase(); // This is only used to create a database table in upper case + DatasetDefinition stagingTable = TestUtils.getBasicStagingTable(); + DatasetReference mainTableDatasetReference = DatasetReferenceImpl.builder().group(testSchemaName).name(mainTableName).build(); // This is the model user has + + // Create staging table + createStagingTable(stagingTable); + + // Create main table with old schema + createTempTable(mainTable); + + AppendOnly ingestMode = AppendOnly.builder() + .digestGenStrategy(UserProvidedDigestGenStrategy.builder().digestField(digestName).build()) + .deduplicationStrategy(FilterDuplicates.builder().build()) + .auditing(DateTimeAuditing.builder().dateTimeField(batchUpdateTimeName).build()) + .build(); + + Set schemaEvolutionCapabilitySet = new HashSet<>(); + schemaEvolutionCapabilitySet.add(SchemaEvolutionCapability.ADD_COLUMN); + + String[] schema = new String[]{idName.toUpperCase(), nameName.toUpperCase(), incomeName.toUpperCase(), startTimeName.toUpperCase(), expiryDateName.toUpperCase(), digestName.toUpperCase(), batchUpdateTimeName.toUpperCase(), batchIdName.toUpperCase()}; + + RelationalSchemaEvolutionService evolutionService = RelationalSchemaEvolutionService.builder() + .relationalSink(H2Sink.get()) + .schemaEvolutionCapabilitySet(schemaEvolutionCapabilitySet) + .ingestMode(ingestMode) + .caseConversion(CaseConversion.TO_UPPER) + .build(); + + SchemaEvolutionServiceResult result = evolutionService.evolve(mainTableDatasetReference, stagingTable.schema(), JdbcConnection.of(h2Sink.connection())); + + List actualSchema = getColumnsFromTable(h2Sink.connection(), null, testSchemaName.toUpperCase(), mainTableName.toUpperCase()); + List expectedSchema = Arrays.asList(schema); + Assertions.assertTrue(actualSchema.size() == expectedSchema.size() && actualSchema.containsAll(expectedSchema) && expectedSchema.containsAll(actualSchema)); + Assertions.assertEquals(SchemaEvolutionStatus.SUCCEEDED, result.status()); + Assertions.assertEquals(Arrays.asList("ALTER TABLE \"TEST\".\"MAIN\" ADD COLUMN \"INCOME\" BIGINT"), result.executedSchemaEvolutionSqls()); + } + + @Test + void testDataTypeConversion() throws Exception + { + DatasetDefinition mainTable = TestUtils.getSchemaEvolutionDataTypeConversionMainTable(); + DatasetDefinition stagingTable = TestUtils.getBasicStagingTable(); + + // Create staging table + createStagingTable(stagingTable); + + // Create main table with old schema + createTempTable(mainTable); + + // Generate the milestoning object + AppendOnly ingestMode = AppendOnly.builder() + .digestGenStrategy(UserProvidedDigestGenStrategy.builder().digestField(digestName).build()) + .deduplicationStrategy(FilterDuplicates.builder().build()) + .auditing(DateTimeAuditing.builder().dateTimeField(batchUpdateTimeName).build()) + .build(); + + Set schemaEvolutionCapabilitySet = new HashSet<>(); + schemaEvolutionCapabilitySet.add(SchemaEvolutionCapability.DATA_TYPE_CONVERSION); + + String[] schema = new String[]{idName, nameName, incomeName, startTimeName, expiryDateName, digestName, batchUpdateTimeName, batchIdName}; + + RelationalSchemaEvolutionService evolutionService = RelationalSchemaEvolutionService.builder() + .relationalSink(H2Sink.get()) + .schemaEvolutionCapabilitySet(schemaEvolutionCapabilitySet) + .ingestMode(ingestMode) + .build(); + + SchemaEvolutionServiceResult result = evolutionService.evolve(mainTable.datasetReference(), stagingTable.schema(), JdbcConnection.of(h2Sink.connection())); + + List actualSchema = getColumnsFromTable(h2Sink.connection(), null, testSchemaName, mainTableName); + List expectedSchema = Arrays.asList(schema); + Assertions.assertTrue(actualSchema.size() == expectedSchema.size() && actualSchema.containsAll(expectedSchema) && expectedSchema.containsAll(actualSchema)); + Assertions.assertEquals("BIGINT", getColumnDataTypeFromTable(h2Sink.connection(), testDatabaseName, testSchemaName, mainTableName, incomeName)); + Assertions.assertEquals(SchemaEvolutionStatus.SUCCEEDED, result.status()); + Assertions.assertEquals(Arrays.asList("ALTER TABLE \"TEST\".\"main\" ALTER COLUMN \"income\" BIGINT"), result.executedSchemaEvolutionSqls()); + } + + @Test + void testDataTypeSizeChange() throws Exception + { + DatasetDefinition mainTable = TestUtils.getMainTableWithBatchUpdateTimeField(); + DatasetDefinition stagingTable = TestUtils.getSchemaEvolutionDataTypeSizeChangeStagingTable(); + + // Create staging table + createStagingTable(stagingTable); + + // Create main table with old schema + createTempTable(mainTable); + + // Generate the milestoning object + AppendOnly ingestMode = AppendOnly.builder() + .digestGenStrategy(UserProvidedDigestGenStrategy.builder().digestField(digestName).build()) + .deduplicationStrategy(FilterDuplicates.builder().build()) + .auditing(DateTimeAuditing.builder().dateTimeField(batchUpdateTimeName).build()) + .build(); + + Set schemaEvolutionCapabilitySet = new HashSet<>(); + schemaEvolutionCapabilitySet.add(SchemaEvolutionCapability.DATA_TYPE_SIZE_CHANGE); + + String[] schema = new String[]{idName, nameName, incomeName, startTimeName, expiryDateName, digestName, batchUpdateTimeName, batchIdName}; + + RelationalSchemaEvolutionService evolutionService = RelationalSchemaEvolutionService.builder() + .relationalSink(H2Sink.get()) + .schemaEvolutionCapabilitySet(schemaEvolutionCapabilitySet) + .ingestMode(ingestMode) + .build(); + + SchemaEvolutionServiceResult result = evolutionService.evolve(mainTable.datasetReference(), stagingTable.schema(), JdbcConnection.of(h2Sink.connection())); + + List actualSchema = getColumnsFromTable(h2Sink.connection(), null, testSchemaName, mainTableName); + List expectedSchema = Arrays.asList(schema); + Assertions.assertTrue(actualSchema.size() == expectedSchema.size() && actualSchema.containsAll(expectedSchema) && expectedSchema.containsAll(actualSchema)); + Assertions.assertEquals("BIGINT", getColumnDataTypeFromTable(h2Sink.connection(), testDatabaseName, testSchemaName, mainTableName, incomeName)); + Assertions.assertEquals("VARCHAR", getColumnDataTypeFromTable(h2Sink.connection(), testDatabaseName, testSchemaName, mainTableName, nameName)); + Assertions.assertEquals(256, getColumnDataTypeLengthFromTable(h2Sink, mainTableName, nameName)); + Assertions.assertEquals(SchemaEvolutionStatus.SUCCEEDED, result.status()); + Assertions.assertEquals(Arrays.asList("ALTER TABLE \"TEST\".\"main\" ALTER COLUMN \"name\" VARCHAR(256) NOT NULL"), result.executedSchemaEvolutionSqls()); + } + + @Test + void testColumnNullabilityChange() throws Exception + { + DatasetDefinition mainTable = TestUtils.getMainTableWithBatchUpdateTimeField(); + DatasetDefinition stagingTable = TestUtils.getSchemaEvolutionColumnNullabilityChangeStagingTable(); + + // Create staging table + createStagingTable(stagingTable); + + //Create main table with Old schema + createTempTable(mainTable); + + // Generate the milestoning object + AppendOnly ingestMode = AppendOnly.builder() + .digestGenStrategy(UserProvidedDigestGenStrategy.builder().digestField(digestName).build()) + .deduplicationStrategy(FilterDuplicates.builder().build()) + .auditing(DateTimeAuditing.builder().dateTimeField(batchUpdateTimeName).build()) + .build(); + + Set schemaEvolutionCapabilitySet = new HashSet<>(); + schemaEvolutionCapabilitySet.add(SchemaEvolutionCapability.COLUMN_NULLABILITY_CHANGE); + + String[] schema = new String[]{idName, nameName, incomeName, startTimeName, expiryDateName, digestName, batchUpdateTimeName, batchIdName}; + + RelationalSchemaEvolutionService evolutionService = RelationalSchemaEvolutionService.builder() + .relationalSink(H2Sink.get()) + .schemaEvolutionCapabilitySet(schemaEvolutionCapabilitySet) + .ingestMode(ingestMode) + .build(); + + SchemaEvolutionServiceResult result = evolutionService.evolve(mainTable.datasetReference(), stagingTable.schema(), JdbcConnection.of(h2Sink.connection())); + + List actualSchema = getColumnsFromTable(h2Sink.connection(), null, testSchemaName, mainTableName); + List expectedSchema = Arrays.asList(schema); + Assertions.assertTrue(actualSchema.size() == expectedSchema.size() && actualSchema.containsAll(expectedSchema) && expectedSchema.containsAll(actualSchema)); + Assertions.assertEquals("YES", getIsColumnNullableFromTable(h2Sink, mainTableName, nameName)); + Assertions.assertEquals(SchemaEvolutionStatus.SUCCEEDED, result.status()); + Assertions.assertEquals(Arrays.asList("ALTER TABLE \"TEST\".\"main\" ALTER COLUMN \"name\" SET NULL"), result.executedSchemaEvolutionSqls()); + } + + @Test + void testMakeMainColumnNullable() throws Exception + { + DatasetDefinition mainTable = TestUtils.getMainTableWithBatchUpdateTimeField(); + DatasetDefinition stagingTable = TestUtils.getSchemaEvolutionMakeMainColumnNullableStagingTable(); + + // Create staging table + createStagingTable(stagingTable); + + // Create main table with old schema + createTempTable(mainTable); + + AppendOnly ingestMode = AppendOnly.builder() + .digestGenStrategy(UserProvidedDigestGenStrategy.builder().digestField(digestName).build()) + .deduplicationStrategy(FilterDuplicates.builder().build()) + .auditing(DateTimeAuditing.builder().dateTimeField(batchUpdateTimeName).build()) + .build(); + + Set schemaEvolutionCapabilitySet = new HashSet<>(); + schemaEvolutionCapabilitySet.add(SchemaEvolutionCapability.COLUMN_NULLABILITY_CHANGE); + + String[] schema = new String[]{idName, nameName, incomeName, startTimeName, expiryDateName, digestName, batchUpdateTimeName, batchIdName}; + + RelationalSchemaEvolutionService evolutionService = RelationalSchemaEvolutionService.builder() + .relationalSink(H2Sink.get()) + .schemaEvolutionCapabilitySet(schemaEvolutionCapabilitySet) + .ingestMode(ingestMode) + .build(); + + SchemaEvolutionServiceResult result = evolutionService.evolve(mainTable.datasetReference(), stagingTable.schema(), JdbcConnection.of(h2Sink.connection())); + + List actualSchema = getColumnsFromTable(h2Sink.connection(), null, testSchemaName, mainTableName); + List expectedSchema = Arrays.asList(schema); + Assertions.assertTrue(actualSchema.size() == expectedSchema.size() && actualSchema.containsAll(expectedSchema) && expectedSchema.containsAll(actualSchema)); + Assertions.assertEquals("YES", getIsColumnNullableFromTable(h2Sink, mainTableName, nameName)); + Assertions.assertEquals(SchemaEvolutionStatus.SUCCEEDED, result.status()); + Assertions.assertEquals(Arrays.asList("ALTER TABLE \"TEST\".\"main\" ALTER COLUMN \"name\" SET NULL"), result.executedSchemaEvolutionSqls()); + } + + @Test + void testAddColumnAndNullabilityChange() throws Exception + { + DatasetDefinition mainTable = TestUtils.getSchemaEvolutionAddColumnMainTable(); + DatasetDefinition stagingTable = TestUtils.getSchemaEvolutionColumnNullabilityChangeStagingTable(); + + // Create staging table + createStagingTable(stagingTable); + + // Create main table with old schema + createTempTable(mainTable); + + AppendOnly ingestMode = AppendOnly.builder() + .digestGenStrategy(UserProvidedDigestGenStrategy.builder().digestField(digestName).build()) + .deduplicationStrategy(FilterDuplicates.builder().build()) + .auditing(DateTimeAuditing.builder().dateTimeField(batchUpdateTimeName).build()) + .build(); + + Set schemaEvolutionCapabilitySet = new HashSet<>(); + schemaEvolutionCapabilitySet.add(SchemaEvolutionCapability.ADD_COLUMN); + schemaEvolutionCapabilitySet.add(SchemaEvolutionCapability.COLUMN_NULLABILITY_CHANGE); + + String[] schema = new String[]{idName, nameName, incomeName, startTimeName, expiryDateName, digestName, batchUpdateTimeName, batchIdName}; + + RelationalSchemaEvolutionService evolutionService = RelationalSchemaEvolutionService.builder() + .relationalSink(H2Sink.get()) + .schemaEvolutionCapabilitySet(schemaEvolutionCapabilitySet) + .ingestMode(ingestMode) + .build(); + + SchemaEvolutionServiceResult result = evolutionService.evolve(mainTable.datasetReference(), stagingTable.schema(), JdbcConnection.of(h2Sink.connection())); + + List actualSchema = getColumnsFromTable(h2Sink.connection(), null, testSchemaName, mainTableName); + List expectedSchema = Arrays.asList(schema); + Assertions.assertTrue(actualSchema.size() == expectedSchema.size() && actualSchema.containsAll(expectedSchema) && expectedSchema.containsAll(actualSchema)); + Assertions.assertEquals("YES", getIsColumnNullableFromTable(h2Sink, mainTableName, nameName)); + Assertions.assertEquals(SchemaEvolutionStatus.SUCCEEDED, result.status()); + Assertions.assertEquals(Arrays.asList("ALTER TABLE \"TEST\".\"main\" ALTER COLUMN \"name\" SET NULL", "ALTER TABLE \"TEST\".\"main\" ADD COLUMN \"income\" BIGINT"), result.executedSchemaEvolutionSqls()); + } + + @Test + void testSchemaEvolutionFailPKTypeDifferent() throws Exception + { + DatasetDefinition mainTable = TestUtils.getSchemaEvolutionPKTypeDifferentMainTable(); + DatasetDefinition stagingTable = TestUtils.getBasicStagingTable(); + + // Create staging table + createStagingTable(stagingTable); + + // Create main table with pld schema + createTempTable(mainTable); + + // Generate the milestoning object + AppendOnly ingestMode = AppendOnly.builder() + .digestGenStrategy(UserProvidedDigestGenStrategy.builder().digestField(digestName).build()) + .deduplicationStrategy(FilterDuplicates.builder().build()) + .auditing(DateTimeAuditing.builder().dateTimeField(batchUpdateTimeName).build()) + .build(); + + Set schemaEvolutionCapabilitySet = new HashSet<>(); + schemaEvolutionCapabilitySet.add(SchemaEvolutionCapability.DATA_TYPE_CONVERSION); + + String[] schema = new String[]{idName, nameName, incomeName, startTimeName, expiryDateName, digestName, batchUpdateTimeName}; + + RelationalSchemaEvolutionService evolutionService = RelationalSchemaEvolutionService.builder() + .relationalSink(H2Sink.get()) + .schemaEvolutionCapabilitySet(schemaEvolutionCapabilitySet) + .ingestMode(ingestMode) + .build(); + + try + { + evolutionService.evolve(mainTable.datasetReference(), stagingTable.schema(), JdbcConnection.of(h2Sink.connection())); + Assertions.fail("Exception was not thrown"); + } + catch (IncompatibleSchemaChangeException e) + { + Assertions.assertEquals("Primary keys for main table has changed which is not allowed", e.getMessage()); + } + } + + @Test + void testSchemaEvolutionDatasetNotFound() throws Exception + { + DatasetDefinition mainTable = TestUtils.getSchemaEvolutionAddColumnMainTable(); + DatasetDefinition stagingTable = TestUtils.getBasicStagingTable(); + + // Create staging table + createStagingTable(stagingTable); + + AppendOnly ingestMode = AppendOnly.builder() + .digestGenStrategy(UserProvidedDigestGenStrategy.builder().digestField(digestName).build()) + .deduplicationStrategy(FilterDuplicates.builder().build()) + .auditing(DateTimeAuditing.builder().dateTimeField(batchUpdateTimeName).build()) + .build(); + + Set schemaEvolutionCapabilitySet = new HashSet<>(); + schemaEvolutionCapabilitySet.add(SchemaEvolutionCapability.ADD_COLUMN); + + String[] schema = new String[]{idName, nameName, incomeName, startTimeName, expiryDateName, digestName, batchUpdateTimeName, batchIdName}; + + RelationalSchemaEvolutionService evolutionService = RelationalSchemaEvolutionService.builder() + .relationalSink(H2Sink.get()) + .schemaEvolutionCapabilitySet(schemaEvolutionCapabilitySet) + .ingestMode(ingestMode) + .build(); + + SchemaEvolutionServiceResult result = evolutionService.evolve(mainTable.datasetReference(), stagingTable.schema(), JdbcConnection.of(h2Sink.connection())); + + Assertions.assertEquals(SchemaEvolutionStatus.FAILED, result.status()); + Assertions.assertEquals("Dataset is not found: main", result.message().get()); + Assertions.assertTrue(result.executedSchemaEvolutionSqls().isEmpty()); + } +} \ No newline at end of file diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/TestUtils.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/TestUtils.java index 9048ad11848..c84be98a2da 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/TestUtils.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/TestUtils.java @@ -44,6 +44,7 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; +import java.sql.DatabaseMetaData; import java.sql.JDBCType; import java.util.Arrays; import java.util.HashMap; @@ -1225,6 +1226,23 @@ public static DatasetDefinition getSchemaEvolutionAddColumnMainTable() .build(); } + public static DatasetDefinition getSchemaEvolutionAddColumnMainTableUpperCase() + { + return DatasetDefinition.builder() + .group(testSchemaName.toUpperCase()) + .name(mainTableName.toUpperCase()) + .schema(SchemaDefinition.builder() + .addFields(id.withName(idName.toUpperCase())) + .addFields(name.withName(nameName.toUpperCase())) + .addFields(startTime.withName(startTimeName.toUpperCase())) + .addFields(expiryDate.withName(expiryDateName.toUpperCase())) + .addFields(digest.withName(digestName.toUpperCase())) + .addFields(batchUpdateTimestamp.withName(batchUpdateTimeName.toUpperCase())) + .addFields(batchId.withName(batchIdName.toUpperCase())) + .build()) + .build(); + } + public static DatasetDefinition expectedMainTableSchema() { return DatasetDefinition.builder() @@ -1449,6 +1467,19 @@ public static void assertTableColumnsEquals(List expectedSchema, List getColumnsFromTable(Connection connection, String databaseName, String schemaName, String tableName) throws SQLException + { + DatabaseMetaData dbMetaData = connection.getMetaData(); + ResultSet columnResult = dbMetaData.getColumns(databaseName, schemaName, tableName, null); + List columnNames = new ArrayList<>(); + while (columnResult.next()) + { + columnNames.add(columnResult.getString(RelationalExecutionHelper.COLUMN_NAME)); + } + return columnNames; + } + // This is to check the actual database table - whether columns have the right nullability public static String getIsColumnNullableFromTable(RelationalExecutionHelper sink, String tableName, String columnName) { diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-postgres/src/main/java/org/finos/legend/engine/persistence/components/relational/postgres/PostgresSink.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-postgres/src/main/java/org/finos/legend/engine/persistence/components/relational/postgres/PostgresSink.java index ad96a2cbfba..3b364a02577 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-postgres/src/main/java/org/finos/legend/engine/persistence/components/relational/postgres/PostgresSink.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-postgres/src/main/java/org/finos/legend/engine/persistence/components/relational/postgres/PostgresSink.java @@ -132,7 +132,7 @@ private PostgresSink() LOGICAL_PLAN_VISITOR_BY_CLASS, (executor, sink, dataset) -> sink.doesTableExist(dataset), (executor, sink, dataset) -> sink.validateDatasetSchema(dataset, new PostgresDataTypeMapping()), - (executor, sink, dataset) -> sink.constructDatasetFromDatabase(dataset, new PostgresJdbcPropertiesToLogicalDataTypeMapping())); + (executor, sink, dataset) -> sink.constructDatasetFromDatabase(dataset, new PostgresJdbcPropertiesToLogicalDataTypeMapping(), false)); } @Override diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-snowflake/src/main/java/org/finos/legend/engine/persistence/components/relational/snowflake/SnowflakeSink.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-snowflake/src/main/java/org/finos/legend/engine/persistence/components/relational/snowflake/SnowflakeSink.java index 32236e29cf8..3faa2b5e4ae 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-snowflake/src/main/java/org/finos/legend/engine/persistence/components/relational/snowflake/SnowflakeSink.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-snowflake/src/main/java/org/finos/legend/engine/persistence/components/relational/snowflake/SnowflakeSink.java @@ -245,7 +245,7 @@ private SnowflakeSink() return results.size() > 0; }, (executor, sink, dataset) -> sink.validateDatasetSchema(dataset, new SnowflakeDataTypeMapping()), - (executor, sink, dataset) -> sink.constructDatasetFromDatabase(dataset, new SnowflakeJdbcPropertiesToLogicalDataTypeMapping())); + (executor, sink, dataset) -> sink.constructDatasetFromDatabase(dataset, new SnowflakeJdbcPropertiesToLogicalDataTypeMapping(), true)); } @Override