Skip to content

Commit

Permalink
Fix the sqls for dedup and versioning and add H2 tests
Browse files Browse the repository at this point in the history
  • Loading branch information
prasar-ashutosh committed Oct 6, 2023
1 parent 9077d90 commit 69db789
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,14 @@ public SchemaDefinition visitMaxVersionStrategy(MaxVersionStrategyAbstract maxVe
@Override
public SchemaDefinition visitAllVersionsStrategy(AllVersionsStrategyAbstract allVersionsStrategyAbstract)
{
Field dataSplit = Field.builder().name(allVersionsStrategyAbstract.dataSplitFieldName())
.type(FieldType.of(DataType.INT, Optional.empty(), Optional.empty()))
.primaryKey(anyPKInStaging)
.build();
schemaFields.add(dataSplit);
if (allVersionsStrategyAbstract.performVersioning())
{
Field dataSplit = Field.builder().name(allVersionsStrategyAbstract.dataSplitFieldName())
.type(FieldType.of(DataType.INT, Optional.empty(), Optional.empty()))
.primaryKey(anyPKInStaging)
.build();
schemaFields.add(dataSplit);
}
return schemaDefBuilder.addAllFields(schemaFields).build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.Map;

import static org.finos.legend.engine.persistence.components.TestUtils.*;
import static org.finos.legend.engine.persistence.components.ingestmode.versioning.AllVersionsStrategyAbstract.DATA_SPLIT;
import static org.finos.legend.engine.persistence.components.util.LogicalPlanUtils.TEMP_STAGING_DATASET_BASE_NAME;

public class TestDedupAndVersioning extends BaseTest
Expand All @@ -54,9 +55,9 @@ public class TestDedupAndVersioning extends BaseTest
6. [DONE] Filter Dups, NoVersion -> tempStagingTable with count column
7. [DONE] Filter Dups, MaxVersion do not perform versioning -> tempStagingTable with count column
8. Filter Dups, MaxVersion with perform versioning -> tempStagingTable with count column and only max version [throw Error on Data errors]
8. [DONE, throw error left] Filter Dups, MaxVersion with perform versioning -> tempStagingTable with count column and only max version [throw Error on Data errors]
9. [DONE] Filter Dups, AllVersion do not perform versioning -> tempStagingTable with count column
10. Filter Dups, AllVersion with perform versioning -> tempStagingTable with count column and Data splits [throw Error on Data errors]
10. [DONE, throw error left] Filter Dups, AllVersion with perform versioning -> tempStagingTable with count column and Data splits [throw Error on Data errors]
11. Fail on Dups, NoVersion -> tempStagingTable with count column [Throw error on dups]
12. Fail on Dups, MaxVersion do not perform versioning -> tempStagingTable with count column [Throw error on dups]
Expand Down Expand Up @@ -92,6 +93,7 @@ public class TestDedupAndVersioning extends BaseTest
String[] schemaWithCount = new String[]{idName, nameName, incomeName, expiryDateName, digestName, "legend_persistence_count"};

String[] schemaWithVersionAndCount = new String[]{idName, nameName, versionName, incomeName, expiryDateName, digestName, "legend_persistence_count"};
String[] schemaWithVersionCountAndDataSplit = new String[]{idName, nameName, versionName, incomeName, expiryDateName, digestName, "legend_persistence_count", DATA_SPLIT};


// Scenario 1
Expand Down Expand Up @@ -186,15 +188,13 @@ void testFilterDupsMaxVersionDoNotPerform() throws Exception
loadDataIntoStagingTableWithVersion(srcDataPath);

performDedupAndVersioining(datasets, ingestMode);
// Validate tempTableExists
verifyResults(expectedDataPath, schemaWithVersionAndCount);
}


// Scenario 8
@Test
void testFilterDupsMaxVersion()
{
void testFilterDupsMaxVersion() throws Exception {
DatasetDefinition mainTable = TestUtils.getDefaultMainTable();
DatasetDefinition stagingTable = getStagingTableWithVersion();
Datasets datasets = Datasets.of(mainTable, stagingTable);
Expand All @@ -205,12 +205,12 @@ void testFilterDupsMaxVersion()
.build();

createStagingTableWithVersion();
// TODO LOAD DATA
String srcDataPath = "src/test/resources/data/dedup-and-versioning/input/data2.csv";
String expectedDataPath = "src/test/resources/data/dedup-and-versioning/expected/expected_data2_filter_dups_max_versioning.csv";
loadDataIntoStagingTableWithVersion(srcDataPath);

performDedupAndVersioining(datasets, ingestMode);

// Validate tempTableExists
Assertions.assertEquals(true, h2Sink.doesTableExist(getTempStagingDataset()));
verifyResults(expectedDataPath, schemaWithVersionAndCount);
}

// Scenario 9
Expand All @@ -224,7 +224,8 @@ void testFilterDupsAllVersionDoNotPerform() throws Exception
.auditing(DateTimeAuditing.builder().dateTimeField("append_time").build())
.digestField("digest")
.deduplicationStrategy(FilterDuplicates.builder().build())
.versioningStrategy(AllVersionsStrategy.builder().versioningField("version").versioningComparator(VersioningComparator.ALWAYS).build())
.versioningStrategy(AllVersionsStrategy.builder().versioningField("version")
.versioningComparator(VersioningComparator.ALWAYS).performVersioning(false).build())
.build();

createStagingTableWithVersion();
Expand All @@ -239,7 +240,7 @@ void testFilterDupsAllVersionDoNotPerform() throws Exception

// Scenario 10
@Test
void testFilterDupsAllVersion()
void testFilterDupsAllVersion() throws Exception
{
DatasetDefinition mainTable = TestUtils.getDefaultMainTable();
DatasetDefinition stagingTable = getStagingTableWithVersion();
Expand All @@ -248,16 +249,18 @@ void testFilterDupsAllVersion()
.auditing(DateTimeAuditing.builder().dateTimeField("append_time").build())
.digestField("digest")
.deduplicationStrategy(FilterDuplicates.builder().build())
.versioningStrategy(AllVersionsStrategy.builder().versioningField("version").versioningComparator(VersioningComparator.ALWAYS).build())
.versioningStrategy(AllVersionsStrategy.builder().versioningField("version")
.versioningComparator(VersioningComparator.ALWAYS).performVersioning(true).build())
.build();

createStagingTableWithVersion();
// TODO LOAD DATA
String srcDataPath = "src/test/resources/data/dedup-and-versioning/input/data2.csv";
String expectedDataPath = "src/test/resources/data/dedup-and-versioning/expected/expected_data2_filter_dups_all_version.csv";
loadDataIntoStagingTableWithVersion(srcDataPath);

performDedupAndVersioining(datasets, ingestMode);

// Validate tempTableExists
Assertions.assertEquals(true, h2Sink.doesTableExist(getTempStagingDataset()));
verifyResults(expectedDataPath, schemaWithVersionCountAndDataSplit);
}

private DatasetDefinition getStagingTableWithoutVersion()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
1,Andy,1,1000,2012-01-01,digest1,1,1
1,Andy,2,2000,2012-01-02,digest2,1,2
1,Andy,3,3000,2012-01-03,digest3,1,3
2,Becky,1,4000,2012-01-04,digest4,2,1
3,Cathy,1,5000,2012-01-05,digest5,1,1
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
1,Andy,3,3000,2012-01-03,digest3,1
2,Becky,1,4000,2012-01-04,digest4,2
3,Cathy,1,5000,2012-01-05,digest5,1
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,4 @@
1,Andy,3,3000,2012-01-03,digest3
2,Becky,1,4000,2012-01-04,digest4
2,Becky,1,4000,2012-01-04,digest4
3,Cathy,1,5000,2012-01-05,digest5
3,Cathy,1,6000,2012-01-06,digest6
3,Cathy,1,5000,2012-01-05,digest5
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
1,Andy,1,1000,2012-01-01,digest1
1,Andy,2,2000,2012-01-02,digest2
1,Andy,3,3000,2012-01-03,digest3
2,Becky,1,4000,2012-01-04,digest4
2,Becky,1,4000,2012-01-04,digest4
3,Cathy,1,5000,2012-01-05,digest5
3,Cathy,1,6000,2012-01-06,digest6

0 comments on commit 69db789

Please sign in to comment.