diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/planner/Planner.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/planner/Planner.java index 98b1ecf7a11..5ee4ae53977 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/planner/Planner.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/planner/Planner.java @@ -32,7 +32,9 @@ import org.finos.legend.engine.persistence.components.logicalplan.LogicalPlanFactory; import org.finos.legend.engine.persistence.components.logicalplan.conditions.Condition; import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset; +import org.finos.legend.engine.persistence.components.logicalplan.datasets.DerivedDataset; import org.finos.legend.engine.persistence.components.logicalplan.datasets.Field; +import org.finos.legend.engine.persistence.components.logicalplan.datasets.FilteredDataset; import org.finos.legend.engine.persistence.components.logicalplan.datasets.Selection; import org.finos.legend.engine.persistence.components.logicalplan.operations.*; import org.finos.legend.engine.persistence.components.logicalplan.values.*; @@ -137,6 +139,8 @@ default boolean enableConcurrentSafety() ingestMode.versioningStrategy().accept(new ValidatePrimaryKeysForVersioningStrategy(primaryKeys, this::validatePrimaryKeysNotEmpty)); // 2. Validate if the versioningField is comparable if a versioningStrategy is present validateVersioningField(ingestMode().versioningStrategy(), stagingDataset()); + // 3. cleanupStagingData must be turned off when using DerivedDataset or FilteredDataset + validateCleanUpStagingData(plannerOptions, originalStagingDataset()); } private Optional getTempStagingDataset() @@ -498,6 +502,14 @@ protected void validateVersioningField(VersioningStrategy versioningStrategy, Da } } + protected void validateCleanUpStagingData(PlannerOptions plannerOptions, Dataset dataset) + { + if (plannerOptions.cleanupStagingData() && (dataset instanceof DerivedDataset || dataset instanceof FilteredDataset)) + { + throw new IllegalStateException("cleanupStagingData cannot be turned on when using DerivedDataset or FilteredDataset"); + } + } + // auditing visitor protected static final AuditEnabled AUDIT_ENABLED = new AuditEnabled(); diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/sqldom/schemaops/statements/DeleteStatement.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/sqldom/schemaops/statements/DeleteStatement.java index e85e00c7f95..9a2b3540898 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/sqldom/schemaops/statements/DeleteStatement.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/sqldom/schemaops/statements/DeleteStatement.java @@ -37,9 +37,9 @@ public DeleteStatement(Table table, Condition condition) } /* - DELETE GENERIC PLAN: - DELETE FROM table-Name [[AS] correlation-Name] [WHERE clause] - */ + DELETE GENERIC PLAN: + DELETE FROM table-Name [[AS] correlation-Name] [WHERE clause] + */ @Override public void genSql(StringBuilder builder) throws SqlDomException { diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/TestUtils.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/TestUtils.java index 27df482271b..02bd2c12e9a 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/TestUtils.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/TestUtils.java @@ -17,6 +17,13 @@ import com.opencsv.CSVReader; import org.finos.legend.engine.persistence.components.common.DatasetFilter; import org.finos.legend.engine.persistence.components.common.FilterType; +import org.finos.legend.engine.persistence.components.logicalplan.conditions.And; +import org.finos.legend.engine.persistence.components.logicalplan.conditions.Equals; +import org.finos.legend.engine.persistence.components.logicalplan.conditions.GreaterThan; +import org.finos.legend.engine.persistence.components.logicalplan.conditions.GreaterThanEqualTo; +import org.finos.legend.engine.persistence.components.logicalplan.conditions.LessThan; +import org.finos.legend.engine.persistence.components.logicalplan.conditions.LessThanEqualTo; +import org.finos.legend.engine.persistence.components.logicalplan.conditions.Or; import org.finos.legend.engine.persistence.components.logicalplan.datasets.CsvExternalDatasetReference; import org.finos.legend.engine.persistence.components.logicalplan.datasets.DataType; import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset; @@ -24,9 +31,13 @@ import org.finos.legend.engine.persistence.components.logicalplan.datasets.DerivedDataset; import org.finos.legend.engine.persistence.components.logicalplan.datasets.Field; import org.finos.legend.engine.persistence.components.logicalplan.datasets.FieldType; +import org.finos.legend.engine.persistence.components.logicalplan.datasets.FilteredDataset; import org.finos.legend.engine.persistence.components.logicalplan.datasets.JsonExternalDatasetReference; import org.finos.legend.engine.persistence.components.logicalplan.datasets.SchemaDefinition; import org.finos.legend.engine.persistence.components.executor.RelationalExecutionHelper; +import org.finos.legend.engine.persistence.components.logicalplan.values.FieldValue; +import org.finos.legend.engine.persistence.components.logicalplan.values.NumericalValue; +import org.finos.legend.engine.persistence.components.logicalplan.values.StringValue; import org.finos.legend.engine.persistence.components.util.MetadataDataset; import org.junit.jupiter.api.Assertions; @@ -368,6 +379,23 @@ public static DatasetDefinition getStagingTableWithNoPks() .build(); } + public static FilteredDataset getFilteredStagingTableWithComplexFilter() + { + return FilteredDataset.builder() + .group(testSchemaName) + .name(stagingTableName) + .schema(getSchemaWithNoPKs()) + .alias(stagingTableName) + .filter(And.builder() + .addConditions(GreaterThan.of(FieldValue.builder().fieldName(incomeName).datasetRefAlias(stagingTableName).build(), NumericalValue.of(1000L))) + .addConditions(Or.builder() + .addConditions(GreaterThanEqualTo.of(FieldValue.builder().fieldName(expiryDateName).datasetRefAlias(stagingTableName).build(), StringValue.of("2022-12-03"))) + .addConditions(LessThanEqualTo.of(FieldValue.builder().fieldName(expiryDateName).datasetRefAlias(stagingTableName).build(), StringValue.of("2022-12-01"))) + .build()) + .build()) + .build(); + } + public static DatasetDefinition getBasicStagingTableWithExpiryDatePk() { return DatasetDefinition.builder() @@ -433,6 +461,20 @@ public static DerivedDataset getDerivedStagingTableWithFilter() .build(); } + public static FilteredDataset getFilteredStagingTable() + { + return FilteredDataset.builder() + .group(testSchemaName) + .name(stagingTableName) + .schema(getStagingSchema()) + .alias(stagingTableName) + .filter(Equals.of(FieldValue.builder() + .fieldName(batchName) + .datasetRefAlias(stagingTableName) + .build(), NumericalValue.of(1L))) + .build(); + } + public static DerivedDataset getStagingTableWithFilterSecondPass() { return DerivedDataset.builder() @@ -445,6 +487,20 @@ public static DerivedDataset getStagingTableWithFilterSecondPass() .build(); } + public static FilteredDataset getFilteredStagingTableSecondPass() + { + return FilteredDataset.builder() + .group(testSchemaName) + .name(stagingTableName) + .schema(getStagingSchema()) + .alias(stagingTableName) + .filter(GreaterThan.of(FieldValue.builder() + .fieldName(batchName) + .datasetRefAlias(stagingTableName) + .build(), NumericalValue.of(1L))) + .build(); + } + public static DerivedDataset getDerivedStagingTableWithFilterWithVersion() { return DerivedDataset.builder() @@ -456,6 +512,20 @@ public static DerivedDataset getDerivedStagingTableWithFilterWithVersion() .build(); } + public static FilteredDataset getFilteredStagingTableWithVersion() + { + return FilteredDataset.builder() + .group(testSchemaName) + .name(stagingTableName) + .schema(getStagingSchemaWithVersion()) + .alias(stagingTableName) + .filter(GreaterThanEqualTo.of(FieldValue.builder() + .fieldName(batchName) + .datasetRefAlias(stagingTableName) + .build(), NumericalValue.of(2L))) + .build(); + } + public static DerivedDataset getStagingTableWithFilterWithVersionSecondPass() { return DerivedDataset.builder() @@ -467,6 +537,20 @@ public static DerivedDataset getStagingTableWithFilterWithVersionSecondPass() .build(); } + public static FilteredDataset getFilteredStagingTableWithVersionSecondPass() + { + return FilteredDataset.builder() + .group(testSchemaName) + .name(stagingTableName) + .schema(getStagingSchemaWithVersion()) + .alias(stagingTableName) + .filter(GreaterThanEqualTo.of(FieldValue.builder() + .fieldName(batchName) + .datasetRefAlias(stagingTableName) + .build(), NumericalValue.of(3L))) + .build(); + } + public static JsonExternalDatasetReference getBasicJsonDatasetReferenceTable(String dataPath) { return JsonExternalDatasetReference.builder() @@ -741,6 +825,50 @@ public static DatasetDefinition getEntityPriceWithVersionStagingTable() .build(); } + public static FilteredDataset getEntityPriceWithVersionFilteredStagingTable() + { + return FilteredDataset.builder() + .group(testSchemaName) + .name(stagingTableName) + .alias(stagingTableName) + .schema(SchemaDefinition.builder() + .addFields(date) + .addFields(entity) + .addFields(price) + .addFields(volume) + .addFields(digest) + .addFields(version) + .build() + ) + .filter(GreaterThanEqualTo.of(FieldValue.builder() + .fieldName(volumeName) + .datasetRefAlias(stagingTableName) + .build(), NumericalValue.of(100L))) + .build(); + } + + public static FilteredDataset getEntityPriceWithVersionFilteredStagingTableSecondPass() + { + return FilteredDataset.builder() + .group(testSchemaName) + .name(stagingTableName) + .alias(stagingTableName) + .schema(SchemaDefinition.builder() + .addFields(date) + .addFields(entity) + .addFields(price) + .addFields(volume) + .addFields(digest) + .addFields(version) + .build() + ) + .filter(GreaterThanEqualTo.of(FieldValue.builder() + .fieldName(volumeName) + .datasetRefAlias(stagingTableName) + .build(), NumericalValue.of(500L))) + .build(); + } + public static DatasetDefinition getBitemporalMainTable() { return DatasetDefinition.builder() @@ -950,6 +1078,46 @@ public static DatasetDefinition getBitemporalFromOnlyStagingTableIdBased() .build(); } + public static FilteredDataset getBitemporalFromOnlyFilteredStagingTableIdBased() + { + return FilteredDataset.builder() + .group(testSchemaName) + .name(stagingTableName) + .schema(SchemaDefinition.builder() + .addFields(index) + .addFields(dateTime) + .addFields(balance) + .addFields(digest) + .build() + ) + .alias(stagingTableName) + .filter(LessThanEqualTo.of(FieldValue.builder() + .fieldName(balanceName) + .datasetRefAlias(stagingTableName) + .build(), NumericalValue.of(3L))) + .build(); + } + + public static FilteredDataset getBitemporalFromOnlyFilteredStagingTableIdBasedSecondPass() + { + return FilteredDataset.builder() + .group(testSchemaName) + .name(stagingTableName) + .schema(SchemaDefinition.builder() + .addFields(index) + .addFields(dateTime) + .addFields(balance) + .addFields(digest) + .build() + ) + .alias(stagingTableName) + .filter(LessThanEqualTo.of(FieldValue.builder() + .fieldName(balanceName) + .datasetRefAlias(stagingTableName) + .build(), NumericalValue.of(20L))) + .build(); + } + public static DatasetDefinition getBitemporalFromOnlyStagingTableWithoutDuplicatesIdBased() { return DatasetDefinition.builder() diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/bitemporal/BitemporalDeltaWithBatchIdTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/bitemporal/BitemporalDeltaWithBatchIdTest.java index f45582ddb34..fa339e39789 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/bitemporal/BitemporalDeltaWithBatchIdTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/bitemporal/BitemporalDeltaWithBatchIdTest.java @@ -27,6 +27,7 @@ import org.finos.legend.engine.persistence.components.ingestmode.versioning.DigestBasedResolver; import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset; import org.finos.legend.engine.persistence.components.logicalplan.datasets.DatasetDefinition; +import org.finos.legend.engine.persistence.components.logicalplan.datasets.FilteredDataset; import org.finos.legend.engine.persistence.components.planner.PlannerOptions; import org.finos.legend.engine.persistence.components.relational.api.DataSplitRange; import org.junit.jupiter.api.Assertions; @@ -849,6 +850,59 @@ void testMilestoningSourceSpecifiesFromSet5WithDataSplitFilterDuplicates() throw executePlansAndVerifyResultsWithSpecifiedDataSplits(ingestMode, options, datasets, schema, expectedDataPass6, expectedStats, dataSplitRanges); } + /* + Scenario: Test milestoning Logic with only validity from time specified when staging table pre populated + */ + @Test + void testMilestoningSourceSpecifiesFromSet6WithStagingFilter() throws Exception + { + DatasetDefinition mainTable = TestUtils.getDefaultMainTable(); + FilteredDataset stagingTable = TestUtils.getBitemporalFromOnlyFilteredStagingTableIdBased(); + + String[] schema = new String[] {indexName, balanceName, digestName, startDateTimeName, endDateTimeName, batchIdInName, batchIdOutName}; + + // Create staging table + createStagingTable(TestUtils.getBitemporalFromOnlyStagingTableIdBased()); + + BitemporalDelta ingestMode = BitemporalDelta.builder() + .digestField(digestName) + .transactionMilestoning(BatchId.builder() + .batchIdInName(batchIdInName) + .batchIdOutName(batchIdOutName) + .build()) + .validityMilestoning(ValidDateTime.builder() + .dateTimeFromName(startDateTimeName) + .dateTimeThruName(endDateTimeName) + .validityDerivation(SourceSpecifiesFromDateTime.builder() + .sourceDateTimeFromField(dateTimeName) + .build()) + .build()) + .build(); + + PlannerOptions options = PlannerOptions.builder().cleanupStagingData(false).collectStatistics(true).build(); + Datasets datasets = Datasets.builder().mainDataset(mainTable).stagingDataset(stagingTable).build(); + + // ------------ Perform Pass1 ------------------------ + String dataPass1 = basePathForInput + "source_specifies_from/without_delete_ind/set_6_with_staging_filter/staging_data_pass1.csv"; + String expectedDataPass1 = basePathForExpected + "source_specifies_from/without_delete_ind/set_6_with_staging_filter/expected_pass1.csv"; + // 1. Load Staging table + loadStagingDataForBitemporalFromOnly(dataPass1); + // 2. Execute Plan and Verify Results + Map expectedStats = createExpectedStatsMap(3, 0, 3, 0, 0); + executePlansAndVerifyResults(ingestMode, options, datasets, schema, expectedDataPass1, expectedStats); + + // ------------ Perform Pass2 ------------------------ + // 0. Create new filter + datasets = Datasets.of(mainTable, TestUtils.getBitemporalFromOnlyFilteredStagingTableIdBasedSecondPass()); + String dataPass2 = basePathForInput + "source_specifies_from/without_delete_ind/set_6_with_staging_filter/staging_data_pass2.csv"; + String expectedDataPass2 = basePathForExpected + "source_specifies_from/without_delete_ind/set_6_with_staging_filter/expected_pass2.csv"; + // 1. Load Staging table + loadStagingDataForBitemporalFromOnly(dataPass2); + // 2. Execute Plan and Verify Results + expectedStats = createExpectedStatsMap(8, 0, 6, 3, 0); + executePlansAndVerifyResults(ingestMode, options, datasets, schema, expectedDataPass2, expectedStats); + } + /* Scenario: Test milestoning Logic with only validity from time specified when staging table pre populated */ diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/nontemporal/AppendOnlyTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/nontemporal/AppendOnlyTest.java index 519b64811fb..893bc6c1c75 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/nontemporal/AppendOnlyTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/nontemporal/AppendOnlyTest.java @@ -32,6 +32,7 @@ import org.finos.legend.engine.persistence.components.ingestmode.versioning.NoVersioningStrategy; import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset; import org.finos.legend.engine.persistence.components.logicalplan.datasets.DatasetDefinition; +import org.finos.legend.engine.persistence.components.logicalplan.datasets.FilteredDataset; import org.finos.legend.engine.persistence.components.planner.PlannerOptions; import org.finos.legend.engine.persistence.components.relational.CaseConversion; import org.finos.legend.engine.persistence.components.relational.api.GeneratorResult; @@ -137,6 +138,59 @@ void testAppendOnlyVanillaUpperCase() throws Exception Assertions.assertEquals(stagingTableList.size(), 0); } + /* + Scenario: Test Append Only vanilla case + staging table is cleaned up in the end with upper case (2) + */ + @Test + void testAppendOnlyVanillaUpperCaseWithFilteredDataset() throws Exception + { + DatasetDefinition mainTable = TestUtils.getDefaultMainTable(); + FilteredDataset stagingTable = TestUtils.getFilteredStagingTableWithComplexFilter(); + + // Create staging table + h2Sink.executeStatement("CREATE TABLE IF NOT EXISTS \"TEST\".\"STAGING\"(\"NAME\" VARCHAR(64) NOT NULL,\"INCOME\" BIGINT,\"EXPIRY_DATE\" DATE)"); + + // Generate the milestoning object + AppendOnly ingestMode = AppendOnly.builder() + .digestGenStrategy(UserProvidedDigestGenStrategy.builder().digestField(digestName).build()) + .deduplicationStrategy(AllowDuplicates.builder().build()) + .versioningStrategy(NoVersioningStrategy.builder().build()) + .auditing(NoAuditing.builder().build()) + .filterExistingRecords(false) + .build(); + + PlannerOptions options = PlannerOptions.builder().cleanupStagingData(false).collectStatistics(true).build(); + Datasets datasets = Datasets.of(mainTable, stagingTable); + + String[] schema = new String[]{nameName.toUpperCase(), incomeName.toUpperCase(), expiryDateName.toUpperCase()}; + + // ------------ Perform incremental (append) milestoning With Clean Staging Table ------------------------ + String dataPass1 = basePath + "input/with_staging_filter/data_pass1.csv"; + String expectedDataPass1 = basePath + "expected/with_staging_filter/expected_pass1.csv"; + // 1. Load staging table + loadStagingDataWithNoPkInUpperCase(dataPass1); + // 2. Execute plans and verify results + Map expectedStats = new HashMap<>(); + expectedStats.put(StatisticName.INCOMING_RECORD_COUNT.name(), 2); + expectedStats.put(StatisticName.ROWS_DELETED.name(), 0); + expectedStats.put(StatisticName.ROWS_UPDATED.name(), 0); + expectedStats.put(StatisticName.ROWS_TERMINATED.name(), 0); + executePlansAndVerifyForCaseConversion(ingestMode, options, datasets, schema, expectedDataPass1, expectedStats); + + // ------------ Perform incremental (append) milestoning With Clean Staging Table ------------------------ + String dataPass2 = basePath + "input/with_staging_filter/data_pass2.csv"; + String expectedDataPass2 = basePath + "expected/with_staging_filter/expected_pass2.csv"; + // 1. Load staging table + loadStagingDataWithNoPkInUpperCase(dataPass2); + // 2. Execute plans and verify results + expectedStats = new HashMap<>(); + expectedStats.put(StatisticName.INCOMING_RECORD_COUNT.name(), 2); + expectedStats.put(StatisticName.ROWS_DELETED.name(), 0); + expectedStats.put(StatisticName.ROWS_UPDATED.name(), 0); + expectedStats.put(StatisticName.ROWS_TERMINATED.name(), 0); + executePlansAndVerifyForCaseConversion(ingestMode, options, datasets, schema, expectedDataPass2, expectedStats); + } + /* Scenario: Test Append Only with auditing, no versioning, filter duplicates and filter existing records (1) */ diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/nontemporal/NontemporalDeltaTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/nontemporal/NontemporalDeltaTest.java index b0dfc3d65a8..5eb28d99c2c 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/nontemporal/NontemporalDeltaTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/nontemporal/NontemporalDeltaTest.java @@ -32,9 +32,13 @@ import org.finos.legend.engine.persistence.components.ingestmode.versioning.DigestBasedResolver; import org.finos.legend.engine.persistence.components.ingestmode.versioning.MaxVersionStrategy; import org.finos.legend.engine.persistence.components.ingestmode.versioning.VersionColumnBasedResolver; +import org.finos.legend.engine.persistence.components.logicalplan.conditions.Equals; import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset; import org.finos.legend.engine.persistence.components.logicalplan.datasets.DatasetDefinition; import org.finos.legend.engine.persistence.components.logicalplan.datasets.DerivedDataset; +import org.finos.legend.engine.persistence.components.logicalplan.datasets.FilteredDataset; +import org.finos.legend.engine.persistence.components.logicalplan.values.FieldValue; +import org.finos.legend.engine.persistence.components.logicalplan.values.NumericalValue; import org.finos.legend.engine.persistence.components.planner.PlannerOptions; import org.finos.legend.engine.persistence.components.relational.api.DataSplitRange; import org.finos.legend.engine.persistence.components.versioning.TestDedupAndVersioning; @@ -45,7 +49,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.logging.Filter; import static org.finos.legend.engine.persistence.components.TestUtils.*; @@ -1007,4 +1010,79 @@ void testNonTemporalDeltaWithAllVersionDigestBasedAndStagingFilters() throws Exc executePlansAndVerifyResultsWithDerivedDataSplits(ingestMode, options, datasets, schema, expectedDataPass2, expectedStatsList, fixedClock_2000_01_01); } + @Test + void testNonTemporalDeltaWithAllVersionDigestBasedAndFilteredDataset() throws Exception + { + DatasetDefinition mainTable = TestUtils.getDefaultMainTable(); + DatasetDefinition stagingDataset = DatasetDefinition.builder() + .group(testSchemaName) + .name(stagingTableName) + .schema(TestDedupAndVersioning.baseSchemaWithVersionAndBatch) + .build(); + + createStagingTableWithoutPks(stagingDataset); + FilteredDataset stagingTable = FilteredDataset.builder() + .group(testSchemaName) + .name(stagingTableName) + .schema(TestDedupAndVersioning.baseSchemaWithVersion) + .filter(Equals.of(FieldValue.builder() + .fieldName(batchName) + .datasetRefAlias(stagingTableName) + .build(), NumericalValue.of(1L))) + .build(); + String path = "src/test/resources/data/incremental-delta-milestoning/input/with_staging_filter/with_all_version/digest_based/data1.csv"; + TestDedupAndVersioning.loadDataIntoStagingTableWithVersionAndBatch(path); + + // Generate the milestoning object + NontemporalDelta ingestMode = NontemporalDelta.builder() + .digestField(digestName) + .auditing(NoAuditing.builder().build()) + .versioningStrategy(AllVersionsStrategy.builder() + .versioningField(versionName) + .mergeDataVersionResolver(DigestBasedResolver.INSTANCE) + .performStageVersioning(true) + .build()) + .build(); + + PlannerOptions options = PlannerOptions.builder().cleanupStagingData(false).collectStatistics(true).build(); + Datasets datasets = Datasets.of(mainTable, stagingTable); + + String[] schema = new String[]{idName, nameName, versionName, incomeName, expiryDateName, digestName}; + + // ------------ Perform incremental (delta) milestoning Pass1 ------------------------ + String expectedDataPass1 = basePath + "expected/with_staging_filter/with_all_version/digest_based/expected_pass1.csv"; + // 2. Execute plans and verify results + List> expectedStatsList = new ArrayList<>(); + Map expectedStats1 = new HashMap<>(); + expectedStats1.put(StatisticName.INCOMING_RECORD_COUNT.name(), 3); + expectedStats1.put(StatisticName.ROWS_TERMINATED.name(), 0); + expectedStats1.put(StatisticName.ROWS_DELETED.name(), 0); + Map expectedStats2 = new HashMap<>(); + expectedStats2.put(StatisticName.INCOMING_RECORD_COUNT.name(), 1); + expectedStats2.put(StatisticName.ROWS_TERMINATED.name(), 0); + expectedStats2.put(StatisticName.ROWS_DELETED.name(), 0); + Map expectedStats3 = new HashMap<>(); + expectedStats3.put(StatisticName.INCOMING_RECORD_COUNT.name(), 1); + expectedStats3.put(StatisticName.ROWS_TERMINATED.name(), 0); + expectedStats3.put(StatisticName.ROWS_DELETED.name(), 0); + expectedStatsList.add(expectedStats1); + expectedStatsList.add(expectedStats2); + expectedStatsList.add(expectedStats3); + executePlansAndVerifyResultsWithDerivedDataSplits(ingestMode, options, datasets, schema, expectedDataPass1, expectedStatsList, fixedClock_2000_01_01); + + // ------------ Perform incremental (delta) milestoning Pass2 Filter Duplicates ------------------------ + String expectedDataPass2 = basePath + "expected/with_staging_filter/with_all_version/digest_based/expected_pass2.csv"; + expectedStatsList = new ArrayList<>(); + Map expectedStats4 = new HashMap<>(); + expectedStats4.put(StatisticName.INCOMING_RECORD_COUNT.name(), 3); + expectedStats4.put(StatisticName.ROWS_TERMINATED.name(), 0); + expectedStats4.put(StatisticName.ROWS_DELETED.name(), 0); + expectedStatsList.add(expectedStats4); + stagingTable = stagingTable.withFilter(Equals.of(FieldValue.builder() + .fieldName(batchName) + .datasetRefAlias(stagingTableName) + .build(), NumericalValue.of(2L))); + datasets = Datasets.of(mainTable, stagingTable); + executePlansAndVerifyResultsWithDerivedDataSplits(ingestMode, options, datasets, schema, expectedDataPass2, expectedStatsList, fixedClock_2000_01_01); + } } \ No newline at end of file diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/nontemporal/NontemporalSnapshotTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/nontemporal/NontemporalSnapshotTest.java index 9e1ce6ca59b..61b05cc6966 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/nontemporal/NontemporalSnapshotTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/nontemporal/NontemporalSnapshotTest.java @@ -26,6 +26,7 @@ import org.finos.legend.engine.persistence.components.ingestmode.versioning.NoVersioningStrategy; import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset; import org.finos.legend.engine.persistence.components.logicalplan.datasets.DatasetDefinition; +import org.finos.legend.engine.persistence.components.logicalplan.datasets.FilteredDataset; import org.finos.legend.engine.persistence.components.planner.PlannerOptions; import org.finos.legend.engine.persistence.components.versioning.TestDedupAndVersioning; import org.junit.jupiter.api.Assertions; @@ -95,6 +96,48 @@ void testNontemporalSnapshotNoAuditing() throws Exception executePlansAndVerifyResults(ingestMode, options, datasets, schema, expectedDataPass2, expectedStats); } + /* + Scenario: Test Nontemporal Snapshot with no auditing + */ + @Test + void testNontemporalSnapshotNoAuditingWithFilteredDataset() throws Exception + { + DatasetDefinition mainTable = TestUtils.getDefaultMainTable(); + FilteredDataset stagingTable = TestUtils.getFilteredStagingTable(); + + // Create staging table + DatasetDefinition stagingTableForDB = TestUtils.getStagingTableWithFilterForDB(); + createStagingTable(stagingTableForDB); + + // Generate the milestoning object + NontemporalSnapshot ingestMode = NontemporalSnapshot.builder().auditing(NoAuditing.builder().build()).build(); + PlannerOptions options = PlannerOptions.builder().cleanupStagingData(false).collectStatistics(true).build(); + Datasets datasets = Datasets.of(mainTable, stagingTable); + + String[] schema = new String[]{idName, nameName, incomeName, startTimeName, expiryDateName, digestName}; + + // ------------ Perform snapshot milestoning Pass1 ------------------------ + String dataPass1 = basePath + "input/with_staging_filter/data_pass1.csv"; + String expectedDataPass1 = basePath + "expected/with_staging_filter/expected_pass1.csv"; + // 1. Load staging table + loadStagingDataWithFilter(dataPass1); + // 2. Execute plans and verify results + Map expectedStats = createExpectedStatsMap(5, 0, 5, 0, 0); + executePlansAndVerifyResults(ingestMode, options, datasets, schema, expectedDataPass1, expectedStats); + + // ------------ Perform snapshot milestoning Pass2 ------------------------ + // 0. Create new filter + datasets = Datasets.of(mainTable, TestUtils.getFilteredStagingTableSecondPass()); + String dataPass2 = basePath + "input/with_staging_filter/data_pass2.csv"; + String expectedDataPass2 = basePath + "expected/with_staging_filter/expected_pass2.csv"; + // 1. Load staging table + loadStagingDataWithFilter(dataPass2); + // 2. Execute plans and verify results + expectedStats.clear(); + expectedStats = createExpectedStatsMap(3, 5, 3, 0, 0); + executePlansAndVerifyResults(ingestMode, options, datasets, schema, expectedDataPass2, expectedStats); + } + /* Scenario: Test Nontemporal Snapshot when auditing is enabled */ @@ -352,6 +395,4 @@ void testNontemporalSnapshotWithFailOnDupsNoVersioning() throws Exception Assertions.assertEquals("Encountered Duplicates, Failing the batch as Fail on Duplicates is set as Deduplication strategy", e.getMessage()); } } - - } diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/unitemporal/UnitemporalDeltaTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/unitemporal/UnitemporalDeltaTest.java index c5a6709584f..01adb8d1f12 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/unitemporal/UnitemporalDeltaTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/unitemporal/UnitemporalDeltaTest.java @@ -30,6 +30,7 @@ import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset; import org.finos.legend.engine.persistence.components.logicalplan.datasets.DatasetDefinition; import org.finos.legend.engine.persistence.components.logicalplan.datasets.DerivedDataset; +import org.finos.legend.engine.persistence.components.logicalplan.datasets.FilteredDataset; import org.finos.legend.engine.persistence.components.planner.PlannerOptions; import org.finos.legend.engine.persistence.components.relational.api.IngestorResult; import org.junit.jupiter.api.Assertions; @@ -886,6 +887,65 @@ void testMilestoningWithFilterStagingTableWithMaxVersioningGreaterThanWithDedupW executePlansAndVerifyForCaseConversion(ingestMode, options, datasets, schema, expectedDataPass3, expectedStats); } + @Test + void testMilestoningWithMaxVersioningGreaterThanWithDedupWithFilteredDatasetWithUpperCase() throws Exception + { + DatasetDefinition mainTable = TestUtils.getUnitemporalMainTableWithVersion(); + FilteredDataset stagingTable = TestUtils.getFilteredStagingTableWithVersion(); + + String[] schema = new String[]{idName.toUpperCase(), nameName.toUpperCase(), incomeName.toUpperCase(), startTimeName.toUpperCase(), expiryDateName.toUpperCase(), digestName.toUpperCase(), versionName.toUpperCase(), batchIdInName.toUpperCase(), batchIdOutName.toUpperCase(), batchTimeInName.toUpperCase(), batchTimeOutName.toUpperCase()}; + + // Create staging table + h2Sink.executeStatement("CREATE TABLE IF NOT EXISTS \"TEST\".\"STAGING\"(\"ID\" INTEGER NOT NULL,\"NAME\" VARCHAR(64) NOT NULL,\"INCOME\" BIGINT,\"START_TIME\" TIMESTAMP NOT NULL,\"EXPIRY_DATE\" DATE,\"DIGEST\" VARCHAR,\"VERSION\" INT,\"BATCH\" INT,PRIMARY KEY (\"ID\", \"START_TIME\", \"VERSION\", \"BATCH\"))"); + + UnitemporalDelta ingestMode = UnitemporalDelta.builder() + .digestField(digestName) + .transactionMilestoning(BatchIdAndDateTime.builder() + .batchIdInName(batchIdInName) + .batchIdOutName(batchIdOutName) + .dateTimeInName(batchTimeInName) + .dateTimeOutName(batchTimeOutName) + .build()) + .versioningStrategy(MaxVersionStrategy.builder() + .versioningField(versionName) + .mergeDataVersionResolver(VersionColumnBasedResolver.of(VersionComparator.GREATER_THAN)) + .performStageVersioning(true) + .build()) + .build(); + + PlannerOptions options = PlannerOptions.builder().cleanupStagingData(false).collectStatistics(true).build(); + Datasets datasets = Datasets.of(mainTable, stagingTable); + + // ------------ Perform Pass1 ------------------------ + String dataPass1 = basePathForInput + "with_staging_filter/with_max_versioning/greater_than/with_dedup/staging_data_pass1.csv"; + String expectedDataPass1 = basePathForExpected + "with_staging_filter/with_max_versioning/greater_than/with_dedup/expected_pass1.csv"; + // 1. Load staging table + loadStagingDataWithFilterWithVersionInUpperCase(dataPass1); + // 2. Execute plans and verify results + Map expectedStats = createExpectedStatsMap(3, 0, 3, 0, 0); + executePlansAndVerifyForCaseConversion(ingestMode, options, datasets, schema, expectedDataPass1, expectedStats, fixedClock_2000_01_01); + + // ------------ Perform Pass2 ------------------------ + // 0. Create new filter + datasets = Datasets.of(mainTable, TestUtils.getFilteredStagingTableWithVersionSecondPass()); + String dataPass2 = basePathForInput + "with_staging_filter/with_max_versioning/greater_than/with_dedup/staging_data_pass2.csv"; + String expectedDataPass2 = basePathForExpected + "with_staging_filter/with_max_versioning/greater_than/with_dedup/expected_pass2.csv"; + // 1. Load staging table + loadStagingDataWithFilterWithVersionInUpperCase(dataPass2); + // 2. Execute plans and verify results + expectedStats = createExpectedStatsMap(9, 0, 1, 1, 0); + executePlansAndVerifyForCaseConversion(ingestMode, options, datasets, schema, expectedDataPass2, expectedStats, fixedClock_2000_01_01); + + // ------------ Perform Pass3 empty batch (No Impact) ------------------------- + String dataPass3 = "src/test/resources/data/empty_file.csv"; + String expectedDataPass3 = basePathForExpected + "with_staging_filter/with_max_versioning/greater_than/with_dedup/expected_pass3.csv"; + // 1. Load staging table + loadStagingDataWithFilterWithVersionInUpperCase(dataPass3); + // 2. Execute plans and verify results + expectedStats = createExpectedStatsMap(0, 0, 0, 0, 0); + executePlansAndVerifyForCaseConversion(ingestMode, options, datasets, schema, expectedDataPass3, expectedStats); + } + @Test void testMilestoningWithMaxVersioningFail() throws Exception { diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/unitemporal/UnitemporalSnapshotTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/unitemporal/UnitemporalSnapshotTest.java index 8ab15d09f9f..f4642b4039a 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/unitemporal/UnitemporalSnapshotTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/unitemporal/UnitemporalSnapshotTest.java @@ -26,6 +26,7 @@ import org.finos.legend.engine.persistence.components.ingestmode.versioning.MaxVersionStrategy; import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset; import org.finos.legend.engine.persistence.components.logicalplan.datasets.DatasetDefinition; +import org.finos.legend.engine.persistence.components.logicalplan.datasets.FilteredDataset; import org.finos.legend.engine.persistence.components.planner.PlannerOptions; import org.finos.legend.engine.persistence.components.util.MetadataDataset; import org.junit.jupiter.api.Assertions; @@ -526,6 +527,70 @@ void testUnitemporalSnapshotMilestoningLogicMaxVersionWithPartitionFilterDuplica executePlansAndVerifyResults(ingestMode, options, datasets, schema, expectedDataPass3, expectedStats, fixedClock_2000_01_01); } + /* + Scenario: Test milestoning Logic with max version and with Partition when staging table pre populated + */ + @Test + void testUnitemporalSnapshotMilestoningLogicMaxVersionWithPartitionFilterDuplicatesWithFilteredDataset() throws Exception + { + DatasetDefinition mainTable = TestUtils.getDefaultMainTable(); + FilteredDataset stagingTable = TestUtils.getEntityPriceWithVersionFilteredStagingTable(); + + String[] schema = new String[]{dateName, entityName, priceName, volumeName, digestName, versionName, batchIdInName, batchIdOutName, batchTimeInName, batchTimeOutName}; + + // Create staging table + createStagingTableWithoutPks(TestUtils.getEntityPriceWithVersionStagingTable()); + + UnitemporalSnapshot ingestMode = UnitemporalSnapshot.builder() + .digestField(digestName) + .transactionMilestoning(BatchIdAndDateTime.builder() + .batchIdInName(batchIdInName) + .batchIdOutName(batchIdOutName) + .dateTimeInName(batchTimeInName) + .dateTimeOutName(batchTimeOutName) + .build()) + .addAllPartitionFields(Collections.singletonList(dateName)) + .versioningStrategy(MaxVersionStrategy.builder() + .versioningField(versionName) + .mergeDataVersionResolver(DigestBasedResolver.builder().build()) + .performStageVersioning(true) + .build()) + .deduplicationStrategy(FilterDuplicates.builder().build()) + .build(); + + PlannerOptions options = PlannerOptions.builder().collectStatistics(true).cleanupStagingData(false).build(); + Datasets datasets = Datasets.of(mainTable, stagingTable); + + // ------------ Perform unitemporal snapshot milestoning Pass1 ------------------------ + String dataPass1 = basePathForInput + "with_staging_filter/staging_data_pass1.csv"; + String expectedDataPass1 = basePathForExpected + "with_staging_filter/expected_pass1.csv"; + // 1. Load staging table + loadStagingDataForWithPartitionWithVersion(dataPass1); + // 2. Execute plans and verify results + Map expectedStats = createExpectedStatsMap(6, 0, 4, 0, 0); + executePlansAndVerifyResults(ingestMode, options, datasets, schema, expectedDataPass1, expectedStats, fixedClock_2000_01_01); + + // ------------ Perform unitemporal snapshot milestoning Pass2 ------------------------ + // 0. Create new filter + datasets = Datasets.of(mainTable, TestUtils.getEntityPriceWithVersionFilteredStagingTableSecondPass()); + String dataPass2 = basePathForInput + "with_staging_filter/staging_data_pass2.csv"; + String expectedDataPass2 = basePathForExpected + "with_staging_filter/expected_pass2.csv"; + // 1. Load staging table + loadStagingDataForWithPartitionWithVersion(dataPass2); + // 2. Execute plans and verify results + expectedStats = createExpectedStatsMap(1, 0, 1, 0, 2); + executePlansAndVerifyResults(ingestMode, options, datasets, schema, expectedDataPass2, expectedStats, fixedClock_2000_01_01); + + // ------------ Perform unitemporal snapshot milestoning Pass3 (Empty Batch) ------------------------ + String dataPass3 = "src/test/resources/data/empty_file.csv"; + String expectedDataPass3 = basePathForExpected + "with_staging_filter/expected_pass3.csv"; + // 1. Load Staging table + loadStagingDataForWithPartitionWithVersion(dataPass3); + // 2. Execute plans and verify results + expectedStats = createExpectedStatsMap(0, 0, 0, 0, 0); + executePlansAndVerifyResults(ingestMode, options, datasets, schema, expectedDataPass3, expectedStats, fixedClock_2000_01_01); + } + /* Scenario: Test milestoning Logic with max version and with Partition when staging table pre populated with upper case */ diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/bitemporal-incremental-milestoning/expected/batch_id_based/source_specifies_from/without_delete_ind/set_6_with_staging_filter/expected_pass1.csv b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/bitemporal-incremental-milestoning/expected/batch_id_based/source_specifies_from/without_delete_ind/set_6_with_staging_filter/expected_pass1.csv new file mode 100644 index 00000000000..203200dfdd5 --- /dev/null +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/bitemporal-incremental-milestoning/expected/batch_id_based/source_specifies_from/without_delete_ind/set_6_with_staging_filter/expected_pass1.csv @@ -0,0 +1,3 @@ +1,1,DIGEST1,2022-01-02 00:00:00.0,2022-01-05 00:00:00.0,1,999999999 +1,2,DIGEST2,2022-01-05 00:00:00.0,9999-12-31 23:59:59.0,1,999999999 +2,3,DIGEST3,2022-01-02 00:00:00.0,9999-12-31 23:59:59.0,1,999999999 diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/bitemporal-incremental-milestoning/expected/batch_id_based/source_specifies_from/without_delete_ind/set_6_with_staging_filter/expected_pass2.csv b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/bitemporal-incremental-milestoning/expected/batch_id_based/source_specifies_from/without_delete_ind/set_6_with_staging_filter/expected_pass2.csv new file mode 100644 index 00000000000..ca1c9be0742 --- /dev/null +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/bitemporal-incremental-milestoning/expected/batch_id_based/source_specifies_from/without_delete_ind/set_6_with_staging_filter/expected_pass2.csv @@ -0,0 +1,12 @@ +1,1,DIGEST1,2022-01-02 00:00:00.0,2022-01-05 00:00:00.0,1,1 +1,2,DIGEST2,2022-01-05 00:00:00.0,9999-12-31 23:59:59.0,1,1 +2,3,DIGEST3,2022-01-02 00:00:00.0,9999-12-31 23:59:59.0,1,1 +1,4,DIGEST4,2022-01-01 00:00:00.0,2022-01-02 00:00:00.0,2,999999999 +1,5,DIGEST5,2022-01-03 00:00:00.0,2022-01-04 00:00:00.0,2,999999999 +1,6,DIGEST6,2022-01-04 00:00:00.0,2022-01-05 00:00:00.0,2,999999999 +1,11,DIGEST11,2022-01-05 00:00:00.0,2022-01-07 00:00:00.0,2,999999999 +1,7,DIGEST7,2022-01-07 00:00:00.0,9999-12-31 23:59:59.0,2,999999999 +2,8,DIGEST8,2022-01-02 00:00:00.0,2022-01-05 00:00:00.0,2,999999999 +2,10,DIGEST10,2022-01-05 00:00:00.0,9999-12-31 23:59:59.0,2,999999999 +3,9,DIGEST9,2022-01-07 00:00:00.0,9999-12-31 23:59:59.0,2,999999999 +1,1,DIGEST1,2022-01-02 00:00:00.0,2022-01-03 00:00:00.0,2,999999999 diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/bitemporal-incremental-milestoning/input/batch_id_based/source_specifies_from/without_delete_ind/set_6_with_staging_filter/staging_data_pass1.csv b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/bitemporal-incremental-milestoning/input/batch_id_based/source_specifies_from/without_delete_ind/set_6_with_staging_filter/staging_data_pass1.csv new file mode 100644 index 00000000000..27a901a4005 --- /dev/null +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/bitemporal-incremental-milestoning/input/batch_id_based/source_specifies_from/without_delete_ind/set_6_with_staging_filter/staging_data_pass1.csv @@ -0,0 +1,5 @@ +1,2022-01-02 00:00:00.0,1,DIGEST1 +1,2022-01-05 00:00:00.0,2,DIGEST2 +2,2022-01-02 00:00:00.0,3,DIGEST3 +2,2022-01-05 00:00:00.0,4,DIGEST4 +3,2022-01-02 00:00:00.0,5,DIGEST5 diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/bitemporal-incremental-milestoning/input/batch_id_based/source_specifies_from/without_delete_ind/set_6_with_staging_filter/staging_data_pass2.csv b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/bitemporal-incremental-milestoning/input/batch_id_based/source_specifies_from/without_delete_ind/set_6_with_staging_filter/staging_data_pass2.csv new file mode 100644 index 00000000000..365aef453f6 --- /dev/null +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/bitemporal-incremental-milestoning/input/batch_id_based/source_specifies_from/without_delete_ind/set_6_with_staging_filter/staging_data_pass2.csv @@ -0,0 +1,10 @@ +1,2022-01-01 00:00:00.0,4,DIGEST4 +1,2022-01-02 00:00:00.0,100,DIGEST12 +1,2022-01-03 00:00:00.0,5,DIGEST5 +1,2022-01-04 00:00:00.0,6,DIGEST6 +1,2022-01-05 00:00:00.0,11,DIGEST11 +1,2022-01-07 00:00:00.0,7,DIGEST7 +2,2022-01-02 00:00:00.0,8,DIGEST8 +2,2022-01-05 00:00:00.0,10,DIGEST10 +2,2022-01-06 00:00:00.0,21,DIGEST13 +3,2022-01-07 00:00:00.0,9,DIGEST9 diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/incremental-append-milestoning/expected/with_staging_filter/expected_pass1.csv b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/incremental-append-milestoning/expected/with_staging_filter/expected_pass1.csv new file mode 100644 index 00000000000..95012897702 --- /dev/null +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/incremental-append-milestoning/expected/with_staging_filter/expected_pass1.csv @@ -0,0 +1,2 @@ +ANDY,3000,2022-12-03 +MINDY,4000,2022-12-04 diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/incremental-append-milestoning/expected/with_staging_filter/expected_pass2.csv b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/incremental-append-milestoning/expected/with_staging_filter/expected_pass2.csv new file mode 100644 index 00000000000..bd89a58f93e --- /dev/null +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/incremental-append-milestoning/expected/with_staging_filter/expected_pass2.csv @@ -0,0 +1,4 @@ +ANDY,3000,2022-12-03 +MINDY,4000,2022-12-04 +ROBERT,2000,2022-12-05 +MATT,4000,2022-12-05 \ No newline at end of file diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/incremental-append-milestoning/input/with_staging_filter/data_pass1.csv b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/incremental-append-milestoning/input/with_staging_filter/data_pass1.csv new file mode 100644 index 00000000000..bd64f3e7b12 --- /dev/null +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/incremental-append-milestoning/input/with_staging_filter/data_pass1.csv @@ -0,0 +1,4 @@ +HARRY,1000,2022-12-01 +ROBERT,2000,2022-12-02 +ANDY,3000,2022-12-03 +MINDY,4000,2022-12-04 \ No newline at end of file diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/incremental-append-milestoning/input/with_staging_filter/data_pass2.csv b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/incremental-append-milestoning/input/with_staging_filter/data_pass2.csv new file mode 100644 index 00000000000..df1fb53595a --- /dev/null +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/incremental-append-milestoning/input/with_staging_filter/data_pass2.csv @@ -0,0 +1,3 @@ +ROBERT,2000,2022-12-05 +ANDY,999,2022-12-03 +MATT,4000,2022-12-05 \ No newline at end of file diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/snapshot-milestoning/expected/with_staging_filter/expected_pass1.csv b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/snapshot-milestoning/expected/with_staging_filter/expected_pass1.csv new file mode 100644 index 00000000000..f51b12ce482 --- /dev/null +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/snapshot-milestoning/expected/with_staging_filter/expected_pass1.csv @@ -0,0 +1,5 @@ +1,HARRY,1000,2020-01-01 00:00:00.0,2022-12-01,DIGEST1 +2,ROBERT,2000,2020-01-02 00:00:00.0,2022-12-02,DIGEST2 +3,ANDY,3000,2020-01-03 00:00:00.0,2022-12-03,DIGEST3 +4,MICHEL,4000,2020-01-04 00:00:00.0,2022-12-04,DIGEST4 +5,LIZA,5000,2020-01-05 00:00:00.0,2022-12-05,DIGEST5 diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/snapshot-milestoning/expected/with_staging_filter/expected_pass2.csv b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/snapshot-milestoning/expected/with_staging_filter/expected_pass2.csv new file mode 100644 index 00000000000..bf3890397c4 --- /dev/null +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/snapshot-milestoning/expected/with_staging_filter/expected_pass2.csv @@ -0,0 +1,3 @@ +4,MICHEL,4001,2020-01-04 00:00:00.0,2022-12-04,DIGEST4_UPDATED +5,LIZA,5001,2020-01-05 00:00:00.0,2022-12-05,DIGEST5_UPDATED +6,MINDY,6001,2020-01-06 00:00:00.0,2022-12-06,DIGEST6 \ No newline at end of file diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/snapshot-milestoning/input/with_staging_filter/data_pass1.csv b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/snapshot-milestoning/input/with_staging_filter/data_pass1.csv new file mode 100644 index 00000000000..4d5f3874a63 --- /dev/null +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/snapshot-milestoning/input/with_staging_filter/data_pass1.csv @@ -0,0 +1,8 @@ +1,HARRY,1000,2020-01-01 00:00:00.0,2022-12-01,DIGEST1,1 +2,ROBERT,2000,2020-01-02 00:00:00.0,2022-12-02,DIGEST2,1 +3,ANDY,3000,2020-01-03 00:00:00.0,2022-12-03,DIGEST3,1 +4,MICHEL,4000,2020-01-04 00:00:00.0,2022-12-04,DIGEST4,1 +5,LIZA,5000,2020-01-05 00:00:00.0,2022-12-05,DIGEST5,1 +4,MICHEL,4001,2020-01-04 00:00:00.0,2022-12-04,DIGEST4_UPDATED,2 +5,LIZA,5001,2020-01-05 00:00:00.0,2022-12-05,DIGEST5_UPDATED,2 +6,MINDY,6001,2020-01-06 00:00:00.0,2022-12-06,DIGEST6,2 diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/snapshot-milestoning/input/with_staging_filter/data_pass2.csv b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/snapshot-milestoning/input/with_staging_filter/data_pass2.csv new file mode 100644 index 00000000000..4d58ebb9356 --- /dev/null +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/snapshot-milestoning/input/with_staging_filter/data_pass2.csv @@ -0,0 +1,8 @@ +1,HARRY,1000,2020-01-01 00:00:00.0,2022-12-01,DIGEST1,1 +2,ROBERT,2000,2020-01-02 00:00:00.0,2022-12-02,DIGEST2,1 +3,ANDY,3000,2020-01-03 00:00:00.0,2022-12-03,DIGEST3,1 +4,MICHEL,4000,2020-01-04 00:00:00.0,2022-12-04,DIGEST4,1 +5,LIZA,5000,2020-01-05 00:00:00.0,2022-12-05,DIGEST5,1 +4,MICHEL,4001,2020-01-04 00:00:00.0,2022-12-04,DIGEST4_UPDATED,2 +5,LIZA,5001,2020-01-05 00:00:00.0,2022-12-05,DIGEST5_UPDATED,2 +6,MINDY,6001,2020-01-06 00:00:00.0,2022-12-06,DIGEST6,2 \ No newline at end of file diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-snapshot-milestoning/expected/batch_id_and_time_based/with_staging_filter/expected_pass1.csv b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-snapshot-milestoning/expected/batch_id_and_time_based/with_staging_filter/expected_pass1.csv new file mode 100644 index 00000000000..b7c23ba34bd --- /dev/null +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-snapshot-milestoning/expected/batch_id_and_time_based/with_staging_filter/expected_pass1.csv @@ -0,0 +1,4 @@ +2021-12-01,GS,383.82,300,DIGEST3_UPDATED2,3,1,999999999,2000-01-01 00:00:00.0,9999-12-31 23:59:59.0 +2021-12-01,JPM,161.00,100,DIGEST2,1,1,999999999,2000-01-01 00:00:00.0,9999-12-31 23:59:59.0 +2021-12-02,GS,37800.00,999,DIGEST6,1,1,999999999,2000-01-01 00:00:00.0,9999-12-31 23:59:59.0 +2021-12-02,JPMX,159.83,200,DIGEST5_UPDATED,2,1,999999999,2000-01-01 00:00:00.0,9999-12-31 23:59:59.0 diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-snapshot-milestoning/expected/batch_id_and_time_based/with_staging_filter/expected_pass2.csv b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-snapshot-milestoning/expected/batch_id_and_time_based/with_staging_filter/expected_pass2.csv new file mode 100644 index 00000000000..71d5fc848e3 --- /dev/null +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-snapshot-milestoning/expected/batch_id_and_time_based/with_staging_filter/expected_pass2.csv @@ -0,0 +1,5 @@ +2021-12-01,GS,383.82,300,DIGEST3_UPDATED2,3,1,999999999,2000-01-01 00:00:00.0,9999-12-31 23:59:59.0 +2021-12-01,JPM,161.00,100,DIGEST2,1,1,999999999,2000-01-01 00:00:00.0,9999-12-31 23:59:59.0 +2021-12-02,GS,37800.00,999,DIGEST6,1,1,1,2000-01-01 00:00:00.0,2000-01-01 00:00:00.0 +2021-12-02,JPMX,159.83,200,DIGEST5_UPDATED,2,1,1,2000-01-01 00:00:00.0,2000-01-01 00:00:00.0 +2021-12-02,JPM,159.83,1000,DIGEST7,1,2,999999999,2000-01-01 00:00:00.0,9999-12-31 23:59:59.0 diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-snapshot-milestoning/expected/batch_id_and_time_based/with_staging_filter/expected_pass3.csv b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-snapshot-milestoning/expected/batch_id_and_time_based/with_staging_filter/expected_pass3.csv new file mode 100644 index 00000000000..71d5fc848e3 --- /dev/null +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-snapshot-milestoning/expected/batch_id_and_time_based/with_staging_filter/expected_pass3.csv @@ -0,0 +1,5 @@ +2021-12-01,GS,383.82,300,DIGEST3_UPDATED2,3,1,999999999,2000-01-01 00:00:00.0,9999-12-31 23:59:59.0 +2021-12-01,JPM,161.00,100,DIGEST2,1,1,999999999,2000-01-01 00:00:00.0,9999-12-31 23:59:59.0 +2021-12-02,GS,37800.00,999,DIGEST6,1,1,1,2000-01-01 00:00:00.0,2000-01-01 00:00:00.0 +2021-12-02,JPMX,159.83,200,DIGEST5_UPDATED,2,1,1,2000-01-01 00:00:00.0,2000-01-01 00:00:00.0 +2021-12-02,JPM,159.83,1000,DIGEST7,1,2,999999999,2000-01-01 00:00:00.0,9999-12-31 23:59:59.0 diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-snapshot-milestoning/input/batch_id_and_time_based/with_staging_filter/staging_data_pass1.csv b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-snapshot-milestoning/input/batch_id_and_time_based/with_staging_filter/staging_data_pass1.csv new file mode 100644 index 00000000000..699aa27b114 --- /dev/null +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-snapshot-milestoning/input/batch_id_and_time_based/with_staging_filter/staging_data_pass1.csv @@ -0,0 +1,9 @@ +2021-12-01,IBM,116.92,10,DIGEST1,1 +2021-12-01,JPM,161.00,100,DIGEST2,1 +2021-12-01,GS,383.82,10,DIGEST3,1 +2021-12-01,GS,383.82,200,DIGEST3_UPDATED1,2 +2021-12-01,GS,383.82,300,DIGEST3_UPDATED2,3 +2021-12-02,IBM,117.37,10,DIGEST4,1 +2021-12-02,JPMX,159.83,100,DIGEST5,1 +2021-12-02,JPMX,159.83,200,DIGEST5_UPDATED,2 +2021-12-02,GS,37800.00,999,DIGEST6,1 diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-snapshot-milestoning/input/batch_id_and_time_based/with_staging_filter/staging_data_pass2.csv b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-snapshot-milestoning/input/batch_id_and_time_based/with_staging_filter/staging_data_pass2.csv new file mode 100644 index 00000000000..2a3a853b910 --- /dev/null +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-snapshot-milestoning/input/batch_id_and_time_based/with_staging_filter/staging_data_pass2.csv @@ -0,0 +1,4 @@ +2021-12-02,IBM,117.37,10,DIGEST4,1 +2021-12-02,JPM,159.83,1000,DIGEST7,1 +2021-12-02,GS,378.00,0,DIGEST8,2 +2021-12-02,GS,378.00,0,DIGEST8,2 diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-test/src/test/java/org/finos/legend/engine/persistence/components/testcases/ingestmode/nontemporal/NontemporalDeltaTestCases.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-test/src/test/java/org/finos/legend/engine/persistence/components/testcases/ingestmode/nontemporal/NontemporalDeltaTestCases.java index f2df472a701..520434e8786 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-test/src/test/java/org/finos/legend/engine/persistence/components/testcases/ingestmode/nontemporal/NontemporalDeltaTestCases.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-test/src/test/java/org/finos/legend/engine/persistence/components/testcases/ingestmode/nontemporal/NontemporalDeltaTestCases.java @@ -234,6 +234,7 @@ void testNontemporalDeltaWithNoVersionAndStagingFilter() RelationalGenerator generator = RelationalGenerator.builder() .ingestMode(testScenario.getIngestMode()) .relationalSink(getRelationalSink()) + .cleanupStagingData(false) .collectStatistics(true) .build(); @@ -250,6 +251,7 @@ void testNontemporalDeltaWithNoVersionAndFilteredDataset() RelationalGenerator generator = RelationalGenerator.builder() .ingestMode(testScenario.getIngestMode()) .relationalSink(getRelationalSink()) + .cleanupStagingData(false) .collectStatistics(true) .build(); @@ -266,6 +268,7 @@ void testNontemporalDeltaWithFilterDupsMaxVersionWithStagingFilters() RelationalGenerator generator = RelationalGenerator.builder() .ingestMode(testScenario.getIngestMode()) .relationalSink(getRelationalSink()) + .cleanupStagingData(false) .collectStatistics(true) .build(); @@ -282,6 +285,7 @@ void testNontemporalDeltaWithNoDedupMaxVersioningWithoutPerformWithStagingFilter RelationalGenerator generator = RelationalGenerator.builder() .ingestMode(testScenario.getIngestMode()) .relationalSink(getRelationalSink()) + .cleanupStagingData(false) .collectStatistics(true) .build(); diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-test/src/test/java/org/finos/legend/engine/persistence/components/testcases/ingestmode/unitemporal/UnitmemporalDeltaBatchIdBasedTestCases.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-test/src/test/java/org/finos/legend/engine/persistence/components/testcases/ingestmode/unitemporal/UnitmemporalDeltaBatchIdBasedTestCases.java index 6dd0ce2331a..e5b9e0ffdf9 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-test/src/test/java/org/finos/legend/engine/persistence/components/testcases/ingestmode/unitemporal/UnitmemporalDeltaBatchIdBasedTestCases.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-test/src/test/java/org/finos/legend/engine/persistence/components/testcases/ingestmode/unitemporal/UnitmemporalDeltaBatchIdBasedTestCases.java @@ -269,7 +269,7 @@ void testUnitemporalDeltaWithNoVersioningAndStagingFilters() .ingestMode(scenario.getIngestMode()) .relationalSink(getRelationalSink()) .executionTimestampClock(fixedClock_2000_01_01) - .cleanupStagingData(true) + .cleanupStagingData(false) .build(); GeneratorResult operations = generator.generateOperations(scenario.getDatasets()); verifyUnitemporalDeltaWithNoVersionAndStagingFilter(operations); @@ -285,7 +285,7 @@ void testUnitemporalDeltaWithNoVersioningAndFilteredDataset() .ingestMode(scenario.getIngestMode()) .relationalSink(getRelationalSink()) .executionTimestampClock(fixedClock_2000_01_01) - .cleanupStagingData(true) + .cleanupStagingData(false) .build(); GeneratorResult operations = generator.generateOperations(scenario.getDatasets()); verifyUnitemporalDeltaWithNoVersionAndFilteredDataset(operations); @@ -302,7 +302,7 @@ void testUnitemporalDeltaWithFilterDupsMaxVersionWithStagingFilter() .ingestMode(scenario.getIngestMode()) .relationalSink(getRelationalSink()) .executionTimestampClock(fixedClock_2000_01_01) - .cleanupStagingData(true) + .cleanupStagingData(false) .build(); GeneratorResult operations = generator.generateOperations(scenario.getDatasets()); this.verifyUnitemporalDeltaWithFilterDupsMaxVersionWithStagingFilter(operations); @@ -319,7 +319,7 @@ void testUnitemporalDeltaWithFilterDupsMaxVersionWithFilteredDataset() .ingestMode(scenario.getIngestMode()) .relationalSink(getRelationalSink()) .executionTimestampClock(fixedClock_2000_01_01) - .cleanupStagingData(true) + .cleanupStagingData(false) .build(); GeneratorResult operations = generator.generateOperations(scenario.getDatasets()); this.verifyUnitemporalDeltaWithFilterDupsMaxVersionWithFilteredDataset(operations); @@ -336,7 +336,7 @@ void testUnitemporalDeltaWithNoDedupMaxVersionWithoutPerformAndStagingFilters() .ingestMode(scenario.getIngestMode()) .relationalSink(getRelationalSink()) .executionTimestampClock(fixedClock_2000_01_01) - .cleanupStagingData(true) + .cleanupStagingData(false) .collectStatistics(true) .build(); GeneratorResult operations = generator.generateOperations(scenario.getDatasets());