diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/IngestModeCaseConverter.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/IngestModeCaseConverter.java index 500f37e90fc..089428754cd 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/IngestModeCaseConverter.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/IngestModeCaseConverter.java @@ -54,11 +54,7 @@ import org.finos.legend.engine.persistence.components.ingestmode.versioning.AllVersionsStrategy; import org.finos.legend.engine.persistence.components.ingestmode.versioning.MaxVersionStrategy; -import java.util.Optional; -import java.util.List; -import java.util.Set; -import java.util.Map; -import java.util.HashMap; +import java.util.*; import java.util.function.Function; import java.util.stream.Collectors; @@ -118,6 +114,7 @@ public IngestMode visitUnitemporalSnapshot(UnitemporalSnapshotAbstract unitempor .transactionMilestoning(unitemporalSnapshot.transactionMilestoning().accept(new TransactionMilestoningCaseConverter())) .addAllPartitionFields(applyCase(unitemporalSnapshot.partitionFields())) .putAllPartitionValuesByField(applyCase(unitemporalSnapshot.partitionValuesByField())) + .addAllPartitionSpecList(applyCaseForListOfMap(unitemporalSnapshot.partitionSpecList())) .emptyDatasetHandling(unitemporalSnapshot.emptyDatasetHandling()) .deduplicationStrategy(unitemporalSnapshot.deduplicationStrategy()) .versioningStrategy(unitemporalSnapshot.versioningStrategy().accept(new VersionStrategyCaseConverter())) @@ -210,6 +207,21 @@ private Map> applyCase(Map> map) return caseAppliedMap; } + private List> applyCaseForListOfMap(List> listOfMap) + { + List> caseAppliedListOfMap = new ArrayList<>(); + for (Map map : listOfMap) + { + Map caseAppliedMap = new HashMap<>(); + for (Map.Entry entry : map.entrySet()) + { + caseAppliedMap.put(applyCase(entry.getKey()), entry.getValue()); + } + caseAppliedListOfMap.add(caseAppliedMap); + } + return caseAppliedListOfMap; + } + private class MergeStrategyCaseConverter implements MergeStrategyVisitor { @Override diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/UnitemporalSnapshotAbstract.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/UnitemporalSnapshotAbstract.java index 51797abc002..3c86841a268 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/UnitemporalSnapshotAbstract.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/UnitemporalSnapshotAbstract.java @@ -26,10 +26,7 @@ import org.finos.legend.engine.persistence.components.ingestmode.versioning.DigestBasedResolverAbstract; import org.immutables.value.Value; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; +import java.util.*; import static org.immutables.value.Value.Derived; import static org.immutables.value.Value.Immutable; @@ -52,10 +49,9 @@ public interface UnitemporalSnapshotAbstract extends IngestMode, TransactionMile List partitionFields(); - List> partitionSpecList(); // [ [date: D1, Id: ID1, Name: N1], [date: D2, Id: ID2, Name: N2], ....] + List> partitionSpecList(); // [ {date: D1, Id: ID1, Name: N1}, {date: D2, Id: ID2, Name: N2}, ....] - // for Backward compatibility -- to be deprecated - Map> partitionValuesByField(); + Map> partitionValuesByField(); // for Backward compatibility -- to be deprecated @Derived default boolean partitioned() @@ -108,19 +104,29 @@ default void validate() } if (partitionKeysWithMoreThanOneValues > 1) { - throw new IllegalStateException(String.format("Can not build UnitemporalSnapshot, partitionValuesByField does not support more than 1 value for more than one partition key")); + throw new IllegalStateException(String.format("Can not build UnitemporalSnapshot, in partitionValuesByField at most one of the partition keys can have more than one value, all other partition keys must have exactly one value")); } } if (!partitionSpecList().isEmpty()) { - for (Map partitionSpec : partitionSpecList()) + for (Map partitionSpec : partitionSpecList()) { if (partitionFields().size() != partitionSpec.size()) { throw new IllegalStateException("Can not build UnitemporalSnapshot, size of each partitionSpec must be same as size of partitionFields"); } } + for (Map partitionSpec : partitionSpecList()) + { + for (String partitionKey: partitionSpec.keySet()) + { + if (!partitionFields().contains(partitionKey)) + { + throw new IllegalStateException(String.format("Can not build UnitemporalSnapshot, partitionKey: [%s] not specified in partitionSpec", partitionKey)); + } + } + } } // Allowed Versioning Strategy - NoVersioning, MaxVersioining diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/util/LogicalPlanUtils.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/util/LogicalPlanUtils.java index 41c1c0cd2a7..84b602203af 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/util/LogicalPlanUtils.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/util/LogicalPlanUtils.java @@ -136,7 +136,7 @@ public static Condition getPartitionColumnValueMatchInCondition(Dataset dataSet, } // (key1 = val11 AND key2 = val21) OR (key1 = val12 AND key2 = val22) OR ... - public static Condition getPartitionSpecMatchCondition(Dataset dataSet, List> partitionSpecList) + public static Condition getPartitionSpecMatchCondition(Dataset dataSet, List> partitionSpecList) { return Or.of(partitionSpecList.stream() .map(partitionSpec -> And.of( @@ -144,7 +144,7 @@ public static Condition getPartitionSpecMatchCondition(Dataset dataSet, List Equals.of( FieldValue.builder().datasetRef(dataSet.datasetReference()).fieldName(columnValuePair.getKey()).build(), - StringValue.of(columnValuePair.getValue())) + StringValue.of((String) columnValuePair.getValue())) ) .collect(Collectors.toList())) ).collect(Collectors.toList())); diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/AnsiTestArtifacts.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/AnsiTestArtifacts.java index cc90393153e..07e51009668 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/AnsiTestArtifacts.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/AnsiTestArtifacts.java @@ -79,6 +79,10 @@ public class AnsiTestArtifacts "(\"ID\" INTEGER NOT NULL,\"NAME\" VARCHAR NOT NULL,\"AMOUNT\" DOUBLE,\"BIZ_DATE\" DATE,\"DIGEST\" VARCHAR," + "\"BATCH_ID_IN\" INTEGER NOT NULL,\"BATCH_ID_OUT\" INTEGER,PRIMARY KEY (\"ID\", \"NAME\", \"BATCH_ID_IN\"))"; + public static String expectedMainTableMultiPartitionCreateQueryWithUpperCase = "CREATE TABLE IF NOT EXISTS \"MYDB\".\"MAIN\"" + + "(\"ID\" INTEGER NOT NULL,\"NAME\" VARCHAR NOT NULL,\"AMOUNT\" DOUBLE,\"ACCOUNT_TYPE\" VARCHAR,\"BIZ_DATE\" DATE,\"DIGEST\" VARCHAR," + + "\"BATCH_ID_IN\" INTEGER NOT NULL,\"BATCH_ID_OUT\" INTEGER,PRIMARY KEY (\"ID\", \"NAME\", \"BATCH_ID_IN\"))"; + public static String expectedMainTableTimeBasedCreateQuery = "CREATE TABLE IF NOT EXISTS \"mydb\".\"main\"(" + "\"id\" INTEGER NOT NULL,\"name\" VARCHAR NOT NULL,\"amount\" DOUBLE,\"biz_date\" DATE,\"digest\" VARCHAR," + "\"batch_time_in\" DATETIME NOT NULL,\"batch_time_out\" DATETIME,PRIMARY KEY (\"id\", \"name\", \"batch_time_in\"))"; diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/unitemporal/UnitemporalSnapshotBatchIdBasedTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/unitemporal/UnitemporalSnapshotBatchIdBasedTest.java index 9dcba8a9a8f..25a6527c7f9 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/unitemporal/UnitemporalSnapshotBatchIdBasedTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/unitemporal/UnitemporalSnapshotBatchIdBasedTest.java @@ -242,6 +242,39 @@ public void verifyUnitemporalSnapshotWithPartitionSpecListNoDedupNoVersion(Gener Assertions.assertEquals(getExpectedMetadataTableIngestQuery(), metadataIngestSql.get(0)); } + @Override + public void verifyUnitemporalSnapshotWithPartitionSpecListNoDedupNoVersionInUpperCase(GeneratorResult operations) + { + List preActionsSql = operations.preActionsSql(); + List milestoningSql = operations.ingestSql(); + List metadataIngestSql = operations.metadataIngestSql(); + + String expectedMilestoneQuery = "UPDATE \"MYDB\".\"MAIN\" as sink " + + "SET sink.\"BATCH_ID_OUT\" = (SELECT COALESCE(MAX(BATCH_METADATA.\"TABLE_BATCH_ID\"),0)+1 FROM BATCH_METADATA as BATCH_METADATA WHERE UPPER(BATCH_METADATA.\"TABLE_NAME\") = 'MAIN')-1 " + + "WHERE (sink.\"BATCH_ID_OUT\" = 999999999) AND " + + "(NOT (EXISTS (SELECT * FROM \"MYDB\".\"STAGING\" as stage " + + "WHERE ((sink.\"ID\" = stage.\"ID\") AND (sink.\"NAME\" = stage.\"NAME\")) AND (sink.\"DIGEST\" = stage.\"DIGEST\")))) " + + "AND (((sink.\"ACCOUNT_TYPE\" = 'TYPE_1') AND (sink.\"BIZ_DATE\" = '2024-01-01')) " + + "OR ((sink.\"ACCOUNT_TYPE\" = 'TYPE_1') AND (sink.\"BIZ_DATE\" = '2024-01-02')) " + + "OR ((sink.\"ACCOUNT_TYPE\" = 'TYPE_2') AND (sink.\"BIZ_DATE\" = '2024-01-02')))"; + + String expectedUpsertQuery = "INSERT INTO \"MYDB\".\"MAIN\" " + + "(\"ID\", \"NAME\", \"AMOUNT\", \"ACCOUNT_TYPE\", \"BIZ_DATE\", \"DIGEST\", \"BATCH_ID_IN\", \"BATCH_ID_OUT\") " + + "(SELECT stage.\"ID\",stage.\"NAME\",stage.\"AMOUNT\",stage.\"ACCOUNT_TYPE\",stage.\"BIZ_DATE\",stage.\"DIGEST\"," + + "(SELECT COALESCE(MAX(BATCH_METADATA.\"TABLE_BATCH_ID\"),0)+1 FROM BATCH_METADATA as BATCH_METADATA WHERE UPPER(BATCH_METADATA.\"TABLE_NAME\") = 'MAIN')," + + "999999999 FROM \"MYDB\".\"STAGING\" as stage " + + "WHERE NOT (stage.\"DIGEST\" IN (SELECT sink.\"DIGEST\" FROM \"MYDB\".\"MAIN\" as sink WHERE (sink.\"BATCH_ID_OUT\" = 999999999) " + + "AND (((sink.\"ACCOUNT_TYPE\" = 'TYPE_1') AND (sink.\"BIZ_DATE\" = '2024-01-01')) " + + "OR ((sink.\"ACCOUNT_TYPE\" = 'TYPE_1') AND (sink.\"BIZ_DATE\" = '2024-01-02')) " + + "OR ((sink.\"ACCOUNT_TYPE\" = 'TYPE_2') AND (sink.\"BIZ_DATE\" = '2024-01-02'))))))"; + + Assertions.assertEquals(AnsiTestArtifacts.expectedMainTableMultiPartitionCreateQueryWithUpperCase, preActionsSql.get(0)); + Assertions.assertEquals(getExpectedMetadataTableCreateQueryWithUpperCase(), preActionsSql.get(1)); + Assertions.assertEquals(expectedMilestoneQuery, milestoningSql.get(0)); + Assertions.assertEquals(expectedUpsertQuery, milestoningSql.get(1)); + Assertions.assertEquals(getExpectedMetadataTableIngestQueryWithUpperCase(), metadataIngestSql.get(0)); + } + @Override public void verifyUnitemporalSnapshotWithPartitionSpecListWithEmptyBatchHandling(GeneratorResult operations) { diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/BigQueryTestArtifacts.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/BigQueryTestArtifacts.java index c6a6c6f7bef..721d0076c50 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/BigQueryTestArtifacts.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/BigQueryTestArtifacts.java @@ -205,6 +205,10 @@ public class BigQueryTestArtifacts "`id` INT64 NOT NULL,`name` STRING NOT NULL,`amount` FLOAT64,`account_type` STRING,`biz_date` DATE,`digest` STRING," + "`batch_id_in` INT64 NOT NULL,`batch_id_out` INT64,PRIMARY KEY (`id`, `name`, `batch_id_in`) NOT ENFORCED)"; + public static String expectedMainTableWithMultiPartitionCreateQueryUpperCase = "CREATE TABLE IF NOT EXISTS `MYDB`.`MAIN`" + + "(`ID` INT64 NOT NULL,`NAME` STRING NOT NULL,`AMOUNT` FLOAT64,`ACCOUNT_TYPE` STRING,`BIZ_DATE` DATE,`DIGEST` STRING," + + "`BATCH_ID_IN` INT64 NOT NULL,`BATCH_ID_OUT` INT64,PRIMARY KEY (`ID`, `NAME`, `BATCH_ID_IN`) NOT ENFORCED)"; + public static String expectedMetadataTableCreateQuery = "CREATE TABLE IF NOT EXISTS batch_metadata" + "(`table_name` STRING(255)," + "`batch_start_ts_utc` DATETIME," + diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/UnitemporalSnapshotBatchIdBasedTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/UnitemporalSnapshotBatchIdBasedTest.java index d0fc4a4d92b..0c99ac890ca 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/UnitemporalSnapshotBatchIdBasedTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/UnitemporalSnapshotBatchIdBasedTest.java @@ -217,6 +217,38 @@ public void verifyUnitemporalSnapshotWithPartitionSpecListNoDedupNoVersion(Gener Assertions.assertEquals(getExpectedMetadataTableIngestQuery(), metadataIngestSql.get(0)); } + @Override + public void verifyUnitemporalSnapshotWithPartitionSpecListNoDedupNoVersionInUpperCase(GeneratorResult operations) + { + List preActionsSql = operations.preActionsSql(); + List milestoningSql = operations.ingestSql(); + List metadataIngestSql = operations.metadataIngestSql(); + + String expectedMilestoneQuery = "UPDATE `MYDB`.`MAIN` as sink SET " + + "sink.`BATCH_ID_OUT` = (SELECT COALESCE(MAX(BATCH_METADATA.`TABLE_BATCH_ID`),0)+1 FROM BATCH_METADATA as BATCH_METADATA WHERE UPPER(BATCH_METADATA.`TABLE_NAME`) = 'MAIN')-1 " + + "WHERE (sink.`BATCH_ID_OUT` = 999999999) AND (NOT (EXISTS (SELECT * FROM `MYDB`.`STAGING` as stage " + + "WHERE ((sink.`ID` = stage.`ID`) AND (sink.`NAME` = stage.`NAME`)) AND (sink.`DIGEST` = stage.`DIGEST`)))) " + + "AND (((sink.`ACCOUNT_TYPE` = 'TYPE_1') AND (sink.`BIZ_DATE` = '2024-01-01')) " + + "OR ((sink.`ACCOUNT_TYPE` = 'TYPE_1') AND (sink.`BIZ_DATE` = '2024-01-02')) " + + "OR ((sink.`ACCOUNT_TYPE` = 'TYPE_2') AND (sink.`BIZ_DATE` = '2024-01-02')))"; + + String expectedUpsertQuery = "INSERT INTO `MYDB`.`MAIN` " + + "(`ID`, `NAME`, `AMOUNT`, `ACCOUNT_TYPE`, `BIZ_DATE`, `DIGEST`, `BATCH_ID_IN`, `BATCH_ID_OUT`) " + + "(SELECT stage.`ID`,stage.`NAME`,stage.`AMOUNT`,stage.`ACCOUNT_TYPE`,stage.`BIZ_DATE`,stage.`DIGEST`," + + "(SELECT COALESCE(MAX(BATCH_METADATA.`TABLE_BATCH_ID`),0)+1 FROM BATCH_METADATA as BATCH_METADATA WHERE UPPER(BATCH_METADATA.`TABLE_NAME`) = 'MAIN')," + + "999999999 FROM `MYDB`.`STAGING` as stage WHERE " + + "NOT (stage.`DIGEST` IN (SELECT sink.`DIGEST` FROM `MYDB`.`MAIN` as sink WHERE (sink.`BATCH_ID_OUT` = 999999999) AND " + + "(((sink.`ACCOUNT_TYPE` = 'TYPE_1') AND (sink.`BIZ_DATE` = '2024-01-01')) " + + "OR ((sink.`ACCOUNT_TYPE` = 'TYPE_1') AND (sink.`BIZ_DATE` = '2024-01-02')) " + + "OR ((sink.`ACCOUNT_TYPE` = 'TYPE_2') AND (sink.`BIZ_DATE` = '2024-01-02'))))))"; + + Assertions.assertEquals(BigQueryTestArtifacts.expectedMainTableWithMultiPartitionCreateQueryUpperCase, preActionsSql.get(0)); + Assertions.assertEquals(BigQueryTestArtifacts.expectedMetadataTableCreateQueryWithUpperCase, preActionsSql.get(1)); + Assertions.assertEquals(expectedMilestoneQuery, milestoningSql.get(0)); + Assertions.assertEquals(expectedUpsertQuery, milestoningSql.get(1)); + Assertions.assertEquals(getExpectedMetadataTableIngestQueryWithUpperCase(), metadataIngestSql.get(0)); + } + @Override public void verifyUnitemporalSnapshotWithPartitionSpecListWithEmptyBatchHandling(GeneratorResult operations) { diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/unitemporal/UnitemporalSnapshotWithBatchIdTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/unitemporal/UnitemporalSnapshotWithBatchIdTest.java index 38aa136457e..5b99807bf09 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/unitemporal/UnitemporalSnapshotWithBatchIdTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/unitemporal/UnitemporalSnapshotWithBatchIdTest.java @@ -259,7 +259,7 @@ void testUnitemporalSnapshotMilestoningLogicWithMultiplePartitionValues() throws createStagingTable(stagingTable); - List> partitionSpecList = new ArrayList<>(); + List> partitionSpecList = new ArrayList<>(); addPartitionSpec(partitionSpecList, "2024-01-01", "ACCOUNT_1"); addPartitionSpec(partitionSpecList, "2024-01-01", "ACCOUNT_2"); addPartitionSpec(partitionSpecList, "2024-01-02", "ACCOUNT_1"); @@ -333,9 +333,9 @@ void testUnitemporalSnapshotMilestoningLogicWithMultiplePartitionValues() throws executePlansAndVerifyResults(ingestModeWithDeleteTargetData, options, datasets, schema, expectedDataPass3, expectedStats); } - private static void addPartitionSpec(List> partitionSpecList, String date, String accountNum) + private static void addPartitionSpec(List> partitionSpecList, String date, String accountNum) { - partitionSpecList.add(new HashMap() + partitionSpecList.add(new HashMap() { { put(dateName, date); diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-memsql/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/MemsqlTestArtifacts.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-memsql/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/MemsqlTestArtifacts.java index 3ea872c2cdd..3bc013700ab 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-memsql/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/MemsqlTestArtifacts.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-memsql/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/MemsqlTestArtifacts.java @@ -218,6 +218,10 @@ public class MemsqlTestArtifacts "(`ID` INTEGER NOT NULL,`NAME` VARCHAR(256) NOT NULL,`AMOUNT` DOUBLE,`BIZ_DATE` DATE,`DIGEST` VARCHAR(256)," + "`BATCH_ID_IN` INTEGER NOT NULL,`BATCH_ID_OUT` INTEGER,PRIMARY KEY (`ID`, `NAME`, `BATCH_ID_IN`))"; + public static String expectedMainTableMultiPartitionsCreateQueryWithUpperCase = "CREATE TABLE IF NOT EXISTS `MYDB`.`MAIN`" + + "(`ID` INTEGER NOT NULL,`NAME` VARCHAR(256) NOT NULL,`AMOUNT` DOUBLE,`ACCOUNT_TYPE` VARCHAR(256),`BIZ_DATE` DATE,`DIGEST` VARCHAR(256)," + + "`BATCH_ID_IN` INTEGER NOT NULL,`BATCH_ID_OUT` INTEGER,PRIMARY KEY (`ID`, `NAME`, `BATCH_ID_IN`))"; + public static String expectedMetadataTableIngestQuery = "INSERT INTO batch_metadata (`table_name`, `table_batch_id`, `batch_start_ts_utc`, `batch_end_ts_utc`, `batch_status`)" + " (SELECT 'main',(SELECT COALESCE(MAX(batch_metadata.`table_batch_id`),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.`table_name`) = 'MAIN'),'2000-01-01 00:00:00.000000',CURRENT_TIMESTAMP(),'DONE')"; diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-memsql/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/UnitemporalSnapshotBatchIdBasedTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-memsql/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/UnitemporalSnapshotBatchIdBasedTest.java index de3e2072513..29743d9d76b 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-memsql/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/UnitemporalSnapshotBatchIdBasedTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-memsql/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/UnitemporalSnapshotBatchIdBasedTest.java @@ -14,6 +14,7 @@ package org.finos.legend.engine.persistence.components.ingestmode; +import org.finos.legend.engine.persistence.components.AnsiTestArtifacts; import org.finos.legend.engine.persistence.components.common.DedupAndVersionErrorSqlType; import org.finos.legend.engine.persistence.components.relational.RelationalSink; import org.finos.legend.engine.persistence.components.relational.api.GeneratorResult; @@ -216,6 +217,38 @@ public void verifyUnitemporalSnapshotWithPartitionSpecListNoDedupNoVersion(Gener Assertions.assertEquals(getExpectedMetadataTableIngestQuery(), metadataIngestSql.get(0)); } + @Override + public void verifyUnitemporalSnapshotWithPartitionSpecListNoDedupNoVersionInUpperCase(GeneratorResult operations) + { + List preActionsSql = operations.preActionsSql(); + List milestoningSql = operations.ingestSql(); + List metadataIngestSql = operations.metadataIngestSql(); + + String expectedMilestoneQuery = "UPDATE `MYDB`.`MAIN` as sink SET " + + "sink.`BATCH_ID_OUT` = (SELECT COALESCE(MAX(BATCH_METADATA.`TABLE_BATCH_ID`),0)+1 FROM BATCH_METADATA as BATCH_METADATA WHERE UPPER(BATCH_METADATA.`TABLE_NAME`) = 'MAIN')-1 " + + "WHERE (sink.`BATCH_ID_OUT` = 999999999) AND (NOT (EXISTS (SELECT * FROM `MYDB`.`STAGING` as stage " + + "WHERE ((sink.`ID` = stage.`ID`) AND (sink.`NAME` = stage.`NAME`)) AND (sink.`DIGEST` = stage.`DIGEST`)))) " + + "AND (((sink.`ACCOUNT_TYPE` = 'TYPE_1') AND (sink.`BIZ_DATE` = '2024-01-01')) " + + "OR ((sink.`ACCOUNT_TYPE` = 'TYPE_1') AND (sink.`BIZ_DATE` = '2024-01-02')) " + + "OR ((sink.`ACCOUNT_TYPE` = 'TYPE_2') AND (sink.`BIZ_DATE` = '2024-01-02')))"; + + String expectedUpsertQuery = "INSERT INTO `MYDB`.`MAIN` " + + "(`ID`, `NAME`, `AMOUNT`, `ACCOUNT_TYPE`, `BIZ_DATE`, `DIGEST`, `BATCH_ID_IN`, `BATCH_ID_OUT`) " + + "(SELECT stage.`ID`,stage.`NAME`,stage.`AMOUNT`,stage.`ACCOUNT_TYPE`,stage.`BIZ_DATE`,stage.`DIGEST`," + + "(SELECT COALESCE(MAX(BATCH_METADATA.`TABLE_BATCH_ID`),0)+1 FROM BATCH_METADATA as BATCH_METADATA WHERE UPPER(BATCH_METADATA.`TABLE_NAME`) = 'MAIN')," + + "999999999 FROM `MYDB`.`STAGING` as stage WHERE " + + "NOT (stage.`DIGEST` IN (SELECT sink.`DIGEST` FROM `MYDB`.`MAIN` as sink WHERE (sink.`BATCH_ID_OUT` = 999999999) AND " + + "(((sink.`ACCOUNT_TYPE` = 'TYPE_1') AND (sink.`BIZ_DATE` = '2024-01-01')) " + + "OR ((sink.`ACCOUNT_TYPE` = 'TYPE_1') AND (sink.`BIZ_DATE` = '2024-01-02')) " + + "OR ((sink.`ACCOUNT_TYPE` = 'TYPE_2') AND (sink.`BIZ_DATE` = '2024-01-02'))))))"; + + Assertions.assertEquals(MemsqlTestArtifacts.expectedMainTableMultiPartitionsCreateQueryWithUpperCase, preActionsSql.get(0)); + Assertions.assertEquals(MemsqlTestArtifacts.expectedMetadataTableCreateQueryWithUpperCase, preActionsSql.get(1)); + Assertions.assertEquals(expectedMilestoneQuery, milestoningSql.get(0)); + Assertions.assertEquals(expectedUpsertQuery, milestoningSql.get(1)); + Assertions.assertEquals(getExpectedMetadataTableIngestQueryWithUpperCase(), metadataIngestSql.get(0)); + } + @Override public void verifyUnitemporalSnapshotWithPartitionSpecListWithEmptyBatchHandling(GeneratorResult operations) { diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-postgres/src/test/java/org/finos/legend/engine/persistence/components/PostgresTestArtifacts.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-postgres/src/test/java/org/finos/legend/engine/persistence/components/PostgresTestArtifacts.java index 8b2634336bf..5ccf0e9c411 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-postgres/src/test/java/org/finos/legend/engine/persistence/components/PostgresTestArtifacts.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-postgres/src/test/java/org/finos/legend/engine/persistence/components/PostgresTestArtifacts.java @@ -67,6 +67,10 @@ public class PostgresTestArtifacts "\"id\" INTEGER NOT NULL,\"name\" VARCHAR NOT NULL,\"amount\" DOUBLE PRECISION,\"account_type\" VARCHAR,\"biz_date\" DATE,\"digest\" VARCHAR," + "\"batch_id_in\" INTEGER NOT NULL,\"batch_id_out\" INTEGER,PRIMARY KEY (\"id\", \"name\", \"batch_id_in\"))"; + public static String expectedMainTableMultiPartitionsCreateQueryUpperCase = "CREATE TABLE IF NOT EXISTS \"MYDB\".\"MAIN\"" + + "(\"ID\" INTEGER NOT NULL,\"NAME\" VARCHAR NOT NULL,\"AMOUNT\" DOUBLE PRECISION,\"ACCOUNT_TYPE\" VARCHAR,\"BIZ_DATE\" DATE," + + "\"DIGEST\" VARCHAR,\"BATCH_ID_IN\" INTEGER NOT NULL,\"BATCH_ID_OUT\" INTEGER,PRIMARY KEY (\"ID\", \"NAME\", \"BATCH_ID_IN\"))"; + public static String expectedMainTableBatchIdAndVersionBasedCreateQuery = "CREATE TABLE IF NOT EXISTS \"mydb\".\"main\"(" + "\"id\" INTEGER NOT NULL,\"name\" VARCHAR NOT NULL,\"amount\" DOUBLE,\"biz_date\" DATE,\"digest\" VARCHAR,\"version\" INTEGER," + "\"batch_id_in\" INTEGER NOT NULL,\"batch_id_out\" INTEGER,PRIMARY KEY (\"id\", \"name\", \"batch_id_in\"))"; diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-postgres/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/unitemporal/UnitemporalSnapshotBatchIdBasedTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-postgres/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/unitemporal/UnitemporalSnapshotBatchIdBasedTest.java index 34985352601..627cbdfba12 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-postgres/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/unitemporal/UnitemporalSnapshotBatchIdBasedTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-postgres/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/unitemporal/UnitemporalSnapshotBatchIdBasedTest.java @@ -14,6 +14,7 @@ package org.finos.legend.engine.persistence.components.ingestmode.unitemporal; +import org.finos.legend.engine.persistence.components.AnsiTestArtifacts; import org.finos.legend.engine.persistence.components.PostgresTestArtifacts; import org.finos.legend.engine.persistence.components.common.DedupAndVersionErrorSqlType; import org.finos.legend.engine.persistence.components.relational.RelationalSink; @@ -240,6 +241,39 @@ public void verifyUnitemporalSnapshotWithPartitionSpecListNoDedupNoVersion(Gener Assertions.assertEquals(getExpectedMetadataTableIngestQuery(), metadataIngestSql.get(0)); } + @Override + public void verifyUnitemporalSnapshotWithPartitionSpecListNoDedupNoVersionInUpperCase(GeneratorResult operations) + { + List preActionsSql = operations.preActionsSql(); + List milestoningSql = operations.ingestSql(); + List metadataIngestSql = operations.metadataIngestSql(); + + String expectedMilestoneQuery = "UPDATE \"MYDB\".\"MAIN\" as sink SET \"BATCH_ID_OUT\" = " + + "(SELECT COALESCE(MAX(BATCH_METADATA.\"TABLE_BATCH_ID\"),0)+1 FROM BATCH_METADATA as BATCH_METADATA WHERE UPPER(BATCH_METADATA.\"TABLE_NAME\") = 'MAIN')-1 " + + "WHERE (sink.\"BATCH_ID_OUT\" = 999999999) AND " + + "(NOT (EXISTS (SELECT * FROM \"MYDB\".\"STAGING\" as stage " + + "WHERE ((sink.\"ID\" = stage.\"ID\") AND (sink.\"NAME\" = stage.\"NAME\")) AND (sink.\"DIGEST\" = stage.\"DIGEST\")))) AND " + + "(((sink.\"ACCOUNT_TYPE\" = 'TYPE_1') AND (sink.\"BIZ_DATE\" = '2024-01-01')) " + + "OR ((sink.\"ACCOUNT_TYPE\" = 'TYPE_1') AND (sink.\"BIZ_DATE\" = '2024-01-02')) " + + "OR ((sink.\"ACCOUNT_TYPE\" = 'TYPE_2') AND (sink.\"BIZ_DATE\" = '2024-01-02')))"; + + String expectedUpsertQuery = "INSERT INTO \"MYDB\".\"MAIN\" " + + "(\"ID\", \"NAME\", \"AMOUNT\", \"ACCOUNT_TYPE\", \"BIZ_DATE\", \"DIGEST\", \"BATCH_ID_IN\", \"BATCH_ID_OUT\") " + + "(SELECT stage.\"ID\",stage.\"NAME\",stage.\"AMOUNT\",stage.\"ACCOUNT_TYPE\",stage.\"BIZ_DATE\",stage.\"DIGEST\"," + + "(SELECT COALESCE(MAX(BATCH_METADATA.\"TABLE_BATCH_ID\"),0)+1 FROM BATCH_METADATA as BATCH_METADATA WHERE UPPER(BATCH_METADATA.\"TABLE_NAME\") = 'MAIN')," + + "999999999 FROM \"MYDB\".\"STAGING\" as stage " + + "WHERE NOT (stage.\"DIGEST\" IN (SELECT sink.\"DIGEST\" FROM \"MYDB\".\"MAIN\" as sink WHERE (sink.\"BATCH_ID_OUT\" = 999999999) " + + "AND (((sink.\"ACCOUNT_TYPE\" = 'TYPE_1') AND (sink.\"BIZ_DATE\" = '2024-01-01')) " + + "OR ((sink.\"ACCOUNT_TYPE\" = 'TYPE_1') AND (sink.\"BIZ_DATE\" = '2024-01-02')) " + + "OR ((sink.\"ACCOUNT_TYPE\" = 'TYPE_2') AND (sink.\"BIZ_DATE\" = '2024-01-02'))))))"; + + Assertions.assertEquals(PostgresTestArtifacts.expectedMainTableMultiPartitionsCreateQueryUpperCase, preActionsSql.get(0)); + Assertions.assertEquals(getExpectedMetadataTableCreateQueryWithUpperCase(), preActionsSql.get(1)); + Assertions.assertEquals(expectedMilestoneQuery, milestoningSql.get(0)); + Assertions.assertEquals(expectedUpsertQuery, milestoningSql.get(1)); + Assertions.assertEquals(getExpectedMetadataTableIngestQueryWithUpperCase(), metadataIngestSql.get(0)); + } + @Override public void verifyUnitemporalSnapshotWithPartitionSpecListWithEmptyBatchHandling(GeneratorResult operations) { diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-test/src/test/java/org/finos/legend/engine/persistence/components/BaseTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-test/src/test/java/org/finos/legend/engine/persistence/components/BaseTest.java index ba8dd71a27e..777779426fb 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-test/src/test/java/org/finos/legend/engine/persistence/components/BaseTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-test/src/test/java/org/finos/legend/engine/persistence/components/BaseTest.java @@ -123,18 +123,18 @@ public class BaseTest put("account_type", new HashSet<>(Arrays.asList("TYPE_1"))); }}; - protected List> partitionSpecList() + protected List> partitionSpecList() { - List> partitionSpecList = new ArrayList<>(); + List> partitionSpecList = new ArrayList<>(); addPartitionSpec(partitionSpecList, "2024-01-01", "TYPE_1"); addPartitionSpec(partitionSpecList, "2024-01-02", "TYPE_1"); addPartitionSpec(partitionSpecList, "2024-01-02", "TYPE_2"); return partitionSpecList; } - private static void addPartitionSpec(List> partitionSpecList, String date, String accountType) + private static void addPartitionSpec(List> partitionSpecList, String date, String accountType) { - partitionSpecList.add(new HashMap() + partitionSpecList.add(new HashMap() { { put("biz_date", date); diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-test/src/test/java/org/finos/legend/engine/persistence/components/testcases/ingestmode/unitemporal/UnitmemporalSnapshotBatchIdBasedTestCases.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-test/src/test/java/org/finos/legend/engine/persistence/components/testcases/ingestmode/unitemporal/UnitmemporalSnapshotBatchIdBasedTestCases.java index c2f0a8665cb..f9a32f6a404 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-test/src/test/java/org/finos/legend/engine/persistence/components/testcases/ingestmode/unitemporal/UnitmemporalSnapshotBatchIdBasedTestCases.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-test/src/test/java/org/finos/legend/engine/persistence/components/testcases/ingestmode/unitemporal/UnitmemporalSnapshotBatchIdBasedTestCases.java @@ -156,6 +156,23 @@ void testUnitemporalSnapshotWithPartitionSpecListNoDedupNoVersion() public abstract void verifyUnitemporalSnapshotWithPartitionSpecListNoDedupNoVersion(GeneratorResult operations); + @Test + void testUnitemporalSnapshotWithPartitionSpecListNoDedupNoVersionInUpperCase() + { + TestScenario scenario = scenarios.BATCH_ID_BASED__WITH_PARTITION_SPEC_LIST__NO_DEDUP__NO_VERSION(); + RelationalGenerator generator = RelationalGenerator.builder() + .ingestMode(scenario.getIngestMode()) + .relationalSink(getRelationalSink()) + .executionTimestampClock(fixedClock_2000_01_01) + .collectStatistics(true) + .caseConversion(CaseConversion.TO_UPPER) + .build(); + GeneratorResult operations = generator.generateOperations(scenario.getDatasets()); + verifyUnitemporalSnapshotWithPartitionSpecListNoDedupNoVersionInUpperCase(operations); + } + + public abstract void verifyUnitemporalSnapshotWithPartitionSpecListNoDedupNoVersionInUpperCase(GeneratorResult operations); + @Test void testUnitemporalSnapshotWithPartitionSpecListWithEmptyBatchHandling() { @@ -292,7 +309,7 @@ void testUnitemporalSnapshotPartitionFilterWithMultiValuesForMultipleKeysValidat } catch (Exception e) { - Assertions.assertEquals("Can not build UnitemporalSnapshot, partitionValuesByField does not support more than 1 value for more than one partition key", e.getMessage()); + Assertions.assertEquals("Can not build UnitemporalSnapshot, in partitionValuesByField at most one of the partition keys can have more than one value, all other partition keys must have exactly one value", e.getMessage()); } }