Skip to content

Commit

Permalink
Add tests for H2 nontemporal snaphot
Browse files Browse the repository at this point in the history
  • Loading branch information
prasar-ashutosh committed Oct 10, 2023
1 parent 75f9270 commit 944920e
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@ protected void loadStagingDataForWithoutName(String path) throws Exception
h2Sink.executeStatement(loadSql);
}

protected void validateFileExists(String path) throws Exception
protected static void validateFileExists(String path) throws Exception
{
File f = new File(path);
if (!f.exists())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@
import org.finos.legend.engine.persistence.components.ingestmode.audit.NoAuditing;
import org.finos.legend.engine.persistence.components.ingestmode.deduplication.FilterDuplicates;
import org.finos.legend.engine.persistence.components.ingestmode.versioning.MaxVersionStrategy;
import org.finos.legend.engine.persistence.components.ingestmode.versioning.VersioningComparator;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.DatasetDefinition;
import org.finos.legend.engine.persistence.components.planner.PlannerOptions;
import org.finos.legend.engine.persistence.components.relational.api.DataSplitRange;
import org.finos.legend.engine.persistence.components.versioning.TestDedupAndVersioning;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

Expand All @@ -34,14 +36,7 @@
import java.util.Map;
import java.util.logging.Filter;

import static org.finos.legend.engine.persistence.components.TestUtils.batchUpdateTimeName;
import static org.finos.legend.engine.persistence.components.TestUtils.dataSplitName;
import static org.finos.legend.engine.persistence.components.TestUtils.digestName;
import static org.finos.legend.engine.persistence.components.TestUtils.expiryDateName;
import static org.finos.legend.engine.persistence.components.TestUtils.idName;
import static org.finos.legend.engine.persistence.components.TestUtils.incomeName;
import static org.finos.legend.engine.persistence.components.TestUtils.nameName;
import static org.finos.legend.engine.persistence.components.TestUtils.startTimeName;
import static org.finos.legend.engine.persistence.components.TestUtils.*;

class NontemporalSnapshotTest extends BaseTest
{
Expand Down Expand Up @@ -259,39 +254,54 @@ void testNontemporalSnapshotWithCleanStagingData() throws Exception
}

/*
Scenario: Test Nontemporal Snapshot when data splits are enabled
Scenario: Test Nontemporal Snapshot when MaxVersion and FilterDuplicates are enabled
*/
@Test
void testNontemporalSnapshotWithMaxVersionAndFilterDuplicates() throws Exception
{
DatasetDefinition mainTable = TestUtils.getDefaultMainTable();
String dataPass1 = basePath + "input/with_data_splits/data_pass1.csv";
Dataset stagingTable = TestUtils.getBasicCsvDatasetReferenceTableWithDataSplits(dataPass1);
DatasetDefinition stagingTable = TestDedupAndVersioning.getStagingTableWithVersion();

// Create staging table
TestDedupAndVersioning.createStagingTableWithVersion();

// Generate the milestoning object
NontemporalSnapshot ingestMode = NontemporalSnapshot.builder()
.auditing(NoAuditing.builder().build())
.versioningStrategy(MaxVersionStrategy.of(""))
.versioningStrategy(MaxVersionStrategy.builder().versioningField("version").versioningComparator(VersioningComparator.ALWAYS).build())
.deduplicationStrategy(FilterDuplicates.builder().build())
.build();

PlannerOptions options = PlannerOptions.builder().collectStatistics(true).build();
PlannerOptions options = PlannerOptions.builder().cleanupStagingData(false).collectStatistics(true).build();
Datasets datasets = Datasets.of(mainTable, stagingTable);

String[] schema = new String[]{idName, nameName, incomeName, startTimeName, expiryDateName, digestName};
String[] schema = new String[]{idName, nameName, versionName, incomeName, expiryDateName, digestName};

// ------------ Perform incremental (append) milestoning Pass1 ------------------------
String expectedDataPass1 = basePath + "expected/with_data_splits/expected_pass1.csv";
// Execute plans and verify results
List<DataSplitRange> dataSplitRanges = new ArrayList<>();
dataSplitRanges.add(DataSplitRange.of(1, 1));
dataSplitRanges.add(DataSplitRange.of(2, 2));
dataSplitRanges.add(DataSplitRange.of(3, 3));

List<Map<String, Object>> expectedStatsList = new ArrayList<>();
Map<String, Object> expectedStats = createExpectedStatsMap(5, 0, 3, 0, 0);
expectedStatsList.add(expectedStats);
executePlansAndVerifyResultsWithDataSplits(ingestMode, options, datasets, schema, expectedDataPass1, expectedStatsList, dataSplitRanges);
// ------------ Perform snapshot milestoning Pass1 ------------------------
String dataPass1 = "src/test/resources/data/dedup-and-versioning/input/data2_with_dups_no_data_error.csv";
String expectedDataPass1 = basePath + "expected/max_version_filter_duplicates/expected_pass1.csv";
// 1. Load staging table
TestDedupAndVersioning.loadDataIntoStagingTableWithVersion(dataPass1);
// 2. Execute plans and verify results

Map<String, Object> expectedStats = createExpectedStatsMap(3, 0, 3, 0, 0);
executePlansAndVerifyResults(ingestMode, options, datasets, schema, expectedDataPass1, expectedStats);

// ------------ Perform snapshot milestoning Pass2 ------------------------
// Throw Data Error
String dataPass2 = "src/test/resources/data/dedup-and-versioning/input/data3_with_dups_and_data_error.csv";
// 1. Load staging table
TestDedupAndVersioning.loadDataIntoStagingTableWithVersion(dataPass2);
// 2. Execute plans and verify results
try
{
executePlansAndVerifyResults(ingestMode, options, datasets, schema, expectedDataPass1, expectedStats);
Assertions.fail("Should not succeed");
}
catch (Exception e)
{
Assertions.assertEquals("Encountered Data errors (same PK, same version but different data), hence failing the batch", e.getMessage());
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,15 @@ public class TestDedupAndVersioning extends BaseTest
6. Filter Dups, NoVersion -> tempStagingTable with count column
7. Filter Dups, MaxVersion do not perform versioning -> tempStagingTable with count column
8. throw error left] Filter Dups, MaxVersion with perform versioning -> tempStagingTable with count column and only max version [throw Error on Data errors]
8. Filter Dups, MaxVersion with perform versioning -> tempStagingTable with count column and only max version [throw Error on Data errors]
9. Filter Dups, AllVersion do not perform versioning -> tempStagingTable with count column
10. throw error left] Filter Dups, AllVersion with perform versioning -> tempStagingTable with count column and Data splits [throw Error on Data errors]
10. Filter Dups, AllVersion with perform versioning -> tempStagingTable with count column and Data splits [throw Error on Data errors]
11.Fail on Dups, NoVersion -> tempStagingTable with count column [Throw error on dups]
12.Fail on Dups, MaxVersion do not perform versioning -> tempStagingTable with count column [Throw error on dups]
13.Fail on Dups, MaxVersion with perform versioning -> tempStagingTable with count column and only max version [Throw error on dups, throw Error on Data errors]
14.Fail on Dups, AllVersion do not perform versioning -> tempStagingTable with count column [Throw error on dups]
15. Fail on Dups, AllVersion with perform versioning -> tempStagingTable with count column and Data splits [Throw error on dups, throw Error on Data errors]
15.Fail on Dups, AllVersion with perform versioning -> tempStagingTable with count column and Data splits [Throw error on dups, throw Error on Data errors]
*/

private static Field name = Field.builder().name(nameName).type(FieldType.of(DataType.VARCHAR, 64, null)).nullable(false).primaryKey(true).fieldAlias(nameName).build();
Expand Down Expand Up @@ -566,7 +566,7 @@ private Dataset getTempStagingDataset()
.build();
}

private DatasetDefinition getStagingTableWithVersion()
public static DatasetDefinition getStagingTableWithVersion()
{
return DatasetDefinition.builder()
.group(testSchemaName)
Expand All @@ -587,7 +587,7 @@ private void createStagingTableWithoutVersion()
h2Sink.executeStatement(createSql);
}

private void createStagingTableWithVersion()
public static void createStagingTableWithVersion()
{
String createSql = "CREATE TABLE IF NOT EXISTS \"TEST\".\"staging\"" +
"(\"id\" INTEGER NOT NULL," +
Expand Down Expand Up @@ -621,7 +621,7 @@ protected void loadDataIntoStagingTableWithoutVersion(String path) throws Except
h2Sink.executeStatement(loadSql);
}

protected void loadDataIntoStagingTableWithVersion(String path) throws Exception
public static void loadDataIntoStagingTableWithVersion(String path) throws Exception
{
validateFileExists(path);
String loadSql = "TRUNCATE TABLE \"TEST\".\"staging\";" +
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
1,Andy,3,3000,2012-01-03,digest3
2,Becky,1,4000,2012-01-04,digest4
3,Cathy,1,5000,2012-01-05,digest5

This file was deleted.

This file was deleted.

0 comments on commit 944920e

Please sign in to comment.