From c904d990236d34c03d0ee9346844522ee42b0960 Mon Sep 17 00:00:00 2001 From: prasar-ashutosh Date: Fri, 6 Oct 2023 14:39:14 +0800 Subject: [PATCH 1/2] Fix the sqls for dedup and versioning and add H2 tests --- .../DatasetDeduplicationHandler.java | 1 - .../versioning/DatasetVersioningHandler.java | 2 - .../components/planner/Planner.java | 2 +- .../components/util/LogicalPlanUtils.java | 9 +- .../components/AnsiTestArtifacts.java | 13 +- .../util/DatasetDeduplicationHandlerTest.java | 4 +- .../util/DatasetVersioningHandlerTest.java | 25 +- .../ingestmode/BigQueryTestArtifacts.java | 9 +- .../nontemporal/NontemporalSnapshotTest.java | 11 +- .../versioning/TestDedupAndVersioning.java | 225 ++++++++++++++++++ ..._expected_without_dups_and_max_version.csv | 4 + .../input/data_with_dups.csv | 6 + .../input/data_with_version.csv | 7 + .../input/with_data_splits/data_pass1.csv | 11 +- .../ingestmode/MemsqlTestArtifacts.java | 11 +- 15 files changed, 292 insertions(+), 48 deletions(-) create mode 100644 legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/versioning/TestDedupAndVersioning.java create mode 100644 legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/dedup-and-versioning/expected/data_expected_without_dups_and_max_version.csv create mode 100644 legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/dedup-and-versioning/input/data_with_dups.csv create mode 100644 legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/dedup-and-versioning/input/data_with_version.csv diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/deduplication/DatasetDeduplicationHandler.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/deduplication/DatasetDeduplicationHandler.java index 898a30d7b69..8f2217604d0 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/deduplication/DatasetDeduplicationHandler.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/deduplication/DatasetDeduplicationHandler.java @@ -61,7 +61,6 @@ private Dataset selectionWithGroupByAllColumns() Selection selectionWithGroupByAllColumns = Selection.builder() .source(stagingDataset) .addAllFields(allColumnsWithCount) - .alias(stagingDataset.datasetReference().alias()) .groupByFields(allColumns) .build(); return selectionWithGroupByAllColumns; diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/versioning/DatasetVersioningHandler.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/versioning/DatasetVersioningHandler.java index d0ce33971aa..dd6cbcb0f30 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/versioning/DatasetVersioningHandler.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/versioning/DatasetVersioningHandler.java @@ -79,7 +79,6 @@ public Dataset visitMaxVersionStrategy(MaxVersionStrategyAbstract maxVersionStra .source(selectionWithRank) .addAllFields(allColumns) .condition(rankFilterCondition) - .alias(dataset.datasetReference().alias()) .build(); return enrichedStagingDataset; @@ -111,7 +110,6 @@ public Dataset visitAllVersionsStrategy(AllVersionsStrategyAbstract allVersionsS Selection selectionWithRank = Selection.builder() .source(dataset) .addAllFields(allColumnsWithRank) - .alias(dataset.datasetReference().alias()) .build(); return selectionWithRank; } diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/planner/Planner.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/planner/Planner.java index e6d82b035f4..44f765d2050 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/planner/Planner.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/planner/Planner.java @@ -252,7 +252,7 @@ public LogicalPlan buildLogicalPlanForDeduplicationAndVersioning(Resources resou if (isTempTableNeededForStaging) { operations.add(Delete.builder().dataset(tempStagingDataset()).build()); - Dataset dedupAndVersionedDataset = LogicalPlanUtils.getTempStagingDataset(ingestMode(), originalStagingDataset(), primaryKeys); + Dataset dedupAndVersionedDataset = LogicalPlanUtils.getDedupedAndVersionedDataset(ingestMode.deduplicationStrategy(), ingestMode.versioningStrategy(), originalStagingDataset(), primaryKeys); List fieldsToInsert = new ArrayList<>(dedupAndVersionedDataset.schemaReference().fieldValues()); operations.add(Insert.of(tempStagingDataset(), dedupAndVersionedDataset, fieldsToInsert)); } diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/util/LogicalPlanUtils.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/util/LogicalPlanUtils.java index 67a670f7fb9..9d2a948a6a9 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/util/LogicalPlanUtils.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/util/LogicalPlanUtils.java @@ -457,11 +457,14 @@ public static Dataset getTempStagingDatasetDefinition(Dataset stagingDataset, In .build(); } - public static Dataset getTempStagingDataset(IngestMode ingestMode, Dataset stagingDataset, List primaryKeys) + public static Dataset getDedupedAndVersionedDataset(DeduplicationStrategy deduplicationStrategy, VersioningStrategy versioningStrategy, Dataset stagingDataset, List primaryKeys) { - DeduplicationStrategy deduplicationStrategy = ingestMode.deduplicationStrategy(); - VersioningStrategy versioningStrategy = ingestMode.versioningStrategy(); Dataset dedupedDataset = deduplicationStrategy.accept(new DatasetDeduplicationHandler(stagingDataset)); + if (dedupedDataset instanceof Selection) + { + Selection selection = (Selection) dedupedDataset; + dedupedDataset = selection.withAlias(stagingDataset.datasetReference().alias()); + } Dataset versionedDataset = versioningStrategy.accept(new DatasetVersioningHandler(dedupedDataset, primaryKeys)); return versionedDataset; } diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/AnsiTestArtifacts.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/AnsiTestArtifacts.java index 9806eed2692..173c2ed83d7 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/AnsiTestArtifacts.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/AnsiTestArtifacts.java @@ -415,11 +415,10 @@ public static String getDropTempTableQuery(String tableName) public static String expectedInsertIntoBaseTempStagingWithMaxVersionAndAllowDuplicates = "INSERT INTO \"mydb\".\"staging_legend_persistence_temp_staging\" " + "(\"id\", \"name\", \"amount\", \"biz_date\", \"legend_persistence_count\") " + - "((SELECT stage.\"id\",stage.\"name\",stage.\"amount\",stage.\"biz_date\",stage.\"legend_persistence_count\" as \"legend_persistence_count\" FROM " + - "(SELECT stage.\"id\",stage.\"name\",stage.\"amount\",stage.\"biz_date\",stage.\"legend_persistence_count\" as \"legend_persistence_count\"," + - "DENSE_RANK() OVER (PARTITION BY stage.\"id\",stage.\"name\" ORDER BY stage.\"biz_date\" DESC) as \"legend_persistence_rank\" " + - "FROM " + - "(SELECT stage.\"id\",stage.\"name\",stage.\"amount\",stage.\"biz_date\",COUNT(*) as \"legend_persistence_count\" " + - "FROM \"mydb\".\"staging\" as stage GROUP BY stage.\"id\", stage.\"name\", stage.\"amount\", stage.\"biz_date\") as stage) as stage " + - "WHERE stage.\"legend_persistence_rank\" = 1) as stage)"; + "(SELECT stage.\"id\",stage.\"name\",stage.\"amount\",stage.\"biz_date\",stage.\"legend_persistence_count\" as \"legend_persistence_count\" FROM " + + "(SELECT stage.\"id\",stage.\"name\",stage.\"amount\",stage.\"biz_date\",stage.\"legend_persistence_count\" as \"legend_persistence_count\",DENSE_RANK() OVER " + + "(PARTITION BY stage.\"id\",stage.\"name\" ORDER BY stage.\"biz_date\" DESC) as \"legend_persistence_rank\" FROM " + + "(SELECT stage.\"id\",stage.\"name\",stage.\"amount\",stage.\"biz_date\",COUNT(*) as \"legend_persistence_count\" FROM " + + "\"mydb\".\"staging\" as stage GROUP BY stage.\"id\", stage.\"name\", stage.\"amount\", stage.\"biz_date\") as stage) as stage " + + "WHERE stage.\"legend_persistence_rank\" = 1)"; } diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/util/DatasetDeduplicationHandlerTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/util/DatasetDeduplicationHandlerTest.java index b78df8aef3b..6e73d57a9b8 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/util/DatasetDeduplicationHandlerTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/util/DatasetDeduplicationHandlerTest.java @@ -42,9 +42,9 @@ public class DatasetDeduplicationHandlerTest extends IngestModeTest .schema(baseTableSchemaWithVersion) .build(); - String expectedSql = "(SELECT stage.\"id\",stage.\"name\",stage.\"version\",stage.\"biz_date\",COUNT(*) as \"legend_persistence_count\" " + + String expectedSql = "SELECT stage.\"id\",stage.\"name\",stage.\"version\",stage.\"biz_date\",COUNT(*) as \"legend_persistence_count\" " + "FROM \"my_db\".\"my_schema\".\"my_table\" as stage " + - "GROUP BY stage.\"id\", stage.\"name\", stage.\"version\", stage.\"biz_date\") as stage"; + "GROUP BY stage.\"id\", stage.\"name\", stage.\"version\", stage.\"biz_date\""; @Test public void testDatasetDeduplicationFailOnDuplicates() diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/util/DatasetVersioningHandlerTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/util/DatasetVersioningHandlerTest.java index 43fbb6c40d6..58255358e61 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/util/DatasetVersioningHandlerTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/util/DatasetVersioningHandlerTest.java @@ -18,10 +18,7 @@ import org.finos.legend.engine.persistence.components.common.DatasetFilter; import org.finos.legend.engine.persistence.components.common.FilterType; import org.finos.legend.engine.persistence.components.ingestmode.deduplication.*; -import org.finos.legend.engine.persistence.components.ingestmode.versioning.AllVersionsStrategy; -import org.finos.legend.engine.persistence.components.ingestmode.versioning.DatasetVersioningHandler; -import org.finos.legend.engine.persistence.components.ingestmode.versioning.MaxVersionStrategy; -import org.finos.legend.engine.persistence.components.ingestmode.versioning.NoVersioningStrategy; +import org.finos.legend.engine.persistence.components.ingestmode.versioning.*; import org.finos.legend.engine.persistence.components.logicalplan.LogicalPlan; import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset; import org.finos.legend.engine.persistence.components.logicalplan.datasets.DatasetDefinition; @@ -77,10 +74,10 @@ public void testVersioningHandlerMaxVersionStrategy() LogicalPlan logicalPlan = LogicalPlan.builder().addOps(versionedSelection).build(); SqlPlan physicalPlan = transformer.generatePhysicalPlan(logicalPlan); List list = physicalPlan.getSqlList(); - String expectedSql = "(SELECT stage.\"id\",stage.\"name\",stage.\"version\",stage.\"biz_date\" " + + String expectedSql = "SELECT stage.\"id\",stage.\"name\",stage.\"version\",stage.\"biz_date\" " + "FROM (SELECT stage.\"id\",stage.\"name\",stage.\"version\",stage.\"biz_date\",DENSE_RANK() OVER " + "(PARTITION BY stage.\"id\",stage.\"name\" ORDER BY stage.\"version\" DESC) as \"legend_persistence_rank\" " + - "FROM \"my_db\".\"my_schema\".\"my_table\" as stage) as stage WHERE stage.\"legend_persistence_rank\" = 1) as stage"; + "FROM \"my_db\".\"my_schema\".\"my_table\" as stage) as stage WHERE stage.\"legend_persistence_rank\" = 1"; Assertions.assertEquals(expectedSql, list.get(0)); } @@ -93,28 +90,30 @@ public void testVersioningHandlerAllVersionsStrategy() LogicalPlan logicalPlan = LogicalPlan.builder().addOps(versionedSelection).build(); SqlPlan physicalPlan = transformer.generatePhysicalPlan(logicalPlan); List list = physicalPlan.getSqlList(); - String expectedSql = "(SELECT stage.\"id\",stage.\"name\",stage.\"version\",stage.\"biz_date\"," + + String expectedSql = "SELECT stage.\"id\",stage.\"name\",stage.\"version\",stage.\"biz_date\"," + "DENSE_RANK() OVER (PARTITION BY stage.\"id\",stage.\"name\" ORDER BY stage.\"version\" ASC) as \"legend_persistence_data_split\" " + - "FROM \"my_db\".\"my_schema\".\"my_table\" as stage) as stage"; + "FROM \"my_db\".\"my_schema\".\"my_table\" as stage"; Assertions.assertEquals(expectedSql, list.get(0)); } @Test public void testVersioningHandlerWithDeduplicationHandler() { - Dataset dedupedDataset = FailOnDuplicates.builder().build().accept(new DatasetDeduplicationHandler(derivedStagingDataset)); - Dataset versionedDataset = AllVersionsStrategy.builder().versioningField("version").build().accept(new DatasetVersioningHandler(dedupedDataset, primaryKeys)); - Selection versionedSelection = (Selection) versionedDataset; + DeduplicationStrategy deduplicationStrategy = FailOnDuplicates.builder().build(); + VersioningStrategy versioningStrategy = AllVersionsStrategy.builder().versioningField("version").build(); + Dataset dedupAndVersionedDataset = LogicalPlanUtils.getDedupedAndVersionedDataset(deduplicationStrategy, versioningStrategy, derivedStagingDataset, primaryKeys); + + Selection versionedSelection = (Selection) dedupAndVersionedDataset; RelationalTransformer transformer = new RelationalTransformer(AnsiSqlSink.get(), transformOptions); LogicalPlan logicalPlan = LogicalPlan.builder().addOps(versionedSelection).build(); SqlPlan physicalPlan = transformer.generatePhysicalPlan(logicalPlan); List list = physicalPlan.getSqlList(); - String expectedSql = "(SELECT stage.\"id\",stage.\"name\",stage.\"version\",stage.\"biz_date\"," + + String expectedSql = "SELECT stage.\"id\",stage.\"name\",stage.\"version\",stage.\"biz_date\"," + "stage.\"legend_persistence_count\" as \"legend_persistence_count\"," + "DENSE_RANK() OVER (PARTITION BY stage.\"id\",stage.\"name\" ORDER BY stage.\"version\" ASC) as \"legend_persistence_data_split\" " + "FROM (SELECT stage.\"id\",stage.\"name\",stage.\"version\",stage.\"biz_date\"," + "COUNT(*) as \"legend_persistence_count\" FROM \"my_db\".\"my_schema\".\"my_table\" as stage WHERE stage.\"bizDate\" = '2020-01-01' " + - "GROUP BY stage.\"id\", stage.\"name\", stage.\"version\", stage.\"biz_date\") as stage) as stage"; + "GROUP BY stage.\"id\", stage.\"name\", stage.\"version\", stage.\"biz_date\") as stage"; Assertions.assertEquals(expectedSql, list.get(0)); } } diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/BigQueryTestArtifacts.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/BigQueryTestArtifacts.java index 1aa748966c6..d4848e657ef 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/BigQueryTestArtifacts.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/BigQueryTestArtifacts.java @@ -391,11 +391,10 @@ public class BigQueryTestArtifacts public static String expectedInsertIntoBaseTempStagingWithMaxVersionAndAllowDuplicates = "INSERT INTO `mydb`.`staging_legend_persistence_temp_staging` " + "(`id`, `name`, `amount`, `biz_date`, `legend_persistence_count`) " + - "((SELECT stage.`id`,stage.`name`,stage.`amount`,stage.`biz_date`,stage.`legend_persistence_count` as `legend_persistence_count` FROM " + + "(SELECT stage.`id`,stage.`name`,stage.`amount`,stage.`biz_date`,stage.`legend_persistence_count` as `legend_persistence_count` FROM " + "(SELECT stage.`id`,stage.`name`,stage.`amount`,stage.`biz_date`,stage.`legend_persistence_count` as `legend_persistence_count`," + "DENSE_RANK() OVER (PARTITION BY stage.`id`,stage.`name` ORDER BY stage.`biz_date` DESC) as `legend_persistence_rank` " + - "FROM " + - "(SELECT stage.`id`,stage.`name`,stage.`amount`,stage.`biz_date`,COUNT(*) as `legend_persistence_count` " + - "FROM `mydb`.`staging` as stage GROUP BY stage.`id`, stage.`name`, stage.`amount`, stage.`biz_date`) as stage) as stage " + - "WHERE stage.`legend_persistence_rank` = 1) as stage)"; + "FROM (SELECT stage.`id`,stage.`name`,stage.`amount`,stage.`biz_date`,COUNT(*) as `legend_persistence_count` " + + "FROM `mydb`.`staging` as stage GROUP BY stage.`id`, stage.`name`, stage.`amount`, stage.`biz_date`) as stage) " + + "as stage WHERE stage.`legend_persistence_rank` = 1)"; } diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/nontemporal/NontemporalSnapshotTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/nontemporal/NontemporalSnapshotTest.java index ec287626ef0..1348375bef5 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/nontemporal/NontemporalSnapshotTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/nontemporal/NontemporalSnapshotTest.java @@ -20,6 +20,8 @@ import org.finos.legend.engine.persistence.components.ingestmode.NontemporalSnapshot; import org.finos.legend.engine.persistence.components.ingestmode.audit.DateTimeAuditing; import org.finos.legend.engine.persistence.components.ingestmode.audit.NoAuditing; +import org.finos.legend.engine.persistence.components.ingestmode.deduplication.FilterDuplicates; +import org.finos.legend.engine.persistence.components.ingestmode.versioning.MaxVersionStrategy; import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset; import org.finos.legend.engine.persistence.components.logicalplan.datasets.DatasetDefinition; import org.finos.legend.engine.persistence.components.planner.PlannerOptions; @@ -30,6 +32,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.logging.Filter; import static org.finos.legend.engine.persistence.components.TestUtils.batchUpdateTimeName; import static org.finos.legend.engine.persistence.components.TestUtils.dataSplitName; @@ -52,7 +55,8 @@ class NontemporalSnapshotTest extends BaseTest 4. No Auditing & import external CSV dataset 5. Staging has lesser columns than main dataset 6. Staging data cleanup - 7. Data Splits enabled + 7. With Auditing, Max Version, Filter Duplicates + 8. With Auditing, No Version, Fail on Duplicates */ /* @@ -258,7 +262,7 @@ void testNontemporalSnapshotWithCleanStagingData() throws Exception Scenario: Test Nontemporal Snapshot when data splits are enabled */ @Test - void testNontemporalSnapshotWithDataSplits() throws Exception + void testNontemporalSnapshotWithMaxVersionAndFilterDuplicates() throws Exception { DatasetDefinition mainTable = TestUtils.getDefaultMainTable(); String dataPass1 = basePath + "input/with_data_splits/data_pass1.csv"; @@ -267,7 +271,8 @@ void testNontemporalSnapshotWithDataSplits() throws Exception // Generate the milestoning object NontemporalSnapshot ingestMode = NontemporalSnapshot.builder() .auditing(NoAuditing.builder().build()) - //.dataSplitField(dataSplitName) + .versioningStrategy(MaxVersionStrategy.of("")) + .deduplicationStrategy(FilterDuplicates.builder().build()) .build(); PlannerOptions options = PlannerOptions.builder().collectStatistics(true).build(); diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/versioning/TestDedupAndVersioning.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/versioning/TestDedupAndVersioning.java new file mode 100644 index 00000000000..5acdb58ba4f --- /dev/null +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/versioning/TestDedupAndVersioning.java @@ -0,0 +1,225 @@ +// Copyright 2023 Goldman Sachs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.finos.legend.engine.persistence.components.versioning; + +import org.finos.legend.engine.persistence.components.BaseTest; +import org.finos.legend.engine.persistence.components.TestUtils; +import org.finos.legend.engine.persistence.components.common.Datasets; +import org.finos.legend.engine.persistence.components.executor.Executor; +import org.finos.legend.engine.persistence.components.ingestmode.AppendOnly; +import org.finos.legend.engine.persistence.components.ingestmode.IngestMode; +import org.finos.legend.engine.persistence.components.ingestmode.NontemporalSnapshot; +import org.finos.legend.engine.persistence.components.ingestmode.audit.DateTimeAuditing; +import org.finos.legend.engine.persistence.components.ingestmode.audit.NoAuditing; +import org.finos.legend.engine.persistence.components.ingestmode.deduplication.FilterDuplicates; +import org.finos.legend.engine.persistence.components.ingestmode.versioning.AllVersionsStrategy; +import org.finos.legend.engine.persistence.components.ingestmode.versioning.MaxVersionStrategy; +import org.finos.legend.engine.persistence.components.logicalplan.datasets.*; +import org.finos.legend.engine.persistence.components.relational.api.RelationalIngestor; +import org.finos.legend.engine.persistence.components.relational.h2.H2Sink; +import org.finos.legend.engine.persistence.components.relational.jdbc.JdbcConnection; +import org.junit.jupiter.api.Test; + +import static org.finos.legend.engine.persistence.components.TestUtils.*; + +public class TestDedupAndVersioning extends BaseTest +{ + + /* Scenarios: + 1. No Dedup, NoVersion -> No tempStagingTable + 2. No Dedup, MaxVersion do not perform versioning -> No tempStagingTable + 3. No Dedup, MaxVersion with perform versioning -> tempStagingTable with only MaxVersioned Data [throw Error on Data errors] + 4. No Dedup, AllVersion do not perform versioning -> No tempStagingTable + 5. No Dedup, AllVersion with perform versioning -> tempStagingTable with Data splits [throw Error on Data errors] + + 6. Filter Dups, NoVersion -> tempStagingTable with count column + 7. Filter Dups, MaxVersion do not perform versioning -> tempStagingTable with count column + 8. Filter Dups, MaxVersion with perform versioning -> tempStagingTable with count column and only max version [throw Error on Data errors] + 9. Filter Dups, AllVersion do not perform versioning -> tempStagingTable with count column + 10. Filter Dups, AllVersion with perform versioning -> tempStagingTable with count column and Data splits [throw Error on Data errors] + + 11. Fail on Dups, NoVersion -> tempStagingTable with count column [Throw error on dups] + 12. Fail on Dups, MaxVersion do not perform versioning -> tempStagingTable with count column [Throw error on dups] + 13. Fail on Dups, MaxVersion with perform versioning -> tempStagingTable with count column and only max version [Throw error on dups, throw Error on Data errors] + 14. Fail on Dups, AllVersion do not perform versioning -> tempStagingTable with count column [Throw error on dups] + 15. Fail on Dups, AllVersion with perform versioning -> tempStagingTable with count column and Data splits [Throw error on dups, throw Error on Data errors] + */ + + private static Field name = Field.builder().name(nameName).type(FieldType.of(DataType.VARCHAR, 64, null)).nullable(false).primaryKey(true).fieldAlias(nameName).build(); + + // Base Schema : PK : id, name + public static SchemaDefinition baseSchemaWithoutVersion = + SchemaDefinition.builder() + .addFields(id) + .addFields(name) + .addFields(income) + .addFields(expiryDate) + .addFields(digest) + .build(); + + public static SchemaDefinition baseSchemaWithVersion = + SchemaDefinition.builder() + .addFields(id) + .addFields(name) + .addFields(version) + .addFields(income) + .addFields(expiryDate) + .addFields(digest) + .build(); + + @Test + void testNoDedupNoVersioning() throws Exception + { + // 1 + DatasetDefinition mainTable = TestUtils.getDefaultMainTable(); + DatasetDefinition stagingTable = getStagingTableWithoutVersion(); + Datasets datasets = Datasets.of(mainTable, stagingTable); + IngestMode ingestMode = NontemporalSnapshot.builder() + .auditing(NoAuditing.builder().build()) + .build(); + RelationalIngestor ingestor = RelationalIngestor.builder() + .ingestMode(ingestMode) + .relationalSink(H2Sink.get()) + .build(); + + // Create staging Table without PKS and load Data into it + + Executor executor = ingestor.init(JdbcConnection.of(h2Sink.connection())); + datasets = ingestor.create(datasets); + datasets = ingestor.dedupAndVersion(datasets); + } + + @Test + void testFilterDupsNoVersioning() throws Exception + { + // 6 + DatasetDefinition mainTable = TestUtils.getDefaultMainTable(); + DatasetDefinition stagingTable = getStagingTableWithoutVersion(); + Datasets datasets = Datasets.of(mainTable, stagingTable); + IngestMode ingestMode = NontemporalSnapshot.builder() + .auditing(NoAuditing.builder().build()) + .deduplicationStrategy(FilterDuplicates.builder().build()) + .build(); + RelationalIngestor ingestor = RelationalIngestor.builder() + .ingestMode(ingestMode) + .relationalSink(H2Sink.get()) + .build(); + + // Create staging Table without PKS and load Data into it + createStagingTableWithoutVersion(); + + Executor executor = ingestor.init(JdbcConnection.of(h2Sink.connection())); + datasets = ingestor.create(datasets); + datasets = ingestor.dedupAndVersion(datasets); + } + + + @Test + void testFilterDupsMaxVersion() throws Exception + { + // 8 + DatasetDefinition mainTable = TestUtils.getDefaultMainTable(); + DatasetDefinition stagingTable = getStagingTableWithVersion(); + Datasets datasets = Datasets.of(mainTable, stagingTable); + IngestMode ingestMode = NontemporalSnapshot.builder() + .auditing(NoAuditing.builder().build()) + .deduplicationStrategy(FilterDuplicates.builder().build()) + .versioningStrategy(MaxVersionStrategy.builder().versioningField("version").build()) + .build(); + RelationalIngestor ingestor = RelationalIngestor.builder() + .ingestMode(ingestMode) + .relationalSink(H2Sink.get()) + .build(); + + // Create staging Table without PKS and load Data into it + createStagingTableWithVersion(); + + Executor executor = ingestor.init(JdbcConnection.of(h2Sink.connection())); + datasets = ingestor.create(datasets); + datasets = ingestor.dedupAndVersion(datasets); + } + + @Test + void testFilterDupsAllVersion() throws Exception + { + // 10 + DatasetDefinition mainTable = TestUtils.getDefaultMainTable(); + DatasetDefinition stagingTable = getStagingTableWithVersion(); + Datasets datasets = Datasets.of(mainTable, stagingTable); + IngestMode ingestMode = AppendOnly.builder() + .auditing(DateTimeAuditing.builder().dateTimeField("append_time").build()) + .digestField("digest") + .deduplicationStrategy(FilterDuplicates.builder().build()) + .versioningStrategy(AllVersionsStrategy.builder().versioningField("version").build()) + .build(); + RelationalIngestor ingestor = RelationalIngestor.builder() + .ingestMode(ingestMode) + .relationalSink(H2Sink.get()) + .build(); + + // Create staging Table without PKS and load Data into it + createStagingTableWithVersion(); + + Executor executor = ingestor.init(JdbcConnection.of(h2Sink.connection())); + datasets = ingestor.create(datasets); + datasets = ingestor.dedupAndVersion(datasets); + } + + private DatasetDefinition getStagingTableWithoutVersion() + { + return DatasetDefinition.builder() + .group(testSchemaName) + .name(stagingTableName) + .schema(baseSchemaWithoutVersion) + .build(); + } + + private DatasetDefinition getStagingTableWithVersion() + { + return DatasetDefinition.builder() + .group(testSchemaName) + .name(stagingTableName) + .schema(baseSchemaWithVersion) + .build(); + } + + + private void createStagingTableWithoutVersion() + { + String createSql = "CREATE TABLE IF NOT EXISTS \"TEST\".\"staging\"" + + "(\"id\" INTEGER NOT NULL," + + "\"name\" VARCHAR(64) NOT NULL," + + "\"income\" BIGINT," + + "\"expiry_date\" DATE," + + "\"digest\" VARCHAR)"; + h2Sink.executeStatement(createSql); + } + + private void createStagingTableWithVersion() + { + String createSql = "CREATE TABLE IF NOT EXISTS \"TEST\".\"staging\"" + + "(\"id\" INTEGER NOT NULL," + + "\"name\" VARCHAR(64) NOT NULL," + + "\"version\" INTEGER," + + "\"income\" BIGINT," + + "\"expiry_date\" DATE," + + "\"digest\" VARCHAR)"; + h2Sink.executeStatement(createSql); + } + + + + + } diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/dedup-and-versioning/expected/data_expected_without_dups_and_max_version.csv b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/dedup-and-versioning/expected/data_expected_without_dups_and_max_version.csv new file mode 100644 index 00000000000..52c061ced12 --- /dev/null +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/dedup-and-versioning/expected/data_expected_without_dups_and_max_version.csv @@ -0,0 +1,4 @@ +1,Andy,3,3000,2012-01-03,digest3,1 +2,Becky,1,4000,2012-01-04,digest4,2 +3,Cathy,1,5000,2012-01-05,digest5,1 +3,Cathy,1,6000,2012-01-06,digest6,1 \ No newline at end of file diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/dedup-and-versioning/input/data_with_dups.csv b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/dedup-and-versioning/input/data_with_dups.csv new file mode 100644 index 00000000000..5d2b28c46a9 --- /dev/null +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/dedup-and-versioning/input/data_with_dups.csv @@ -0,0 +1,6 @@ +1,Andy,1000,2012-01-01,digest1 +1,Andy,1000,2012-01-01,digest1 +1,Andy,1000,2012-01-01,digest1 +2,Becky,2000,2012-01-02,digest2 +2,Becky,2000,2012-01-02,digest2 +3,Cathy,3000,2012-01-03,digest3 \ No newline at end of file diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/dedup-and-versioning/input/data_with_version.csv b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/dedup-and-versioning/input/data_with_version.csv new file mode 100644 index 00000000000..d2402f02f2e --- /dev/null +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/dedup-and-versioning/input/data_with_version.csv @@ -0,0 +1,7 @@ +1,Andy,1,1000,2012-01-01,digest1 +1,Andy,2,2000,2012-01-02,digest2 +1,Andy,3,3000,2012-01-03,digest3 +2,Becky,1,4000,2012-01-04,digest4 +2,Becky,1,4000,2012-01-04,digest4 +3,Cathy,1,5000,2012-01-05,digest5 +3,Cathy,1,6000,2012-01-06,digest6 \ No newline at end of file diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/snapshot-milestoning/input/with_data_splits/data_pass1.csv b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/snapshot-milestoning/input/with_data_splits/data_pass1.csv index 26ff40ce596..74403d57fcd 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/snapshot-milestoning/input/with_data_splits/data_pass1.csv +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/snapshot-milestoning/input/with_data_splits/data_pass1.csv @@ -1,5 +1,6 @@ -1,HARRY,1000,2020-01-01 00:00:00.0,2022-12-01,DIGEST1,1 -2,ROBERT,2000,2020-01-02 00:00:00.0,2022-12-02,DIGEST2,1 -3,ANDY,3000,2020-01-03 00:00:00.0,2022-12-03,DIGEST3,1 -1,HARRISON,11000,2020-01-01 00:00:00.0,2022-12-01,DIGEST4,2 -2,ROBERTO,21000,2020-01-02 00:00:00.0,2022-12-02,DIGEST5,2 \ No newline at end of file +1,HARRY,1,1000,2020-01-01 00:00:00.0,2022-12-01,DIGEST1 +1,HARRY,1,1000,2020-01-01 00:00:00.0,2022-12-01,DIGEST1 +2,ROBERT,1,2000,2020-01-02 00:00:00.0,2022-12-02,DIGEST2 +3,ANDY,1,3000,2020-01-03 00:00:00.0,2022-12-03,DIGEST3 +1,HARRISON,2,11000,2020-01-01 00:00:00.0,2022-12-01,DIGEST4 +2,ROBERTO,2,21000,2020-01-02 00:00:00.0,2022-12-02,DIGEST5 \ No newline at end of file diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-memsql/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/MemsqlTestArtifacts.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-memsql/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/MemsqlTestArtifacts.java index 4d086e78b93..dba4fe464c7 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-memsql/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/MemsqlTestArtifacts.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-memsql/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/MemsqlTestArtifacts.java @@ -383,12 +383,11 @@ public class MemsqlTestArtifacts public static String expectedInsertIntoBaseTempStagingWithMaxVersionAndAllowDuplicates = "INSERT INTO `mydb`.`staging_legend_persistence_temp_staging` " + "(`id`, `name`, `amount`, `biz_date`, `legend_persistence_count`) " + - "((SELECT stage.`id`,stage.`name`,stage.`amount`,stage.`biz_date`,stage.`legend_persistence_count` as `legend_persistence_count` FROM " + + "(SELECT stage.`id`,stage.`name`,stage.`amount`,stage.`biz_date`,stage.`legend_persistence_count` as `legend_persistence_count` FROM " + "(SELECT stage.`id`,stage.`name`,stage.`amount`,stage.`biz_date`,stage.`legend_persistence_count` as `legend_persistence_count`," + - "DENSE_RANK() OVER (PARTITION BY stage.`id`,stage.`name` ORDER BY stage.`biz_date` DESC) as `legend_persistence_rank` " + - "FROM " + - "(SELECT stage.`id`,stage.`name`,stage.`amount`,stage.`biz_date`,COUNT(*) as `legend_persistence_count` " + - "FROM `mydb`.`staging` as stage GROUP BY stage.`id`, stage.`name`, stage.`amount`, stage.`biz_date`) as stage) as stage " + - "WHERE stage.`legend_persistence_rank` = 1) as stage)"; + "DENSE_RANK() OVER (PARTITION BY stage.`id`,stage.`name` ORDER BY stage.`biz_date` DESC) as `legend_persistence_rank` FROM " + + "(SELECT stage.`id`,stage.`name`,stage.`amount`,stage.`biz_date`,COUNT(*) as `legend_persistence_count` FROM " + + "`mydb`.`staging` as stage GROUP BY stage.`id`, stage.`name`, stage.`amount`, stage.`biz_date`) as stage) " + + "as stage WHERE stage.`legend_persistence_rank` = 1)"; } From 453f3efc707fd9d006150271c517e681754ae56c Mon Sep 17 00:00:00 2001 From: prasar-ashutosh Date: Fri, 6 Oct 2023 15:10:38 +0800 Subject: [PATCH 2/2] Fix the sqls for dedup and versioning and add H2 tests --- .../components/util/LogicalPlanUtils.java | 3 +- .../nontemporal/NontemporalSnapshotTest.java | 6 +- .../versioning/TestDedupAndVersioning.java | 73 ++++++++++--------- .../data_expected_with_data_splits.csv | 7 ++ .../data_expected_with_duplicates.csv | 6 ++ .../data_expected_with_max_version.csv | 5 ++ ...cted_without_dups_and_with_data_splits.csv | 6 ++ 7 files changed, 66 insertions(+), 40 deletions(-) create mode 100644 legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/dedup-and-versioning/expected/data_expected_with_data_splits.csv create mode 100644 legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/dedup-and-versioning/expected/data_expected_with_duplicates.csv create mode 100644 legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/dedup-and-versioning/expected/data_expected_with_max_version.csv create mode 100644 legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/dedup-and-versioning/expected/data_expected_without_dups_and_with_data_splits.csv diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/util/LogicalPlanUtils.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/util/LogicalPlanUtils.java index 9d2a948a6a9..c8f91cd30ff 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/util/LogicalPlanUtils.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/util/LogicalPlanUtils.java @@ -460,7 +460,8 @@ public static Dataset getTempStagingDatasetDefinition(Dataset stagingDataset, In public static Dataset getDedupedAndVersionedDataset(DeduplicationStrategy deduplicationStrategy, VersioningStrategy versioningStrategy, Dataset stagingDataset, List primaryKeys) { Dataset dedupedDataset = deduplicationStrategy.accept(new DatasetDeduplicationHandler(stagingDataset)); - if (dedupedDataset instanceof Selection) + boolean isTempTableNeededForVersioning = versioningStrategy.accept(VersioningVisitors.IS_TEMP_TABLE_NEEDED); + if (isTempTableNeededForVersioning && dedupedDataset instanceof Selection) { Selection selection = (Selection) dedupedDataset; dedupedDataset = selection.withAlias(stagingDataset.datasetReference().alias()); diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/nontemporal/NontemporalSnapshotTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/nontemporal/NontemporalSnapshotTest.java index 00ba03c3e5b..e0fdbc0ec8a 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/nontemporal/NontemporalSnapshotTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/nontemporal/NontemporalSnapshotTest.java @@ -86,9 +86,9 @@ public void verifyNontemporalSnapshotWithAuditingFilterDupsNoVersioning(Generato String cleanupTempStagingTableSql = "DELETE FROM \"mydb\".\"staging_legend_persistence_temp_staging\" as stage"; String insertTempStagingTableSql = "INSERT INTO \"mydb\".\"staging_legend_persistence_temp_staging\" " + "(\"id\", \"name\", \"amount\", \"biz_date\", \"legend_persistence_count\") " + - "((SELECT stage.\"id\",stage.\"name\",stage.\"amount\",stage.\"biz_date\",COUNT(*) as \"legend_persistence_count\" " + - "FROM \"mydb\".\"staging\" as stage GROUP BY stage.\"id\", stage.\"name\", stage.\"amount\", stage.\"biz_date\") as stage)"; - + "(SELECT stage.\"id\",stage.\"name\",stage.\"amount\",stage.\"biz_date\"," + + "COUNT(*) as \"legend_persistence_count\" FROM \"mydb\".\"staging\" as stage " + + "GROUP BY stage.\"id\", stage.\"name\", stage.\"amount\", stage.\"biz_date\")"; Assertions.assertEquals(AnsiTestArtifacts.expectedBaseTableWithAuditPkCreateQuery, preActionsSqlList.get(0)); Assertions.assertEquals(createTempStagingTable, preActionsSqlList.get(1)); diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/versioning/TestDedupAndVersioning.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/versioning/TestDedupAndVersioning.java index 5acdb58ba4f..5e02d54d484 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/versioning/TestDedupAndVersioning.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/versioning/TestDedupAndVersioning.java @@ -30,9 +30,11 @@ import org.finos.legend.engine.persistence.components.relational.api.RelationalIngestor; import org.finos.legend.engine.persistence.components.relational.h2.H2Sink; import org.finos.legend.engine.persistence.components.relational.jdbc.JdbcConnection; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import static org.finos.legend.engine.persistence.components.TestUtils.*; +import static org.finos.legend.engine.persistence.components.util.LogicalPlanUtils.TEMP_STAGING_DATASET_BASE_NAME; public class TestDedupAndVersioning extends BaseTest { @@ -89,16 +91,10 @@ void testNoDedupNoVersioning() throws Exception IngestMode ingestMode = NontemporalSnapshot.builder() .auditing(NoAuditing.builder().build()) .build(); - RelationalIngestor ingestor = RelationalIngestor.builder() - .ingestMode(ingestMode) - .relationalSink(H2Sink.get()) - .build(); - - // Create staging Table without PKS and load Data into it - Executor executor = ingestor.init(JdbcConnection.of(h2Sink.connection())); - datasets = ingestor.create(datasets); - datasets = ingestor.dedupAndVersion(datasets); + performDedupAndVersioining(datasets, ingestMode); + // Validate tempTableExists + Assertions.assertEquals(false, h2Sink.doesTableExist(getTempStagingDataset())); } @Test @@ -112,17 +108,12 @@ void testFilterDupsNoVersioning() throws Exception .auditing(NoAuditing.builder().build()) .deduplicationStrategy(FilterDuplicates.builder().build()) .build(); - RelationalIngestor ingestor = RelationalIngestor.builder() - .ingestMode(ingestMode) - .relationalSink(H2Sink.get()) - .build(); - - // Create staging Table without PKS and load Data into it createStagingTableWithoutVersion(); + // TODO LOAD DATA - Executor executor = ingestor.init(JdbcConnection.of(h2Sink.connection())); - datasets = ingestor.create(datasets); - datasets = ingestor.dedupAndVersion(datasets); + performDedupAndVersioining(datasets, ingestMode); + // Validate tempTableExists + Assertions.assertEquals(true, h2Sink.doesTableExist(getTempStagingDataset())); } @@ -138,17 +129,14 @@ void testFilterDupsMaxVersion() throws Exception .deduplicationStrategy(FilterDuplicates.builder().build()) .versioningStrategy(MaxVersionStrategy.builder().versioningField("version").build()) .build(); - RelationalIngestor ingestor = RelationalIngestor.builder() - .ingestMode(ingestMode) - .relationalSink(H2Sink.get()) - .build(); - // Create staging Table without PKS and load Data into it createStagingTableWithVersion(); + // TODO LOAD DATA - Executor executor = ingestor.init(JdbcConnection.of(h2Sink.connection())); - datasets = ingestor.create(datasets); - datasets = ingestor.dedupAndVersion(datasets); + performDedupAndVersioining(datasets, ingestMode); + + // Validate tempTableExists + Assertions.assertEquals(true, h2Sink.doesTableExist(getTempStagingDataset())); } @Test @@ -164,17 +152,14 @@ void testFilterDupsAllVersion() throws Exception .deduplicationStrategy(FilterDuplicates.builder().build()) .versioningStrategy(AllVersionsStrategy.builder().versioningField("version").build()) .build(); - RelationalIngestor ingestor = RelationalIngestor.builder() - .ingestMode(ingestMode) - .relationalSink(H2Sink.get()) - .build(); - // Create staging Table without PKS and load Data into it createStagingTableWithVersion(); + // TODO LOAD DATA - Executor executor = ingestor.init(JdbcConnection.of(h2Sink.connection())); - datasets = ingestor.create(datasets); - datasets = ingestor.dedupAndVersion(datasets); + performDedupAndVersioining(datasets, ingestMode); + + // Validate tempTableExists + Assertions.assertEquals(true, h2Sink.doesTableExist(getTempStagingDataset())); } private DatasetDefinition getStagingTableWithoutVersion() @@ -186,6 +171,14 @@ private DatasetDefinition getStagingTableWithoutVersion() .build(); } + private Dataset getTempStagingDataset() + { + return DatasetReferenceImpl.builder() + .group(testSchemaName) + .name(stagingTableName + "_" + TEMP_STAGING_DATASET_BASE_NAME) + .build(); + } + private DatasetDefinition getStagingTableWithVersion() { return DatasetDefinition.builder() @@ -219,7 +212,15 @@ private void createStagingTableWithVersion() h2Sink.executeStatement(createSql); } + private static void performDedupAndVersioining(Datasets datasets, IngestMode ingestMode) { + RelationalIngestor ingestor = RelationalIngestor.builder() + .ingestMode(ingestMode) + .relationalSink(H2Sink.get()) + .build(); - - + Executor executor = ingestor.init(JdbcConnection.of(h2Sink.connection())); + datasets = ingestor.create(datasets); + datasets = ingestor.dedupAndVersion(datasets); } + +} diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/dedup-and-versioning/expected/data_expected_with_data_splits.csv b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/dedup-and-versioning/expected/data_expected_with_data_splits.csv new file mode 100644 index 00000000000..4ec38cb111f --- /dev/null +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/dedup-and-versioning/expected/data_expected_with_data_splits.csv @@ -0,0 +1,7 @@ +1,Andy,1,1000,2012-01-01,digest1,1 +1,Andy,2,2000,2012-01-02,digest2,2 +1,Andy,3,3000,2012-01-03,digest3,3 +2,Becky,1,4000,2012-01-04,digest4,1 +2,Becky,1,4000,2012-01-04,digest4,1 +3,Cathy,1,5000,2012-01-05,digest5,1 +3,Cathy,1,6000,2012-01-06,digest6,1 \ No newline at end of file diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/dedup-and-versioning/expected/data_expected_with_duplicates.csv b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/dedup-and-versioning/expected/data_expected_with_duplicates.csv new file mode 100644 index 00000000000..5d2b28c46a9 --- /dev/null +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/dedup-and-versioning/expected/data_expected_with_duplicates.csv @@ -0,0 +1,6 @@ +1,Andy,1000,2012-01-01,digest1 +1,Andy,1000,2012-01-01,digest1 +1,Andy,1000,2012-01-01,digest1 +2,Becky,2000,2012-01-02,digest2 +2,Becky,2000,2012-01-02,digest2 +3,Cathy,3000,2012-01-03,digest3 \ No newline at end of file diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/dedup-and-versioning/expected/data_expected_with_max_version.csv b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/dedup-and-versioning/expected/data_expected_with_max_version.csv new file mode 100644 index 00000000000..4f351fc2dda --- /dev/null +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/dedup-and-versioning/expected/data_expected_with_max_version.csv @@ -0,0 +1,5 @@ +1,Andy,3,3000,2012-01-03,digest3 +2,Becky,1,4000,2012-01-04,digest4 +2,Becky,1,4000,2012-01-04,digest4 +3,Cathy,1,5000,2012-01-05,digest5 +3,Cathy,1,6000,2012-01-06,digest6 \ No newline at end of file diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/dedup-and-versioning/expected/data_expected_without_dups_and_with_data_splits.csv b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/dedup-and-versioning/expected/data_expected_without_dups_and_with_data_splits.csv new file mode 100644 index 00000000000..0ccd0abe9a4 --- /dev/null +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/dedup-and-versioning/expected/data_expected_without_dups_and_with_data_splits.csv @@ -0,0 +1,6 @@ +1,Andy,1,1000,2012-01-01,digest1,1,1 +1,Andy,2,2000,2012-01-02,digest2,1,2 +1,Andy,3,3000,2012-01-03,digest3,1,3 +2,Becky,1,4000,2012-01-04,digest4,2,1 +3,Cathy,1,5000,2012-01-05,digest5,1,1 +3,Cathy,1,6000,2012-01-06,digest6,1,1 \ No newline at end of file