diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/common/DedupAndVersionErrorSqlType.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/common/DedupAndVersionErrorSqlType.java index adda9e48e60..585fb200ce6 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/common/DedupAndVersionErrorSqlType.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/common/DedupAndVersionErrorSqlType.java @@ -18,6 +18,8 @@ public enum DedupAndVersionErrorSqlType { MAX_DUPLICATES, DUPLICATE_ROWS, + MAX_PK_DUPLICATES, + PK_DUPLICATE_ROWS, MAX_DATA_ERRORS, DATA_ERROR_ROWS; } diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/versioning/DeriveDuplicatePkRowsLogicalPlan.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/versioning/DeriveDuplicatePkRowsLogicalPlan.java new file mode 100644 index 00000000000..72f0a1f03a0 --- /dev/null +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/versioning/DeriveDuplicatePkRowsLogicalPlan.java @@ -0,0 +1,87 @@ +// Copyright 2024 Goldman Sachs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.finos.legend.engine.persistence.components.ingestmode.versioning; + +import org.finos.legend.engine.persistence.components.logicalplan.LogicalPlan; +import org.finos.legend.engine.persistence.components.logicalplan.conditions.GreaterThan; +import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset; +import org.finos.legend.engine.persistence.components.logicalplan.datasets.Selection; +import org.finos.legend.engine.persistence.components.logicalplan.values.All; +import org.finos.legend.engine.persistence.components.logicalplan.values.FieldValue; +import org.finos.legend.engine.persistence.components.logicalplan.values.FunctionImpl; +import org.finos.legend.engine.persistence.components.logicalplan.values.FunctionName; +import org.finos.legend.engine.persistence.components.logicalplan.values.ObjectValue; +import org.finos.legend.engine.persistence.components.logicalplan.values.Value; + +import java.util.List; +import java.util.stream.Collectors; + +public class DeriveDuplicatePkRowsLogicalPlan implements VersioningStrategyVisitor +{ + private List primaryKeys; + private Dataset tempStagingDataset; + private int sampleRowCount; + + public static final String DUPLICATE_PK_COUNT = "legend_persistence_pk_count"; + + public DeriveDuplicatePkRowsLogicalPlan(List primaryKeys, Dataset tempStagingDataset, int sampleRowCount) + { + this.primaryKeys = primaryKeys; + this.tempStagingDataset = tempStagingDataset; + this.sampleRowCount = sampleRowCount; + } + + @Override + public LogicalPlan visitNoVersioningStrategy(NoVersioningStrategyAbstract noVersioningStrategy) + { + if (noVersioningStrategy.failOnDuplicatePrimaryKeys()) + { + List pks = primaryKeys.stream().map(pkName -> FieldValue.builder().fieldName(pkName).build()).collect(Collectors.toList()); + + FunctionImpl count = FunctionImpl.builder() + .functionName(FunctionName.COUNT) + .addValue(All.INSTANCE) + .alias(DUPLICATE_PK_COUNT) + .build(); + + Selection selectDuplicatePks = Selection.builder() + .source(tempStagingDataset) + .groupByFields(pks) + .addAllFields(pks) + .addFields(count) + .havingCondition(GreaterThan.of(FieldValue.builder().fieldName(DUPLICATE_PK_COUNT).build(), ObjectValue.of(1))) + .limit(sampleRowCount) + .build(); + + return LogicalPlan.builder().addOps(selectDuplicatePks).build(); + } + else + { + return null; + } + } + + @Override + public LogicalPlan visitMaxVersionStrategy(MaxVersionStrategyAbstract maxVersionStrategy) + { + return null; + } + + @Override + public LogicalPlan visitAllVersionsStrategy(AllVersionsStrategyAbstract allVersionsStrategyAbstract) + { + return null; + } +} \ No newline at end of file diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/versioning/DeriveMaxDuplicatePkCountLogicalPlan.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/versioning/DeriveMaxDuplicatePkCountLogicalPlan.java new file mode 100644 index 00000000000..4ad014f451b --- /dev/null +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/versioning/DeriveMaxDuplicatePkCountLogicalPlan.java @@ -0,0 +1,95 @@ +// Copyright 2024 Goldman Sachs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.finos.legend.engine.persistence.components.ingestmode.versioning; + +import org.finos.legend.engine.persistence.components.common.DedupAndVersionErrorSqlType; +import org.finos.legend.engine.persistence.components.logicalplan.LogicalPlan; +import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset; +import org.finos.legend.engine.persistence.components.logicalplan.datasets.Selection; +import org.finos.legend.engine.persistence.components.logicalplan.values.All; +import org.finos.legend.engine.persistence.components.logicalplan.values.FieldValue; +import org.finos.legend.engine.persistence.components.logicalplan.values.FunctionImpl; +import org.finos.legend.engine.persistence.components.logicalplan.values.FunctionName; +import org.finos.legend.engine.persistence.components.logicalplan.values.Value; + +import java.util.List; +import java.util.stream.Collectors; + +public class DeriveMaxDuplicatePkCountLogicalPlan implements VersioningStrategyVisitor +{ + + List primaryKeys; + Dataset tempStagingDataset; + + public DeriveMaxDuplicatePkCountLogicalPlan(List primaryKeys, Dataset tempStagingDataset) + { + this.primaryKeys = primaryKeys; + this.tempStagingDataset = tempStagingDataset; + } + + @Override + public LogicalPlan visitNoVersioningStrategy(NoVersioningStrategyAbstract noVersioningStrategy) + { + if (noVersioningStrategy.failOnDuplicatePrimaryKeys()) + { + String maxPkCountAlias = DedupAndVersionErrorSqlType.MAX_PK_DUPLICATES.name(); + String pkCountAlias = "legend_persistence_pk_count"; + + List pks = primaryKeys.stream().map(pkName -> FieldValue.builder().fieldName(pkName).build()).collect(Collectors.toList()); + + FunctionImpl count = FunctionImpl.builder() + .functionName(FunctionName.COUNT) + .addValue(All.INSTANCE) + .alias(pkCountAlias) + .build(); + + Selection selectPkCount = Selection.builder() + .source(tempStagingDataset) + .groupByFields(pks) + .addFields(count) + .alias(tempStagingDataset.datasetReference().alias()) + .build(); + + FunctionImpl maxCount = FunctionImpl.builder() + .functionName(FunctionName.MAX) + .addValue(FieldValue.builder().fieldName(pkCountAlias).build()) + .alias(maxPkCountAlias) + .build(); + + Selection selectMaxPkCountCount = Selection.builder() + .source(selectPkCount) + .addFields(maxCount) + .build(); + + return LogicalPlan.builder().addOps(selectMaxPkCountCount).build(); + } + else + { + return null; + } + } + + @Override + public LogicalPlan visitMaxVersionStrategy(MaxVersionStrategyAbstract maxVersionStrategy) + { + return null; + } + + @Override + public LogicalPlan visitAllVersionsStrategy(AllVersionsStrategyAbstract allVersionsStrategyAbstract) + { + return null; + } +} \ No newline at end of file diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/versioning/DeriveTempStagingSchemaDefinition.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/versioning/DeriveTempStagingSchemaDefinition.java index b7892f3e7ef..cbeab3aca81 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/versioning/DeriveTempStagingSchemaDefinition.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/versioning/DeriveTempStagingSchemaDefinition.java @@ -50,7 +50,6 @@ public DeriveTempStagingSchemaDefinition(SchemaDefinition stagingSchema, Dedupli @Override public SchemaDefinition visitNoVersioningStrategy(NoVersioningStrategyAbstract noVersioningStrategy) { - return schemaDefBuilder.addAllFields(schemaFields).build(); } diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/versioning/NoVersioningStrategyAbstract.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/versioning/NoVersioningStrategyAbstract.java index ec3ae32ae8e..cc5cb0c1d1e 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/versioning/NoVersioningStrategyAbstract.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/versioning/NoVersioningStrategyAbstract.java @@ -26,6 +26,12 @@ ) public interface NoVersioningStrategyAbstract extends VersioningStrategy { + @Value.Default + default boolean failOnDuplicatePrimaryKeys() + { + return false; + } + @Override default T accept(VersioningStrategyVisitor visitor) { diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/versioning/VersioningVisitors.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/versioning/VersioningVisitors.java index 9276d441340..b93820cb576 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/versioning/VersioningVisitors.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/versioning/VersioningVisitors.java @@ -14,6 +14,9 @@ package org.finos.legend.engine.persistence.components.ingestmode.versioning; +import org.finos.legend.engine.persistence.components.ingestmode.deduplication.DeduplicationStrategy; +import org.finos.legend.engine.persistence.components.ingestmode.deduplication.FailOnDuplicates; + import java.util.Optional; public class VersioningVisitors @@ -62,6 +65,28 @@ public Boolean visitAllVersionsStrategy(AllVersionsStrategyAbstract allVersionsS } }; + public static final VersioningStrategyVisitor IS_DUPLICATE_PK_CHECK_NEEDED = new VersioningStrategyVisitor() + { + + @Override + public Boolean visitNoVersioningStrategy(NoVersioningStrategyAbstract noVersioningStrategy) + { + return noVersioningStrategy.failOnDuplicatePrimaryKeys(); + } + + @Override + public Boolean visitMaxVersionStrategy(MaxVersionStrategyAbstract maxVersionStrategy) + { + return false; + } + + @Override + public Boolean visitAllVersionsStrategy(AllVersionsStrategyAbstract allVersionsStrategyAbstract) + { + return false; + } + }; + public static final VersioningStrategyVisitor> EXTRACT_VERSIONING_FIELD = new VersioningStrategyVisitor>() { @Override @@ -83,6 +108,35 @@ public Optional visitAllVersionsStrategy(AllVersionsStrategyAbstract all } }; + public static class ValidateDedupAndVersioningCombination implements VersioningStrategyVisitor + { + final DeduplicationStrategy deduplicationStrategy; + public ValidateDedupAndVersioningCombination(DeduplicationStrategy deduplicationStrategy) + { + this.deduplicationStrategy = deduplicationStrategy; + } + @Override + public Void visitNoVersioningStrategy(NoVersioningStrategyAbstract noVersioningStrategy) + { + if (noVersioningStrategy.failOnDuplicatePrimaryKeys() && !(this.deduplicationStrategy instanceof FailOnDuplicates)) + { + throw new IllegalStateException("For failOnDuplicatePrimaryKeys, FailOnDuplicates must be selected as the DeduplicationStrategy"); + } + return null; + } + + @Override + public Void visitMaxVersionStrategy(MaxVersionStrategyAbstract maxVersionStrategy) + { + return null; + } + + @Override + public Void visitAllVersionsStrategy(AllVersionsStrategyAbstract allVersionsStrategyAbstract) + { + return null; + } + } } diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/planner/Planner.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/planner/Planner.java index 954e94afdb4..7b079a1b235 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/planner/Planner.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/planner/Planner.java @@ -178,11 +178,13 @@ default String ingestRunId() this.batchEndTimestamp = BatchEndTimestamp.INSTANCE; // Validation - // 1. MaxVersion & AllVersion strategies must have primary keys + // 1. Validate if the combination of deduplication and versioning is valid + ingestMode.versioningStrategy().accept(new VersioningVisitors.ValidateDedupAndVersioningCombination(ingestMode.deduplicationStrategy())); + // 2. MaxVersion & AllVersion strategies must have primary keys ingestMode.versioningStrategy().accept(new ValidatePrimaryKeysForVersioningStrategy(primaryKeys, this::validatePrimaryKeysNotEmpty)); - // 2. Validate if the versioningField is comparable if a versioningStrategy is present + // 3. Validate if the versioningField is comparable if a versioningStrategy is present validateVersioningField(ingestMode().versioningStrategy(), stagingDataset()); - // 3. cleanupStagingData must be turned off when using DerivedDataset or FilteredDataset + // 4. cleanupStagingData must be turned off when using DerivedDataset or FilteredDataset validateCleanUpStagingData(plannerOptions, originalStagingDataset()); } @@ -445,6 +447,7 @@ public Map buildLogicalPlanForDeduplic { Map dedupAndVersioningErrorChecks = new HashMap<>(); addMaxDuplicatesErrorCheck(dedupAndVersioningErrorChecks); + addMaxPkDuplicatesErrorCheck(dedupAndVersioningErrorChecks); addDataErrorCheck(dedupAndVersioningErrorChecks); return dedupAndVersioningErrorChecks; } @@ -485,6 +488,24 @@ protected void addMaxDuplicatesErrorCheck(Map dedupAndVersioningErrorChecks) + { + if (ingestMode.versioningStrategy().accept(VersioningVisitors.IS_DUPLICATE_PK_CHECK_NEEDED)) + { + LogicalPlan logicalPlanForMaxDuplicatePkCount = ingestMode.versioningStrategy().accept(new DeriveMaxDuplicatePkCountLogicalPlan(primaryKeys, stagingDataset())); + if (logicalPlanForMaxDuplicatePkCount != null) + { + dedupAndVersioningErrorChecks.put(DedupAndVersionErrorSqlType.MAX_PK_DUPLICATES, logicalPlanForMaxDuplicatePkCount); + } + + LogicalPlan logicalPlanForDuplicatePkRows = ingestMode.versioningStrategy().accept(new DeriveDuplicatePkRowsLogicalPlan(primaryKeys, stagingDataset(), options().sampleRowCount())); + if (logicalPlanForDuplicatePkRows != null) + { + dedupAndVersioningErrorChecks.put(DedupAndVersionErrorSqlType.PK_DUPLICATE_ROWS, logicalPlanForDuplicatePkRows); + } + } + } + protected void addDataErrorCheck(Map dedupAndVersioningErrorChecks) { List remainingColumns = getDigestOrRemainingColumns(); @@ -667,6 +688,10 @@ static class ValidatePrimaryKeysForVersioningStrategy implements VersioningStrat @Override public Void visitNoVersioningStrategy(NoVersioningStrategyAbstract noVersioningStrategy) { + if (noVersioningStrategy.failOnDuplicatePrimaryKeys()) + { + validatePrimaryKeysNotEmpty.accept(primaryKeys); + } return null; } diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/AnsiTestArtifacts.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/AnsiTestArtifacts.java index 36297acf02e..87f5cdc9b91 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/AnsiTestArtifacts.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/AnsiTestArtifacts.java @@ -608,6 +608,12 @@ public static String getDropTempTableQuery(String tableName) public static String dupRowsSql = "SELECT \"id\",\"name\",\"legend_persistence_count\" FROM \"mydb\".\"staging_temp_staging_lp_yosulf\" as stage " + "WHERE stage.\"legend_persistence_count\" > 1 LIMIT 20"; + public static String maxPkDupsErrorCheckSql = "SELECT MAX(\"legend_persistence_pk_count\") as \"MAX_PK_DUPLICATES\" FROM " + + "(SELECT COUNT(*) as \"legend_persistence_pk_count\" FROM \"mydb\".\"staging_temp_staging_lp_yosulf\" as stage GROUP BY \"id\", \"name\") as stage"; + + public static String dupPkRowsSql = "SELECT \"id\",\"name\",COUNT(*) as \"legend_persistence_pk_count\" FROM " + + "\"mydb\".\"staging_temp_staging_lp_yosulf\" as stage GROUP BY \"id\", \"name\" HAVING \"legend_persistence_pk_count\" > 1 LIMIT 20"; + public static String dataErrorCheckSqlWithBizDateVersion = "SELECT MAX(\"legend_persistence_distinct_rows\") as \"MAX_DATA_ERRORS\" FROM " + "(SELECT COUNT(DISTINCT(\"digest\")) as \"legend_persistence_distinct_rows\" FROM " + "\"mydb\".\"staging_temp_staging_lp_yosulf\" as stage GROUP BY \"id\", \"name\", \"biz_date\") as stage"; diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/unitemporal/UnitemporalDeltaBatchIdDateTimeBasedTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/unitemporal/UnitemporalDeltaBatchIdDateTimeBasedTest.java index 68fc66e0996..7969ef81920 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/unitemporal/UnitemporalDeltaBatchIdDateTimeBasedTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/unitemporal/UnitemporalDeltaBatchIdDateTimeBasedTest.java @@ -15,6 +15,7 @@ package org.finos.legend.engine.persistence.components.ingestmode.unitemporal; import org.finos.legend.engine.persistence.components.AnsiTestArtifacts; +import org.finos.legend.engine.persistence.components.common.DedupAndVersionErrorSqlType; import org.finos.legend.engine.persistence.components.relational.RelationalSink; import org.finos.legend.engine.persistence.components.relational.ansi.AnsiSqlSink; import org.finos.legend.engine.persistence.components.relational.api.DataSplitRange; @@ -24,6 +25,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Map; import static org.finos.legend.engine.persistence.components.AnsiTestArtifacts.*; import static org.finos.legend.engine.persistence.components.common.DedupAndVersionErrorSqlType.*; @@ -131,13 +133,15 @@ public void verifyUnitemporalDeltaWithDeleteIndMultiValuesNoDedupNoVersion(Gener List preActionsSql = operations.preActionsSql(); List milestoningSql = operations.ingestSql(); List metadataIngestSql = operations.metadataIngestSql(); + List dedupAndVersioningSql = operations.deduplicationAndVersioningSql(); + Map dedupAndVersionErrorSqlTypeStringMap = operations.deduplicationAndVersioningErrorChecksSql(); String expectedMilestoneQuery = "UPDATE \"mydb\".\"main\" as sink SET sink.\"batch_id_out\" = " + "(SELECT COALESCE(MAX(batch_metadata.\"table_batch_id\"),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.\"table_name\") = 'MAIN')-1," + "sink.\"batch_time_out\" = '2000-01-01 00:00:00.000000' " + "WHERE " + "(sink.\"batch_id_out\" = 999999999) AND " + - "(EXISTS (SELECT * FROM \"mydb\".\"staging\" as stage " + + "(EXISTS (SELECT * FROM \"mydb\".\"staging_temp_staging_lp_yosulf\" as stage " + "WHERE ((sink.\"id\" = stage.\"id\") AND (sink.\"name\" = stage.\"name\")) " + "AND ((sink.\"digest\" <> stage.\"digest\") OR (stage.\"delete_indicator\" IN ('yes','1','true')))))"; @@ -146,18 +150,30 @@ public void verifyUnitemporalDeltaWithDeleteIndMultiValuesNoDedupNoVersion(Gener "\"batch_time_in\", \"batch_time_out\") " + "(SELECT stage.\"id\",stage.\"name\",stage.\"amount\",stage.\"biz_date\",stage.\"digest\"," + "(SELECT COALESCE(MAX(batch_metadata.\"table_batch_id\"),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.\"table_name\") = 'MAIN')," + - "999999999,'2000-01-01 00:00:00.000000','9999-12-31 23:59:59' FROM \"mydb\".\"staging\" as stage " + + "999999999,'2000-01-01 00:00:00.000000','9999-12-31 23:59:59' FROM \"mydb\".\"staging_temp_staging_lp_yosulf\" as stage " + "WHERE (NOT (EXISTS (SELECT * FROM \"mydb\".\"main\" as sink " + "WHERE (sink.\"batch_id_out\" = 999999999) AND (sink.\"digest\" = stage.\"digest\") " + "AND ((sink.\"id\" = stage.\"id\") AND (sink.\"name\" = stage.\"name\"))))) AND " + "(stage.\"delete_indicator\" NOT IN ('yes','1','true')))"; + String expectedInsertIntoBaseTempStagingWithFilterDuplicates = "INSERT INTO \"mydb\".\"staging_temp_staging_lp_yosulf\" " + + "(\"id\", \"name\", \"amount\", \"biz_date\", \"digest\", \"delete_indicator\", \"legend_persistence_count\") " + + "(SELECT stage.\"id\",stage.\"name\",stage.\"amount\",stage.\"biz_date\",stage.\"digest\",stage.\"delete_indicator\"," + + "COUNT(*) as \"legend_persistence_count\" FROM \"mydb\".\"staging\" as stage " + + "GROUP BY stage.\"id\", stage.\"name\", stage.\"amount\", stage.\"biz_date\", stage.\"digest\", stage.\"delete_indicator\")"; + Assertions.assertEquals(AnsiTestArtifacts.expectedMainTableCreateQuery, preActionsSql.get(0)); Assertions.assertEquals(getExpectedMetadataTableCreateQuery(), preActionsSql.get(1)); Assertions.assertEquals(expectedMilestoneQuery, milestoningSql.get(0)); Assertions.assertEquals(expectedUpsertQuery, milestoningSql.get(1)); Assertions.assertEquals(getExpectedMetadataTableIngestQuery(), metadataIngestSql.get(0)); + Assertions.assertEquals(AnsiTestArtifacts.expectedTempStagingCleanupQuery, dedupAndVersioningSql.get(0)); + Assertions.assertEquals(expectedInsertIntoBaseTempStagingWithFilterDuplicates, dedupAndVersioningSql.get(1)); + Assertions.assertEquals(AnsiTestArtifacts.maxDupsErrorCheckSql, dedupAndVersionErrorSqlTypeStringMap.get(MAX_DUPLICATES)); + Assertions.assertEquals(AnsiTestArtifacts.dupRowsSql, dedupAndVersionErrorSqlTypeStringMap.get(DUPLICATE_ROWS)); + Assertions.assertEquals(AnsiTestArtifacts.maxPkDupsErrorCheckSql, dedupAndVersionErrorSqlTypeStringMap.get(MAX_PK_DUPLICATES)); + Assertions.assertEquals(AnsiTestArtifacts.dupPkRowsSql, dedupAndVersionErrorSqlTypeStringMap.get(PK_DUPLICATE_ROWS)); // Stats String incomingRecordCount = "SELECT COUNT(*) as \"incomingRecordCount\" FROM \"mydb\".\"staging\" as stage"; diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/BigQueryTestArtifacts.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/BigQueryTestArtifacts.java index 27b0bc59f63..a9074fe9575 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/BigQueryTestArtifacts.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/BigQueryTestArtifacts.java @@ -525,6 +525,12 @@ public class BigQueryTestArtifacts "FROM (SELECT stage.`id`,stage.`name`,stage.`amount`,stage.`biz_date`,COUNT(*) as `legend_persistence_count` FROM `mydb`.`staging` as stage " + "GROUP BY stage.`id`, stage.`name`, stage.`amount`, stage.`biz_date`) as stage)"; + public static String maxPkDupsErrorCheckSql = "SELECT MAX(`legend_persistence_pk_count`) as `MAX_PK_DUPLICATES` FROM " + + "(SELECT COUNT(*) as `legend_persistence_pk_count` FROM `mydb`.`staging_temp_staging_lp_yosulf` as stage GROUP BY `id`, `name`) as stage"; + + public static String dupPkRowsSql = "SELECT `id`,`name`,COUNT(*) as `legend_persistence_pk_count` FROM " + + "`mydb`.`staging_temp_staging_lp_yosulf` as stage GROUP BY `id`, `name` HAVING `legend_persistence_pk_count` > 1 LIMIT 20"; + public static String maxDupsErrorCheckSql = "SELECT MAX(stage.`legend_persistence_count`) as `MAX_DUPLICATES` FROM " + "`mydb`.`staging_temp_staging_lp_yosulf` as stage"; diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/UnitemporalDeltaBatchIdDateTimeBasedTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/UnitemporalDeltaBatchIdDateTimeBasedTest.java index 8f22f55b79a..466a1987c02 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/UnitemporalDeltaBatchIdDateTimeBasedTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/UnitemporalDeltaBatchIdDateTimeBasedTest.java @@ -14,6 +14,7 @@ package org.finos.legend.engine.persistence.components.ingestmode; +import org.finos.legend.engine.persistence.components.common.DedupAndVersionErrorSqlType; import org.finos.legend.engine.persistence.components.relational.RelationalSink; import org.finos.legend.engine.persistence.components.relational.api.DataSplitRange; import org.finos.legend.engine.persistence.components.relational.api.GeneratorResult; @@ -23,6 +24,12 @@ import java.util.ArrayList; import java.util.List; +import java.util.Map; + +import static org.finos.legend.engine.persistence.components.common.DedupAndVersionErrorSqlType.DUPLICATE_ROWS; +import static org.finos.legend.engine.persistence.components.common.DedupAndVersionErrorSqlType.MAX_DUPLICATES; +import static org.finos.legend.engine.persistence.components.common.DedupAndVersionErrorSqlType.MAX_PK_DUPLICATES; +import static org.finos.legend.engine.persistence.components.common.DedupAndVersionErrorSqlType.PK_DUPLICATE_ROWS; public class UnitemporalDeltaBatchIdDateTimeBasedTest extends UnitmemporalDeltaBatchIdDateTimeBasedTestCases { @@ -116,13 +123,15 @@ public void verifyUnitemporalDeltaWithDeleteIndMultiValuesNoDedupNoVersion(Gener List preActionsSql = operations.preActionsSql(); List milestoningSql = operations.ingestSql(); List metadataIngestSql = operations.metadataIngestSql(); + List dedupAndVersioningSql = operations.deduplicationAndVersioningSql(); + Map dedupAndVersionErrorSqlTypeStringMap = operations.deduplicationAndVersioningErrorChecksSql(); String expectedMilestoneQuery = "UPDATE `mydb`.`main` as sink SET sink.`batch_id_out` = " + "(SELECT COALESCE(MAX(batch_metadata.`table_batch_id`),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.`table_name`) = 'MAIN')-1," + "sink.`batch_time_out` = PARSE_DATETIME('%Y-%m-%d %H:%M:%E6S','2000-01-01 00:00:00.000000') " + "WHERE " + "(sink.`batch_id_out` = 999999999) AND " + - "(EXISTS (SELECT * FROM `mydb`.`staging` as stage " + + "(EXISTS (SELECT * FROM `mydb`.`staging_temp_staging_lp_yosulf` as stage " + "WHERE ((sink.`id` = stage.`id`) AND (sink.`name` = stage.`name`)) " + "AND ((sink.`digest` <> stage.`digest`) OR (stage.`delete_indicator` IN ('yes','1','true')))))"; @@ -131,18 +140,30 @@ public void verifyUnitemporalDeltaWithDeleteIndMultiValuesNoDedupNoVersion(Gener "`batch_time_in`, `batch_time_out`) " + "(SELECT stage.`id`,stage.`name`,stage.`amount`,stage.`biz_date`,stage.`digest`," + "(SELECT COALESCE(MAX(batch_metadata.`table_batch_id`),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.`table_name`) = 'MAIN')," + - "999999999,PARSE_DATETIME('%Y-%m-%d %H:%M:%E6S','2000-01-01 00:00:00.000000'),PARSE_DATETIME('%Y-%m-%d %H:%M:%E6S','9999-12-31 23:59:59') FROM `mydb`.`staging` as stage " + + "999999999,PARSE_DATETIME('%Y-%m-%d %H:%M:%E6S','2000-01-01 00:00:00.000000'),PARSE_DATETIME('%Y-%m-%d %H:%M:%E6S','9999-12-31 23:59:59') FROM `mydb`.`staging_temp_staging_lp_yosulf` as stage " + "WHERE (NOT (EXISTS (SELECT * FROM `mydb`.`main` as sink " + "WHERE (sink.`batch_id_out` = 999999999) AND (sink.`digest` = stage.`digest`) " + "AND ((sink.`id` = stage.`id`) AND (sink.`name` = stage.`name`))))) AND " + "(stage.`delete_indicator` NOT IN ('yes','1','true')))"; + String expectedInsertIntoBaseTempStagingWithFilterDuplicates = "INSERT INTO `mydb`.`staging_temp_staging_lp_yosulf` " + + "(`id`, `name`, `amount`, `biz_date`, `digest`, `delete_indicator`, `legend_persistence_count`) " + + "(SELECT stage.`id`,stage.`name`,stage.`amount`,stage.`biz_date`,stage.`digest`,stage.`delete_indicator`," + + "COUNT(*) as `legend_persistence_count` FROM `mydb`.`staging` as stage " + + "GROUP BY stage.`id`, stage.`name`, stage.`amount`, stage.`biz_date`, stage.`digest`, stage.`delete_indicator`)"; + Assertions.assertEquals(BigQueryTestArtifacts.expectedMainTableCreateQuery, preActionsSql.get(0)); Assertions.assertEquals(BigQueryTestArtifacts.expectedMetadataTableCreateQuery, preActionsSql.get(1)); Assertions.assertEquals(expectedMilestoneQuery, milestoningSql.get(0)); Assertions.assertEquals(expectedUpsertQuery, milestoningSql.get(1)); Assertions.assertEquals(getExpectedMetadataTableIngestQuery(), metadataIngestSql.get(0)); + Assertions.assertEquals(BigQueryTestArtifacts.expectedTempStagingCleanupQuery, dedupAndVersioningSql.get(0)); + Assertions.assertEquals(expectedInsertIntoBaseTempStagingWithFilterDuplicates, dedupAndVersioningSql.get(1)); + Assertions.assertEquals(BigQueryTestArtifacts.maxDupsErrorCheckSql, dedupAndVersionErrorSqlTypeStringMap.get(MAX_DUPLICATES)); + Assertions.assertEquals(BigQueryTestArtifacts.dupRowsSql, dedupAndVersionErrorSqlTypeStringMap.get(DUPLICATE_ROWS)); + Assertions.assertEquals(BigQueryTestArtifacts.maxPkDupsErrorCheckSql, dedupAndVersionErrorSqlTypeStringMap.get(MAX_PK_DUPLICATES)); + Assertions.assertEquals(BigQueryTestArtifacts.dupPkRowsSql, dedupAndVersionErrorSqlTypeStringMap.get(PK_DUPLICATE_ROWS)); // Stats String incomingRecordCount = "SELECT COUNT(*) as `incomingRecordCount` FROM `mydb`.`staging` as stage"; diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/DataErrorAbstract.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/DataErrorAbstract.java index 69620c2f6ae..dc4e1c6c886 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/DataErrorAbstract.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/DataErrorAbstract.java @@ -36,6 +36,7 @@ public interface DataErrorAbstract public static final String COLUMN_NAME = "column_name"; public static final String CHARACTER_POSITION = "character_position"; public static final String NUM_DUPLICATES = "num_duplicates"; + public static final String NUM_PK_DUPLICATES = "num_pk_duplicates"; public static final String NUM_DATA_VERSION_ERRORS = "num_data_version_errors"; String errorMessage(); diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/ErrorCategory.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/ErrorCategory.java index 9c12bf39d9a..b5ebabd19c8 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/ErrorCategory.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/ErrorCategory.java @@ -23,6 +23,7 @@ public enum ErrorCategory FILE_NOT_FOUND("File not found in specified location"), UNKNOWN("Unknown error"), DUPLICATES("Duplicate rows found"), + DUPLICATE_PRIMARY_KEYS("Multiple rows with duplicate primary keys found"), DATA_VERSION_ERROR("Data errors (same PK, same version but different data)"); private final String defaultErrorMessage; diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/RelationalIngestorAbstract.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/RelationalIngestorAbstract.java index 80246cf8cb8..5dcfc3fc02f 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/RelationalIngestorAbstract.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/RelationalIngestorAbstract.java @@ -25,6 +25,7 @@ import org.finos.legend.engine.persistence.components.ingestmode.*; import org.finos.legend.engine.persistence.components.ingestmode.deduplication.DatasetDeduplicationHandler; import org.finos.legend.engine.persistence.components.ingestmode.versioning.DeriveDataErrorRowsLogicalPlan; +import org.finos.legend.engine.persistence.components.ingestmode.versioning.DeriveDuplicatePkRowsLogicalPlan; import org.finos.legend.engine.persistence.components.logicalplan.LogicalPlan; import org.finos.legend.engine.persistence.components.logicalplan.LogicalPlanFactory; import org.finos.legend.engine.persistence.components.logicalplan.datasets.*; @@ -66,6 +67,7 @@ import static org.finos.legend.engine.persistence.components.relational.api.ApiUtils.retrieveValueAsLong; import static org.finos.legend.engine.persistence.components.relational.api.DataErrorAbstract.NUM_DATA_VERSION_ERRORS; import static org.finos.legend.engine.persistence.components.relational.api.DataErrorAbstract.NUM_DUPLICATES; +import static org.finos.legend.engine.persistence.components.relational.api.DataErrorAbstract.NUM_PK_DUPLICATES; import static org.finos.legend.engine.persistence.components.relational.api.RelationalGeneratorAbstract.BULK_LOAD_BATCH_STATUS_PATTERN; import static org.finos.legend.engine.persistence.components.transformer.Transformer.TransformOptionsAbstract.DATE_TIME_FORMATTER; @@ -470,6 +472,24 @@ public void dedupAndVersion() } } + // Error Check for PK Duplicates: if versioning = No Versioning (fail on pk dups), Fail the job if count > 1 + if (dedupAndVersionErrorSqlTypeSqlPlanMap.containsKey(MAX_PK_DUPLICATES)) + { + List result = executor.executePhysicalPlanAndGetResults(dedupAndVersionErrorSqlTypeSqlPlanMap.get(MAX_PK_DUPLICATES)); + Optional obj = getFirstColumnValue(getFirstRowForFirstResult(result)); + Optional maxPkDuplicatesValue = retrieveValueAsLong(obj.orElse(null)); + if (maxPkDuplicatesValue.isPresent() && maxPkDuplicatesValue.get() > 1) + { + // Find the pk-duplicate rows + TabularData duplicatePkRows = executor.executePhysicalPlanAndGetResults(dedupAndVersionErrorSqlTypeSqlPlanMap.get(PK_DUPLICATE_ROWS)).get(0); + String errorMessage = "Encountered multiple rows with duplicate primary keys, Failing the batch as Fail on Duplicate Primary Keys is selected"; + LOGGER.error(errorMessage); + List dataErrors = ApiUtils.constructDataQualityErrors(enrichedDatasets.stagingDataset(), duplicatePkRows.getData(), + ErrorCategory.DUPLICATE_PRIMARY_KEYS, caseConversion(), DeriveDuplicatePkRowsLogicalPlan.DUPLICATE_PK_COUNT, NUM_PK_DUPLICATES); + throw new DataQualityException(errorMessage, dataErrors); + } + } + // Error Check for Data Error: If versioning = Max Version/ All Versioning, Check for data error if (dedupAndVersionErrorSqlTypeSqlPlanMap.containsKey(MAX_DATA_ERRORS)) { diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/nontemporal/NontemporalDeltaTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/nontemporal/NontemporalDeltaTest.java index 376e177cab5..4bd62189820 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/nontemporal/NontemporalDeltaTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/nontemporal/NontemporalDeltaTest.java @@ -31,6 +31,7 @@ import org.finos.legend.engine.persistence.components.ingestmode.versioning.AllVersionsStrategy; import org.finos.legend.engine.persistence.components.ingestmode.versioning.DigestBasedResolver; import org.finos.legend.engine.persistence.components.ingestmode.versioning.MaxVersionStrategy; +import org.finos.legend.engine.persistence.components.ingestmode.versioning.NoVersioningStrategy; import org.finos.legend.engine.persistence.components.ingestmode.versioning.VersionColumnBasedResolver; import org.finos.legend.engine.persistence.components.logicalplan.conditions.Equals; import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset; @@ -241,6 +242,7 @@ void testNonTemporalDeltaWithCleanStagingDataWithFailOnDups() throws Exception .digestField(digestName) .auditing(NoAuditing.builder().build()) .deduplicationStrategy(FailOnDuplicates.builder().build()) + .versioningStrategy(NoVersioningStrategy.builder().failOnDuplicatePrimaryKeys(true).build()) .build(); PlannerOptions options = PlannerOptions.builder().cleanupStagingData(true).collectStatistics(true).build(); diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/nontemporal/NontemporalSnapshotTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/nontemporal/NontemporalSnapshotTest.java index ed329f49ada..99c2a45f696 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/nontemporal/NontemporalSnapshotTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/nontemporal/NontemporalSnapshotTest.java @@ -356,7 +356,7 @@ void testNontemporalSnapshotWithFailOnDupsNoVersioning() throws Exception // Generate the milestoning object NontemporalSnapshot ingestMode = NontemporalSnapshot.builder() .auditing(NoAuditing.builder().build()) - .versioningStrategy(NoVersioningStrategy.builder().build()) + .versioningStrategy(NoVersioningStrategy.builder().failOnDuplicatePrimaryKeys(true).build()) .deduplicationStrategy(FailOnDuplicates.builder().build()) .build(); diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/unitemporal/UnitemporalDeltaTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/unitemporal/UnitemporalDeltaTest.java index 01adb8d1f12..58d76d3f19d 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/unitemporal/UnitemporalDeltaTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/unitemporal/UnitemporalDeltaTest.java @@ -25,6 +25,7 @@ import org.finos.legend.engine.persistence.components.ingestmode.versioning.MaxVersionStrategy; import org.finos.legend.engine.persistence.components.ingestmode.merge.DeleteIndicatorMergeStrategy; import org.finos.legend.engine.persistence.components.ingestmode.transactionmilestoning.BatchIdAndDateTime; +import org.finos.legend.engine.persistence.components.ingestmode.versioning.NoVersioningStrategy; import org.finos.legend.engine.persistence.components.ingestmode.versioning.VersionColumnBasedResolver; import org.finos.legend.engine.persistence.components.ingestmode.versioning.VersionComparator; import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset; @@ -125,14 +126,13 @@ void testMilestoning() throws Exception @Test void testMilestoningWithDeleteIndicator() throws Exception { - DatasetDefinition mainTable = TestUtils.getDefaultMainTable(); DatasetDefinition stagingTable = TestUtils.getStagingTableWithDeleteIndicator(); String[] schema = new String[]{idName, nameName, incomeName, startTimeName, expiryDateName, digestName, batchIdInName, batchIdOutName, batchTimeInName, batchTimeOutName}; // Create staging table - createStagingTable(stagingTable); + createStagingTableWithoutPks(stagingTable); UnitemporalDelta ingestMode = UnitemporalDelta.builder() .digestField(digestName) @@ -146,6 +146,8 @@ void testMilestoningWithDeleteIndicator() throws Exception .deleteField(deleteIndicatorName) .addAllDeleteValues(Arrays.asList(deleteIndicatorValues)) .build()) + .versioningStrategy(NoVersioningStrategy.builder().failOnDuplicatePrimaryKeys(true).build()) + .deduplicationStrategy(FailOnDuplicates.builder().build()) .build(); PlannerOptions options = PlannerOptions.builder().collectStatistics(true).build(); @@ -177,6 +179,20 @@ void testMilestoningWithDeleteIndicator() throws Exception // 2. Execute plans and verify results expectedStats = createExpectedStatsMap(0, 0, 0, 0, 0); executePlansAndVerifyResults(ingestMode, options, datasets, schema, expectedDataPass3, expectedStats); + + // ------------ Perform Pass4 (Duplicate PKs) ------------------------- + String dataPass4 = basePathForInput + "with_delete_ind/staging_data_pass3.csv"; + // 1. Load staging table + loadStagingDataWithDeleteInd(dataPass4); + try + { + executePlansAndVerifyResults(ingestMode, options, datasets, schema, expectedDataPass3, expectedStats); + Assertions.fail("Should not succeed"); + } + catch (Exception e) + { + Assertions.assertEquals("Encountered multiple rows with duplicate primary keys, Failing the batch as Fail on Duplicate Primary Keys is selected", e.getMessage()); + } } @Test diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/unitemporal/UnitemporalDeltaWithBatchTimeTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/unitemporal/UnitemporalDeltaWithBatchTimeTest.java index fcc3eec3c89..228edd890c9 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/unitemporal/UnitemporalDeltaWithBatchTimeTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/unitemporal/UnitemporalDeltaWithBatchTimeTest.java @@ -21,6 +21,7 @@ import org.finos.legend.engine.persistence.components.ingestmode.deduplication.FailOnDuplicates; import org.finos.legend.engine.persistence.components.ingestmode.merge.DeleteIndicatorMergeStrategy; import org.finos.legend.engine.persistence.components.ingestmode.transactionmilestoning.TransactionDateTime; +import org.finos.legend.engine.persistence.components.ingestmode.versioning.NoVersioningStrategy; import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset; import org.finos.legend.engine.persistence.components.logicalplan.datasets.DatasetDefinition; import org.finos.legend.engine.persistence.components.planner.PlannerOptions; @@ -68,6 +69,7 @@ void testMilestoning() throws Exception .dateTimeOutName(batchTimeOutName) .build()) .deduplicationStrategy(FailOnDuplicates.builder().build()) + .versioningStrategy(NoVersioningStrategy.builder().failOnDuplicatePrimaryKeys(true).build()) .build(); PlannerOptions options = PlannerOptions.builder().cleanupStagingData(false).collectStatistics(true).build(); diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/unitemporal/UnitemporalSnapshotTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/unitemporal/UnitemporalSnapshotTest.java index f4642b4039a..1c78255eb6e 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/unitemporal/UnitemporalSnapshotTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/unitemporal/UnitemporalSnapshotTest.java @@ -18,12 +18,14 @@ import org.finos.legend.engine.persistence.components.TestUtils; import org.finos.legend.engine.persistence.components.common.Datasets; import org.finos.legend.engine.persistence.components.ingestmode.UnitemporalSnapshot; +import org.finos.legend.engine.persistence.components.ingestmode.deduplication.FailOnDuplicates; import org.finos.legend.engine.persistence.components.ingestmode.deduplication.FilterDuplicates; import org.finos.legend.engine.persistence.components.ingestmode.emptyhandling.FailEmptyBatch; import org.finos.legend.engine.persistence.components.ingestmode.emptyhandling.NoOp; import org.finos.legend.engine.persistence.components.ingestmode.transactionmilestoning.BatchIdAndDateTime; import org.finos.legend.engine.persistence.components.ingestmode.versioning.DigestBasedResolver; import org.finos.legend.engine.persistence.components.ingestmode.versioning.MaxVersionStrategy; +import org.finos.legend.engine.persistence.components.ingestmode.versioning.NoVersioningStrategy; import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset; import org.finos.legend.engine.persistence.components.logicalplan.datasets.DatasetDefinition; import org.finos.legend.engine.persistence.components.logicalplan.datasets.FilteredDataset; @@ -125,6 +127,8 @@ void testUnitemporalSnapshotMilestoningLogicWithoutPartitionWithCaseConversion() .dateTimeInName(batchTimeInName) .dateTimeOutName(batchTimeOutName) .build()) + .deduplicationStrategy(FailOnDuplicates.builder().build()) + .versioningStrategy(NoVersioningStrategy.builder().failOnDuplicatePrimaryKeys(true).build()) .build(); PlannerOptions options = PlannerOptions.builder().cleanupStagingData(false).collectStatistics(true).build(); diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/unitemporal/UnitemporalSnapshotWithBatchIdTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/unitemporal/UnitemporalSnapshotWithBatchIdTest.java index 56741c184ec..6460bf31bd2 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/unitemporal/UnitemporalSnapshotWithBatchIdTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/unitemporal/UnitemporalSnapshotWithBatchIdTest.java @@ -19,9 +19,11 @@ import org.finos.legend.engine.persistence.components.common.Datasets; import org.finos.legend.engine.persistence.components.ingestmode.IngestMode; import org.finos.legend.engine.persistence.components.ingestmode.UnitemporalSnapshot; +import org.finos.legend.engine.persistence.components.ingestmode.deduplication.FailOnDuplicates; import org.finos.legend.engine.persistence.components.ingestmode.emptyhandling.DeleteTargetData; import org.finos.legend.engine.persistence.components.ingestmode.emptyhandling.NoOp; import org.finos.legend.engine.persistence.components.ingestmode.transactionmilestoning.BatchId; +import org.finos.legend.engine.persistence.components.ingestmode.versioning.NoVersioningStrategy; import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset; import org.finos.legend.engine.persistence.components.logicalplan.datasets.DatasetDefinition; import org.finos.legend.engine.persistence.components.planner.PlannerOptions; @@ -75,6 +77,8 @@ void testUnitemporalSnapshotMilestoningLogicWithoutPartition() throws Exception .batchIdOutName(batchIdOutName) .build()) .emptyDatasetHandling(DeleteTargetData.builder().build()) + .deduplicationStrategy(FailOnDuplicates.builder().build()) + .versioningStrategy(NoVersioningStrategy.builder().failOnDuplicatePrimaryKeys(true).build()) .build(); PlannerOptions options = PlannerOptions.builder().cleanupStagingData(false).collectStatistics(true).build(); diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/unitemporal/UnitemporalSnapshotWithBatchTimeTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/unitemporal/UnitemporalSnapshotWithBatchTimeTest.java index fe429653740..f884fac1ac2 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/unitemporal/UnitemporalSnapshotWithBatchTimeTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/unitemporal/UnitemporalSnapshotWithBatchTimeTest.java @@ -18,9 +18,11 @@ import org.finos.legend.engine.persistence.components.TestUtils; import org.finos.legend.engine.persistence.components.common.Datasets; import org.finos.legend.engine.persistence.components.ingestmode.UnitemporalSnapshot; +import org.finos.legend.engine.persistence.components.ingestmode.deduplication.FailOnDuplicates; import org.finos.legend.engine.persistence.components.ingestmode.emptyhandling.FailEmptyBatch; import org.finos.legend.engine.persistence.components.ingestmode.emptyhandling.NoOp; import org.finos.legend.engine.persistence.components.ingestmode.transactionmilestoning.TransactionDateTime; +import org.finos.legend.engine.persistence.components.ingestmode.versioning.NoVersioningStrategy; import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset; import org.finos.legend.engine.persistence.components.logicalplan.datasets.DatasetDefinition; import org.finos.legend.engine.persistence.components.planner.PlannerOptions; @@ -211,6 +213,8 @@ void testUnitemporalSnapshotMilestoningLogicWithLessColumnsInStaging() throws Ex .dateTimeInName(batchTimeInName) .dateTimeOutName(batchTimeOutName) .build()) + .versioningStrategy(NoVersioningStrategy.builder().failOnDuplicatePrimaryKeys(true).build()) + .deduplicationStrategy(FailOnDuplicates.builder().build()) .build(); PlannerOptions options = PlannerOptions.builder().collectStatistics(true).build(); diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/versioning/TestDedupAndVersioning.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/versioning/TestDedupAndVersioning.java index 26859a93fbb..eed9230f85c 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/versioning/TestDedupAndVersioning.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/versioning/TestDedupAndVersioning.java @@ -32,7 +32,9 @@ import org.finos.legend.engine.persistence.components.ingestmode.versioning.AllVersionsStrategy; import org.finos.legend.engine.persistence.components.ingestmode.versioning.DigestBasedResolver; import org.finos.legend.engine.persistence.components.ingestmode.versioning.MaxVersionStrategy; +import org.finos.legend.engine.persistence.components.ingestmode.versioning.NoVersioningStrategy; import org.finos.legend.engine.persistence.components.logicalplan.datasets.*; +import org.finos.legend.engine.persistence.components.logicalplan.datasets.DatasetDefinition; import org.finos.legend.engine.persistence.components.logicalplan.datasets.SchemaDefinition; import org.finos.legend.engine.persistence.components.relational.api.DataError; import org.finos.legend.engine.persistence.components.relational.api.ErrorCategory; @@ -72,6 +74,8 @@ public class TestDedupAndVersioning extends BaseTest 13.Fail on Dups, MaxVersion with perform versioning -> tempStagingTable with count column and only max version [Throw error on dups, throw Error on Data errors] 14.Fail on Dups, AllVersion do not perform versioning -> tempStagingTable with count column [Throw error on dups] 15.Fail on Dups, AllVersion with perform versioning -> tempStagingTable with count column and Data splits [Throw error on dups, throw Error on Data errors] + + 16.Fail on Dups, NoVersion with fail on duplicate PKs -> tempStagingTable with count column and pk_count column [Throw error on pk dups] */ private static Field name = Field.builder().name(nameName).type(FieldType.of(DataType.VARCHAR, 64, null)).nullable(false).primaryKey(true).fieldAlias(nameName).build(); @@ -627,6 +631,80 @@ void testFailOnDupsAllVersion() throws Exception } } + // Scenario 16 + @Test + void testFailOnDupsNoVersionFailOnDupPks() throws Exception + { + DatasetDefinition mainTable = TestUtils.getDefaultMainTable(); + DatasetDefinition stagingTable = getStagingTableWithoutVersion(); + Datasets datasets = Datasets.of(mainTable, stagingTable); + IngestMode ingestMode = AppendOnly.builder() + .auditing(DateTimeAuditing.builder().dateTimeField("append_time").build()) + .digestGenStrategy(UserProvidedDigestGenStrategy.builder().digestField("digest").build()) + .deduplicationStrategy(FailOnDuplicates.builder().build()) + .versioningStrategy(NoVersioningStrategy.builder().failOnDuplicatePrimaryKeys(true).build()) + .build(); + + // Happy scenario + createStagingTableWithoutPks(stagingTable); + String srcDataPath1 = "src/test/resources/data/dedup-and-versioning/input/data5_without_dups.csv"; + loadDataIntoStagingTableWithoutVersion(srcDataPath1); + + String expectedDataPath = "src/test/resources/data/dedup-and-versioning/expected/expected_data5_fail_on_dups_no_versioning_fail_on_dups_pk.csv"; + String ingestRunId = performDedupAndVersioining(datasets, ingestMode); + // Validate tempTableExists + verifyResults(expectedDataPath, schemaWithCount, ingestRunId); + + + // Duplicate PK scenario, should throw error + String srcDataPath2 = "src/test/resources/data/dedup-and-versioning/input/data6_with_dups_pk.csv"; + loadDataIntoStagingTableWithoutVersion(srcDataPath2); + try + { + ingestRunId = performDedupAndVersioining(datasets, ingestMode); + Assertions.fail("Should not succeed"); + } + catch (DataQualityException e) + { + Map row1 = new HashMap<>(); + row1.put("name", "Andy"); + row1.put("id", 1); + + Map row2 = new HashMap<>(); + row2.put("name", "Cathy"); + row2.put("id", 3); + + DataError dataError1 = buildDataError(ErrorCategory.DUPLICATE_PRIMARY_KEYS, row1, buildErrorDetailsMap("num_pk_duplicates", 3L)); + DataError dataError2 = buildDataError(ErrorCategory.DUPLICATE_PRIMARY_KEYS, row2, buildErrorDetailsMap("num_pk_duplicates", 2L)); + Assertions.assertEquals("Encountered multiple rows with duplicate primary keys, Failing the batch as Fail on Duplicate Primary Keys is selected", e.getMessage()); + Assertions.assertEquals(Arrays.asList(dataError1, dataError2), e.getDataErrors()); + } + } + + @Test + void testInvalidCombination() + { + DatasetDefinition mainTable = TestUtils.getDefaultMainTable(); + DatasetDefinition stagingTable = getStagingTableWithoutVersion(); + Datasets datasets = Datasets.of(mainTable, stagingTable); + IngestMode ingestMode = AppendOnly.builder() + .auditing(DateTimeAuditing.builder().dateTimeField("append_time").build()) + .digestGenStrategy(UserProvidedDigestGenStrategy.builder().digestField("digest").build()) + .deduplicationStrategy(AllowDuplicates.builder().build()) + .versioningStrategy(NoVersioningStrategy.builder().failOnDuplicatePrimaryKeys(true).build()) + .build(); + + try + { + performDedupAndVersioining(datasets, ingestMode); + Assertions.fail("Should not succeed"); + } + catch (Exception e) + { + Assertions.assertEquals("For failOnDuplicatePrimaryKeys, FailOnDuplicates must be selected as the DeduplicationStrategy", e.getMessage()); + } + } + public static DatasetDefinition getStagingTableWithoutVersion() { diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/dedup-and-versioning/expected/expected_data5_fail_on_dups_no_versioning_fail_on_dups_pk.csv b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/dedup-and-versioning/expected/expected_data5_fail_on_dups_no_versioning_fail_on_dups_pk.csv new file mode 100644 index 00000000000..ef480b9124d --- /dev/null +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/dedup-and-versioning/expected/expected_data5_fail_on_dups_no_versioning_fail_on_dups_pk.csv @@ -0,0 +1,3 @@ +1,Andy,1000,2012-01-01,digest1,1 +2,Becky,2000,2012-01-02,digest2,1 +3,Cathy,3000,2012-01-03,digest3,1 \ No newline at end of file diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/dedup-and-versioning/input/data6_with_dups_pk.csv b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/dedup-and-versioning/input/data6_with_dups_pk.csv new file mode 100644 index 00000000000..55556a8dcba --- /dev/null +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/dedup-and-versioning/input/data6_with_dups_pk.csv @@ -0,0 +1,6 @@ +1,Andy,1000,2012-01-01,digest1 +1,Andy,1100,2012-01-01,digest1_1 +1,Andy,1200,2012-01-01,digest1_2 +2,Becky,2000,2012-01-02,digest2 +3,Cathy,3000,2012-01-03,digest3 +3,Cathy,3100,2012-01-03,digest3_1 \ No newline at end of file diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-incremental-milestoning/input/batch_id_and_time_based/with_delete_ind/staging_data_pass3.csv b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-incremental-milestoning/input/batch_id_and_time_based/with_delete_ind/staging_data_pass3.csv new file mode 100644 index 00000000000..0abd68d3ff0 --- /dev/null +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-incremental-milestoning/input/batch_id_and_time_based/with_delete_ind/staging_data_pass3.csv @@ -0,0 +1,3 @@ +5,MATT,6100,2020-01-06 00:00:00.0,2022-12-06,DIGEST5_UPDATED,0 +6,NANCY,7000,2020-01-07 00:00:00.0,2022-12-07,DIGEST6_1,0 +6,NANCY,7100,2020-01-07 00:00:00.0,2022-12-07,DIGEST6_2,0 diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-memsql/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/MemsqlTestArtifacts.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-memsql/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/MemsqlTestArtifacts.java index e8fa3a61038..68930aeb7bf 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-memsql/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/MemsqlTestArtifacts.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-memsql/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/MemsqlTestArtifacts.java @@ -500,6 +500,12 @@ public class MemsqlTestArtifacts public static String maxDupsErrorCheckSql = "SELECT MAX(stage.`legend_persistence_count`) as `MAX_DUPLICATES` FROM " + "`mydb`.`staging_temp_staging_lp_yosulf` as stage"; + public static String maxPkDupsErrorCheckSql = "SELECT MAX(`legend_persistence_pk_count`) as `MAX_PK_DUPLICATES` FROM " + + "(SELECT COUNT(*) as `legend_persistence_pk_count` FROM `mydb`.`staging_temp_staging_lp_yosulf` as stage GROUP BY `id`, `name`) as stage"; + + public static String dupPkRowsSql = "SELECT `id`,`name`,COUNT(*) as `legend_persistence_pk_count` FROM " + + "`mydb`.`staging_temp_staging_lp_yosulf` as stage GROUP BY `id`, `name` HAVING `legend_persistence_pk_count` > 1 LIMIT 20"; + public static String dataErrorCheckSqlForBizDateAsVersion = "SELECT MAX(`legend_persistence_distinct_rows`) as `MAX_DATA_ERRORS` FROM " + "(SELECT COUNT(DISTINCT(`digest`)) as `legend_persistence_distinct_rows` FROM " + "`mydb`.`staging_temp_staging_lp_yosulf` as stage GROUP BY `id`, `name`, `biz_date`) as stage"; diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-memsql/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/UnitemporalDeltaBatchIdDateTimeBasedTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-memsql/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/UnitemporalDeltaBatchIdDateTimeBasedTest.java index 726ff04cca4..8885747fac1 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-memsql/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/UnitemporalDeltaBatchIdDateTimeBasedTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-memsql/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/UnitemporalDeltaBatchIdDateTimeBasedTest.java @@ -14,6 +14,7 @@ package org.finos.legend.engine.persistence.components.ingestmode; +import org.finos.legend.engine.persistence.components.common.DedupAndVersionErrorSqlType; import org.finos.legend.engine.persistence.components.relational.RelationalSink; import org.finos.legend.engine.persistence.components.relational.api.DataSplitRange; import org.finos.legend.engine.persistence.components.relational.api.GeneratorResult; @@ -23,6 +24,12 @@ import java.util.ArrayList; import java.util.List; +import java.util.Map; + +import static org.finos.legend.engine.persistence.components.common.DedupAndVersionErrorSqlType.DUPLICATE_ROWS; +import static org.finos.legend.engine.persistence.components.common.DedupAndVersionErrorSqlType.MAX_DUPLICATES; +import static org.finos.legend.engine.persistence.components.common.DedupAndVersionErrorSqlType.MAX_PK_DUPLICATES; +import static org.finos.legend.engine.persistence.components.common.DedupAndVersionErrorSqlType.PK_DUPLICATE_ROWS; public class UnitemporalDeltaBatchIdDateTimeBasedTest extends UnitmemporalDeltaBatchIdDateTimeBasedTestCases { @@ -116,13 +123,15 @@ public void verifyUnitemporalDeltaWithDeleteIndMultiValuesNoDedupNoVersion(Gener List preActionsSql = operations.preActionsSql(); List milestoningSql = operations.ingestSql(); List metadataIngestSql = operations.metadataIngestSql(); + List dedupAndVersioningSql = operations.deduplicationAndVersioningSql(); + Map dedupAndVersionErrorSqlTypeStringMap = operations.deduplicationAndVersioningErrorChecksSql(); String expectedMilestoneQuery = "UPDATE `mydb`.`main` as sink SET sink.`batch_id_out` = " + "(SELECT COALESCE(MAX(batch_metadata.`table_batch_id`),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.`table_name`) = 'MAIN')-1," + "sink.`batch_time_out` = '2000-01-01 00:00:00.000000' " + "WHERE " + "(sink.`batch_id_out` = 999999999) AND " + - "(EXISTS (SELECT * FROM `mydb`.`staging` as stage " + + "(EXISTS (SELECT * FROM `mydb`.`staging_temp_staging_lp_yosulf` as stage " + "WHERE ((sink.`id` = stage.`id`) AND (sink.`name` = stage.`name`)) " + "AND ((sink.`digest` <> stage.`digest`) OR (stage.`delete_indicator` IN ('yes','1','true')))))"; @@ -131,18 +140,30 @@ public void verifyUnitemporalDeltaWithDeleteIndMultiValuesNoDedupNoVersion(Gener "`batch_time_in`, `batch_time_out`) " + "(SELECT stage.`id`,stage.`name`,stage.`amount`,stage.`biz_date`,stage.`digest`," + "(SELECT COALESCE(MAX(batch_metadata.`table_batch_id`),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.`table_name`) = 'MAIN')," + - "999999999,'2000-01-01 00:00:00.000000','9999-12-31 23:59:59' FROM `mydb`.`staging` as stage " + + "999999999,'2000-01-01 00:00:00.000000','9999-12-31 23:59:59' FROM `mydb`.`staging_temp_staging_lp_yosulf` as stage " + "WHERE (NOT (EXISTS (SELECT * FROM `mydb`.`main` as sink " + "WHERE (sink.`batch_id_out` = 999999999) AND (sink.`digest` = stage.`digest`) " + "AND ((sink.`id` = stage.`id`) AND (sink.`name` = stage.`name`))))) AND " + "(stage.`delete_indicator` NOT IN ('yes','1','true')))"; + String expectedInsertIntoBaseTempStagingWithFilterDuplicates = "INSERT INTO `mydb`.`staging_temp_staging_lp_yosulf` " + + "(`id`, `name`, `amount`, `biz_date`, `digest`, `delete_indicator`, `legend_persistence_count`) " + + "(SELECT stage.`id`,stage.`name`,stage.`amount`,stage.`biz_date`,stage.`digest`,stage.`delete_indicator`," + + "COUNT(*) as `legend_persistence_count` FROM `mydb`.`staging` as stage " + + "GROUP BY stage.`id`, stage.`name`, stage.`amount`, stage.`biz_date`, stage.`digest`, stage.`delete_indicator`)"; + Assertions.assertEquals(MemsqlTestArtifacts.expectedMainTableCreateQuery, preActionsSql.get(0)); Assertions.assertEquals(MemsqlTestArtifacts.expectedMetadataTableCreateQuery, preActionsSql.get(1)); Assertions.assertEquals(expectedMilestoneQuery, milestoningSql.get(0)); Assertions.assertEquals(expectedUpsertQuery, milestoningSql.get(1)); Assertions.assertEquals(getExpectedMetadataTableIngestQuery(), metadataIngestSql.get(0)); + Assertions.assertEquals(MemsqlTestArtifacts.expectedTempStagingCleanupQuery, dedupAndVersioningSql.get(0)); + Assertions.assertEquals(expectedInsertIntoBaseTempStagingWithFilterDuplicates, dedupAndVersioningSql.get(1)); + Assertions.assertEquals(MemsqlTestArtifacts.maxDupsErrorCheckSql, dedupAndVersionErrorSqlTypeStringMap.get(MAX_DUPLICATES)); + Assertions.assertEquals(MemsqlTestArtifacts.dupRowsSql, dedupAndVersionErrorSqlTypeStringMap.get(DUPLICATE_ROWS)); + Assertions.assertEquals(MemsqlTestArtifacts.maxPkDupsErrorCheckSql, dedupAndVersionErrorSqlTypeStringMap.get(MAX_PK_DUPLICATES)); + Assertions.assertEquals(MemsqlTestArtifacts.dupPkRowsSql, dedupAndVersionErrorSqlTypeStringMap.get(PK_DUPLICATE_ROWS)); // Stats String incomingRecordCount = "SELECT COUNT(*) as `incomingRecordCount` FROM `mydb`.`staging` as stage"; diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-test/src/test/java/org/finos/legend/engine/persistence/components/scenarios/UnitemporalDeltaBatchIdDateTimeBasedScenarios.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-test/src/test/java/org/finos/legend/engine/persistence/components/scenarios/UnitemporalDeltaBatchIdDateTimeBasedScenarios.java index 283aa64a426..62c640acdbf 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-test/src/test/java/org/finos/legend/engine/persistence/components/scenarios/UnitemporalDeltaBatchIdDateTimeBasedScenarios.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-test/src/test/java/org/finos/legend/engine/persistence/components/scenarios/UnitemporalDeltaBatchIdDateTimeBasedScenarios.java @@ -22,6 +22,7 @@ import org.finos.legend.engine.persistence.components.ingestmode.transactionmilestoning.BatchIdAndDateTime; import org.finos.legend.engine.persistence.components.ingestmode.versioning.AllVersionsStrategy; import org.finos.legend.engine.persistence.components.ingestmode.versioning.DigestBasedResolver; +import org.finos.legend.engine.persistence.components.ingestmode.versioning.NoVersioningStrategy; import java.util.Arrays; @@ -82,6 +83,8 @@ public TestScenario BATCH_ID_AND_TIME_BASED__WITH_DEL_IND_MULTI_VALUES__NO_DEDUP .deleteField(deleteIndicatorField) .addAllDeleteValues(Arrays.asList(deleteIndicatorValues)) .build()) + .versioningStrategy(NoVersioningStrategy.builder().failOnDuplicatePrimaryKeys(true).build()) + .deduplicationStrategy(FailOnDuplicates.builder().build()) .build(); return new TestScenario(mainTableWithBatchIdAndTime, stagingTableWithDeleteIndicator, ingestMode); diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-test/src/test/java/org/finos/legend/engine/persistence/components/testcases/ingestmode/unitemporal/UnitmemporalDeltaBatchIdDateTimeBasedTestCases.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-test/src/test/java/org/finos/legend/engine/persistence/components/testcases/ingestmode/unitemporal/UnitmemporalDeltaBatchIdDateTimeBasedTestCases.java index 0cb41f6eb10..46a19a1ef86 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-test/src/test/java/org/finos/legend/engine/persistence/components/testcases/ingestmode/unitemporal/UnitmemporalDeltaBatchIdDateTimeBasedTestCases.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-test/src/test/java/org/finos/legend/engine/persistence/components/testcases/ingestmode/unitemporal/UnitmemporalDeltaBatchIdDateTimeBasedTestCases.java @@ -79,6 +79,7 @@ void testUnitemporalDeltaWithDeleteIndMultiValuesNoDedupNoVersion() .relationalSink(getRelationalSink()) .executionTimestampClock(fixedClock_2000_01_01) .collectStatistics(true) + .ingestRunId(ingestRunId) .build(); GeneratorResult operations = generator.generateOperations(scenario.getDatasets()); verifyUnitemporalDeltaWithDeleteIndMultiValuesNoDedupNoVersion(operations);