From 551506bd05225d94136d1235e7ad2cc9d331f5ef Mon Sep 17 00:00:00 2001 From: Mythreyi Date: Tue, 24 Dec 2024 10:54:34 +0800 Subject: [PATCH] Persistence Component: Tests for unitemporal snapshot with partition without digest --- .../ingestmode/IngestModeCaseConverter.java | 1 + .../UnitemporalSnapshotAbstract.java | 2 - .../planner/UnitemporalSnapshotPlanner.java | 27 ++-- .../components/AnsiTestArtifacts.java | 12 ++ .../UnitemporalSnapshotBatchIdBasedTest.java | 128 +++++++++++++++++ .../ingestmode/BigQueryTestArtifacts.java | 12 ++ .../UnitemporalSnapshotBatchIdBasedTest.java | 124 +++++++++++++++++ .../ingestmode/MemsqlTestArtifacts.java | 12 ++ .../UnitemporalSnapshotBatchIdBasedTest.java | 129 ++++++++++++++++++ .../components/PostgresTestArtifacts.java | 8 ++ .../UnitemporalSnapshotBatchIdBasedTest.java | 129 ++++++++++++++++++ .../persistence/components/BaseTest.java | 42 ++++++ ...temporalSnapshotBatchIdBasedScenarios.java | 41 ++++++ ...memporalSnapshotBatchIdBasedTestCases.java | 81 +++++++++++ 14 files changed, 730 insertions(+), 18 deletions(-) diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/IngestModeCaseConverter.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/IngestModeCaseConverter.java index 85d99bd3120..67cd3806209 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/IngestModeCaseConverter.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/IngestModeCaseConverter.java @@ -375,6 +375,7 @@ public PartitioningStrategy visitPartitioning(PartitioningAbstract partitionStra { return Partitioning.builder() .addAllPartitionFields(applyCase(partitionStrategy.partitionFields())) + .deleteStrategy(partitionStrategy.deleteStrategy()) .putAllPartitionValuesByField(applyCase(partitionStrategy.partitionValuesByField())) .addAllPartitionSpecList(applyCaseForListOfMap(partitionStrategy.partitionSpecList())) .derivePartitionSpec(partitionStrategy.derivePartitionSpec()) diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/UnitemporalSnapshotAbstract.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/UnitemporalSnapshotAbstract.java index f76521664f5..372c3863f49 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/UnitemporalSnapshotAbstract.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/UnitemporalSnapshotAbstract.java @@ -42,8 +42,6 @@ ) public interface UnitemporalSnapshotAbstract extends IngestMode, TransactionMilestoned { - Optional digestField(); - @Value.Default default PartitioningStrategy partitioningStrategy() { diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/planner/UnitemporalSnapshotPlanner.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/planner/UnitemporalSnapshotPlanner.java index b5bc5f0dc85..cf4068a6577 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/planner/UnitemporalSnapshotPlanner.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/planner/UnitemporalSnapshotPlanner.java @@ -210,24 +210,23 @@ else if (!partition.partitionSpecList().isEmpty()) */ protected Update getSqlToMilestoneRows(List> values) { - Condition notExistsWhereClause = Not.of(Exists.of( - Selection.builder() - .source(stagingDataset()) - .condition(And.builder().addConditions(primaryKeysMatchCondition, digestMatchCondition).build()) - .addAllFields(LogicalPlanUtils.ALL_COLUMNS()) - .build())); + List whereClause = new ArrayList<>(Arrays.asList(openRecordCondition)); - List whereClause = new ArrayList<>((Arrays.asList(openRecordCondition))); + if (!(partitioning.isPresent() && partitioning.get().deleteStrategy() == DeleteStrategy.DELETE_ALL)) + { + Condition notExistsWhereClause = Not.of(Exists.of( + Selection.builder() + .source(stagingDataset()) + .condition(And.builder().addConditions(primaryKeysMatchCondition, digestMatchCondition).build()) + .addAllFields(LogicalPlanUtils.ALL_COLUMNS()) + .build())); + whereClause.add(notExistsWhereClause); + } if (partitioning.isPresent()) { Partitioning partition = partitioning.get(); - if (partition.deleteStrategy() == DeleteStrategy.DELETE_UPDATED) - { - whereClause.add(notExistsWhereClause); - } - if (!partition.partitionValuesByField().isEmpty()) { whereClause.add(LogicalPlanUtils.getPartitionColumnValueMatchInCondition(mainDataset(), partition.partitionValuesByField())); @@ -247,10 +246,6 @@ else if (!partition.partitionSpecList().isEmpty()) whereClause.add(partitionColumnCondition); } } - else - { - whereClause.add(notExistsWhereClause); - } return UpdateAbstract.of(mainDataset(), values, And.of(whereClause)); } diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/AnsiTestArtifacts.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/AnsiTestArtifacts.java index faddf7743f6..5cfdbf4e981 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/AnsiTestArtifacts.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/AnsiTestArtifacts.java @@ -67,10 +67,18 @@ public class AnsiTestArtifacts "\"id\" INTEGER NOT NULL,\"name\" VARCHAR NOT NULL,\"amount\" DOUBLE,\"biz_date\" DATE,\"digest\" VARCHAR," + "\"batch_id_in\" INTEGER NOT NULL,\"batch_id_out\" INTEGER,PRIMARY KEY (\"id\", \"name\", \"batch_id_in\"))"; + public static String expectedMainTableBatchIdBasedWithoutDigestCreateQuery = "CREATE TABLE IF NOT EXISTS \"mydb\".\"main\"(" + + "\"id\" INTEGER NOT NULL,\"name\" VARCHAR NOT NULL,\"amount\" DOUBLE,\"biz_date\" DATE," + + "\"batch_id_in\" INTEGER NOT NULL,\"batch_id_out\" INTEGER,PRIMARY KEY (\"id\", \"name\", \"batch_id_in\"))"; + public static String expectedMainTableWithMultiPartitionsCreateQuery = "CREATE TABLE IF NOT EXISTS \"mydb\".\"main\"(" + "\"id\" INTEGER NOT NULL,\"name\" VARCHAR NOT NULL,\"amount\" DOUBLE,\"account_type\" INTEGER,\"biz_date\" DATE,\"digest\" VARCHAR," + "\"batch_id_in\" INTEGER NOT NULL,\"batch_id_out\" INTEGER,PRIMARY KEY (\"id\", \"name\", \"batch_id_in\"))"; + public static String expectedMainTableWithMultiPartitionsWithoutDigestCreateQuery = "CREATE TABLE IF NOT EXISTS \"mydb\".\"main\"(" + + "\"id\" INTEGER NOT NULL,\"name\" VARCHAR NOT NULL,\"amount\" DOUBLE,\"account_type\" INTEGER,\"biz_date\" DATE," + + "\"batch_id_in\" INTEGER NOT NULL,\"batch_id_out\" INTEGER,PRIMARY KEY (\"id\", \"name\", \"batch_id_in\"))"; + public static String expectedMainTableBatchIdAndVersionBasedCreateQuery = "CREATE TABLE IF NOT EXISTS \"mydb\".\"main\"(" + "\"id\" INTEGER NOT NULL,\"name\" VARCHAR NOT NULL,\"amount\" DOUBLE,\"biz_date\" DATE,\"digest\" VARCHAR,\"version\" INTEGER," + "\"batch_id_in\" INTEGER NOT NULL,\"batch_id_out\" INTEGER,PRIMARY KEY (\"id\", \"name\", \"batch_id_in\"))"; @@ -87,6 +95,10 @@ public class AnsiTestArtifacts "(\"ID\" INTEGER NOT NULL,\"NAME\" VARCHAR NOT NULL,\"AMOUNT\" DOUBLE,\"ACCOUNT_TYPE\" INTEGER,\"BIZ_DATE\" DATE,\"DIGEST\" VARCHAR," + "\"BATCH_ID_IN\" INTEGER NOT NULL,\"BATCH_ID_OUT\" INTEGER,PRIMARY KEY (\"ID\", \"NAME\", \"BATCH_ID_IN\"))"; + public static String expectedMainTableMultiPartitionWithoutDigestCreateQueryWithUpperCase = "CREATE TABLE IF NOT EXISTS \"MYDB\".\"MAIN\"" + + "(\"ID\" INTEGER NOT NULL,\"NAME\" VARCHAR NOT NULL,\"AMOUNT\" DOUBLE,\"ACCOUNT_TYPE\" INTEGER,\"BIZ_DATE\" DATE," + + "\"BATCH_ID_IN\" INTEGER NOT NULL,\"BATCH_ID_OUT\" INTEGER,PRIMARY KEY (\"ID\", \"NAME\", \"BATCH_ID_IN\"))"; + public static String expectedMainTableTimeBasedCreateQuery = "CREATE TABLE IF NOT EXISTS \"mydb\".\"main\"(" + "\"id\" INTEGER NOT NULL,\"name\" VARCHAR NOT NULL,\"amount\" DOUBLE,\"biz_date\" DATE,\"digest\" VARCHAR," + "\"batch_time_in\" DATETIME NOT NULL,\"batch_time_out\" DATETIME,PRIMARY KEY (\"id\", \"name\", \"batch_time_in\"))"; diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/unitemporal/UnitemporalSnapshotBatchIdBasedTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/unitemporal/UnitemporalSnapshotBatchIdBasedTest.java index 7c8afbbe70d..dac16c8401d 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/unitemporal/UnitemporalSnapshotBatchIdBasedTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/unitemporal/UnitemporalSnapshotBatchIdBasedTest.java @@ -303,6 +303,134 @@ public void verifyUnitemporalSnapshotWithCleanStagingData(GeneratorResult operat Assertions.assertEquals(AnsiTestArtifacts.expectedStagingCleanupQuery, postActionsSql.get(0)); } + @Override + public void verifyUnitemporalSnapshotWithPartitionNoDedupNoVersionNoDigest(GeneratorResult operations) + { + List preActionsSql = operations.preActionsSql(); + List milestoningSql = operations.ingestSql(); + List metadataIngestSql = operations.metadataIngestSql(); + + String expectedMilestoneQuery = "UPDATE \"mydb\".\"main\" as sink SET sink.\"batch_id_out\" = " + + "(SELECT COALESCE(MAX(batch_metadata.\"table_batch_id\"),0)+1 FROM batch_metadata as batch_metadata WHERE " + + "UPPER(batch_metadata.\"table_name\") = 'MAIN')-1 WHERE (sink.\"batch_id_out\" = 999999999) AND " + + "(EXISTS (SELECT * FROM \"mydb\".\"staging\" as stage WHERE sink.\"biz_date\" = stage.\"biz_date\"))"; + + String expectedUpsertQuery = "INSERT INTO \"mydb\".\"main\" (\"id\", \"name\", \"amount\", \"biz_date\", \"batch_id_in\", \"batch_id_out\") " + + "(SELECT stage.\"id\",stage.\"name\",stage.\"amount\",stage.\"biz_date\"," + + "(SELECT COALESCE(MAX(batch_metadata.\"table_batch_id\"),0)+1 FROM batch_metadata as batch_metadata WHERE " + + "UPPER(batch_metadata.\"table_name\") = 'MAIN'),999999999 FROM \"mydb\".\"staging\" as stage)"; + + Assertions.assertEquals(AnsiTestArtifacts.expectedMainTableBatchIdBasedWithoutDigestCreateQuery, preActionsSql.get(0)); + Assertions.assertEquals(getExpectedMetadataTableCreateQuery(), preActionsSql.get(1)); + + Assertions.assertEquals(expectedMilestoneQuery, milestoningSql.get(0)); + Assertions.assertEquals(expectedUpsertQuery, milestoningSql.get(1)); + Assertions.assertEquals(getExpectedMetadataTableIngestQuery(), metadataIngestSql.get(0)); + verifyStats(operations, incomingRecordCount, rowsUpdated, rowsDeleted, rowsInserted, rowsTerminated); + } + + @Override + public void verifyUnitemporalSnapshotWithPartitionFiltersNoDedupNoVersionNoDigest(GeneratorResult operations) + { + List preActionsSql = operations.preActionsSql(); + List milestoningSql = operations.ingestSql(); + List metadataIngestSql = operations.metadataIngestSql(); + + String expectedMilestoneQuery = "UPDATE \"mydb\".\"main\" as sink " + + "SET sink.\"batch_id_out\" = (SELECT COALESCE(MAX(batch_metadata.\"table_batch_id\"),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.\"table_name\") = 'MAIN')-1 " + + "WHERE (sink.\"batch_id_out\" = 999999999) " + + "AND (sink.\"biz_date\" IN ('2000-01-01 00:00:00','2000-01-02 00:00:00'))"; + + String expectedUpsertQuery = "INSERT INTO \"mydb\".\"main\" " + + "(\"id\", \"name\", \"amount\", \"biz_date\", \"batch_id_in\", \"batch_id_out\") " + + "(SELECT stage.\"id\",stage.\"name\",stage.\"amount\",stage.\"biz_date\"," + + "(SELECT COALESCE(MAX(batch_metadata.\"table_batch_id\"),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.\"table_name\") = 'MAIN'),999999999 " + + "FROM \"mydb\".\"staging\" as stage)"; + + Assertions.assertEquals(AnsiTestArtifacts.expectedMainTableBatchIdBasedWithoutDigestCreateQuery, preActionsSql.get(0)); + Assertions.assertEquals(getExpectedMetadataTableCreateQuery(), preActionsSql.get(1)); + + Assertions.assertEquals(expectedMilestoneQuery, milestoningSql.get(0)); + Assertions.assertEquals(expectedUpsertQuery, milestoningSql.get(1)); + Assertions.assertEquals(getExpectedMetadataTableIngestQuery(), metadataIngestSql.get(0)); + } + + @Override + public void verifyUnitemporalSnapshotWithPartitionSpecListNoDedupNoVersionNoDigest(GeneratorResult operations) + { + List preActionsSql = operations.preActionsSql(); + List milestoningSql = operations.ingestSql(); + List metadataIngestSql = operations.metadataIngestSql(); + + String expectedMilestoneQuery = "UPDATE \"mydb\".\"main\" as sink SET " + + "sink.\"batch_id_out\" = (SELECT COALESCE(MAX(batch_metadata.\"table_batch_id\"),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.\"table_name\") = 'MAIN')-1 " + + "WHERE (sink.\"batch_id_out\" = 999999999) " + + "AND (((sink.\"biz_date\" = '2024-01-01') AND (sink.\"account_type\" = 1)) " + + "OR ((sink.\"biz_date\" = '2024-01-02') AND (sink.\"account_type\" = 1)) " + + "OR ((sink.\"biz_date\" = '2024-01-02') AND (sink.\"account_type\" = 2)))"; + + String expectedUpsertQuery = "INSERT INTO \"mydb\".\"main\" " + + "(\"id\", \"name\", \"amount\", \"account_type\", \"biz_date\", \"batch_id_in\", \"batch_id_out\") " + + "(SELECT stage.\"id\",stage.\"name\",stage.\"amount\",stage.\"account_type\",stage.\"biz_date\"," + + "(SELECT COALESCE(MAX(batch_metadata.\"table_batch_id\"),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.\"table_name\") = 'MAIN'),999999999 " + + "FROM \"mydb\".\"staging\" as stage)"; + + Assertions.assertEquals(AnsiTestArtifacts.expectedMainTableWithMultiPartitionsWithoutDigestCreateQuery, preActionsSql.get(0)); + Assertions.assertEquals(getExpectedMetadataTableCreateQuery(), preActionsSql.get(1)); + + Assertions.assertEquals(expectedMilestoneQuery, milestoningSql.get(0)); + Assertions.assertEquals(expectedUpsertQuery, milestoningSql.get(1)); + Assertions.assertEquals(getExpectedMetadataTableIngestQuery(), metadataIngestSql.get(0)); + } + + @Override + public void verifyUnitemporalSnapshotWithPartitionSpecListNoDedupNoVersionInUpperCaseNoDigest(GeneratorResult operations) + { + List preActionsSql = operations.preActionsSql(); + List milestoningSql = operations.ingestSql(); + List metadataIngestSql = operations.metadataIngestSql(); + + String expectedMilestoneQuery = "UPDATE \"MYDB\".\"MAIN\" as sink " + + "SET sink.\"BATCH_ID_OUT\" = (SELECT COALESCE(MAX(BATCH_METADATA.\"TABLE_BATCH_ID\"),0)+1 FROM BATCH_METADATA as BATCH_METADATA WHERE UPPER(BATCH_METADATA.\"TABLE_NAME\") = 'MAIN')-1 " + + "WHERE (sink.\"BATCH_ID_OUT\" = 999999999) " + + "AND (((sink.\"ACCOUNT_TYPE\" = 1) AND (sink.\"BIZ_DATE\" = '2024-01-01')) " + + "OR ((sink.\"ACCOUNT_TYPE\" = 1) AND (sink.\"BIZ_DATE\" = '2024-01-02')) " + + "OR ((sink.\"ACCOUNT_TYPE\" = 2) AND (sink.\"BIZ_DATE\" = '2024-01-02')))"; + + String expectedUpsertQuery = "INSERT INTO \"MYDB\".\"MAIN\" " + + "(\"ID\", \"NAME\", \"AMOUNT\", \"ACCOUNT_TYPE\", \"BIZ_DATE\", \"BATCH_ID_IN\", \"BATCH_ID_OUT\") " + + "(SELECT stage.\"ID\",stage.\"NAME\",stage.\"AMOUNT\",stage.\"ACCOUNT_TYPE\",stage.\"BIZ_DATE\"," + + "(SELECT COALESCE(MAX(BATCH_METADATA.\"TABLE_BATCH_ID\"),0)+1 FROM BATCH_METADATA as BATCH_METADATA WHERE UPPER(BATCH_METADATA.\"TABLE_NAME\") = 'MAIN')," + + "999999999 FROM \"MYDB\".\"STAGING\" as stage)"; + + Assertions.assertEquals(AnsiTestArtifacts.expectedMainTableMultiPartitionWithoutDigestCreateQueryWithUpperCase, preActionsSql.get(0)); + Assertions.assertEquals(getExpectedMetadataTableCreateQueryWithUpperCase(), preActionsSql.get(1)); + Assertions.assertEquals(expectedMilestoneQuery, milestoningSql.get(0)); + Assertions.assertEquals(expectedUpsertQuery, milestoningSql.get(1)); + Assertions.assertEquals(getExpectedMetadataTableIngestQueryWithUpperCase(), metadataIngestSql.get(0)); + } + + @Override + public void verifyUnitemporalSnapshotWithPartitionSpecListWithEmptyBatchHandlingNoDigest(GeneratorResult operations) + { + List preActionsSql = operations.preActionsSql(); + List milestoningSql = operations.ingestSql(); + List metadataIngestSql = operations.metadataIngestSql(); + + String expectedMilestoneQuery = "UPDATE \"mydb\".\"main\" as sink SET " + + "sink.\"batch_id_out\" = (SELECT COALESCE(MAX(batch_metadata.\"table_batch_id\"),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.\"table_name\") = 'MAIN')-1 " + + "WHERE (sink.\"batch_id_out\" = 999999999) AND " + + "(((sink.\"biz_date\" = '2024-01-01') AND (sink.\"account_type\" = 1)) " + + "OR ((sink.\"biz_date\" = '2024-01-02') AND (sink.\"account_type\" = 1)) " + + "OR ((sink.\"biz_date\" = '2024-01-02') AND (sink.\"account_type\" = 2)))"; + + Assertions.assertEquals(AnsiTestArtifacts.expectedMainTableWithMultiPartitionsWithoutDigestCreateQuery, preActionsSql.get(0)); + Assertions.assertEquals(getExpectedMetadataTableCreateQuery(), preActionsSql.get(1)); + + Assertions.assertEquals(expectedMilestoneQuery, milestoningSql.get(0)); + Assertions.assertEquals(getExpectedMetadataTableIngestQuery(), metadataIngestSql.get(0)); + } + @Override public RelationalSink getRelationalSink() { diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/BigQueryTestArtifacts.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/BigQueryTestArtifacts.java index bfa026f8d97..6eee71421ec 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/BigQueryTestArtifacts.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/BigQueryTestArtifacts.java @@ -201,14 +201,26 @@ public class BigQueryTestArtifacts "`id` INT64 NOT NULL,`name` STRING NOT NULL,`amount` FLOAT64,`biz_date` DATE,`digest` STRING," + "`batch_id_in` INT64 NOT NULL,`batch_id_out` INT64,PRIMARY KEY (`id`, `name`, `batch_id_in`) NOT ENFORCED)"; + public static String expectedMainTableBatchIdWithoutDigestBasedCreateQuery = "CREATE TABLE IF NOT EXISTS `mydb`.`main`(" + + "`id` INT64 NOT NULL,`name` STRING NOT NULL,`amount` FLOAT64,`biz_date` DATE," + + "`batch_id_in` INT64 NOT NULL,`batch_id_out` INT64,PRIMARY KEY (`id`, `name`, `batch_id_in`) NOT ENFORCED)"; + public static String expectedMainTableWithMultiPartitionCreateQuery = "CREATE TABLE IF NOT EXISTS `mydb`.`main`(" + "`id` INT64 NOT NULL,`name` STRING NOT NULL,`amount` FLOAT64,`account_type` INT64,`biz_date` DATE,`digest` STRING," + "`batch_id_in` INT64 NOT NULL,`batch_id_out` INT64,PRIMARY KEY (`id`, `name`, `batch_id_in`) NOT ENFORCED)"; + public static String expectedMainTableWithMultiPartitionWithoutDigestCreateQuery = "CREATE TABLE IF NOT EXISTS `mydb`.`main`(" + + "`id` INT64 NOT NULL,`name` STRING NOT NULL,`amount` FLOAT64,`account_type` INT64,`biz_date` DATE," + + "`batch_id_in` INT64 NOT NULL,`batch_id_out` INT64,PRIMARY KEY (`id`, `name`, `batch_id_in`) NOT ENFORCED)"; + public static String expectedMainTableWithMultiPartitionCreateQueryUpperCase = "CREATE TABLE IF NOT EXISTS `MYDB`.`MAIN`" + "(`ID` INT64 NOT NULL,`NAME` STRING NOT NULL,`AMOUNT` FLOAT64,`ACCOUNT_TYPE` INT64,`BIZ_DATE` DATE,`DIGEST` STRING," + "`BATCH_ID_IN` INT64 NOT NULL,`BATCH_ID_OUT` INT64,PRIMARY KEY (`ID`, `NAME`, `BATCH_ID_IN`) NOT ENFORCED)"; + public static String expectedMainTableWithMultiPartitionWithoutDigestCreateQueryUpperCase = "CREATE TABLE IF NOT EXISTS `MYDB`.`MAIN`" + + "(`ID` INT64 NOT NULL,`NAME` STRING NOT NULL,`AMOUNT` FLOAT64,`ACCOUNT_TYPE` INT64,`BIZ_DATE` DATE," + + "`BATCH_ID_IN` INT64 NOT NULL,`BATCH_ID_OUT` INT64,PRIMARY KEY (`ID`, `NAME`, `BATCH_ID_IN`) NOT ENFORCED)"; + public static String expectedMetadataTableCreateQuery = "CREATE TABLE IF NOT EXISTS batch_metadata" + "(`table_name` STRING(255)," + "`batch_start_ts_utc` DATETIME," + diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/UnitemporalSnapshotBatchIdBasedTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/UnitemporalSnapshotBatchIdBasedTest.java index 52ea588eea9..ddb9dcb23ff 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/UnitemporalSnapshotBatchIdBasedTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/UnitemporalSnapshotBatchIdBasedTest.java @@ -272,6 +272,130 @@ public void verifyUnitemporalSnapshotWithCleanStagingData(GeneratorResult operat Assertions.assertEquals(BigQueryTestArtifacts.expectedStagingCleanupQuery, postActionsSql.get(0)); } + @Override + public void verifyUnitemporalSnapshotWithPartitionNoDedupNoVersionNoDigest(GeneratorResult operations) + { + List preActionsSql = operations.preActionsSql(); + List milestoningSql = operations.ingestSql(); + List metadataIngestSql = operations.metadataIngestSql(); + + String expectedMilestoneQuery = "UPDATE `mydb`.`main` as sink " + + "SET sink.`batch_id_out` = (SELECT COALESCE(MAX(batch_metadata.`table_batch_id`),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.`table_name`) = 'MAIN')-1 " + + "WHERE (sink.`batch_id_out` = 999999999) " + + "AND (EXISTS (SELECT * FROM `mydb`.`staging` as stage WHERE sink.`biz_date` = stage.`biz_date`))"; + + String expectedUpsertQuery = "INSERT INTO `mydb`.`main` " + + "(`id`, `name`, `amount`, `biz_date`, `batch_id_in`, `batch_id_out`) " + + "(SELECT stage.`id`,stage.`name`,stage.`amount`,stage.`biz_date`," + + "(SELECT COALESCE(MAX(batch_metadata.`table_batch_id`),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.`table_name`) = 'MAIN'),999999999 " + + "FROM `mydb`.`staging` as stage)"; + + Assertions.assertEquals(BigQueryTestArtifacts.expectedMainTableBatchIdWithoutDigestBasedCreateQuery, preActionsSql.get(0)); + Assertions.assertEquals(BigQueryTestArtifacts.expectedMetadataTableCreateQuery, preActionsSql.get(1)); + + Assertions.assertEquals(expectedMilestoneQuery, milestoningSql.get(0)); + Assertions.assertEquals(expectedUpsertQuery, milestoningSql.get(1)); + Assertions.assertEquals(getExpectedMetadataTableIngestQuery(), metadataIngestSql.get(0)); + verifyStats(operations, incomingRecordCount, rowsUpdated, rowsDeleted, rowsInserted, rowsTerminated); + } + + @Override + public void verifyUnitemporalSnapshotWithPartitionFiltersNoDedupNoVersionNoDigest(GeneratorResult operations) + { + List preActionsSql = operations.preActionsSql(); + List milestoningSql = operations.ingestSql(); + List metadataIngestSql = operations.metadataIngestSql(); + + String expectedMilestoneQuery = "UPDATE `mydb`.`main` as sink " + + "SET sink.`batch_id_out` = (SELECT COALESCE(MAX(batch_metadata.`table_batch_id`),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.`table_name`) = 'MAIN')-1 " + + "WHERE (sink.`batch_id_out` = 999999999) " + + "AND (sink.`biz_date` IN ('2000-01-01 00:00:00','2000-01-02 00:00:00'))"; + + String expectedUpsertQuery = "INSERT INTO `mydb`.`main` " + + "(`id`, `name`, `amount`, `biz_date`, `batch_id_in`, `batch_id_out`) " + + "(SELECT stage.`id`,stage.`name`,stage.`amount`,stage.`biz_date`," + + "(SELECT COALESCE(MAX(batch_metadata.`table_batch_id`),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.`table_name`) = 'MAIN'),999999999 " + + "FROM `mydb`.`staging` as stage)"; + Assertions.assertEquals(BigQueryTestArtifacts.expectedMainTableBatchIdWithoutDigestBasedCreateQuery, preActionsSql.get(0)); + Assertions.assertEquals(BigQueryTestArtifacts.expectedMetadataTableCreateQuery, preActionsSql.get(1)); + + Assertions.assertEquals(expectedMilestoneQuery, milestoningSql.get(0)); + Assertions.assertEquals(expectedUpsertQuery, milestoningSql.get(1)); + Assertions.assertEquals(getExpectedMetadataTableIngestQuery(), metadataIngestSql.get(0)); + } + + @Override + public void verifyUnitemporalSnapshotWithPartitionSpecListNoDedupNoVersionNoDigest(GeneratorResult operations) + { + List preActionsSql = operations.preActionsSql(); + List milestoningSql = operations.ingestSql(); + List metadataIngestSql = operations.metadataIngestSql(); + + String expectedMilestoneQuery = "UPDATE `mydb`.`main` as sink SET " + + "sink.`batch_id_out` = (SELECT COALESCE(MAX(batch_metadata.`table_batch_id`),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.`table_name`) = 'MAIN')-1 " + + "WHERE (sink.`batch_id_out` = 999999999) AND " + + "(((sink.`biz_date` = '2024-01-01') AND (sink.`account_type` = 1)) " + + "OR ((sink.`biz_date` = '2024-01-02') AND (sink.`account_type` = 1)) " + + "OR ((sink.`biz_date` = '2024-01-02') AND (sink.`account_type` = 2)))"; + + String expectedUpsertQuery = "INSERT INTO `mydb`.`main` " + + "(`id`, `name`, `amount`, `account_type`, `biz_date`, `batch_id_in`, `batch_id_out`) " + + "(SELECT stage.`id`,stage.`name`,stage.`amount`,stage.`account_type`,stage.`biz_date`," + + "(SELECT COALESCE(MAX(batch_metadata.`table_batch_id`),0)+1 FROM batch_metadata as batch_metadata " + + "WHERE UPPER(batch_metadata.`table_name`) = 'MAIN'),999999999 " + + "FROM `mydb`.`staging` as stage)"; + + Assertions.assertEquals(BigQueryTestArtifacts.expectedMainTableWithMultiPartitionWithoutDigestCreateQuery, preActionsSql.get(0)); + Assertions.assertEquals(BigQueryTestArtifacts.expectedMetadataTableCreateQuery, preActionsSql.get(1)); + + Assertions.assertEquals(expectedMilestoneQuery, milestoningSql.get(0)); + Assertions.assertEquals(expectedUpsertQuery, milestoningSql.get(1)); + Assertions.assertEquals(getExpectedMetadataTableIngestQuery(), metadataIngestSql.get(0)); + } + + @Override + public void verifyUnitemporalSnapshotWithPartitionSpecListNoDedupNoVersionInUpperCaseNoDigest(GeneratorResult operations) + { + List preActionsSql = operations.preActionsSql(); + List milestoningSql = operations.ingestSql(); + List metadataIngestSql = operations.metadataIngestSql(); + + String expectedMilestoneQuery = "UPDATE `MYDB`.`MAIN` as sink SET " + + "sink.`BATCH_ID_OUT` = (SELECT COALESCE(MAX(BATCH_METADATA.`TABLE_BATCH_ID`),0)+1 FROM BATCH_METADATA as BATCH_METADATA WHERE UPPER(BATCH_METADATA.`TABLE_NAME`) = 'MAIN')-1 " + + "WHERE (sink.`BATCH_ID_OUT` = 999999999) " + + "AND (((sink.`ACCOUNT_TYPE` = 1) AND (sink.`BIZ_DATE` = '2024-01-01')) " + + "OR ((sink.`ACCOUNT_TYPE` = 1) AND (sink.`BIZ_DATE` = '2024-01-02')) " + + "OR ((sink.`ACCOUNT_TYPE` = 2) AND (sink.`BIZ_DATE` = '2024-01-02')))"; + + String expectedUpsertQuery = "INSERT INTO `MYDB`.`MAIN` " + + "(`ID`, `NAME`, `AMOUNT`, `ACCOUNT_TYPE`, `BIZ_DATE`, `BATCH_ID_IN`, `BATCH_ID_OUT`) " + + "(SELECT stage.`ID`,stage.`NAME`,stage.`AMOUNT`,stage.`ACCOUNT_TYPE`,stage.`BIZ_DATE`," + + "(SELECT COALESCE(MAX(BATCH_METADATA.`TABLE_BATCH_ID`),0)+1 FROM BATCH_METADATA as BATCH_METADATA WHERE UPPER(BATCH_METADATA.`TABLE_NAME`) = 'MAIN')," + + "999999999 FROM `MYDB`.`STAGING` as stage)"; + + Assertions.assertEquals(BigQueryTestArtifacts.expectedMainTableWithMultiPartitionWithoutDigestCreateQueryUpperCase, preActionsSql.get(0)); + Assertions.assertEquals(BigQueryTestArtifacts.expectedMetadataTableCreateQueryWithUpperCase, preActionsSql.get(1)); + Assertions.assertEquals(expectedMilestoneQuery, milestoningSql.get(0)); + Assertions.assertEquals(expectedUpsertQuery, milestoningSql.get(1)); + Assertions.assertEquals(getExpectedMetadataTableIngestQueryWithUpperCase(), metadataIngestSql.get(0)); + } + + @Override + public void verifyUnitemporalSnapshotWithPartitionSpecListWithEmptyBatchHandlingNoDigest(GeneratorResult operations) + { + List preActionsSql = operations.preActionsSql(); + List milestoningSql = operations.ingestSql(); + List metadataIngestSql = operations.metadataIngestSql(); + + String expectedMilestoneQuery = "UPDATE `mydb`.`main` as sink SET sink.`batch_id_out` = (SELECT COALESCE(MAX(batch_metadata.`table_batch_id`),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.`table_name`) = 'MAIN')-1 WHERE (sink.`batch_id_out` = 999999999) AND (((sink.`biz_date` = '2024-01-01') AND (sink.`account_type` = 1)) OR ((sink.`biz_date` = '2024-01-02') AND (sink.`account_type` = 1)) OR ((sink.`biz_date` = '2024-01-02') AND (sink.`account_type` = 2)))"; + + Assertions.assertEquals(BigQueryTestArtifacts.expectedMainTableWithMultiPartitionWithoutDigestCreateQuery, preActionsSql.get(0)); + Assertions.assertEquals(BigQueryTestArtifacts.expectedMetadataTableCreateQuery, preActionsSql.get(1)); + + Assertions.assertEquals(expectedMilestoneQuery, milestoningSql.get(0)); + Assertions.assertEquals(getExpectedMetadataTableIngestQuery(), metadataIngestSql.get(0)); + } + @Override public RelationalSink getRelationalSink() { diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-memsql/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/MemsqlTestArtifacts.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-memsql/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/MemsqlTestArtifacts.java index 94a3cae44a9..a7fa539f275 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-memsql/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/MemsqlTestArtifacts.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-memsql/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/MemsqlTestArtifacts.java @@ -192,10 +192,18 @@ public class MemsqlTestArtifacts "`id` INTEGER NOT NULL,`name` VARCHAR(256) NOT NULL,`amount` DOUBLE,`biz_date` DATE,`digest` VARCHAR(256)," + "`batch_id_in` INTEGER NOT NULL,`batch_id_out` INTEGER,PRIMARY KEY (`id`, `name`, `batch_id_in`))"; + public static String expectedMainTableBatchIdWithoutDigestBasedCreateQuery = "CREATE TABLE IF NOT EXISTS `mydb`.`main`(" + + "`id` INTEGER NOT NULL,`name` VARCHAR(256) NOT NULL,`amount` DOUBLE,`biz_date` DATE," + + "`batch_id_in` INTEGER NOT NULL,`batch_id_out` INTEGER,PRIMARY KEY (`id`, `name`, `batch_id_in`))"; + public static String expectedMainTableMultiPartitionKeysCreateQuery = "CREATE TABLE IF NOT EXISTS `mydb`.`main`(" + "`id` INTEGER NOT NULL,`name` VARCHAR(256) NOT NULL,`amount` DOUBLE,`account_type` INTEGER,`biz_date` DATE,`digest` VARCHAR(256)," + "`batch_id_in` INTEGER NOT NULL,`batch_id_out` INTEGER,PRIMARY KEY (`id`, `name`, `batch_id_in`))"; + public static String expectedMainTableMultiPartitionKeysWithoutDigestCreateQuery = "CREATE TABLE IF NOT EXISTS `mydb`.`main`(" + + "`id` INTEGER NOT NULL,`name` VARCHAR(256) NOT NULL,`amount` DOUBLE,`account_type` INTEGER,`biz_date` DATE," + + "`batch_id_in` INTEGER NOT NULL,`batch_id_out` INTEGER,PRIMARY KEY (`id`, `name`, `batch_id_in`))"; + public static String expectedMetadataTableCreateQuery = "CREATE TABLE IF NOT EXISTS batch_metadata" + "(`table_name` VARCHAR(255)," + "`batch_start_ts_utc` DATETIME," + @@ -226,6 +234,10 @@ public class MemsqlTestArtifacts "(`ID` INTEGER NOT NULL,`NAME` VARCHAR(256) NOT NULL,`AMOUNT` DOUBLE,`ACCOUNT_TYPE` INTEGER,`BIZ_DATE` DATE,`DIGEST` VARCHAR(256)," + "`BATCH_ID_IN` INTEGER NOT NULL,`BATCH_ID_OUT` INTEGER,PRIMARY KEY (`ID`, `NAME`, `BATCH_ID_IN`))"; + public static String expectedMainTableMultiPartitionsWithoutDigestCreateQueryWithUpperCase = "CREATE TABLE IF NOT EXISTS `MYDB`.`MAIN`" + + "(`ID` INTEGER NOT NULL,`NAME` VARCHAR(256) NOT NULL,`AMOUNT` DOUBLE,`ACCOUNT_TYPE` INTEGER,`BIZ_DATE` DATE," + + "`BATCH_ID_IN` INTEGER NOT NULL,`BATCH_ID_OUT` INTEGER,PRIMARY KEY (`ID`, `NAME`, `BATCH_ID_IN`))"; + public static String expectedMetadataTableIngestQuery = "INSERT INTO batch_metadata (`table_name`, `table_batch_id`, `batch_start_ts_utc`, `batch_end_ts_utc`, `batch_status`)" + " (SELECT 'main',(SELECT COALESCE(MAX(batch_metadata.`table_batch_id`),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.`table_name`) = 'MAIN'),'2000-01-01 00:00:00.000000',CURRENT_TIMESTAMP(),'DONE')"; diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-memsql/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/UnitemporalSnapshotBatchIdBasedTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-memsql/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/UnitemporalSnapshotBatchIdBasedTest.java index efa5a82352a..2e003c57992 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-memsql/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/UnitemporalSnapshotBatchIdBasedTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-memsql/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/UnitemporalSnapshotBatchIdBasedTest.java @@ -277,6 +277,135 @@ public void verifyUnitemporalSnapshotWithCleanStagingData(GeneratorResult operat Assertions.assertEquals(MemsqlTestArtifacts.expectedStagingCleanupQuery, postActionsSql.get(0)); } + @Override + public void verifyUnitemporalSnapshotWithPartitionNoDedupNoVersionNoDigest(GeneratorResult operations) + { + List preActionsSql = operations.preActionsSql(); + List milestoningSql = operations.ingestSql(); + List metadataIngestSql = operations.metadataIngestSql(); + + String expectedMilestoneQuery = "UPDATE `mydb`.`main` as sink " + + "SET sink.`batch_id_out` = (SELECT COALESCE(MAX(batch_metadata.`table_batch_id`),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.`table_name`) = 'MAIN')-1 " + + "WHERE (sink.`batch_id_out` = 999999999) " + + "AND (EXISTS (SELECT * FROM `mydb`.`staging` as stage WHERE sink.`biz_date` = stage.`biz_date`))"; + + String expectedUpsertQuery = "INSERT INTO `mydb`.`main` " + + "(`id`, `name`, `amount`, `biz_date`, `batch_id_in`, `batch_id_out`) " + + "(SELECT stage.`id`,stage.`name`,stage.`amount`,stage.`biz_date`," + + "(SELECT COALESCE(MAX(batch_metadata.`table_batch_id`),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.`table_name`) = 'MAIN'),999999999 " + + "FROM `mydb`.`staging` as stage)"; + + Assertions.assertEquals(MemsqlTestArtifacts.expectedMainTableBatchIdWithoutDigestBasedCreateQuery, preActionsSql.get(0)); + Assertions.assertEquals(MemsqlTestArtifacts.expectedMetadataTableCreateQuery, preActionsSql.get(1)); + + Assertions.assertEquals(expectedMilestoneQuery, milestoningSql.get(0)); + Assertions.assertEquals(expectedUpsertQuery, milestoningSql.get(1)); + Assertions.assertEquals(getExpectedMetadataTableIngestQuery(), metadataIngestSql.get(0)); + verifyStats(operations, incomingRecordCount, rowsUpdated, rowsDeleted, rowsInserted, rowsTerminated); + } + + @Override + public void verifyUnitemporalSnapshotWithPartitionFiltersNoDedupNoVersionNoDigest(GeneratorResult operations) + { + List preActionsSql = operations.preActionsSql(); + List milestoningSql = operations.ingestSql(); + List metadataIngestSql = operations.metadataIngestSql(); + + String expectedMilestoneQuery = "UPDATE `mydb`.`main` as sink " + + "SET sink.`batch_id_out` = (SELECT COALESCE(MAX(batch_metadata.`table_batch_id`),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.`table_name`) = 'MAIN')-1 " + + "WHERE (sink.`batch_id_out` = 999999999) " + + "AND (sink.`biz_date` IN ('2000-01-01 00:00:00','2000-01-02 00:00:00'))"; + + String expectedUpsertQuery = "INSERT INTO `mydb`.`main` " + + "(`id`, `name`, `amount`, `biz_date`, `batch_id_in`, `batch_id_out`) " + + "(SELECT stage.`id`,stage.`name`,stage.`amount`,stage.`biz_date`," + + "(SELECT COALESCE(MAX(batch_metadata.`table_batch_id`),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.`table_name`) = 'MAIN'),999999999 " + + "FROM `mydb`.`staging` as stage)"; + + Assertions.assertEquals(MemsqlTestArtifacts.expectedMainTableBatchIdWithoutDigestBasedCreateQuery, preActionsSql.get(0)); + Assertions.assertEquals(MemsqlTestArtifacts.expectedMetadataTableCreateQuery, preActionsSql.get(1)); + + Assertions.assertEquals(expectedMilestoneQuery, milestoningSql.get(0)); + Assertions.assertEquals(expectedUpsertQuery, milestoningSql.get(1)); + Assertions.assertEquals(getExpectedMetadataTableIngestQuery(), metadataIngestSql.get(0)); + } + + @Override + public void verifyUnitemporalSnapshotWithPartitionSpecListNoDedupNoVersionNoDigest(GeneratorResult operations) + { + List preActionsSql = operations.preActionsSql(); + List milestoningSql = operations.ingestSql(); + List metadataIngestSql = operations.metadataIngestSql(); + + String expectedMilestoneQuery = "UPDATE `mydb`.`main` as sink SET " + + "sink.`batch_id_out` = (SELECT COALESCE(MAX(batch_metadata.`table_batch_id`),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.`table_name`) = 'MAIN')-1 " + + "WHERE (sink.`batch_id_out` = 999999999) " + + "AND (((sink.`biz_date` = '2024-01-01') AND (sink.`account_type` = 1)) " + + "OR ((sink.`biz_date` = '2024-01-02') AND (sink.`account_type` = 1)) " + + "OR ((sink.`biz_date` = '2024-01-02') AND (sink.`account_type` = 2)))"; + + String expectedUpsertQuery = "INSERT INTO `mydb`.`main` " + + "(`id`, `name`, `amount`, `account_type`, `biz_date`, `batch_id_in`, `batch_id_out`) " + + "(SELECT stage.`id`,stage.`name`,stage.`amount`,stage.`account_type`,stage.`biz_date`," + + "(SELECT COALESCE(MAX(batch_metadata.`table_batch_id`),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.`table_name`) = 'MAIN'),999999999 " + + "FROM `mydb`.`staging` as stage)"; + + Assertions.assertEquals(MemsqlTestArtifacts.expectedMainTableMultiPartitionKeysWithoutDigestCreateQuery, preActionsSql.get(0)); + Assertions.assertEquals(MemsqlTestArtifacts.expectedMetadataTableCreateQuery, preActionsSql.get(1)); + + Assertions.assertEquals(expectedMilestoneQuery, milestoningSql.get(0)); + Assertions.assertEquals(expectedUpsertQuery, milestoningSql.get(1)); + Assertions.assertEquals(getExpectedMetadataTableIngestQuery(), metadataIngestSql.get(0)); + } + + @Override + public void verifyUnitemporalSnapshotWithPartitionSpecListNoDedupNoVersionInUpperCaseNoDigest(GeneratorResult operations) + { + List preActionsSql = operations.preActionsSql(); + List milestoningSql = operations.ingestSql(); + List metadataIngestSql = operations.metadataIngestSql(); + + String expectedMilestoneQuery = "UPDATE `MYDB`.`MAIN` as sink SET " + + "sink.`BATCH_ID_OUT` = (SELECT COALESCE(MAX(BATCH_METADATA.`TABLE_BATCH_ID`),0)+1 FROM BATCH_METADATA as BATCH_METADATA WHERE UPPER(BATCH_METADATA.`TABLE_NAME`) = 'MAIN')-1 " + + "WHERE (sink.`BATCH_ID_OUT` = 999999999) " + + "AND (((sink.`ACCOUNT_TYPE` = 1) AND (sink.`BIZ_DATE` = '2024-01-01')) " + + "OR ((sink.`ACCOUNT_TYPE` = 1) AND (sink.`BIZ_DATE` = '2024-01-02')) " + + "OR ((sink.`ACCOUNT_TYPE` = 2) AND (sink.`BIZ_DATE` = '2024-01-02')))"; + + String expectedUpsertQuery = "INSERT INTO `MYDB`.`MAIN` " + + "(`ID`, `NAME`, `AMOUNT`, `ACCOUNT_TYPE`, `BIZ_DATE`, `BATCH_ID_IN`, `BATCH_ID_OUT`) " + + "(SELECT stage.`ID`,stage.`NAME`,stage.`AMOUNT`,stage.`ACCOUNT_TYPE`,stage.`BIZ_DATE`," + + "(SELECT COALESCE(MAX(BATCH_METADATA.`TABLE_BATCH_ID`),0)+1 FROM BATCH_METADATA as BATCH_METADATA WHERE UPPER(BATCH_METADATA.`TABLE_NAME`) = 'MAIN')," + + "999999999 FROM `MYDB`.`STAGING` as stage)"; + + Assertions.assertEquals(MemsqlTestArtifacts.expectedMainTableMultiPartitionsWithoutDigestCreateQueryWithUpperCase, preActionsSql.get(0)); + Assertions.assertEquals(MemsqlTestArtifacts.expectedMetadataTableCreateQueryWithUpperCase, preActionsSql.get(1)); + Assertions.assertEquals(expectedMilestoneQuery, milestoningSql.get(0)); + Assertions.assertEquals(expectedUpsertQuery, milestoningSql.get(1)); + Assertions.assertEquals(getExpectedMetadataTableIngestQueryWithUpperCase(), metadataIngestSql.get(0)); + } + + @Override + public void verifyUnitemporalSnapshotWithPartitionSpecListWithEmptyBatchHandlingNoDigest(GeneratorResult operations) + { + List preActionsSql = operations.preActionsSql(); + List milestoningSql = operations.ingestSql(); + List metadataIngestSql = operations.metadataIngestSql(); + + String expectedMilestoneQuery = "UPDATE `mydb`.`main` as sink SET " + + "sink.`batch_id_out` = (SELECT COALESCE(MAX(batch_metadata.`table_batch_id`),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.`table_name`) = 'MAIN')-1 " + + "WHERE (sink.`batch_id_out` = 999999999) AND " + + "(((sink.`biz_date` = '2024-01-01') AND (sink.`account_type` = 1)) " + + "OR ((sink.`biz_date` = '2024-01-02') AND (sink.`account_type` = 1)) " + + "OR ((sink.`biz_date` = '2024-01-02') AND (sink.`account_type` = 2)))"; + + Assertions.assertEquals(MemsqlTestArtifacts.expectedMainTableMultiPartitionKeysWithoutDigestCreateQuery, preActionsSql.get(0)); + Assertions.assertEquals(MemsqlTestArtifacts.expectedMetadataTableCreateQuery, preActionsSql.get(1)); + + Assertions.assertEquals(expectedMilestoneQuery, milestoningSql.get(0)); + Assertions.assertEquals(getExpectedMetadataTableIngestQuery(), metadataIngestSql.get(0)); + } + @Override public RelationalSink getRelationalSink() { diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-postgres/src/test/java/org/finos/legend/engine/persistence/components/PostgresTestArtifacts.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-postgres/src/test/java/org/finos/legend/engine/persistence/components/PostgresTestArtifacts.java index 77ccb40086c..701fd07549c 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-postgres/src/test/java/org/finos/legend/engine/persistence/components/PostgresTestArtifacts.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-postgres/src/test/java/org/finos/legend/engine/persistence/components/PostgresTestArtifacts.java @@ -67,10 +67,18 @@ public class PostgresTestArtifacts "\"id\" INTEGER NOT NULL,\"name\" VARCHAR NOT NULL,\"amount\" DOUBLE PRECISION,\"biz_date\" DATE,\"digest\" VARCHAR," + "\"batch_id_in\" INTEGER NOT NULL,\"batch_id_out\" INTEGER,PRIMARY KEY (\"id\", \"name\", \"batch_id_in\"))"; + public static String expectedMainTableBatchIdNoDigestBasedCreateQuery = "CREATE TABLE IF NOT EXISTS \"mydb\".\"main\"(" + + "\"id\" INTEGER NOT NULL,\"name\" VARCHAR NOT NULL,\"amount\" DOUBLE PRECISION,\"biz_date\" DATE," + + "\"batch_id_in\" INTEGER NOT NULL,\"batch_id_out\" INTEGER,PRIMARY KEY (\"id\", \"name\", \"batch_id_in\"))"; + public static String expectedMainTableMultiPartitionsCreateQuery = "CREATE TABLE IF NOT EXISTS \"mydb\".\"main\"(" + "\"id\" INTEGER NOT NULL,\"name\" VARCHAR NOT NULL,\"amount\" DOUBLE PRECISION,\"account_type\" INTEGER,\"biz_date\" DATE,\"digest\" VARCHAR," + "\"batch_id_in\" INTEGER NOT NULL,\"batch_id_out\" INTEGER,PRIMARY KEY (\"id\", \"name\", \"batch_id_in\"))"; + public static String expectedMainTableMultiPartitionsNoDigestCreateQuery = "CREATE TABLE IF NOT EXISTS \"mydb\".\"main\"(" + + "\"id\" INTEGER NOT NULL,\"name\" VARCHAR NOT NULL,\"amount\" DOUBLE PRECISION,\"account_type\" INTEGER,\"biz_date\" DATE," + + "\"batch_id_in\" INTEGER NOT NULL,\"batch_id_out\" INTEGER,PRIMARY KEY (\"id\", \"name\", \"batch_id_in\"))"; + public static String expectedMainTableMultiPartitionsCreateQueryUpperCase = "CREATE TABLE IF NOT EXISTS \"MYDB\".\"MAIN\"" + "(\"ID\" INTEGER NOT NULL,\"NAME\" VARCHAR NOT NULL,\"AMOUNT\" DOUBLE PRECISION,\"ACCOUNT_TYPE\" INTEGER,\"BIZ_DATE\" DATE," + "\"DIGEST\" VARCHAR,\"BATCH_ID_IN\" INTEGER NOT NULL,\"BATCH_ID_OUT\" INTEGER,PRIMARY KEY (\"ID\", \"NAME\", \"BATCH_ID_IN\"))"; diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-postgres/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/UnitemporalSnapshotBatchIdBasedTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-postgres/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/UnitemporalSnapshotBatchIdBasedTest.java index ce67091a24a..8d3cbbd06e6 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-postgres/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/UnitemporalSnapshotBatchIdBasedTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-postgres/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/UnitemporalSnapshotBatchIdBasedTest.java @@ -302,6 +302,135 @@ public void verifyUnitemporalSnapshotWithCleanStagingData(GeneratorResult operat Assertions.assertEquals(PostgresTestArtifacts.expectedStagingCleanupQuery, postActionsSql.get(0)); } + @Override + public void verifyUnitemporalSnapshotWithPartitionNoDedupNoVersionNoDigest(GeneratorResult operations) + { + List preActionsSql = operations.preActionsSql(); + List milestoningSql = operations.ingestSql(); + List metadataIngestSql = operations.metadataIngestSql(); + + String expectedMilestoneQuery = "UPDATE \"mydb\".\"main\" as sink " + + "SET \"batch_id_out\" = (SELECT COALESCE(MAX(batch_metadata.\"table_batch_id\"),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.\"table_name\") = 'MAIN')-1 " + + "WHERE (sink.\"batch_id_out\" = 999999999) " + + "AND (EXISTS (SELECT * FROM \"mydb\".\"staging\" as stage WHERE sink.\"biz_date\" = stage.\"biz_date\"))"; + + + String expectedUpsertQuery = "INSERT INTO \"mydb\".\"main\" " + + "(\"id\", \"name\", \"amount\", \"biz_date\", \"batch_id_in\", \"batch_id_out\") " + + "(SELECT stage.\"id\",stage.\"name\",stage.\"amount\",stage.\"biz_date\"," + + "(SELECT COALESCE(MAX(batch_metadata.\"table_batch_id\"),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.\"table_name\") = 'MAIN'),999999999 " + + "FROM \"mydb\".\"staging\" as stage)"; + + Assertions.assertEquals(PostgresTestArtifacts.expectedMainTableBatchIdNoDigestBasedCreateQuery, preActionsSql.get(0)); + Assertions.assertEquals(getExpectedMetadataTableCreateQuery(), preActionsSql.get(1)); + + Assertions.assertEquals(expectedMilestoneQuery, milestoningSql.get(0)); + Assertions.assertEquals(expectedUpsertQuery, milestoningSql.get(1)); + Assertions.assertEquals(getExpectedMetadataTableIngestQuery(), metadataIngestSql.get(0)); + verifyStats(operations, incomingRecordCount, rowsUpdated, rowsDeleted, rowsInserted, rowsTerminated); + } + + @Override + public void verifyUnitemporalSnapshotWithPartitionFiltersNoDedupNoVersionNoDigest(GeneratorResult operations) + { + List preActionsSql = operations.preActionsSql(); + List milestoningSql = operations.ingestSql(); + List metadataIngestSql = operations.metadataIngestSql(); + + String expectedMilestoneQuery = "UPDATE \"mydb\".\"main\" as sink " + + "SET \"batch_id_out\" = (SELECT COALESCE(MAX(batch_metadata.\"table_batch_id\"),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.\"table_name\") = 'MAIN')-1 " + + "WHERE (sink.\"batch_id_out\" = 999999999) " + + "AND (sink.\"biz_date\" IN ('2000-01-01 00:00:00','2000-01-02 00:00:00'))"; + + String expectedUpsertQuery = "INSERT INTO \"mydb\".\"main\" " + + "(\"id\", \"name\", \"amount\", \"biz_date\", \"batch_id_in\", \"batch_id_out\") " + + "(SELECT stage.\"id\",stage.\"name\",stage.\"amount\",stage.\"biz_date\"," + + "(SELECT COALESCE(MAX(batch_metadata.\"table_batch_id\"),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.\"table_name\") = 'MAIN'),999999999 " + + "FROM \"mydb\".\"staging\" as stage)"; + Assertions.assertEquals(PostgresTestArtifacts.expectedMainTableBatchIdNoDigestBasedCreateQuery, preActionsSql.get(0)); + Assertions.assertEquals(getExpectedMetadataTableCreateQuery(), preActionsSql.get(1)); + + Assertions.assertEquals(expectedMilestoneQuery, milestoningSql.get(0)); + Assertions.assertEquals(expectedUpsertQuery, milestoningSql.get(1)); + Assertions.assertEquals(getExpectedMetadataTableIngestQuery(), metadataIngestSql.get(0)); + } + + @Override + public void verifyUnitemporalSnapshotWithPartitionSpecListNoDedupNoVersionNoDigest(GeneratorResult operations) + { + List preActionsSql = operations.preActionsSql(); + List milestoningSql = operations.ingestSql(); + List metadataIngestSql = operations.metadataIngestSql(); + + String expectedMilestoneQuery = "UPDATE \"mydb\".\"main\" as sink SET " + + "\"batch_id_out\" = (SELECT COALESCE(MAX(batch_metadata.\"table_batch_id\"),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.\"table_name\") = 'MAIN')-1 " + + "WHERE (sink.\"batch_id_out\" = 999999999) " + + "AND (((sink.\"biz_date\" = '2024-01-01') AND (sink.\"account_type\" = 1)) " + + "OR ((sink.\"biz_date\" = '2024-01-02') AND (sink.\"account_type\" = 1)) " + + "OR ((sink.\"biz_date\" = '2024-01-02') AND (sink.\"account_type\" = 2)))"; + + String expectedUpsertQuery = "INSERT INTO \"mydb\".\"main\" " + + "(\"id\", \"name\", \"amount\", \"account_type\", \"biz_date\", \"batch_id_in\", \"batch_id_out\") " + + "(SELECT stage.\"id\",stage.\"name\",stage.\"amount\",stage.\"account_type\",stage.\"biz_date\"," + + "(SELECT COALESCE(MAX(batch_metadata.\"table_batch_id\"),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.\"table_name\") = 'MAIN')," + + "999999999 FROM \"mydb\".\"staging\" as stage)"; + + Assertions.assertEquals(PostgresTestArtifacts.expectedMainTableMultiPartitionsNoDigestCreateQuery, preActionsSql.get(0)); + Assertions.assertEquals(getExpectedMetadataTableCreateQuery(), preActionsSql.get(1)); + + Assertions.assertEquals(expectedMilestoneQuery, milestoningSql.get(0)); + Assertions.assertEquals(expectedUpsertQuery, milestoningSql.get(1)); + Assertions.assertEquals(getExpectedMetadataTableIngestQuery(), metadataIngestSql.get(0)); + } + + @Override + public void verifyUnitemporalSnapshotWithPartitionSpecListNoDedupNoVersionInUpperCaseNoDigest(GeneratorResult operations) + { + List preActionsSql = operations.preActionsSql(); + List milestoningSql = operations.ingestSql(); + List metadataIngestSql = operations.metadataIngestSql(); + + String expectedMilestoneQuery = "UPDATE \"MYDB\".\"MAIN\" as sink SET \"BATCH_ID_OUT\" = " + + "(SELECT COALESCE(MAX(BATCH_METADATA.\"TABLE_BATCH_ID\"),0)+1 FROM BATCH_METADATA as BATCH_METADATA WHERE UPPER(BATCH_METADATA.\"TABLE_NAME\") = 'MAIN')-1 " + + "WHERE (sink.\"BATCH_ID_OUT\" = 999999999) AND " + + "(((sink.\"ACCOUNT_TYPE\" = 1) AND (sink.\"BIZ_DATE\" = '2024-01-01')) " + + "OR ((sink.\"ACCOUNT_TYPE\" = 1) AND (sink.\"BIZ_DATE\" = '2024-01-02')) " + + "OR ((sink.\"ACCOUNT_TYPE\" = 2) AND (sink.\"BIZ_DATE\" = '2024-01-02')))"; + + String expectedUpsertQuery = "INSERT INTO \"MYDB\".\"MAIN\" " + + "(\"ID\", \"NAME\", \"AMOUNT\", \"ACCOUNT_TYPE\", \"BIZ_DATE\", \"BATCH_ID_IN\", \"BATCH_ID_OUT\") " + + "(SELECT stage.\"ID\",stage.\"NAME\",stage.\"AMOUNT\",stage.\"ACCOUNT_TYPE\",stage.\"BIZ_DATE\"," + + "(SELECT COALESCE(MAX(BATCH_METADATA.\"TABLE_BATCH_ID\"),0)+1 FROM BATCH_METADATA as BATCH_METADATA WHERE UPPER(BATCH_METADATA.\"TABLE_NAME\") = 'MAIN')," + + "999999999 FROM \"MYDB\".\"STAGING\" as stage)"; + + Assertions.assertEquals(PostgresTestArtifacts.expectedMainTableMultiPartitionsNoDigestCreateQuery.toUpperCase(), preActionsSql.get(0)); + Assertions.assertEquals(getExpectedMetadataTableCreateQueryWithUpperCase(), preActionsSql.get(1)); + Assertions.assertEquals(expectedMilestoneQuery, milestoningSql.get(0)); + Assertions.assertEquals(expectedUpsertQuery, milestoningSql.get(1)); + Assertions.assertEquals(getExpectedMetadataTableIngestQueryWithUpperCase(), metadataIngestSql.get(0)); + } + + @Override + public void verifyUnitemporalSnapshotWithPartitionSpecListWithEmptyBatchHandlingNoDigest(GeneratorResult operations) + { + List preActionsSql = operations.preActionsSql(); + List milestoningSql = operations.ingestSql(); + List metadataIngestSql = operations.metadataIngestSql(); + + String expectedMilestoneQuery = "UPDATE \"mydb\".\"main\" as sink " + + "SET \"batch_id_out\" = (SELECT COALESCE(MAX(batch_metadata.\"table_batch_id\"),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.\"table_name\") = 'MAIN')-1 " + + "WHERE (sink.\"batch_id_out\" = 999999999) AND " + + "(((sink.\"biz_date\" = '2024-01-01') AND (sink.\"account_type\" = 1)) " + + "OR ((sink.\"biz_date\" = '2024-01-02') AND (sink.\"account_type\" = 1)) " + + "OR ((sink.\"biz_date\" = '2024-01-02') AND (sink.\"account_type\" = 2)))"; + + Assertions.assertEquals(PostgresTestArtifacts.expectedMainTableMultiPartitionsNoDigestCreateQuery, preActionsSql.get(0)); + Assertions.assertEquals(getExpectedMetadataTableCreateQuery(), preActionsSql.get(1)); + + Assertions.assertEquals(expectedMilestoneQuery, milestoningSql.get(0)); + Assertions.assertEquals(getExpectedMetadataTableIngestQuery(), metadataIngestSql.get(0)); + } + @Override public RelationalSink getRelationalSink() { diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-test/src/test/java/org/finos/legend/engine/persistence/components/BaseTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-test/src/test/java/org/finos/legend/engine/persistence/components/BaseTest.java index 91e785f719e..6f22615cffd 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-test/src/test/java/org/finos/legend/engine/persistence/components/BaseTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-test/src/test/java/org/finos/legend/engine/persistence/components/BaseTest.java @@ -252,6 +252,25 @@ private static void addPartitionSpec(List> partitionSpecList .addFields(batchIdOut) .build(); + protected SchemaDefinition mainTableBatchIdBasedSchemaWithoutDigest = SchemaDefinition.builder() + .addFields(id) + .addFields(name) + .addFields(amount) + .addFields(bizDate) + .addFields(batchIdIn) + .addFields(batchIdOut) + .build(); + + protected SchemaDefinition mainTableWithMultiPartitionsBasedSchemaWithoutDigest = SchemaDefinition.builder() + .addFields(id) + .addFields(name) + .addFields(amount) + .addFields(accountType) + .addFields(bizDate) + .addFields(batchIdIn) + .addFields(batchIdOut) + .build(); + protected SchemaDefinition mainTableWithMultiPartitionsBasedSchema = SchemaDefinition.builder() .addFields(id) .addFields(name) @@ -321,6 +340,14 @@ private static void addPartitionSpec(List> partitionSpecList .addFields(digest) .build(); + protected SchemaDefinition stagingTableSchemaWithMultiplePartitionsWithoutDigest = SchemaDefinition.builder() + .addFields(id) + .addFields(name) + .addFields(amount) + .addFields(accountType) + .addFields(bizDate) + .build(); + protected SchemaDefinition mainTableSchemaWithDigest = SchemaDefinition.builder() .addFields(id) .addFields(name) @@ -713,6 +740,11 @@ protected String enrichSqlWithDataSplits(String sql, DataSplitRange dataSplitRan .schema(stagingTableSchemaWithMultiplePartitions) .build(); + protected Dataset stagingTableWithMultiPartitionsWithoutDigest = DatasetDefinition.builder() + .database(stagingDbName).name(stagingTableName).alias(stagingTableAlias) + .schema(stagingTableSchemaWithMultiplePartitionsWithoutDigest) + .build(); + protected Dataset stagingTableWithFilter = DerivedDataset.builder() .database(stagingDbName).name(stagingTableName).alias(stagingTableAlias) .schema(stagingTableSchemaWithDigest) @@ -808,6 +840,16 @@ protected String enrichSqlWithDataSplits(String sql, DataSplitRange dataSplitRan .schema(mainTableBatchIdBasedSchema) .build(); + protected Dataset mainTableWithBatchIdBasedSchemaWithoutDigest = DatasetDefinition.builder() + .database(mainDbName).name(mainTableName).alias(mainTableAlias) + .schema(mainTableBatchIdBasedSchemaWithoutDigest) + .build(); + + protected Dataset mainTableMultiPartitionsBasedWithoutDigest = DatasetDefinition.builder() + .database(mainDbName).name(mainTableName).alias(mainTableAlias) + .schema(mainTableWithMultiPartitionsBasedSchemaWithoutDigest) + .build(); + protected Dataset mainTableMultiPartitionsBased = DatasetDefinition.builder() .database(mainDbName).name(mainTableName).alias(mainTableAlias) .schema(mainTableWithMultiPartitionsBasedSchema) diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-test/src/test/java/org/finos/legend/engine/persistence/components/scenarios/UnitemporalSnapshotBatchIdBasedScenarios.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-test/src/test/java/org/finos/legend/engine/persistence/components/scenarios/UnitemporalSnapshotBatchIdBasedScenarios.java index 367f8969b84..affba0fe9e8 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-test/src/test/java/org/finos/legend/engine/persistence/components/scenarios/UnitemporalSnapshotBatchIdBasedScenarios.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-test/src/test/java/org/finos/legend/engine/persistence/components/scenarios/UnitemporalSnapshotBatchIdBasedScenarios.java @@ -22,6 +22,7 @@ import org.finos.legend.engine.persistence.components.ingestmode.partitioning.NoPartitioning; import org.finos.legend.engine.persistence.components.ingestmode.partitioning.Partitioning; import org.finos.legend.engine.persistence.components.ingestmode.transactionmilestoning.BatchId; +import org.finos.legend.engine.persistence.components.util.DeleteStrategy; import java.util.Arrays; @@ -42,6 +43,8 @@ public class UnitemporalSnapshotBatchIdBasedScenarios extends BaseTest 2) Without Partition, FailOnDups No Versioning 3) With Partition, No Dedup No Versioning 4) With Partition Filter, No Dedup No Versioning + 5) With Partition, No Dedup No Versioning No digest + 6) With Partition Filter, No Dedup No Versioning No digest */ public TestScenario BATCH_ID_BASED__WITHOUT_PARTITIONS__NO_DEDUP__NO_VERSION() @@ -111,4 +114,42 @@ public TestScenario BATCH_ID_BASED__WITH_PARTITION_SPEC_LIST__NO_DEDUP__NO_VERSI .build(); return new TestScenario(mainTableMultiPartitionsBased, stagingTableWithMultiPartitions, ingestMode); } + + public TestScenario BATCH_ID_BASED__WITH_PARTITIONS__NO_DEDUP__NO_VERSION__NO_DIGEST() + { + UnitemporalSnapshot ingestMode = UnitemporalSnapshot.builder() + .transactionMilestoning(BatchId.builder() + .batchIdInName(batchIdInField) + .batchIdOutName(batchIdOutField) + .build()) + .partitioningStrategy(Partitioning.builder().addAllPartitionFields(Arrays.asList(partitionKeys)).deleteStrategy(DeleteStrategy.DELETE_ALL).build()) + .build(); + return new TestScenario(mainTableWithBatchIdBasedSchemaWithoutDigest, stagingTableWithBaseSchema, ingestMode); + } + + public TestScenario BATCH_ID_BASED__WITH_PARTITION_FILTER__NO_DEDUP__NO_VERSION__NO_DIGEST() + { + UnitemporalSnapshot ingestMode = UnitemporalSnapshot.builder() + .partitioningStrategy(Partitioning.builder().addAllPartitionFields(Arrays.asList(partitionKeys)).putAllPartitionValuesByField(partitionFilter).deleteStrategy(DeleteStrategy.DELETE_ALL).build()) + .transactionMilestoning(BatchId.builder() + .batchIdInName(batchIdInField) + .batchIdOutName(batchIdOutField) + .build()) + .build(); + return new TestScenario(mainTableWithBatchIdBasedSchemaWithoutDigest, stagingTableWithBaseSchema, ingestMode); + } + + public TestScenario BATCH_ID_BASED__WITH_PARTITION_SPEC_LIST__NO_DEDUP__NO_VERSION__NO_DIGEST() + { + UnitemporalSnapshot ingestMode = UnitemporalSnapshot.builder() + .transactionMilestoning(BatchId.builder() + .batchIdInName(batchIdInField) + .batchIdOutName(batchIdOutField) + .build()) + .partitioningStrategy(Partitioning.builder().addAllPartitionFields(Arrays.asList(partitionKeysMulti)).addAllPartitionSpecList(partitionSpecList()).deleteStrategy(DeleteStrategy.DELETE_ALL).build()) + .emptyDatasetHandling(DeleteTargetData.builder().build()) + .build(); + return new TestScenario(mainTableMultiPartitionsBasedWithoutDigest, stagingTableWithMultiPartitionsWithoutDigest, ingestMode); + } + } \ No newline at end of file diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-test/src/test/java/org/finos/legend/engine/persistence/components/testcases/ingestmode/unitemporal/UnitmemporalSnapshotBatchIdBasedTestCases.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-test/src/test/java/org/finos/legend/engine/persistence/components/testcases/ingestmode/unitemporal/UnitmemporalSnapshotBatchIdBasedTestCases.java index 3babd9934a9..d1e18da5fc5 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-test/src/test/java/org/finos/legend/engine/persistence/components/testcases/ingestmode/unitemporal/UnitmemporalSnapshotBatchIdBasedTestCases.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-test/src/test/java/org/finos/legend/engine/persistence/components/testcases/ingestmode/unitemporal/UnitmemporalSnapshotBatchIdBasedTestCases.java @@ -207,6 +207,87 @@ void testUnitemporalSnapshotWithCleanStagingData() public abstract void verifyUnitemporalSnapshotWithCleanStagingData(GeneratorResult operations); + @Test + void testUnitemporalSnapshotWithPartitionNoDedupNoVersionNoDigest() + { + TestScenario scenario = scenarios.BATCH_ID_BASED__WITH_PARTITIONS__NO_DEDUP__NO_VERSION__NO_DIGEST(); + RelationalGenerator generator = RelationalGenerator.builder() + .ingestMode(scenario.getIngestMode()) + .relationalSink(getRelationalSink()) + .executionTimestampClock(fixedClock_2000_01_01) + .collectStatistics(true) + .build(); + GeneratorResult operations = generator.generateOperations(scenario.getDatasets()); + verifyUnitemporalSnapshotWithPartitionNoDedupNoVersionNoDigest(operations); + } + + public abstract void verifyUnitemporalSnapshotWithPartitionNoDedupNoVersionNoDigest(GeneratorResult operations); + + @Test + void testUnitemporalSnapshotWithPartitionFiltersNoDedupNoVersionNoDigest() + { + TestScenario scenario = scenarios.BATCH_ID_BASED__WITH_PARTITION_FILTER__NO_DEDUP__NO_VERSION__NO_DIGEST(); + RelationalGenerator generator = RelationalGenerator.builder() + .ingestMode(scenario.getIngestMode()) + .relationalSink(getRelationalSink()) + .executionTimestampClock(fixedClock_2000_01_01) + .collectStatistics(true) + .build(); + GeneratorResult operations = generator.generateOperations(scenario.getDatasets()); + verifyUnitemporalSnapshotWithPartitionFiltersNoDedupNoVersionNoDigest(operations); + } + + public abstract void verifyUnitemporalSnapshotWithPartitionFiltersNoDedupNoVersionNoDigest(GeneratorResult operations); + + @Test + void testUnitemporalSnapshotWithPartitionSpecListNoDedupNoVersionNoDigest() + { + TestScenario scenario = scenarios.BATCH_ID_BASED__WITH_PARTITION_SPEC_LIST__NO_DEDUP__NO_VERSION__NO_DIGEST(); + RelationalGenerator generator = RelationalGenerator.builder() + .ingestMode(scenario.getIngestMode()) + .relationalSink(getRelationalSink()) + .executionTimestampClock(fixedClock_2000_01_01) + .collectStatistics(true) + .build(); + GeneratorResult operations = generator.generateOperations(scenario.getDatasets()); + verifyUnitemporalSnapshotWithPartitionSpecListNoDedupNoVersionNoDigest(operations); + } + + public abstract void verifyUnitemporalSnapshotWithPartitionSpecListNoDedupNoVersionNoDigest(GeneratorResult operations); + + @Test + void testUnitemporalSnapshotWithPartitionSpecListNoDedupNoVersionInUpperCaseNoDigest() + { + TestScenario scenario = scenarios.BATCH_ID_BASED__WITH_PARTITION_SPEC_LIST__NO_DEDUP__NO_VERSION__NO_DIGEST(); + RelationalGenerator generator = RelationalGenerator.builder() + .ingestMode(scenario.getIngestMode()) + .relationalSink(getRelationalSink()) + .executionTimestampClock(fixedClock_2000_01_01) + .collectStatistics(true) + .caseConversion(CaseConversion.TO_UPPER) + .build(); + GeneratorResult operations = generator.generateOperations(scenario.getDatasets()); + verifyUnitemporalSnapshotWithPartitionSpecListNoDedupNoVersionInUpperCaseNoDigest(operations); + } + + public abstract void verifyUnitemporalSnapshotWithPartitionSpecListNoDedupNoVersionInUpperCaseNoDigest(GeneratorResult operations); + + @Test + void testUnitemporalSnapshotWithPartitionSpecListWithEmptyBatchHandlingNoDigest() + { + TestScenario scenario = scenarios.BATCH_ID_BASED__WITH_PARTITION_SPEC_LIST__NO_DEDUP__NO_VERSION__NO_DIGEST(); + RelationalGenerator generator = RelationalGenerator.builder() + .ingestMode(scenario.getIngestMode()) + .relationalSink(getRelationalSink()) + .executionTimestampClock(fixedClock_2000_01_01) + .collectStatistics(true) + .build(); + GeneratorResult operations = generator.generateOperationsForEmptyBatch(scenario.getDatasets()); + verifyUnitemporalSnapshotWithPartitionSpecListWithEmptyBatchHandlingNoDigest(operations); + } + + public abstract void verifyUnitemporalSnapshotWithPartitionSpecListWithEmptyBatchHandlingNoDigest(GeneratorResult operations); + @Test void testUnitemporalSnasphotValidationBatchIdInMissing() {