diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/IngestModeCaseConverter.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/IngestModeCaseConverter.java index 500f37e90fc..089428754cd 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/IngestModeCaseConverter.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/IngestModeCaseConverter.java @@ -54,11 +54,7 @@ import org.finos.legend.engine.persistence.components.ingestmode.versioning.AllVersionsStrategy; import org.finos.legend.engine.persistence.components.ingestmode.versioning.MaxVersionStrategy; -import java.util.Optional; -import java.util.List; -import java.util.Set; -import java.util.Map; -import java.util.HashMap; +import java.util.*; import java.util.function.Function; import java.util.stream.Collectors; @@ -118,6 +114,7 @@ public IngestMode visitUnitemporalSnapshot(UnitemporalSnapshotAbstract unitempor .transactionMilestoning(unitemporalSnapshot.transactionMilestoning().accept(new TransactionMilestoningCaseConverter())) .addAllPartitionFields(applyCase(unitemporalSnapshot.partitionFields())) .putAllPartitionValuesByField(applyCase(unitemporalSnapshot.partitionValuesByField())) + .addAllPartitionSpecList(applyCaseForListOfMap(unitemporalSnapshot.partitionSpecList())) .emptyDatasetHandling(unitemporalSnapshot.emptyDatasetHandling()) .deduplicationStrategy(unitemporalSnapshot.deduplicationStrategy()) .versioningStrategy(unitemporalSnapshot.versioningStrategy().accept(new VersionStrategyCaseConverter())) @@ -210,6 +207,21 @@ private Map> applyCase(Map> map) return caseAppliedMap; } + private List> applyCaseForListOfMap(List> listOfMap) + { + List> caseAppliedListOfMap = new ArrayList<>(); + for (Map map : listOfMap) + { + Map caseAppliedMap = new HashMap<>(); + for (Map.Entry entry : map.entrySet()) + { + caseAppliedMap.put(applyCase(entry.getKey()), entry.getValue()); + } + caseAppliedListOfMap.add(caseAppliedMap); + } + return caseAppliedListOfMap; + } + private class MergeStrategyCaseConverter implements MergeStrategyVisitor { @Override diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/UnitemporalSnapshotAbstract.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/UnitemporalSnapshotAbstract.java index 3ad65815215..3c86841a268 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/UnitemporalSnapshotAbstract.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/UnitemporalSnapshotAbstract.java @@ -26,10 +26,7 @@ import org.finos.legend.engine.persistence.components.ingestmode.versioning.DigestBasedResolverAbstract; import org.immutables.value.Value; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; +import java.util.*; import static org.immutables.value.Value.Derived; import static org.immutables.value.Value.Immutable; @@ -52,7 +49,9 @@ public interface UnitemporalSnapshotAbstract extends IngestMode, TransactionMile List partitionFields(); - Map> partitionValuesByField(); + List> partitionSpecList(); // [ {date: D1, Id: ID1, Name: N1}, {date: D2, Id: ID2, Name: N2}, ....] + + Map> partitionValuesByField(); // for Backward compatibility -- to be deprecated @Derived default boolean partitioned() @@ -75,6 +74,12 @@ default T accept(IngestModeVisitor visitor) @Value.Check default void validate() { + + if (!partitionValuesByField().isEmpty() && !partitionSpecList().isEmpty()) + { + throw new IllegalStateException("Can not build UnitemporalSnapshot, Provide either partitionValuesByField or partitionSpecList, both not supported together"); + } + // All the keys in partitionValuesByField must exactly match the fields in partitionFields if (!partitionValuesByField().isEmpty()) { @@ -89,6 +94,39 @@ default void validate() throw new IllegalStateException(String.format("Can not build UnitemporalSnapshot, partitionKey: [%s] not specified in partitionFields", partitionKey)); } } + int partitionKeysWithMoreThanOneValues = 0; + for (Set partitionValues: partitionValuesByField().values()) + { + if (partitionValues.size() > 1) + { + partitionKeysWithMoreThanOneValues++; + } + } + if (partitionKeysWithMoreThanOneValues > 1) + { + throw new IllegalStateException(String.format("Can not build UnitemporalSnapshot, in partitionValuesByField at most one of the partition keys can have more than one value, all other partition keys must have exactly one value")); + } + } + + if (!partitionSpecList().isEmpty()) + { + for (Map partitionSpec : partitionSpecList()) + { + if (partitionFields().size() != partitionSpec.size()) + { + throw new IllegalStateException("Can not build UnitemporalSnapshot, size of each partitionSpec must be same as size of partitionFields"); + } + } + for (Map partitionSpec : partitionSpecList()) + { + for (String partitionKey: partitionSpec.keySet()) + { + if (!partitionFields().contains(partitionKey)) + { + throw new IllegalStateException(String.format("Can not build UnitemporalSnapshot, partitionKey: [%s] not specified in partitionSpec", partitionKey)); + } + } + } } // Allowed Versioning Strategy - NoVersioning, MaxVersioining diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/planner/UnitemporalSnapshotPlanner.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/planner/UnitemporalSnapshotPlanner.java index 2a1a6d22c37..6b5e9158e2b 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/planner/UnitemporalSnapshotPlanner.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/planner/UnitemporalSnapshotPlanner.java @@ -124,6 +124,10 @@ protected Insert sqlToUpsertRows() { whereClauseForNotInSink.add(LogicalPlanUtils.getPartitionColumnValueMatchInCondition(mainDataset(), ingestMode().partitionValuesByField())); } + else if (!ingestMode().partitionSpecList().isEmpty()) + { + whereClauseForNotInSink.add(LogicalPlanUtils.getPartitionSpecMatchCondition(mainDataset(), ingestMode().partitionSpecList())); + } else { whereClauseForNotInSink.add(LogicalPlanUtils.getPartitionColumnsMatchCondition(mainDataset(), stagingDataset(), ingestMode().partitionFields().toArray(new String[0]))); @@ -193,7 +197,15 @@ protected Update getSqlToMilestoneRows(List> values) if (ingestMode().partitioned()) { - if (ingestMode().partitionValuesByField().isEmpty()) + if (!ingestMode().partitionValuesByField().isEmpty()) + { + whereClauseForPartition.add(LogicalPlanUtils.getPartitionColumnValueMatchInCondition(mainDataset(), ingestMode().partitionValuesByField())); + } + else if (!ingestMode().partitionSpecList().isEmpty()) + { + whereClauseForPartition.add(LogicalPlanUtils.getPartitionSpecMatchCondition(mainDataset(), ingestMode().partitionSpecList())); + } + else { Condition partitionColumnCondition = Exists.of( Selection.builder() @@ -203,10 +215,6 @@ protected Update getSqlToMilestoneRows(List> values) .build()); whereClauseForPartition.add(partitionColumnCondition); } - else - { - whereClauseForPartition.add(LogicalPlanUtils.getPartitionColumnValueMatchInCondition(mainDataset(), ingestMode().partitionValuesByField())); - } } return UpdateAbstract.of(mainDataset(), values, And.of(whereClauseForPartition)); @@ -230,6 +238,10 @@ protected Update sqlToMilestoneAllRows(List> values) { conditions.add(LogicalPlanUtils.getPartitionColumnValueMatchInCondition(mainDataset(), ingestMode().partitionValuesByField())); } + else if (ingestMode().partitioned() && !ingestMode().partitionSpecList().isEmpty()) + { + conditions.add(LogicalPlanUtils.getPartitionSpecMatchCondition(mainDataset(), ingestMode().partitionSpecList())); + } return UpdateAbstract.of(mainDataset(), values, And.of(conditions)); } @@ -254,7 +266,7 @@ public LogicalPlan visitNoOp(NoOpAbstract noOpAbstract) public LogicalPlan visitDeleteTargetData(DeleteTargetDataAbstract deleteTargetDataAbstract) { List operations = new ArrayList<>(); - if (ingestMode().partitioned() && ingestMode().partitionValuesByField().isEmpty()) + if (ingestMode().partitioned() && ingestMode().partitionValuesByField().isEmpty() && ingestMode().partitionSpecList().isEmpty()) { return LogicalPlan.of(operations); } diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/util/LogicalPlanUtils.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/util/LogicalPlanUtils.java index 3293c1479d3..b258d76e704 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/util/LogicalPlanUtils.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/util/LogicalPlanUtils.java @@ -130,14 +130,24 @@ public static Condition getPartitionColumnsMatchCondition(Dataset mainDataSet, D return getColumnsMatchCondition(mainDataSet, stagingDataSet, partitionColumns); } - public static Condition getPartitionColumnsDoNotMatchCondition(Dataset mainDataSet, Dataset stagingDataSet, String[] partitionColumns) + public static Condition getPartitionColumnValueMatchInCondition(Dataset dataSet, Map> partitionFilter) { - return getColumnsDoNotMatchCondition(mainDataSet, stagingDataSet, partitionColumns); + return getColumnValueMatchInCondition(dataSet, partitionFilter); } - public static Condition getPartitionColumnValueMatchInCondition(Dataset dataSet, Map> partitionFilter) + // (key1 = val11 AND key2 = val21) OR (key1 = val12 AND key2 = val22) OR ... + public static Condition getPartitionSpecMatchCondition(Dataset dataSet, List> partitionSpecList) { - return getColumnValueMatchInCondition(dataSet, partitionFilter); + return Or.of(partitionSpecList.stream() + .map(partitionSpec -> And.of( + partitionSpec.entrySet().stream() + .map(columnValuePair -> + Equals.of( + FieldValue.builder().datasetRef(dataSet.datasetReference()).fieldName(columnValuePair.getKey()).build(), + columnValuePair.getValue() instanceof Number ? ObjectValue.of(columnValuePair.getValue()) : StringValue.of((String) columnValuePair.getValue())) + ) + .collect(Collectors.toList())) + ).collect(Collectors.toList())); } private static Condition getColumnValueMatchInCondition(Dataset dataSet, Map> keyValuePair) diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/AnsiTestArtifacts.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/AnsiTestArtifacts.java index 87f5cdc9b91..79792688f90 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/AnsiTestArtifacts.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/AnsiTestArtifacts.java @@ -63,6 +63,10 @@ public class AnsiTestArtifacts "\"id\" INTEGER NOT NULL,\"name\" VARCHAR NOT NULL,\"amount\" DOUBLE,\"biz_date\" DATE,\"digest\" VARCHAR," + "\"batch_id_in\" INTEGER NOT NULL,\"batch_id_out\" INTEGER,PRIMARY KEY (\"id\", \"name\", \"batch_id_in\"))"; + public static String expectedMainTableWithMultiPartitionsCreateQuery = "CREATE TABLE IF NOT EXISTS \"mydb\".\"main\"(" + + "\"id\" INTEGER NOT NULL,\"name\" VARCHAR NOT NULL,\"amount\" DOUBLE,\"account_type\" INTEGER,\"biz_date\" DATE,\"digest\" VARCHAR," + + "\"batch_id_in\" INTEGER NOT NULL,\"batch_id_out\" INTEGER,PRIMARY KEY (\"id\", \"name\", \"batch_id_in\"))"; + public static String expectedMainTableBatchIdAndVersionBasedCreateQuery = "CREATE TABLE IF NOT EXISTS \"mydb\".\"main\"(" + "\"id\" INTEGER NOT NULL,\"name\" VARCHAR NOT NULL,\"amount\" DOUBLE,\"biz_date\" DATE,\"digest\" VARCHAR,\"version\" INTEGER," + "\"batch_id_in\" INTEGER NOT NULL,\"batch_id_out\" INTEGER,PRIMARY KEY (\"id\", \"name\", \"batch_id_in\"))"; @@ -75,6 +79,10 @@ public class AnsiTestArtifacts "(\"ID\" INTEGER NOT NULL,\"NAME\" VARCHAR NOT NULL,\"AMOUNT\" DOUBLE,\"BIZ_DATE\" DATE,\"DIGEST\" VARCHAR," + "\"BATCH_ID_IN\" INTEGER NOT NULL,\"BATCH_ID_OUT\" INTEGER,PRIMARY KEY (\"ID\", \"NAME\", \"BATCH_ID_IN\"))"; + public static String expectedMainTableMultiPartitionCreateQueryWithUpperCase = "CREATE TABLE IF NOT EXISTS \"MYDB\".\"MAIN\"" + + "(\"ID\" INTEGER NOT NULL,\"NAME\" VARCHAR NOT NULL,\"AMOUNT\" DOUBLE,\"ACCOUNT_TYPE\" INTEGER,\"BIZ_DATE\" DATE,\"DIGEST\" VARCHAR," + + "\"BATCH_ID_IN\" INTEGER NOT NULL,\"BATCH_ID_OUT\" INTEGER,PRIMARY KEY (\"ID\", \"NAME\", \"BATCH_ID_IN\"))"; + public static String expectedMainTableTimeBasedCreateQuery = "CREATE TABLE IF NOT EXISTS \"mydb\".\"main\"(" + "\"id\" INTEGER NOT NULL,\"name\" VARCHAR NOT NULL,\"amount\" DOUBLE,\"biz_date\" DATE,\"digest\" VARCHAR," + "\"batch_time_in\" DATETIME NOT NULL,\"batch_time_out\" DATETIME,PRIMARY KEY (\"id\", \"name\", \"batch_time_in\"))"; diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/unitemporal/UnitemporalSnapshotBatchIdBasedTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/unitemporal/UnitemporalSnapshotBatchIdBasedTest.java index 61bf58462be..7c8afbbe70d 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/unitemporal/UnitemporalSnapshotBatchIdBasedTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/unitemporal/UnitemporalSnapshotBatchIdBasedTest.java @@ -207,6 +207,95 @@ public void verifyUnitemporalSnapshotWithPartitionFiltersNoDedupNoVersion(Genera Assertions.assertEquals(getExpectedMetadataTableIngestQuery(), metadataIngestSql.get(0)); } + @Override + public void verifyUnitemporalSnapshotWithPartitionSpecListNoDedupNoVersion(GeneratorResult operations) + { + List preActionsSql = operations.preActionsSql(); + List milestoningSql = operations.ingestSql(); + List metadataIngestSql = operations.metadataIngestSql(); + + String expectedMilestoneQuery = "UPDATE \"mydb\".\"main\" as sink SET " + + "sink.\"batch_id_out\" = (SELECT COALESCE(MAX(batch_metadata.\"table_batch_id\"),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.\"table_name\") = 'MAIN')-1 " + + "WHERE (sink.\"batch_id_out\" = 999999999) AND " + + "(NOT (EXISTS (SELECT * FROM \"mydb\".\"staging\" as stage " + + "WHERE ((sink.\"id\" = stage.\"id\") AND (sink.\"name\" = stage.\"name\")) AND (sink.\"digest\" = stage.\"digest\")))) " + + "AND (((sink.\"biz_date\" = '2024-01-01') AND (sink.\"account_type\" = 1)) " + + "OR ((sink.\"biz_date\" = '2024-01-02') AND (sink.\"account_type\" = 1)) " + + "OR ((sink.\"biz_date\" = '2024-01-02') AND (sink.\"account_type\" = 2)))"; + + String expectedUpsertQuery = "INSERT INTO \"mydb\".\"main\" " + + "(\"id\", \"name\", \"amount\", \"account_type\", \"biz_date\", \"digest\", \"batch_id_in\", \"batch_id_out\") " + + "(SELECT stage.\"id\",stage.\"name\",stage.\"amount\",stage.\"account_type\",stage.\"biz_date\",stage.\"digest\"," + + "(SELECT COALESCE(MAX(batch_metadata.\"table_batch_id\"),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.\"table_name\") = 'MAIN'),999999999 " + + "FROM \"mydb\".\"staging\" as stage WHERE " + + "NOT (stage.\"digest\" IN (SELECT sink.\"digest\" FROM \"mydb\".\"main\" as sink WHERE " + + "(sink.\"batch_id_out\" = 999999999) AND " + + "(((sink.\"biz_date\" = '2024-01-01') AND (sink.\"account_type\" = 1)) " + + "OR ((sink.\"biz_date\" = '2024-01-02') AND (sink.\"account_type\" = 1)) " + + "OR ((sink.\"biz_date\" = '2024-01-02') AND (sink.\"account_type\" = 2))))))"; + + Assertions.assertEquals(AnsiTestArtifacts.expectedMainTableWithMultiPartitionsCreateQuery, preActionsSql.get(0)); + Assertions.assertEquals(getExpectedMetadataTableCreateQuery(), preActionsSql.get(1)); + + Assertions.assertEquals(expectedMilestoneQuery, milestoningSql.get(0)); + Assertions.assertEquals(expectedUpsertQuery, milestoningSql.get(1)); + Assertions.assertEquals(getExpectedMetadataTableIngestQuery(), metadataIngestSql.get(0)); + } + + @Override + public void verifyUnitemporalSnapshotWithPartitionSpecListNoDedupNoVersionInUpperCase(GeneratorResult operations) + { + List preActionsSql = operations.preActionsSql(); + List milestoningSql = operations.ingestSql(); + List metadataIngestSql = operations.metadataIngestSql(); + + String expectedMilestoneQuery = "UPDATE \"MYDB\".\"MAIN\" as sink " + + "SET sink.\"BATCH_ID_OUT\" = (SELECT COALESCE(MAX(BATCH_METADATA.\"TABLE_BATCH_ID\"),0)+1 FROM BATCH_METADATA as BATCH_METADATA WHERE UPPER(BATCH_METADATA.\"TABLE_NAME\") = 'MAIN')-1 " + + "WHERE (sink.\"BATCH_ID_OUT\" = 999999999) AND " + + "(NOT (EXISTS (SELECT * FROM \"MYDB\".\"STAGING\" as stage " + + "WHERE ((sink.\"ID\" = stage.\"ID\") AND (sink.\"NAME\" = stage.\"NAME\")) AND (sink.\"DIGEST\" = stage.\"DIGEST\")))) " + + "AND (((sink.\"ACCOUNT_TYPE\" = 1) AND (sink.\"BIZ_DATE\" = '2024-01-01')) " + + "OR ((sink.\"ACCOUNT_TYPE\" = 1) AND (sink.\"BIZ_DATE\" = '2024-01-02')) " + + "OR ((sink.\"ACCOUNT_TYPE\" = 2) AND (sink.\"BIZ_DATE\" = '2024-01-02')))"; + + String expectedUpsertQuery = "INSERT INTO \"MYDB\".\"MAIN\" " + + "(\"ID\", \"NAME\", \"AMOUNT\", \"ACCOUNT_TYPE\", \"BIZ_DATE\", \"DIGEST\", \"BATCH_ID_IN\", \"BATCH_ID_OUT\") " + + "(SELECT stage.\"ID\",stage.\"NAME\",stage.\"AMOUNT\",stage.\"ACCOUNT_TYPE\",stage.\"BIZ_DATE\",stage.\"DIGEST\"," + + "(SELECT COALESCE(MAX(BATCH_METADATA.\"TABLE_BATCH_ID\"),0)+1 FROM BATCH_METADATA as BATCH_METADATA WHERE UPPER(BATCH_METADATA.\"TABLE_NAME\") = 'MAIN')," + + "999999999 FROM \"MYDB\".\"STAGING\" as stage " + + "WHERE NOT (stage.\"DIGEST\" IN (SELECT sink.\"DIGEST\" FROM \"MYDB\".\"MAIN\" as sink WHERE (sink.\"BATCH_ID_OUT\" = 999999999) " + + "AND (((sink.\"ACCOUNT_TYPE\" = 1) AND (sink.\"BIZ_DATE\" = '2024-01-01')) " + + "OR ((sink.\"ACCOUNT_TYPE\" = 1) AND (sink.\"BIZ_DATE\" = '2024-01-02')) " + + "OR ((sink.\"ACCOUNT_TYPE\" = 2) AND (sink.\"BIZ_DATE\" = '2024-01-02'))))))"; + + Assertions.assertEquals(AnsiTestArtifacts.expectedMainTableMultiPartitionCreateQueryWithUpperCase, preActionsSql.get(0)); + Assertions.assertEquals(getExpectedMetadataTableCreateQueryWithUpperCase(), preActionsSql.get(1)); + Assertions.assertEquals(expectedMilestoneQuery, milestoningSql.get(0)); + Assertions.assertEquals(expectedUpsertQuery, milestoningSql.get(1)); + Assertions.assertEquals(getExpectedMetadataTableIngestQueryWithUpperCase(), metadataIngestSql.get(0)); + } + + @Override + public void verifyUnitemporalSnapshotWithPartitionSpecListWithEmptyBatchHandling(GeneratorResult operations) + { + List preActionsSql = operations.preActionsSql(); + List milestoningSql = operations.ingestSql(); + List metadataIngestSql = operations.metadataIngestSql(); + + String expectedMilestoneQuery = "UPDATE \"mydb\".\"main\" as sink SET " + + "sink.\"batch_id_out\" = (SELECT COALESCE(MAX(batch_metadata.\"table_batch_id\"),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.\"table_name\") = 'MAIN')-1 " + + "WHERE (sink.\"batch_id_out\" = 999999999) AND " + + "(((sink.\"biz_date\" = '2024-01-01') AND (sink.\"account_type\" = 1)) " + + "OR ((sink.\"biz_date\" = '2024-01-02') AND (sink.\"account_type\" = 1)) " + + "OR ((sink.\"biz_date\" = '2024-01-02') AND (sink.\"account_type\" = 2)))"; + + Assertions.assertEquals(AnsiTestArtifacts.expectedMainTableWithMultiPartitionsCreateQuery, preActionsSql.get(0)); + Assertions.assertEquals(getExpectedMetadataTableCreateQuery(), preActionsSql.get(1)); + + Assertions.assertEquals(expectedMilestoneQuery, milestoningSql.get(0)); + Assertions.assertEquals(getExpectedMetadataTableIngestQuery(), metadataIngestSql.get(0)); + } + @Override public void verifyUnitemporalSnapshotWithCleanStagingData(GeneratorResult operations) { diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/BigQueryTestArtifacts.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/BigQueryTestArtifacts.java index a9074fe9575..061e069cf30 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/BigQueryTestArtifacts.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/BigQueryTestArtifacts.java @@ -201,6 +201,14 @@ public class BigQueryTestArtifacts "`id` INT64 NOT NULL,`name` STRING NOT NULL,`amount` FLOAT64,`biz_date` DATE,`digest` STRING," + "`batch_id_in` INT64 NOT NULL,`batch_id_out` INT64,PRIMARY KEY (`id`, `name`, `batch_id_in`) NOT ENFORCED)"; + public static String expectedMainTableWithMultiPartitionCreateQuery = "CREATE TABLE IF NOT EXISTS `mydb`.`main`(" + + "`id` INT64 NOT NULL,`name` STRING NOT NULL,`amount` FLOAT64,`account_type` INT64,`biz_date` DATE,`digest` STRING," + + "`batch_id_in` INT64 NOT NULL,`batch_id_out` INT64,PRIMARY KEY (`id`, `name`, `batch_id_in`) NOT ENFORCED)"; + + public static String expectedMainTableWithMultiPartitionCreateQueryUpperCase = "CREATE TABLE IF NOT EXISTS `MYDB`.`MAIN`" + + "(`ID` INT64 NOT NULL,`NAME` STRING NOT NULL,`AMOUNT` FLOAT64,`ACCOUNT_TYPE` INT64,`BIZ_DATE` DATE,`DIGEST` STRING," + + "`BATCH_ID_IN` INT64 NOT NULL,`BATCH_ID_OUT` INT64,PRIMARY KEY (`ID`, `NAME`, `BATCH_ID_IN`) NOT ENFORCED)"; + public static String expectedMetadataTableCreateQuery = "CREATE TABLE IF NOT EXISTS batch_metadata" + "(`table_name` STRING(255)," + "`batch_start_ts_utc` DATETIME," + diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/UnitemporalSnapshotBatchIdBasedTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/UnitemporalSnapshotBatchIdBasedTest.java index 09c2a5e08bf..52ea588eea9 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/UnitemporalSnapshotBatchIdBasedTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/UnitemporalSnapshotBatchIdBasedTest.java @@ -182,6 +182,89 @@ public void verifyUnitemporalSnapshotWithPartitionFiltersNoDedupNoVersion(Genera Assertions.assertEquals(getExpectedMetadataTableIngestQuery(), metadataIngestSql.get(0)); } + @Override + public void verifyUnitemporalSnapshotWithPartitionSpecListNoDedupNoVersion(GeneratorResult operations) + { + List preActionsSql = operations.preActionsSql(); + List milestoningSql = operations.ingestSql(); + List metadataIngestSql = operations.metadataIngestSql(); + + String expectedMilestoneQuery = "UPDATE `mydb`.`main` as sink SET " + + "sink.`batch_id_out` = (SELECT COALESCE(MAX(batch_metadata.`table_batch_id`),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.`table_name`) = 'MAIN')-1 " + + "WHERE (sink.`batch_id_out` = 999999999) AND " + + "(NOT (EXISTS (SELECT * FROM `mydb`.`staging` as stage WHERE " + + "((sink.`id` = stage.`id`) AND (sink.`name` = stage.`name`)) AND (sink.`digest` = stage.`digest`)))) AND " + + "(((sink.`biz_date` = '2024-01-01') AND (sink.`account_type` = 1)) " + + "OR ((sink.`biz_date` = '2024-01-02') AND (sink.`account_type` = 1)) " + + "OR ((sink.`biz_date` = '2024-01-02') AND (sink.`account_type` = 2)))"; + + String expectedUpsertQuery = "INSERT INTO `mydb`.`main` " + + "(`id`, `name`, `amount`, `account_type`, `biz_date`, `digest`, `batch_id_in`, `batch_id_out`) " + + "(SELECT stage.`id`,stage.`name`,stage.`amount`,stage.`account_type`,stage.`biz_date`,stage.`digest`," + + "(SELECT COALESCE(MAX(batch_metadata.`table_batch_id`),0)+1 FROM batch_metadata as batch_metadata " + + "WHERE UPPER(batch_metadata.`table_name`) = 'MAIN'),999999999 " + + "FROM `mydb`.`staging` as stage WHERE " + + "NOT (stage.`digest` IN (SELECT sink.`digest` FROM `mydb`.`main` as sink WHERE (sink.`batch_id_out` = 999999999) AND " + + "(((sink.`biz_date` = '2024-01-01') AND (sink.`account_type` = 1)) " + + "OR ((sink.`biz_date` = '2024-01-02') AND (sink.`account_type` = 1)) " + + "OR ((sink.`biz_date` = '2024-01-02') AND (sink.`account_type` = 2))))))"; + + Assertions.assertEquals(BigQueryTestArtifacts.expectedMainTableWithMultiPartitionCreateQuery, preActionsSql.get(0)); + Assertions.assertEquals(BigQueryTestArtifacts.expectedMetadataTableCreateQuery, preActionsSql.get(1)); + + Assertions.assertEquals(expectedMilestoneQuery, milestoningSql.get(0)); + Assertions.assertEquals(expectedUpsertQuery, milestoningSql.get(1)); + Assertions.assertEquals(getExpectedMetadataTableIngestQuery(), metadataIngestSql.get(0)); + } + + @Override + public void verifyUnitemporalSnapshotWithPartitionSpecListNoDedupNoVersionInUpperCase(GeneratorResult operations) + { + List preActionsSql = operations.preActionsSql(); + List milestoningSql = operations.ingestSql(); + List metadataIngestSql = operations.metadataIngestSql(); + + String expectedMilestoneQuery = "UPDATE `MYDB`.`MAIN` as sink SET " + + "sink.`BATCH_ID_OUT` = (SELECT COALESCE(MAX(BATCH_METADATA.`TABLE_BATCH_ID`),0)+1 FROM BATCH_METADATA as BATCH_METADATA WHERE UPPER(BATCH_METADATA.`TABLE_NAME`) = 'MAIN')-1 " + + "WHERE (sink.`BATCH_ID_OUT` = 999999999) AND (NOT (EXISTS (SELECT * FROM `MYDB`.`STAGING` as stage " + + "WHERE ((sink.`ID` = stage.`ID`) AND (sink.`NAME` = stage.`NAME`)) AND (sink.`DIGEST` = stage.`DIGEST`)))) " + + "AND (((sink.`ACCOUNT_TYPE` = 1) AND (sink.`BIZ_DATE` = '2024-01-01')) " + + "OR ((sink.`ACCOUNT_TYPE` = 1) AND (sink.`BIZ_DATE` = '2024-01-02')) " + + "OR ((sink.`ACCOUNT_TYPE` = 2) AND (sink.`BIZ_DATE` = '2024-01-02')))"; + + String expectedUpsertQuery = "INSERT INTO `MYDB`.`MAIN` " + + "(`ID`, `NAME`, `AMOUNT`, `ACCOUNT_TYPE`, `BIZ_DATE`, `DIGEST`, `BATCH_ID_IN`, `BATCH_ID_OUT`) " + + "(SELECT stage.`ID`,stage.`NAME`,stage.`AMOUNT`,stage.`ACCOUNT_TYPE`,stage.`BIZ_DATE`,stage.`DIGEST`," + + "(SELECT COALESCE(MAX(BATCH_METADATA.`TABLE_BATCH_ID`),0)+1 FROM BATCH_METADATA as BATCH_METADATA WHERE UPPER(BATCH_METADATA.`TABLE_NAME`) = 'MAIN')," + + "999999999 FROM `MYDB`.`STAGING` as stage WHERE " + + "NOT (stage.`DIGEST` IN (SELECT sink.`DIGEST` FROM `MYDB`.`MAIN` as sink WHERE (sink.`BATCH_ID_OUT` = 999999999) AND " + + "(((sink.`ACCOUNT_TYPE` = 1) AND (sink.`BIZ_DATE` = '2024-01-01')) " + + "OR ((sink.`ACCOUNT_TYPE` = 1) AND (sink.`BIZ_DATE` = '2024-01-02')) " + + "OR ((sink.`ACCOUNT_TYPE` = 2) AND (sink.`BIZ_DATE` = '2024-01-02'))))))"; + + Assertions.assertEquals(BigQueryTestArtifacts.expectedMainTableWithMultiPartitionCreateQueryUpperCase, preActionsSql.get(0)); + Assertions.assertEquals(BigQueryTestArtifacts.expectedMetadataTableCreateQueryWithUpperCase, preActionsSql.get(1)); + Assertions.assertEquals(expectedMilestoneQuery, milestoningSql.get(0)); + Assertions.assertEquals(expectedUpsertQuery, milestoningSql.get(1)); + Assertions.assertEquals(getExpectedMetadataTableIngestQueryWithUpperCase(), metadataIngestSql.get(0)); + } + + @Override + public void verifyUnitemporalSnapshotWithPartitionSpecListWithEmptyBatchHandling(GeneratorResult operations) + { + List preActionsSql = operations.preActionsSql(); + List milestoningSql = operations.ingestSql(); + List metadataIngestSql = operations.metadataIngestSql(); + + String expectedMilestoneQuery = "UPDATE `mydb`.`main` as sink SET sink.`batch_id_out` = (SELECT COALESCE(MAX(batch_metadata.`table_batch_id`),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.`table_name`) = 'MAIN')-1 WHERE (sink.`batch_id_out` = 999999999) AND (((sink.`biz_date` = '2024-01-01') AND (sink.`account_type` = 1)) OR ((sink.`biz_date` = '2024-01-02') AND (sink.`account_type` = 1)) OR ((sink.`biz_date` = '2024-01-02') AND (sink.`account_type` = 2)))"; + + Assertions.assertEquals(BigQueryTestArtifacts.expectedMainTableWithMultiPartitionCreateQuery, preActionsSql.get(0)); + Assertions.assertEquals(BigQueryTestArtifacts.expectedMetadataTableCreateQuery, preActionsSql.get(1)); + + Assertions.assertEquals(expectedMilestoneQuery, milestoningSql.get(0)); + Assertions.assertEquals(getExpectedMetadataTableIngestQuery(), metadataIngestSql.get(0)); + } + @Override public void verifyUnitemporalSnapshotWithCleanStagingData(GeneratorResult operations) { diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/BaseTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/BaseTest.java index a1e11e31f88..5f762a42b93 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/BaseTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/BaseTest.java @@ -399,6 +399,16 @@ protected void loadStagingDataForWithPartition(String path) throws Exception h2Sink.executeStatement(loadSql); } + protected void loadStagingDataForWithMultiPartition(String path) throws Exception + { + validateFileExists(path); + String loadSql = "TRUNCATE TABLE \"TEST\".\"staging\";" + + "INSERT INTO \"TEST\".\"staging\"(date, accountNum, dimension, balance, digest) " + + "SELECT CONVERT( \"date\",DATE ), \"accountNum\", \"dimension\", CONVERT( \"balance\", BIGINT), \"digest\"" + + " FROM CSVREAD( '" + path + "', 'date, accountNum, dimension, balance, digest', NULL )"; + h2Sink.executeStatement(loadSql); + } + protected void loadStagingDataForWithPartitionWithVersion(String path) throws Exception { validateFileExists(path); diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/TestUtils.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/TestUtils.java index c84be98a2da..579ad050dc1 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/TestUtils.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/TestUtils.java @@ -116,6 +116,8 @@ public class TestUtils public static String dataSplitName = "data_split"; public static String batchName = "batch"; public static String ratingName = "rating"; + public static String accountNumName = "accountNum"; + public static String dimensionName = "dimension"; public static String COMMA_DELIMITER = ","; public static HashMap> partitionFilter = new HashMap>() @@ -169,7 +171,8 @@ public class TestUtils public static Field dataSplit = Field.builder().name(dataSplitName).type(FieldType.of(DataType.BIGINT, Optional.empty(), Optional.empty())).primaryKey(true).fieldAlias(dataSplitName).build(); public static Field batch = Field.builder().name(batchName).type(FieldType.of(DataType.INT, Optional.empty(), Optional.empty())).fieldAlias(batchName).primaryKey(true).build(); public static Field rating = Field.builder().name(ratingName).type(FieldType.of(DataType.INT, Optional.empty(), Optional.empty())).fieldAlias(ratingName).build(); - + public static Field accountNum = Field.builder().name(accountNumName).type(FieldType.of(DataType.VARCHAR, Optional.empty(), Optional.empty())).fieldAlias(accountNumName).primaryKey(true).build(); + public static Field dimension = Field.builder().name(dimensionName).type(FieldType.of(DataType.VARCHAR, Optional.empty(), Optional.empty())).fieldAlias(dimensionName).primaryKey(true).build(); public static DatasetDefinition getBasicMainTable() { diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/unitemporal/UnitemporalSnapshotWithBatchIdTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/unitemporal/UnitemporalSnapshotWithBatchIdTest.java index 6460bf31bd2..5b99807bf09 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/unitemporal/UnitemporalSnapshotWithBatchIdTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/unitemporal/UnitemporalSnapshotWithBatchIdTest.java @@ -26,28 +26,15 @@ import org.finos.legend.engine.persistence.components.ingestmode.versioning.NoVersioningStrategy; import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset; import org.finos.legend.engine.persistence.components.logicalplan.datasets.DatasetDefinition; +import org.finos.legend.engine.persistence.components.logicalplan.datasets.SchemaDefinition; import org.finos.legend.engine.persistence.components.planner.PlannerOptions; import org.finos.legend.engine.persistence.components.util.MetadataDataset; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import java.util.Collections; -import java.util.List; -import java.util.Map; - -import static org.finos.legend.engine.persistence.components.TestUtils.batchIdInName; -import static org.finos.legend.engine.persistence.components.TestUtils.batchIdOutName; -import static org.finos.legend.engine.persistence.components.TestUtils.priceName; -import static org.finos.legend.engine.persistence.components.TestUtils.dateName; -import static org.finos.legend.engine.persistence.components.TestUtils.digestName; -import static org.finos.legend.engine.persistence.components.TestUtils.expiryDateName; -import static org.finos.legend.engine.persistence.components.TestUtils.idName; -import static org.finos.legend.engine.persistence.components.TestUtils.incomeName; -import static org.finos.legend.engine.persistence.components.TestUtils.nameName; -import static org.finos.legend.engine.persistence.components.TestUtils.partitionFilter; -import static org.finos.legend.engine.persistence.components.TestUtils.startTimeName; -import static org.finos.legend.engine.persistence.components.TestUtils.entityName; -import static org.finos.legend.engine.persistence.components.TestUtils.volumeName; +import java.util.*; + +import static org.finos.legend.engine.persistence.components.TestUtils.*; class UnitemporalSnapshotWithBatchIdTest extends BaseTest { @@ -241,6 +228,122 @@ void testUnitemporalSnapshotMilestoningLogicWithPartitionFilter() throws Excepti executePlansAndVerifyResults(ingestModeWithDeleteTargetData, options, datasets, schema, expectedDataPass3, expectedStats); } + @Test + void testUnitemporalSnapshotMilestoningLogicWithMultiplePartitionValues() throws Exception + { + DatasetDefinition mainTable = DatasetDefinition.builder() + .group(testSchemaName).name(mainTableName) + .schema(SchemaDefinition.builder() + .addFields(date) + .addFields(accountNum) + .addFields(dimension) + .addFields(balance) + .addFields(digest) + .addFields(batchIdIn) + .addFields(batchIdOut) + .build()).build(); + + DatasetDefinition stagingTable = DatasetDefinition.builder() + .group(testSchemaName).name(stagingTableName) + .schema(SchemaDefinition.builder() + .addFields(date) + .addFields(accountNum) + .addFields(dimension) + .addFields(balance) + .addFields(digest) + .build()).build(); + + String[] schema = new String[]{dateName, accountNumName, dimensionName, balanceName, digestName, batchIdInName, batchIdOutName}; + + // Create staging table + createStagingTable(stagingTable); + + + List> partitionSpecList = new ArrayList<>(); + addPartitionSpec(partitionSpecList, "2024-01-01", "ACCOUNT_1"); + addPartitionSpec(partitionSpecList, "2024-01-01", "ACCOUNT_2"); + addPartitionSpec(partitionSpecList, "2024-01-02", "ACCOUNT_1"); + addPartitionSpec(partitionSpecList, "2024-01-02", "ACCOUNT_2"); + addPartitionSpec(partitionSpecList, "2024-01-03", "ACCOUNT_1"); + addPartitionSpec(partitionSpecList, "2024-01-03", "ACCOUNT_2"); + + UnitemporalSnapshot ingestMode = UnitemporalSnapshot.builder() + .digestField(digestName) + .transactionMilestoning(BatchId.builder() + .batchIdInName(batchIdInName) + .batchIdOutName(batchIdOutName) + .build()) + .addAllPartitionFields(Arrays.asList(dateName, accountNumName)) + .addAllPartitionSpecList(partitionSpecList) + .build(); + + PlannerOptions options = PlannerOptions.builder().collectStatistics(true).build(); + Datasets datasets = Datasets.of(mainTable, stagingTable); + + // ------------ Perform unitemporal snapshot milestoning Pass1 ------------------------ + String dataPass1 = basePathForInput + "with_multi_values_partition/staging_data_pass1.csv"; + String expectedDataPass1 = basePathForExpected + "with_multi_values_partition/expected_pass1.csv"; + // 1. Load staging table + loadStagingDataForWithMultiPartition(dataPass1); + // 2. Execute plans and verify results + Map expectedStats = createExpectedStatsMap(15, 0, 15, 0, 0); + executePlansAndVerifyResults(ingestMode, options, datasets, schema, expectedDataPass1, expectedStats); + + // ------------ Perform unitemporal snapshot milestoning Pass2 ------------------------ + String dataPass2 = basePathForInput + "with_multi_values_partition/staging_data_pass2.csv"; + String expectedDataPass2 = basePathForExpected + "with_multi_values_partition/expected_pass2.csv"; + + + partitionSpecList = new ArrayList<>(); + addPartitionSpec(partitionSpecList, "2024-01-01", "ACCOUNT_1"); + addPartitionSpec(partitionSpecList, "2024-01-01", "ACCOUNT_3"); + addPartitionSpec(partitionSpecList, "2024-01-02", "ACCOUNT_2"); + addPartitionSpec(partitionSpecList, "2024-01-04", "ACCOUNT_1"); + ingestMode = ingestMode.withPartitionSpecList(partitionSpecList); + + // 1. Load staging table + loadStagingDataForWithMultiPartition(dataPass2); + // 2. Execute plans and verify results + expectedStats = createExpectedStatsMap(5, 0, 2, 2, 3); + executePlansAndVerifyResults(ingestMode, options, datasets, schema, expectedDataPass2, expectedStats); + + // ------------ Perform unitemporal snapshot milestoning Pass3 (Empty Batch - No Op) ------------------------ + IngestMode ingestModeWithNoOpBatchHandling = ingestMode.withEmptyDatasetHandling(NoOp.builder().build()); + + String dataPass3 = "src/test/resources/data/empty_file.csv"; + String expectedDataPass3 = basePathForExpected + "with_multi_values_partition/expected_pass2.csv"; + // 1. Load Staging table + loadStagingDataForWithMultiPartition(dataPass3); + // 2. Execute plans and verify results + expectedStats = createExpectedStatsMap(0, 0, 0, 0, 0); + executePlansAndVerifyResults(ingestModeWithNoOpBatchHandling, options, datasets, schema, expectedDataPass3, expectedStats); + + // ------------ Perform unitemporal snapshot milestoning Pass3 (Empty Batch - Delete target Data) ------------------------ + + partitionSpecList = new ArrayList<>(); + addPartitionSpec(partitionSpecList, "2024-01-01", "ACCOUNT_1"); + addPartitionSpec(partitionSpecList, "2024-01-02", "ACCOUNT_2"); + IngestMode ingestModeWithDeleteTargetData = ingestMode.withPartitionSpecList(partitionSpecList).withEmptyDatasetHandling(DeleteTargetData.builder().build()); + dataPass3 = "src/test/resources/data/empty_file.csv"; + expectedDataPass3 = basePathForExpected + "with_multi_values_partition/expected_pass3.csv"; + // 1. Load Staging table + loadStagingDataForWithMultiPartition(dataPass3); + // 2. Execute plans and verify results + expectedStats = createExpectedStatsMap(0, 0, 0, 0, 3); + executePlansAndVerifyResults(ingestModeWithDeleteTargetData, options, datasets, schema, expectedDataPass3, expectedStats); + } + + private static void addPartitionSpec(List> partitionSpecList, String date, String accountNum) + { + partitionSpecList.add(new HashMap() + { + { + put(dateName, date); + put(accountNumName, accountNum); + } + }); + } + /* Scenario: Test milestoning Logic when staging data comes from CSV and has less columns than main dataset */ diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-snapshot-milestoning/expected/batch_id_based/with_multi_values_partition/expected_pass1.csv b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-snapshot-milestoning/expected/batch_id_based/with_multi_values_partition/expected_pass1.csv new file mode 100644 index 00000000000..3aedf92c4a1 --- /dev/null +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-snapshot-milestoning/expected/batch_id_based/with_multi_values_partition/expected_pass1.csv @@ -0,0 +1,15 @@ +2024-01-01,ACCOUNT_1,DIMENSION_1,1100,DIGEST11,1,999999999 +2024-01-01,ACCOUNT_1,DIMENSION_2,1200,DIGEST12,1,999999999 +2024-01-01,ACCOUNT_1,DIMENSION_3,1300,DIGEST13,1,999999999 +2024-01-01,ACCOUNT_2,DIMENSION_1,1400,DIGEST14,1,999999999 +2024-01-01,ACCOUNT_2,DIMENSION_2,1500,DIGEST15,1,999999999 +2024-01-01,ACCOUNT_2,DIMENSION_3,1600,DIGEST16,1,999999999 +2024-01-02,ACCOUNT_1,DIMENSION_1,2100,DIGEST21,1,999999999 +2024-01-02,ACCOUNT_1,DIMENSION_2,2200,DIGEST22,1,999999999 +2024-01-02,ACCOUNT_1,DIMENSION_3,2300,DIGEST23,1,999999999 +2024-01-02,ACCOUNT_2,DIMENSION_1,2400,DIGEST24,1,999999999 +2024-01-02,ACCOUNT_2,DIMENSION_2,2500,DIGEST25,1,999999999 +2024-01-02,ACCOUNT_2,DIMENSION_3,2600,DIGEST26,1,999999999 +2024-01-03,ACCOUNT_1,DIMENSION_1,3100,DIGEST31,1,999999999 +2024-01-03,ACCOUNT_1,DIMENSION_2,3200,DIGEST32,1,999999999 +2024-01-03,ACCOUNT_2,DIMENSION_1,3300,DIGEST33,1,999999999 \ No newline at end of file diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-snapshot-milestoning/expected/batch_id_based/with_multi_values_partition/expected_pass2.csv b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-snapshot-milestoning/expected/batch_id_based/with_multi_values_partition/expected_pass2.csv new file mode 100644 index 00000000000..5c088dd677a --- /dev/null +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-snapshot-milestoning/expected/batch_id_based/with_multi_values_partition/expected_pass2.csv @@ -0,0 +1,19 @@ +2024-01-01,ACCOUNT_1,DIMENSION_1,1100,DIGEST11,1,1 +2024-01-01,ACCOUNT_1,DIMENSION_2,1200,DIGEST12,1,999999999 +2024-01-01,ACCOUNT_1,DIMENSION_3,1300,DIGEST13,1,1 +2024-01-01,ACCOUNT_2,DIMENSION_1,1400,DIGEST14,1,999999999 +2024-01-01,ACCOUNT_2,DIMENSION_2,1500,DIGEST15,1,999999999 +2024-01-01,ACCOUNT_2,DIMENSION_3,1600,DIGEST16,1,999999999 +2024-01-02,ACCOUNT_1,DIMENSION_1,2100,DIGEST21,1,999999999 +2024-01-02,ACCOUNT_1,DIMENSION_2,2200,DIGEST22,1,999999999 +2024-01-02,ACCOUNT_1,DIMENSION_3,2300,DIGEST23,1,999999999 +2024-01-02,ACCOUNT_2,DIMENSION_1,2400,DIGEST24,1,1 +2024-01-02,ACCOUNT_2,DIMENSION_2,2500,DIGEST25,1,1 +2024-01-02,ACCOUNT_2,DIMENSION_3,2600,DIGEST26,1,1 +2024-01-03,ACCOUNT_1,DIMENSION_1,3100,DIGEST31,1,999999999 +2024-01-03,ACCOUNT_1,DIMENSION_2,3200,DIGEST32,1,999999999 +2024-01-03,ACCOUNT_2,DIMENSION_1,3300,DIGEST33,1,999999999 +2024-01-01,ACCOUNT_1,DIMENSION_1,1700,DIGEST17,2,999999999 +2024-01-01,ACCOUNT_3,DIMENSION_1,1800,DIGEST18,2,999999999 +2024-01-02,ACCOUNT_2,DIMENSION_1,2700,DIGEST27,2,999999999 +2024-01-04,ACCOUNT_1,DIMENSION_1,4100,DIGEST41,2,999999999 \ No newline at end of file diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-snapshot-milestoning/expected/batch_id_based/with_multi_values_partition/expected_pass3.csv b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-snapshot-milestoning/expected/batch_id_based/with_multi_values_partition/expected_pass3.csv new file mode 100644 index 00000000000..44ad84c7c93 --- /dev/null +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-snapshot-milestoning/expected/batch_id_based/with_multi_values_partition/expected_pass3.csv @@ -0,0 +1,19 @@ +2024-01-01,ACCOUNT_1,DIMENSION_1,1100,DIGEST11,1,1 +2024-01-01,ACCOUNT_1,DIMENSION_2,1200,DIGEST12,1,3 +2024-01-01,ACCOUNT_1,DIMENSION_3,1300,DIGEST13,1,1 +2024-01-01,ACCOUNT_2,DIMENSION_1,1400,DIGEST14,1,999999999 +2024-01-01,ACCOUNT_2,DIMENSION_2,1500,DIGEST15,1,999999999 +2024-01-01,ACCOUNT_2,DIMENSION_3,1600,DIGEST16,1,999999999 +2024-01-02,ACCOUNT_1,DIMENSION_1,2100,DIGEST21,1,999999999 +2024-01-02,ACCOUNT_1,DIMENSION_2,2200,DIGEST22,1,999999999 +2024-01-02,ACCOUNT_1,DIMENSION_3,2300,DIGEST23,1,999999999 +2024-01-02,ACCOUNT_2,DIMENSION_1,2400,DIGEST24,1,1 +2024-01-02,ACCOUNT_2,DIMENSION_2,2500,DIGEST25,1,1 +2024-01-02,ACCOUNT_2,DIMENSION_3,2600,DIGEST26,1,1 +2024-01-03,ACCOUNT_1,DIMENSION_1,3100,DIGEST31,1,999999999 +2024-01-03,ACCOUNT_1,DIMENSION_2,3200,DIGEST32,1,999999999 +2024-01-03,ACCOUNT_2,DIMENSION_1,3300,DIGEST33,1,999999999 +2024-01-01,ACCOUNT_1,DIMENSION_1,1700,DIGEST17,2,3 +2024-01-01,ACCOUNT_3,DIMENSION_1,1800,DIGEST18,2,999999999 +2024-01-02,ACCOUNT_2,DIMENSION_1,2700,DIGEST27,2,3 +2024-01-04,ACCOUNT_1,DIMENSION_1,4100,DIGEST41,2,999999999 \ No newline at end of file diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-snapshot-milestoning/input/batch_id_based/with_multi_values_partition/staging_data_pass1.csv b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-snapshot-milestoning/input/batch_id_based/with_multi_values_partition/staging_data_pass1.csv new file mode 100644 index 00000000000..2163f5cb0ed --- /dev/null +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-snapshot-milestoning/input/batch_id_based/with_multi_values_partition/staging_data_pass1.csv @@ -0,0 +1,15 @@ +2024-01-01,ACCOUNT_1,DIMENSION_1,1100,DIGEST11 +2024-01-01,ACCOUNT_1,DIMENSION_2,1200,DIGEST12 +2024-01-01,ACCOUNT_1,DIMENSION_3,1300,DIGEST13 +2024-01-01,ACCOUNT_2,DIMENSION_1,1400,DIGEST14 +2024-01-01,ACCOUNT_2,DIMENSION_2,1500,DIGEST15 +2024-01-01,ACCOUNT_2,DIMENSION_3,1600,DIGEST16 +2024-01-02,ACCOUNT_1,DIMENSION_1,2100,DIGEST21 +2024-01-02,ACCOUNT_1,DIMENSION_2,2200,DIGEST22 +2024-01-02,ACCOUNT_1,DIMENSION_3,2300,DIGEST23 +2024-01-02,ACCOUNT_2,DIMENSION_1,2400,DIGEST24 +2024-01-02,ACCOUNT_2,DIMENSION_2,2500,DIGEST25 +2024-01-02,ACCOUNT_2,DIMENSION_3,2600,DIGEST26 +2024-01-03,ACCOUNT_1,DIMENSION_1,3100,DIGEST31 +2024-01-03,ACCOUNT_1,DIMENSION_2,3200,DIGEST32 +2024-01-03,ACCOUNT_2,DIMENSION_1,3300,DIGEST33 \ No newline at end of file diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-snapshot-milestoning/input/batch_id_based/with_multi_values_partition/staging_data_pass2.csv b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-snapshot-milestoning/input/batch_id_based/with_multi_values_partition/staging_data_pass2.csv new file mode 100644 index 00000000000..90149ae650c --- /dev/null +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-snapshot-milestoning/input/batch_id_based/with_multi_values_partition/staging_data_pass2.csv @@ -0,0 +1,6 @@ +2024-01-01,ACCOUNT_1,DIMENSION_1,1700,DIGEST17 +2024-01-01,ACCOUNT_1,DIMENSION_2,1200,DIGEST12 +2024-01-01,ACCOUNT_3,DIMENSION_1,1800,DIGEST18 +2024-01-02,ACCOUNT_2,DIMENSION_1,2700,DIGEST27 +2024-01-04,ACCOUNT_1,DIMENSION_1,4100,DIGEST41 + diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-memsql/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/MemsqlTestArtifacts.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-memsql/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/MemsqlTestArtifacts.java index 68930aeb7bf..5cd8c30b403 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-memsql/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/MemsqlTestArtifacts.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-memsql/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/MemsqlTestArtifacts.java @@ -192,6 +192,10 @@ public class MemsqlTestArtifacts "`id` INTEGER NOT NULL,`name` VARCHAR(256) NOT NULL,`amount` DOUBLE,`biz_date` DATE,`digest` VARCHAR(256)," + "`batch_id_in` INTEGER NOT NULL,`batch_id_out` INTEGER,PRIMARY KEY (`id`, `name`, `batch_id_in`))"; + public static String expectedMainTableMultiPartitionKeysCreateQuery = "CREATE TABLE IF NOT EXISTS `mydb`.`main`(" + + "`id` INTEGER NOT NULL,`name` VARCHAR(256) NOT NULL,`amount` DOUBLE,`account_type` INTEGER,`biz_date` DATE,`digest` VARCHAR(256)," + + "`batch_id_in` INTEGER NOT NULL,`batch_id_out` INTEGER,PRIMARY KEY (`id`, `name`, `batch_id_in`))"; + public static String expectedMetadataTableCreateQuery = "CREATE TABLE IF NOT EXISTS batch_metadata" + "(`table_name` VARCHAR(255)," + "`batch_start_ts_utc` DATETIME," + @@ -214,6 +218,10 @@ public class MemsqlTestArtifacts "(`ID` INTEGER NOT NULL,`NAME` VARCHAR(256) NOT NULL,`AMOUNT` DOUBLE,`BIZ_DATE` DATE,`DIGEST` VARCHAR(256)," + "`BATCH_ID_IN` INTEGER NOT NULL,`BATCH_ID_OUT` INTEGER,PRIMARY KEY (`ID`, `NAME`, `BATCH_ID_IN`))"; + public static String expectedMainTableMultiPartitionsCreateQueryWithUpperCase = "CREATE TABLE IF NOT EXISTS `MYDB`.`MAIN`" + + "(`ID` INTEGER NOT NULL,`NAME` VARCHAR(256) NOT NULL,`AMOUNT` DOUBLE,`ACCOUNT_TYPE` INTEGER,`BIZ_DATE` DATE,`DIGEST` VARCHAR(256)," + + "`BATCH_ID_IN` INTEGER NOT NULL,`BATCH_ID_OUT` INTEGER,PRIMARY KEY (`ID`, `NAME`, `BATCH_ID_IN`))"; + public static String expectedMetadataTableIngestQuery = "INSERT INTO batch_metadata (`table_name`, `table_batch_id`, `batch_start_ts_utc`, `batch_end_ts_utc`, `batch_status`)" + " (SELECT 'main',(SELECT COALESCE(MAX(batch_metadata.`table_batch_id`),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.`table_name`) = 'MAIN'),'2000-01-01 00:00:00.000000',CURRENT_TIMESTAMP(),'DONE')"; diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-memsql/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/UnitemporalSnapshotBatchIdBasedTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-memsql/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/UnitemporalSnapshotBatchIdBasedTest.java index bb0498eb0ad..efa5a82352a 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-memsql/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/UnitemporalSnapshotBatchIdBasedTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-memsql/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/UnitemporalSnapshotBatchIdBasedTest.java @@ -14,6 +14,7 @@ package org.finos.legend.engine.persistence.components.ingestmode; +import org.finos.legend.engine.persistence.components.AnsiTestArtifacts; import org.finos.legend.engine.persistence.components.common.DedupAndVersionErrorSqlType; import org.finos.legend.engine.persistence.components.relational.RelationalSink; import org.finos.legend.engine.persistence.components.relational.api.GeneratorResult; @@ -182,6 +183,93 @@ public void verifyUnitemporalSnapshotWithPartitionFiltersNoDedupNoVersion(Genera Assertions.assertEquals(getExpectedMetadataTableIngestQuery(), metadataIngestSql.get(0)); } + @Override + public void verifyUnitemporalSnapshotWithPartitionSpecListNoDedupNoVersion(GeneratorResult operations) + { + List preActionsSql = operations.preActionsSql(); + List milestoningSql = operations.ingestSql(); + List metadataIngestSql = operations.metadataIngestSql(); + + String expectedMilestoneQuery = "UPDATE `mydb`.`main` as sink SET " + + "sink.`batch_id_out` = (SELECT COALESCE(MAX(batch_metadata.`table_batch_id`),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.`table_name`) = 'MAIN')-1 " + + "WHERE (sink.`batch_id_out` = 999999999) AND " + + "(NOT (EXISTS (SELECT * FROM `mydb`.`staging` as stage WHERE ((sink.`id` = stage.`id`) AND (sink.`name` = stage.`name`)) AND (sink.`digest` = stage.`digest`)))) " + + "AND (((sink.`biz_date` = '2024-01-01') AND (sink.`account_type` = 1)) " + + "OR ((sink.`biz_date` = '2024-01-02') AND (sink.`account_type` = 1)) " + + "OR ((sink.`biz_date` = '2024-01-02') AND (sink.`account_type` = 2)))"; + + String expectedUpsertQuery = "INSERT INTO `mydb`.`main` " + + "(`id`, `name`, `amount`, `account_type`, `biz_date`, `digest`, `batch_id_in`, `batch_id_out`) " + + "(SELECT stage.`id`,stage.`name`,stage.`amount`,stage.`account_type`,stage.`biz_date`,stage.`digest`," + + "(SELECT COALESCE(MAX(batch_metadata.`table_batch_id`),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.`table_name`) = 'MAIN'),999999999 " + + "FROM `mydb`.`staging` as stage WHERE " + + "NOT (stage.`digest` IN (SELECT sink.`digest` FROM `mydb`.`main` as sink WHERE " + + "(sink.`batch_id_out` = 999999999) AND " + + "(((sink.`biz_date` = '2024-01-01') AND (sink.`account_type` = 1)) " + + "OR ((sink.`biz_date` = '2024-01-02') AND (sink.`account_type` = 1)) " + + "OR ((sink.`biz_date` = '2024-01-02') AND (sink.`account_type` = 2))))))"; + + Assertions.assertEquals(MemsqlTestArtifacts.expectedMainTableMultiPartitionKeysCreateQuery, preActionsSql.get(0)); + Assertions.assertEquals(MemsqlTestArtifacts.expectedMetadataTableCreateQuery, preActionsSql.get(1)); + + Assertions.assertEquals(expectedMilestoneQuery, milestoningSql.get(0)); + Assertions.assertEquals(expectedUpsertQuery, milestoningSql.get(1)); + Assertions.assertEquals(getExpectedMetadataTableIngestQuery(), metadataIngestSql.get(0)); + } + + @Override + public void verifyUnitemporalSnapshotWithPartitionSpecListNoDedupNoVersionInUpperCase(GeneratorResult operations) + { + List preActionsSql = operations.preActionsSql(); + List milestoningSql = operations.ingestSql(); + List metadataIngestSql = operations.metadataIngestSql(); + + String expectedMilestoneQuery = "UPDATE `MYDB`.`MAIN` as sink SET " + + "sink.`BATCH_ID_OUT` = (SELECT COALESCE(MAX(BATCH_METADATA.`TABLE_BATCH_ID`),0)+1 FROM BATCH_METADATA as BATCH_METADATA WHERE UPPER(BATCH_METADATA.`TABLE_NAME`) = 'MAIN')-1 " + + "WHERE (sink.`BATCH_ID_OUT` = 999999999) AND (NOT (EXISTS (SELECT * FROM `MYDB`.`STAGING` as stage " + + "WHERE ((sink.`ID` = stage.`ID`) AND (sink.`NAME` = stage.`NAME`)) AND (sink.`DIGEST` = stage.`DIGEST`)))) " + + "AND (((sink.`ACCOUNT_TYPE` = 1) AND (sink.`BIZ_DATE` = '2024-01-01')) " + + "OR ((sink.`ACCOUNT_TYPE` = 1) AND (sink.`BIZ_DATE` = '2024-01-02')) " + + "OR ((sink.`ACCOUNT_TYPE` = 2) AND (sink.`BIZ_DATE` = '2024-01-02')))"; + + String expectedUpsertQuery = "INSERT INTO `MYDB`.`MAIN` " + + "(`ID`, `NAME`, `AMOUNT`, `ACCOUNT_TYPE`, `BIZ_DATE`, `DIGEST`, `BATCH_ID_IN`, `BATCH_ID_OUT`) " + + "(SELECT stage.`ID`,stage.`NAME`,stage.`AMOUNT`,stage.`ACCOUNT_TYPE`,stage.`BIZ_DATE`,stage.`DIGEST`," + + "(SELECT COALESCE(MAX(BATCH_METADATA.`TABLE_BATCH_ID`),0)+1 FROM BATCH_METADATA as BATCH_METADATA WHERE UPPER(BATCH_METADATA.`TABLE_NAME`) = 'MAIN')," + + "999999999 FROM `MYDB`.`STAGING` as stage WHERE " + + "NOT (stage.`DIGEST` IN (SELECT sink.`DIGEST` FROM `MYDB`.`MAIN` as sink WHERE (sink.`BATCH_ID_OUT` = 999999999) AND " + + "(((sink.`ACCOUNT_TYPE` = 1) AND (sink.`BIZ_DATE` = '2024-01-01')) " + + "OR ((sink.`ACCOUNT_TYPE` = 1) AND (sink.`BIZ_DATE` = '2024-01-02')) " + + "OR ((sink.`ACCOUNT_TYPE` = 2) AND (sink.`BIZ_DATE` = '2024-01-02'))))))"; + + Assertions.assertEquals(MemsqlTestArtifacts.expectedMainTableMultiPartitionsCreateQueryWithUpperCase, preActionsSql.get(0)); + Assertions.assertEquals(MemsqlTestArtifacts.expectedMetadataTableCreateQueryWithUpperCase, preActionsSql.get(1)); + Assertions.assertEquals(expectedMilestoneQuery, milestoningSql.get(0)); + Assertions.assertEquals(expectedUpsertQuery, milestoningSql.get(1)); + Assertions.assertEquals(getExpectedMetadataTableIngestQueryWithUpperCase(), metadataIngestSql.get(0)); + } + + @Override + public void verifyUnitemporalSnapshotWithPartitionSpecListWithEmptyBatchHandling(GeneratorResult operations) + { + List preActionsSql = operations.preActionsSql(); + List milestoningSql = operations.ingestSql(); + List metadataIngestSql = operations.metadataIngestSql(); + + String expectedMilestoneQuery = "UPDATE `mydb`.`main` as sink SET " + + "sink.`batch_id_out` = (SELECT COALESCE(MAX(batch_metadata.`table_batch_id`),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.`table_name`) = 'MAIN')-1 " + + "WHERE (sink.`batch_id_out` = 999999999) AND " + + "(((sink.`biz_date` = '2024-01-01') AND (sink.`account_type` = 1)) " + + "OR ((sink.`biz_date` = '2024-01-02') AND (sink.`account_type` = 1)) " + + "OR ((sink.`biz_date` = '2024-01-02') AND (sink.`account_type` = 2)))"; + + Assertions.assertEquals(MemsqlTestArtifacts.expectedMainTableMultiPartitionKeysCreateQuery, preActionsSql.get(0)); + Assertions.assertEquals(MemsqlTestArtifacts.expectedMetadataTableCreateQuery, preActionsSql.get(1)); + + Assertions.assertEquals(expectedMilestoneQuery, milestoningSql.get(0)); + Assertions.assertEquals(getExpectedMetadataTableIngestQuery(), metadataIngestSql.get(0)); + } + @Override public void verifyUnitemporalSnapshotWithCleanStagingData(GeneratorResult operations) { diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-postgres/src/test/java/org/finos/legend/engine/persistence/components/PostgresTestArtifacts.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-postgres/src/test/java/org/finos/legend/engine/persistence/components/PostgresTestArtifacts.java index e63e28735c0..64b88928ef1 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-postgres/src/test/java/org/finos/legend/engine/persistence/components/PostgresTestArtifacts.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-postgres/src/test/java/org/finos/legend/engine/persistence/components/PostgresTestArtifacts.java @@ -63,6 +63,14 @@ public class PostgresTestArtifacts "\"id\" INTEGER NOT NULL,\"name\" VARCHAR NOT NULL,\"amount\" DOUBLE PRECISION,\"biz_date\" DATE,\"digest\" VARCHAR," + "\"batch_id_in\" INTEGER NOT NULL,\"batch_id_out\" INTEGER,PRIMARY KEY (\"id\", \"name\", \"batch_id_in\"))"; + public static String expectedMainTableMultiPartitionsCreateQuery = "CREATE TABLE IF NOT EXISTS \"mydb\".\"main\"(" + + "\"id\" INTEGER NOT NULL,\"name\" VARCHAR NOT NULL,\"amount\" DOUBLE PRECISION,\"account_type\" INTEGER,\"biz_date\" DATE,\"digest\" VARCHAR," + + "\"batch_id_in\" INTEGER NOT NULL,\"batch_id_out\" INTEGER,PRIMARY KEY (\"id\", \"name\", \"batch_id_in\"))"; + + public static String expectedMainTableMultiPartitionsCreateQueryUpperCase = "CREATE TABLE IF NOT EXISTS \"MYDB\".\"MAIN\"" + + "(\"ID\" INTEGER NOT NULL,\"NAME\" VARCHAR NOT NULL,\"AMOUNT\" DOUBLE PRECISION,\"ACCOUNT_TYPE\" INTEGER,\"BIZ_DATE\" DATE," + + "\"DIGEST\" VARCHAR,\"BATCH_ID_IN\" INTEGER NOT NULL,\"BATCH_ID_OUT\" INTEGER,PRIMARY KEY (\"ID\", \"NAME\", \"BATCH_ID_IN\"))"; + public static String expectedMainTableBatchIdAndVersionBasedCreateQuery = "CREATE TABLE IF NOT EXISTS \"mydb\".\"main\"(" + "\"id\" INTEGER NOT NULL,\"name\" VARCHAR NOT NULL,\"amount\" DOUBLE,\"biz_date\" DATE,\"digest\" VARCHAR,\"version\" INTEGER," + "\"batch_id_in\" INTEGER NOT NULL,\"batch_id_out\" INTEGER,PRIMARY KEY (\"id\", \"name\", \"batch_id_in\"))"; diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-postgres/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/unitemporal/UnitemporalSnapshotBatchIdBasedTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-postgres/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/unitemporal/UnitemporalSnapshotBatchIdBasedTest.java index d1077618c0c..fec061feda5 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-postgres/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/unitemporal/UnitemporalSnapshotBatchIdBasedTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-postgres/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/unitemporal/UnitemporalSnapshotBatchIdBasedTest.java @@ -14,6 +14,7 @@ package org.finos.legend.engine.persistence.components.ingestmode.unitemporal; +import org.finos.legend.engine.persistence.components.AnsiTestArtifacts; import org.finos.legend.engine.persistence.components.PostgresTestArtifacts; import org.finos.legend.engine.persistence.components.common.DedupAndVersionErrorSqlType; import org.finos.legend.engine.persistence.components.relational.RelationalSink; @@ -207,6 +208,93 @@ public void verifyUnitemporalSnapshotWithPartitionFiltersNoDedupNoVersion(Genera Assertions.assertEquals(getExpectedMetadataTableIngestQuery(), metadataIngestSql.get(0)); } + @Override + public void verifyUnitemporalSnapshotWithPartitionSpecListNoDedupNoVersion(GeneratorResult operations) + { + List preActionsSql = operations.preActionsSql(); + List milestoningSql = operations.ingestSql(); + List metadataIngestSql = operations.metadataIngestSql(); + + String expectedMilestoneQuery = "UPDATE \"mydb\".\"main\" as sink SET " + + "\"batch_id_out\" = (SELECT COALESCE(MAX(batch_metadata.\"table_batch_id\"),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.\"table_name\") = 'MAIN')-1 " + + "WHERE (sink.\"batch_id_out\" = 999999999) AND " + + "(NOT (EXISTS (SELECT * FROM \"mydb\".\"staging\" as stage WHERE ((sink.\"id\" = stage.\"id\") AND (sink.\"name\" = stage.\"name\")) AND (sink.\"digest\" = stage.\"digest\")))) " + + "AND (((sink.\"biz_date\" = '2024-01-01') AND (sink.\"account_type\" = 1)) " + + "OR ((sink.\"biz_date\" = '2024-01-02') AND (sink.\"account_type\" = 1)) " + + "OR ((sink.\"biz_date\" = '2024-01-02') AND (sink.\"account_type\" = 2)))"; + + String expectedUpsertQuery = "INSERT INTO \"mydb\".\"main\" " + + "(\"id\", \"name\", \"amount\", \"account_type\", \"biz_date\", \"digest\", \"batch_id_in\", \"batch_id_out\") " + + "(SELECT stage.\"id\",stage.\"name\",stage.\"amount\",stage.\"account_type\",stage.\"biz_date\",stage.\"digest\"," + + "(SELECT COALESCE(MAX(batch_metadata.\"table_batch_id\"),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.\"table_name\") = 'MAIN')," + + "999999999 FROM \"mydb\".\"staging\" as stage WHERE " + + "NOT (stage.\"digest\" IN (SELECT sink.\"digest\" FROM \"mydb\".\"main\" as sink WHERE (sink.\"batch_id_out\" = 999999999) " + + "AND (((sink.\"biz_date\" = '2024-01-01') AND (sink.\"account_type\" = 1)) " + + "OR ((sink.\"biz_date\" = '2024-01-02') AND (sink.\"account_type\" = 1)) " + + "OR ((sink.\"biz_date\" = '2024-01-02') AND (sink.\"account_type\" = 2))))))"; + + Assertions.assertEquals(PostgresTestArtifacts.expectedMainTableMultiPartitionsCreateQuery, preActionsSql.get(0)); + Assertions.assertEquals(getExpectedMetadataTableCreateQuery(), preActionsSql.get(1)); + + Assertions.assertEquals(expectedMilestoneQuery, milestoningSql.get(0)); + Assertions.assertEquals(expectedUpsertQuery, milestoningSql.get(1)); + Assertions.assertEquals(getExpectedMetadataTableIngestQuery(), metadataIngestSql.get(0)); + } + + @Override + public void verifyUnitemporalSnapshotWithPartitionSpecListNoDedupNoVersionInUpperCase(GeneratorResult operations) + { + List preActionsSql = operations.preActionsSql(); + List milestoningSql = operations.ingestSql(); + List metadataIngestSql = operations.metadataIngestSql(); + + String expectedMilestoneQuery = "UPDATE \"MYDB\".\"MAIN\" as sink SET \"BATCH_ID_OUT\" = " + + "(SELECT COALESCE(MAX(BATCH_METADATA.\"TABLE_BATCH_ID\"),0)+1 FROM BATCH_METADATA as BATCH_METADATA WHERE UPPER(BATCH_METADATA.\"TABLE_NAME\") = 'MAIN')-1 " + + "WHERE (sink.\"BATCH_ID_OUT\" = 999999999) AND " + + "(NOT (EXISTS (SELECT * FROM \"MYDB\".\"STAGING\" as stage " + + "WHERE ((sink.\"ID\" = stage.\"ID\") AND (sink.\"NAME\" = stage.\"NAME\")) AND (sink.\"DIGEST\" = stage.\"DIGEST\")))) AND " + + "(((sink.\"ACCOUNT_TYPE\" = 1) AND (sink.\"BIZ_DATE\" = '2024-01-01')) " + + "OR ((sink.\"ACCOUNT_TYPE\" = 1) AND (sink.\"BIZ_DATE\" = '2024-01-02')) " + + "OR ((sink.\"ACCOUNT_TYPE\" = 2) AND (sink.\"BIZ_DATE\" = '2024-01-02')))"; + + String expectedUpsertQuery = "INSERT INTO \"MYDB\".\"MAIN\" " + + "(\"ID\", \"NAME\", \"AMOUNT\", \"ACCOUNT_TYPE\", \"BIZ_DATE\", \"DIGEST\", \"BATCH_ID_IN\", \"BATCH_ID_OUT\") " + + "(SELECT stage.\"ID\",stage.\"NAME\",stage.\"AMOUNT\",stage.\"ACCOUNT_TYPE\",stage.\"BIZ_DATE\",stage.\"DIGEST\"," + + "(SELECT COALESCE(MAX(BATCH_METADATA.\"TABLE_BATCH_ID\"),0)+1 FROM BATCH_METADATA as BATCH_METADATA WHERE UPPER(BATCH_METADATA.\"TABLE_NAME\") = 'MAIN')," + + "999999999 FROM \"MYDB\".\"STAGING\" as stage " + + "WHERE NOT (stage.\"DIGEST\" IN (SELECT sink.\"DIGEST\" FROM \"MYDB\".\"MAIN\" as sink WHERE (sink.\"BATCH_ID_OUT\" = 999999999) " + + "AND (((sink.\"ACCOUNT_TYPE\" = 1) AND (sink.\"BIZ_DATE\" = '2024-01-01')) " + + "OR ((sink.\"ACCOUNT_TYPE\" = 1) AND (sink.\"BIZ_DATE\" = '2024-01-02')) " + + "OR ((sink.\"ACCOUNT_TYPE\" = 2) AND (sink.\"BIZ_DATE\" = '2024-01-02'))))))"; + + Assertions.assertEquals(PostgresTestArtifacts.expectedMainTableMultiPartitionsCreateQueryUpperCase, preActionsSql.get(0)); + Assertions.assertEquals(getExpectedMetadataTableCreateQueryWithUpperCase(), preActionsSql.get(1)); + Assertions.assertEquals(expectedMilestoneQuery, milestoningSql.get(0)); + Assertions.assertEquals(expectedUpsertQuery, milestoningSql.get(1)); + Assertions.assertEquals(getExpectedMetadataTableIngestQueryWithUpperCase(), metadataIngestSql.get(0)); + } + + @Override + public void verifyUnitemporalSnapshotWithPartitionSpecListWithEmptyBatchHandling(GeneratorResult operations) + { + List preActionsSql = operations.preActionsSql(); + List milestoningSql = operations.ingestSql(); + List metadataIngestSql = operations.metadataIngestSql(); + + String expectedMilestoneQuery = "UPDATE \"mydb\".\"main\" as sink " + + "SET \"batch_id_out\" = (SELECT COALESCE(MAX(batch_metadata.\"table_batch_id\"),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.\"table_name\") = 'MAIN')-1 " + + "WHERE (sink.\"batch_id_out\" = 999999999) AND " + + "(((sink.\"biz_date\" = '2024-01-01') AND (sink.\"account_type\" = 1)) " + + "OR ((sink.\"biz_date\" = '2024-01-02') AND (sink.\"account_type\" = 1)) " + + "OR ((sink.\"biz_date\" = '2024-01-02') AND (sink.\"account_type\" = 2)))"; + + Assertions.assertEquals(PostgresTestArtifacts.expectedMainTableMultiPartitionsCreateQuery, preActionsSql.get(0)); + Assertions.assertEquals(getExpectedMetadataTableCreateQuery(), preActionsSql.get(1)); + + Assertions.assertEquals(expectedMilestoneQuery, milestoningSql.get(0)); + Assertions.assertEquals(getExpectedMetadataTableIngestQuery(), metadataIngestSql.get(0)); + } + @Override public void verifyUnitemporalSnapshotWithCleanStagingData(GeneratorResult operations) { diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-test/src/test/java/org/finos/legend/engine/persistence/components/BaseTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-test/src/test/java/org/finos/legend/engine/persistence/components/BaseTest.java index fb5e9b2d025..a949b7d73fa 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-test/src/test/java/org/finos/legend/engine/persistence/components/BaseTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-test/src/test/java/org/finos/legend/engine/persistence/components/BaseTest.java @@ -104,11 +104,45 @@ public class BaseTest protected String validityThroughTargetField = "validity_through_target"; protected String[] partitionKeys = new String[]{"biz_date"}; + + protected String[] partitionKeysMulti = new String[]{"biz_date", "account_type"}; protected Map> partitionFilter = new HashMap>() {{ put("biz_date", new HashSet<>(Arrays.asList("2000-01-01 00:00:00", "2000-01-02 00:00:00"))); }}; + protected Map> partitionFilterWithMultiValuesForMultipleKeys = new HashMap>() + {{ + put("biz_date", new HashSet<>(Arrays.asList("2000-01-01 00:00:00", "2000-01-02 00:00:00"))); + put("account_type", new HashSet<>(Arrays.asList("1", "2"))); + }}; + + protected Map> partitionFilterWithMultiValuesForOneKey = new HashMap>() + {{ + put("biz_date", new HashSet<>(Arrays.asList("2000-01-01 00:00:00", "2000-01-02 00:00:00"))); + put("account_type", new HashSet<>(Arrays.asList("1"))); + }}; + + protected List> partitionSpecList() + { + List> partitionSpecList = new ArrayList<>(); + addPartitionSpec(partitionSpecList, "2024-01-01", 1); + addPartitionSpec(partitionSpecList, "2024-01-02", 1); + addPartitionSpec(partitionSpecList, "2024-01-02", 2); + return partitionSpecList; + } + + private static void addPartitionSpec(List> partitionSpecList, String date, Integer accountType) + { + partitionSpecList.add(new HashMap() + { + { + put("biz_date", date); + put("account_type", accountType); + } + }); + } + // Base Columns: Primary keys : id, name protected Field id = Field.builder().name("id").type(FieldType.of(DataType.INT, Optional.empty(), Optional.empty())).primaryKey(true).build(); protected Field tinyIntId = Field.builder().name("id").type(FieldType.of(DataType.TINYINT, Optional.empty(), Optional.empty())).primaryKey(true).build(); @@ -116,6 +150,7 @@ public class BaseTest protected Field name = Field.builder().name("name").type(FieldType.of(DataType.VARCHAR, Optional.empty(), Optional.empty())).primaryKey(true).build(); protected Field nameModified = Field.builder().name("name").type(FieldType.of(DataType.VARCHAR, 64, null)).primaryKey(true).build(); protected Field amount = Field.builder().name("amount").type(FieldType.of(DataType.DOUBLE, Optional.empty(), Optional.empty())).build(); + protected Field accountType = Field.builder().name("account_type").type(FieldType.of(DataType.INT, Optional.empty(), Optional.empty())).build(); protected Field floatAmount = Field.builder().name("amount").type(FieldType.of(DataType.FLOAT, Optional.empty(), Optional.empty())).build(); protected Field bizDate = Field.builder().name("biz_date").type(FieldType.of(DataType.DATE, Optional.empty(), Optional.empty())).build(); @@ -216,6 +251,17 @@ public class BaseTest .addFields(batchIdOut) .build(); + protected SchemaDefinition mainTableWithMultiPartitionsBasedSchema = SchemaDefinition.builder() + .addFields(id) + .addFields(name) + .addFields(amount) + .addFields(accountType) + .addFields(bizDate) + .addFields(digest) + .addFields(batchIdIn) + .addFields(batchIdOut) + .build(); + protected SchemaDefinition mainTableBatchIdAndVersionBasedSchema = SchemaDefinition.builder() .addFields(id) .addFields(name) @@ -265,6 +311,15 @@ public class BaseTest .addFields(digest) .build(); + protected SchemaDefinition stagingTableSchemaWithMultiplePartitions = SchemaDefinition.builder() + .addFields(id) + .addFields(name) + .addFields(amount) + .addFields(accountType) + .addFields(bizDate) + .addFields(digest) + .build(); + protected SchemaDefinition mainTableSchemaWithDigest = SchemaDefinition.builder() .addFields(id) .addFields(name) @@ -652,6 +707,11 @@ protected String enrichSqlWithDataSplits(String sql, DataSplitRange dataSplitRan .schema(stagingTableSchemaWithDigest) .build(); + protected Dataset stagingTableWithMultiPartitions = DatasetDefinition.builder() + .database(stagingDbName).name(stagingTableName).alias(stagingTableAlias) + .schema(stagingTableSchemaWithMultiplePartitions) + .build(); + protected Dataset stagingTableWithFilter = DerivedDataset.builder() .database(stagingDbName).name(stagingTableName).alias(stagingTableAlias) .schema(stagingTableSchemaWithDigest) @@ -747,6 +807,10 @@ protected String enrichSqlWithDataSplits(String sql, DataSplitRange dataSplitRan .schema(mainTableBatchIdBasedSchema) .build(); + protected Dataset mainTableMultiPartitionsBased = DatasetDefinition.builder() + .database(mainDbName).name(mainTableName).alias(mainTableAlias) + .schema(mainTableWithMultiPartitionsBasedSchema) + .build(); protected Dataset mainTableWithBatchIdAndVersionBasedSchema = DatasetDefinition.builder() .database(mainDbName).name(mainTableName).alias(mainTableAlias) diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-test/src/test/java/org/finos/legend/engine/persistence/components/scenarios/UnitemporalSnapshotBatchIdBasedScenarios.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-test/src/test/java/org/finos/legend/engine/persistence/components/scenarios/UnitemporalSnapshotBatchIdBasedScenarios.java index 7f0991b20d4..14ccbfed8b4 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-test/src/test/java/org/finos/legend/engine/persistence/components/scenarios/UnitemporalSnapshotBatchIdBasedScenarios.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-test/src/test/java/org/finos/legend/engine/persistence/components/scenarios/UnitemporalSnapshotBatchIdBasedScenarios.java @@ -17,6 +17,7 @@ import org.finos.legend.engine.persistence.components.BaseTest; import org.finos.legend.engine.persistence.components.ingestmode.UnitemporalSnapshot; import org.finos.legend.engine.persistence.components.ingestmode.deduplication.FailOnDuplicates; +import org.finos.legend.engine.persistence.components.ingestmode.emptyhandling.DeleteTargetData; import org.finos.legend.engine.persistence.components.ingestmode.emptyhandling.NoOp; import org.finos.legend.engine.persistence.components.ingestmode.transactionmilestoning.BatchId; @@ -94,4 +95,19 @@ public TestScenario BATCH_ID_BASED__WITH_PARTITION_FILTER__NO_DEDUP__NO_VERSION( .build(); return new TestScenario(mainTableWithBatchIdBasedSchema, stagingTableWithBaseSchemaAndDigest, ingestMode); } + + public TestScenario BATCH_ID_BASED__WITH_PARTITION_SPEC_LIST__NO_DEDUP__NO_VERSION() + { + UnitemporalSnapshot ingestMode = UnitemporalSnapshot.builder() + .digestField(digestField) + .transactionMilestoning(BatchId.builder() + .batchIdInName(batchIdInField) + .batchIdOutName(batchIdOutField) + .build()) + .addAllPartitionFields(Arrays.asList(partitionKeysMulti)) + .addAllPartitionSpecList(partitionSpecList()) + .emptyDatasetHandling(DeleteTargetData.builder().build()) + .build(); + return new TestScenario(mainTableMultiPartitionsBased, stagingTableWithMultiPartitions, ingestMode); + } } \ No newline at end of file diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-test/src/test/java/org/finos/legend/engine/persistence/components/testcases/ingestmode/unitemporal/UnitmemporalSnapshotBatchIdBasedTestCases.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-test/src/test/java/org/finos/legend/engine/persistence/components/testcases/ingestmode/unitemporal/UnitmemporalSnapshotBatchIdBasedTestCases.java index 3914a18362f..f9a32f6a404 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-test/src/test/java/org/finos/legend/engine/persistence/components/testcases/ingestmode/unitemporal/UnitmemporalSnapshotBatchIdBasedTestCases.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-test/src/test/java/org/finos/legend/engine/persistence/components/testcases/ingestmode/unitemporal/UnitmemporalSnapshotBatchIdBasedTestCases.java @@ -17,6 +17,7 @@ import org.finos.legend.engine.persistence.components.BaseTest; import org.finos.legend.engine.persistence.components.common.Datasets; import org.finos.legend.engine.persistence.components.ingestmode.UnitemporalSnapshot; +import org.finos.legend.engine.persistence.components.ingestmode.emptyhandling.DeleteTargetData; import org.finos.legend.engine.persistence.components.ingestmode.transactionmilestoning.BatchId; import org.finos.legend.engine.persistence.components.ingestmode.versioning.AllVersionsStrategy; import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset; @@ -139,6 +140,55 @@ void testUnitemporalSnapshotWithPartitionFiltersNoDedupNoVersion() public abstract void verifyUnitemporalSnapshotWithPartitionFiltersNoDedupNoVersion(GeneratorResult operations); + @Test + void testUnitemporalSnapshotWithPartitionSpecListNoDedupNoVersion() + { + TestScenario scenario = scenarios.BATCH_ID_BASED__WITH_PARTITION_SPEC_LIST__NO_DEDUP__NO_VERSION(); + RelationalGenerator generator = RelationalGenerator.builder() + .ingestMode(scenario.getIngestMode()) + .relationalSink(getRelationalSink()) + .executionTimestampClock(fixedClock_2000_01_01) + .collectStatistics(true) + .build(); + GeneratorResult operations = generator.generateOperations(scenario.getDatasets()); + verifyUnitemporalSnapshotWithPartitionSpecListNoDedupNoVersion(operations); + } + + public abstract void verifyUnitemporalSnapshotWithPartitionSpecListNoDedupNoVersion(GeneratorResult operations); + + @Test + void testUnitemporalSnapshotWithPartitionSpecListNoDedupNoVersionInUpperCase() + { + TestScenario scenario = scenarios.BATCH_ID_BASED__WITH_PARTITION_SPEC_LIST__NO_DEDUP__NO_VERSION(); + RelationalGenerator generator = RelationalGenerator.builder() + .ingestMode(scenario.getIngestMode()) + .relationalSink(getRelationalSink()) + .executionTimestampClock(fixedClock_2000_01_01) + .collectStatistics(true) + .caseConversion(CaseConversion.TO_UPPER) + .build(); + GeneratorResult operations = generator.generateOperations(scenario.getDatasets()); + verifyUnitemporalSnapshotWithPartitionSpecListNoDedupNoVersionInUpperCase(operations); + } + + public abstract void verifyUnitemporalSnapshotWithPartitionSpecListNoDedupNoVersionInUpperCase(GeneratorResult operations); + + @Test + void testUnitemporalSnapshotWithPartitionSpecListWithEmptyBatchHandling() + { + TestScenario scenario = scenarios.BATCH_ID_BASED__WITH_PARTITION_SPEC_LIST__NO_DEDUP__NO_VERSION(); + RelationalGenerator generator = RelationalGenerator.builder() + .ingestMode(scenario.getIngestMode()) + .relationalSink(getRelationalSink()) + .executionTimestampClock(fixedClock_2000_01_01) + .collectStatistics(true) + .build(); + GeneratorResult operations = generator.generateOperationsForEmptyBatch(scenario.getDatasets()); + verifyUnitemporalSnapshotWithPartitionSpecListWithEmptyBatchHandling(operations); + } + + public abstract void verifyUnitemporalSnapshotWithPartitionSpecListWithEmptyBatchHandling(GeneratorResult operations); + @Test void testUnitemporalSnapshotWithCleanStagingData() { @@ -239,6 +289,101 @@ void testUnitemporalSnapshotAllVersionValidation() } } + @Test + void testUnitemporalSnapshotPartitionFilterWithMultiValuesForMultipleKeysValidation() + { + try + { + UnitemporalSnapshot ingestMode = UnitemporalSnapshot.builder() + .digestField(digestField) + .transactionMilestoning(BatchId.builder() + .batchIdInName(batchIdInField) + .batchIdOutName(batchIdOutField) + .build()) + .addAllPartitionFields(Arrays.asList(partitionKeysMulti)) + .putAllPartitionValuesByField(partitionFilterWithMultiValuesForMultipleKeys) + .emptyDatasetHandling(DeleteTargetData.builder().build()) + .build(); + + Assertions.fail("Exception was not thrown"); + } + catch (Exception e) + { + Assertions.assertEquals("Can not build UnitemporalSnapshot, in partitionValuesByField at most one of the partition keys can have more than one value, all other partition keys must have exactly one value", e.getMessage()); + } + } + + @Test + void testUnitemporalSnapshotPartitionFilterWithMultiValuesForOneKeyValidation() + { + try + { + UnitemporalSnapshot ingestMode = UnitemporalSnapshot.builder() + .digestField(digestField) + .transactionMilestoning(BatchId.builder() + .batchIdInName(batchIdInField) + .batchIdOutName(batchIdOutField) + .build()) + .addAllPartitionFields(Arrays.asList(partitionKeysMulti)) + .putAllPartitionValuesByField(partitionFilterWithMultiValuesForOneKey) + .emptyDatasetHandling(DeleteTargetData.builder().build()) + .build(); + } + catch (Exception e) + { + Assertions.fail("No Exception expected for multi values for one key"); + } + } + + @Test + void testUnitemporalSnapshotBothPartitionFilterAndPartitionSpecListProvidedValidation() + { + try + { + UnitemporalSnapshot ingestMode = UnitemporalSnapshot.builder() + .digestField(digestField) + .transactionMilestoning(BatchId.builder() + .batchIdInName(batchIdInField) + .batchIdOutName(batchIdOutField) + .build()) + .addAllPartitionFields(Arrays.asList(partitionKeysMulti)) + .putAllPartitionValuesByField(partitionFilterWithMultiValuesForOneKey) + .addAllPartitionSpecList(partitionSpecList()) + .emptyDatasetHandling(DeleteTargetData.builder().build()) + .build(); + + Assertions.fail("Exception was not thrown"); + } + catch (Exception e) + { + Assertions.assertEquals("Can not build UnitemporalSnapshot, Provide either partitionValuesByField or partitionSpecList, both not supported together", e.getMessage()); + } + } + + @Test + void testUnitemporalSnapshotPartitionKeyMismatchValidation() + { + try + { + UnitemporalSnapshot ingestMode = UnitemporalSnapshot.builder() + .digestField(digestField) + .transactionMilestoning(BatchId.builder() + .batchIdInName(batchIdInField) + .batchIdOutName(batchIdOutField) + .build()) + .addAllPartitionFields(Arrays.asList(partitionKeys)) + .addAllPartitionSpecList(partitionSpecList()) + .emptyDatasetHandling(DeleteTargetData.builder().build()) + .build(); + + Assertions.fail("Exception was not thrown"); + } + catch (Exception e) + { + Assertions.assertEquals("Can not build UnitemporalSnapshot, size of each partitionSpec must be same as size of partitionFields", e.getMessage()); + } + } + public abstract RelationalSink getRelationalSink(); }