Skip to content

Commit

Permalink
Persistence Component: Tests for unitemporal snapshot with partition …
Browse files Browse the repository at this point in the history
…without digest
  • Loading branch information
rengam32 committed Dec 24, 2024
1 parent 551506b commit 7b05140
Show file tree
Hide file tree
Showing 8 changed files with 138 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,16 @@ protected void loadStagingDataForWithPartition(String path) throws Exception
h2Sink.executeStatement(loadSql);
}

protected void loadStagingDataForWithPartitionWithoutDigest(String path) throws Exception
{
validateFileExists(path);
String loadSql = "TRUNCATE TABLE \"TEST\".\"staging\";" +
"INSERT INTO \"TEST\".\"staging\"(date, entity, price, volume) " +
"SELECT CONVERT( \"date\",DATE ), \"entity\", CONVERT( \"price\", DECIMAL(20,2)), CONVERT( \"volume\", BIGINT)" +
" FROM CSVREAD( '" + path + "', 'date, entity, price, volume', NULL )";
h2Sink.executeStatement(loadSql);
}

protected void loadStagingDataForWithMultiPartition(String path) throws Exception
{
validateFileExists(path);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -797,6 +797,25 @@ public static DatasetDefinition getEntityPriceMainTable()
.build();
}

public static DatasetDefinition getEntityPriceMainTableWithoutDigest()
{
return DatasetDefinition.builder()
.group(testSchemaName)
.name(mainTableName)
.schema(SchemaDefinition.builder()
.addFields(date)
.addFields(entity)
.addFields(price)
.addFields(volume)
.addFields(batchIdIn)
.addFields(batchIdOut)
.addFields(batchTimeIn)
.addFields(batchTimeOut)
.build()
)
.build();
}

public static DatasetDefinition getEntityPriceIdBasedMainTable()
{
DatasetDefinition mainTable = DatasetDefinition.builder()
Expand Down Expand Up @@ -849,6 +868,21 @@ public static DatasetDefinition getEntityPriceStagingTable()
.build();
}

public static DatasetDefinition getEntityPriceStagingTableWithoutDigest()
{
return DatasetDefinition.builder()
.group(testSchemaName)
.name(stagingTableName)
.schema(SchemaDefinition.builder()
.addFields(date)
.addFields(entity)
.addFields(price)
.addFields(volume)
.build()
)
.build();
}

public static DatasetDefinition getEntityPriceWithVersionStagingTable()
{
return DatasetDefinition.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.finos.legend.engine.persistence.components.logicalplan.datasets.DatasetDefinition;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.FilteredDataset;
import org.finos.legend.engine.persistence.components.planner.PlannerOptions;
import org.finos.legend.engine.persistence.components.util.DeleteStrategy;
import org.finos.legend.engine.persistence.components.util.MetadataDataset;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -658,4 +659,64 @@ void testUnitemporalSnapshotMilestoningLogicMaxVersionWithPartitionFilterDuplica
expectedStats = createExpectedStatsMap(0, 0, 0, 0, 0);
executePlansAndVerifyForCaseConversion(ingestMode, options, datasets, schema, expectedDataPass3, expectedStats, fixedClock_2000_01_01);
}

/*
Scenario: Test milestoning Logic with Partition, without digest when staging table pre populated
Empty Batch Handling : Default
*/
@Test
void testUnitemporalSnapshotMilestoningLogicWithPartitionNoDigest() throws Exception
{
DatasetDefinition mainTable = TestUtils.getEntityPriceMainTableWithoutDigest();
DatasetDefinition stagingTable = TestUtils.getEntityPriceStagingTableWithoutDigest();
MetadataDataset metadataDataset = TestUtils.getMetadataDataset();

String[] schema = new String[]{dateName, entityName, priceName, volumeName, batchIdInName, batchIdOutName, batchTimeInName, batchTimeOutName};

// Create staging table
createStagingTable(stagingTable);

UnitemporalSnapshot ingestMode = UnitemporalSnapshot.builder()
.transactionMilestoning(BatchIdAndDateTime.builder()
.batchIdInName(batchIdInName)
.batchIdOutName(batchIdOutName)
.dateTimeInName(batchTimeInName)
.dateTimeOutName(batchTimeOutName)
.build())
.partitioningStrategy(Partitioning.builder().addAllPartitionFields(Collections.singletonList(dateName)).deleteStrategy(DeleteStrategy.DELETE_ALL).build())
.build();

PlannerOptions options = PlannerOptions.builder().collectStatistics(true).build();
Datasets datasets = Datasets.of(mainTable, stagingTable);

// ------------ Perform unitemporal snapshot milestoning Pass1 ------------------------
String dataPass1 = basePathForInput + "with_partition/no_version_no_digest/staging_data_pass1.csv";
String expectedDataPass1 = basePathForExpected + "with_partition/no_version_no_digest/expected_pass1.csv";
// 1. Load staging table
loadStagingDataForWithPartitionWithoutDigest(dataPass1);
// 2. Execute plans and verify results
Map<String, Object> expectedStats = createExpectedStatsMap(6, 0, 6, 0, 0);
executePlansAndVerifyResults(ingestMode, options, datasets, schema, expectedDataPass1, expectedStats, fixedClock_2000_01_01);

// ------------ Perform unitemporal snapshot milestoning Pass2 ------------------------
String dataPass2 = basePathForInput + "with_partition/no_version_no_digest/staging_data_pass2.csv";
String expectedDataPass2 = basePathForExpected + "with_partition/no_version_no_digest/expected_pass2.csv";
// 1. Load staging table
loadStagingDataForWithPartitionWithoutDigest(dataPass2);
// 2. Execute plans and verify results
expectedStats = createExpectedStatsMap(3, 0, 1, 2, 1);
executePlansAndVerifyResults(ingestMode, options, datasets, schema, expectedDataPass2, expectedStats, fixedClock_2000_01_01);

// ------------ Perform unitemporal snapshot milestoning Pass3 (Empty Batch) ------------------------

options = options.withCleanupStagingData(true);

String dataPass3 = "src/test/resources/data/empty_file.csv";
String expectedDataPass3 = basePathForExpected + "with_partition/no_version_no_digest/expected_pass3.csv";
// 1. Load Staging table
loadStagingDataForWithPartitionWithoutDigest(dataPass3);
// 2. Execute plans and verify results
expectedStats = createExpectedStatsMap(0, 0, 0, 0, 0);
executePlansAndVerifyResults(ingestMode, options, datasets, schema, expectedDataPass3, expectedStats, fixedClock_2000_01_01);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
2021-12-01,IBM,116.92,5958300,1,999999999,2000-01-01 00:00:00.0,9999-12-31 23:59:59.0
2021-12-01,JPM,161.00,12253400,1,999999999,2000-01-01 00:00:00.0,9999-12-31 23:59:59.0
2021-12-01,GS,383.82,2476000,1,999999999,2000-01-01 00:00:00.0,9999-12-31 23:59:59.0
2021-12-02,IBM,117.37,5267100,1,999999999,2000-01-01 00:00:00.0,9999-12-31 23:59:59.0
2021-12-02,JPMX,159.83,12969900,1,999999999,2000-01-01 00:00:00.0,9999-12-31 23:59:59.0
2021-12-02,GS,37800.00,3343700,1,999999999,2000-01-01 00:00:00.0,9999-12-31 23:59:59.0
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
2021-12-01,IBM,116.92,5958300,1,999999999,2000-01-01 00:00:00.0,9999-12-31 23:59:59.0
2021-12-01,JPM,161.00,12253400,1,999999999,2000-01-01 00:00:00.0,9999-12-31 23:59:59.0
2021-12-01,GS,383.82,2476000,1,999999999,2000-01-01 00:00:00.0,9999-12-31 23:59:59.0
2021-12-02,IBM,117.37,5267100,1,1,2000-01-01 00:00:00.0,2000-01-01 00:00:00.0
2021-12-02,JPMX,159.83,12969900,1,1,2000-01-01 00:00:00.0,2000-01-01 00:00:00.0
2021-12-02,GS,37800.00,3343700,1,1,2000-01-01 00:00:00.0,2000-01-01 00:00:00.0
2021-12-02,IBM,117.37,5267100,2,999999999,2000-01-01 00:00:00.0,9999-12-31 23:59:59.0
2021-12-02,JPM,159.83,12969900,2,999999999,2000-01-01 00:00:00.0,9999-12-31 23:59:59.0
2021-12-02,GS,378.00,3343700,2,999999999,2000-01-01 00:00:00.0,9999-12-31 23:59:59.0
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
2021-12-01,IBM,116.92,5958300,1,999999999,2000-01-01 00:00:00.0,9999-12-31 23:59:59.0
2021-12-01,JPM,161.00,12253400,1,999999999,2000-01-01 00:00:00.0,9999-12-31 23:59:59.0
2021-12-01,GS,383.82,2476000,1,999999999,2000-01-01 00:00:00.0,9999-12-31 23:59:59.0
2021-12-02,IBM,117.37,5267100,1,1,2000-01-01 00:00:00.0,2000-01-01 00:00:00.0
2021-12-02,JPMX,159.83,12969900,1,1,2000-01-01 00:00:00.0,2000-01-01 00:00:00.0
2021-12-02,GS,37800.00,3343700,1,1,2000-01-01 00:00:00.0,2000-01-01 00:00:00.0
2021-12-02,IBM,117.37,5267100,2,999999999,2000-01-01 00:00:00.0,9999-12-31 23:59:59.0
2021-12-02,JPM,159.83,12969900,2,999999999,2000-01-01 00:00:00.0,9999-12-31 23:59:59.0
2021-12-02,GS,378.00,3343700,2,999999999,2000-01-01 00:00:00.0,9999-12-31 23:59:59.0
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
2021-12-01,IBM,116.92,5958300
2021-12-01,JPM,161.00,12253400
2021-12-01,GS,383.82,2476000
2021-12-02,IBM,117.37,5267100
2021-12-02,JPMX,159.83,12969900
2021-12-02,GS,37800.00,3343700
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
2021-12-02,IBM,117.37,5267100
2021-12-02,JPM,159.83,12969900
2021-12-02,GS,378.00,3343700

0 comments on commit 7b05140

Please sign in to comment.