Skip to content

Commit

Permalink
Add h2 tests
Browse files Browse the repository at this point in the history
  • Loading branch information
kumuwu committed Nov 23, 2023
1 parent 953b744 commit 06cf988
Show file tree
Hide file tree
Showing 28 changed files with 641 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@
import org.finos.legend.engine.persistence.components.logicalplan.LogicalPlanFactory;
import org.finos.legend.engine.persistence.components.logicalplan.conditions.Condition;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.DerivedDataset;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Field;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.FilteredDataset;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Selection;
import org.finos.legend.engine.persistence.components.logicalplan.operations.*;
import org.finos.legend.engine.persistence.components.logicalplan.values.*;
Expand Down Expand Up @@ -137,6 +139,8 @@ default boolean enableConcurrentSafety()
ingestMode.versioningStrategy().accept(new ValidatePrimaryKeysForVersioningStrategy(primaryKeys, this::validatePrimaryKeysNotEmpty));
// 2. Validate if the versioningField is comparable if a versioningStrategy is present
validateVersioningField(ingestMode().versioningStrategy(), stagingDataset());
// 3. cleanupStagingData must be turned off when using DerivedDataset or FilteredDataset
validateCleanUpStagingData(plannerOptions, originalStagingDataset());
}

private Optional<Dataset> getTempStagingDataset()
Expand Down Expand Up @@ -498,6 +502,14 @@ protected void validateVersioningField(VersioningStrategy versioningStrategy, Da
}
}

protected void validateCleanUpStagingData(PlannerOptions plannerOptions, Dataset dataset)
{
if (plannerOptions.cleanupStagingData() && (dataset instanceof DerivedDataset || dataset instanceof FilteredDataset))
{
throw new IllegalStateException("cleanupStagingData cannot be turned on when using DerivedDataset or FilteredDataset");
}
}

// auditing visitor

protected static final AuditEnabled AUDIT_ENABLED = new AuditEnabled();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ public DeleteStatement(Table table, Condition condition)
}

/*
DELETE GENERIC PLAN:
DELETE FROM table-Name [[AS] correlation-Name] [WHERE clause]
*/
DELETE GENERIC PLAN:
DELETE FROM table-Name [[AS] correlation-Name] [WHERE clause]
*/
@Override
public void genSql(StringBuilder builder) throws SqlDomException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,27 @@
import com.opencsv.CSVReader;
import org.finos.legend.engine.persistence.components.common.DatasetFilter;
import org.finos.legend.engine.persistence.components.common.FilterType;
import org.finos.legend.engine.persistence.components.logicalplan.conditions.And;
import org.finos.legend.engine.persistence.components.logicalplan.conditions.Equals;
import org.finos.legend.engine.persistence.components.logicalplan.conditions.GreaterThan;
import org.finos.legend.engine.persistence.components.logicalplan.conditions.GreaterThanEqualTo;
import org.finos.legend.engine.persistence.components.logicalplan.conditions.LessThan;
import org.finos.legend.engine.persistence.components.logicalplan.conditions.LessThanEqualTo;
import org.finos.legend.engine.persistence.components.logicalplan.conditions.Or;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.CsvExternalDatasetReference;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.DataType;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.DatasetDefinition;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.DerivedDataset;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Field;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.FieldType;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.FilteredDataset;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.JsonExternalDatasetReference;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.SchemaDefinition;
import org.finos.legend.engine.persistence.components.executor.RelationalExecutionHelper;
import org.finos.legend.engine.persistence.components.logicalplan.values.FieldValue;
import org.finos.legend.engine.persistence.components.logicalplan.values.NumericalValue;
import org.finos.legend.engine.persistence.components.logicalplan.values.StringValue;
import org.finos.legend.engine.persistence.components.util.MetadataDataset;
import org.junit.jupiter.api.Assertions;

Expand Down Expand Up @@ -368,6 +379,23 @@ public static DatasetDefinition getStagingTableWithNoPks()
.build();
}

public static FilteredDataset getFilteredStagingTableWithComplexFilter()
{
return FilteredDataset.builder()
.group(testSchemaName)
.name(stagingTableName)
.schema(getSchemaWithNoPKs())
.alias(stagingTableName)
.filter(And.builder()
.addConditions(GreaterThan.of(FieldValue.builder().fieldName(incomeName).datasetRefAlias(stagingTableName).build(), NumericalValue.of(1000L)))
.addConditions(Or.builder()
.addConditions(GreaterThanEqualTo.of(FieldValue.builder().fieldName(expiryDateName).datasetRefAlias(stagingTableName).build(), StringValue.of("2022-12-03")))
.addConditions(LessThanEqualTo.of(FieldValue.builder().fieldName(expiryDateName).datasetRefAlias(stagingTableName).build(), StringValue.of("2022-12-01")))
.build())
.build())
.build();
}

public static DatasetDefinition getBasicStagingTableWithExpiryDatePk()
{
return DatasetDefinition.builder()
Expand Down Expand Up @@ -433,6 +461,20 @@ public static DerivedDataset getDerivedStagingTableWithFilter()
.build();
}

public static FilteredDataset getFilteredStagingTable()
{
return FilteredDataset.builder()
.group(testSchemaName)
.name(stagingTableName)
.schema(getStagingSchema())
.alias(stagingTableName)
.filter(Equals.of(FieldValue.builder()
.fieldName(batchName)
.datasetRefAlias(stagingTableName)
.build(), NumericalValue.of(1L)))
.build();
}

public static DerivedDataset getStagingTableWithFilterSecondPass()
{
return DerivedDataset.builder()
Expand All @@ -445,6 +487,20 @@ public static DerivedDataset getStagingTableWithFilterSecondPass()
.build();
}

public static FilteredDataset getFilteredStagingTableSecondPass()
{
return FilteredDataset.builder()
.group(testSchemaName)
.name(stagingTableName)
.schema(getStagingSchema())
.alias(stagingTableName)
.filter(GreaterThan.of(FieldValue.builder()
.fieldName(batchName)
.datasetRefAlias(stagingTableName)
.build(), NumericalValue.of(1L)))
.build();
}

public static DerivedDataset getDerivedStagingTableWithFilterWithVersion()
{
return DerivedDataset.builder()
Expand All @@ -456,6 +512,20 @@ public static DerivedDataset getDerivedStagingTableWithFilterWithVersion()
.build();
}

public static FilteredDataset getFilteredStagingTableWithVersion()
{
return FilteredDataset.builder()
.group(testSchemaName)
.name(stagingTableName)
.schema(getStagingSchemaWithVersion())
.alias(stagingTableName)
.filter(GreaterThanEqualTo.of(FieldValue.builder()
.fieldName(batchName)
.datasetRefAlias(stagingTableName)
.build(), NumericalValue.of(2L)))
.build();
}

public static DerivedDataset getStagingTableWithFilterWithVersionSecondPass()
{
return DerivedDataset.builder()
Expand All @@ -467,6 +537,20 @@ public static DerivedDataset getStagingTableWithFilterWithVersionSecondPass()
.build();
}

public static FilteredDataset getFilteredStagingTableWithVersionSecondPass()
{
return FilteredDataset.builder()
.group(testSchemaName)
.name(stagingTableName)
.schema(getStagingSchemaWithVersion())
.alias(stagingTableName)
.filter(GreaterThanEqualTo.of(FieldValue.builder()
.fieldName(batchName)
.datasetRefAlias(stagingTableName)
.build(), NumericalValue.of(3L)))
.build();
}

public static JsonExternalDatasetReference getBasicJsonDatasetReferenceTable(String dataPath)
{
return JsonExternalDatasetReference.builder()
Expand Down Expand Up @@ -741,6 +825,50 @@ public static DatasetDefinition getEntityPriceWithVersionStagingTable()
.build();
}

public static FilteredDataset getEntityPriceWithVersionFilteredStagingTable()
{
return FilteredDataset.builder()
.group(testSchemaName)
.name(stagingTableName)
.alias(stagingTableName)
.schema(SchemaDefinition.builder()
.addFields(date)
.addFields(entity)
.addFields(price)
.addFields(volume)
.addFields(digest)
.addFields(version)
.build()
)
.filter(GreaterThanEqualTo.of(FieldValue.builder()
.fieldName(volumeName)
.datasetRefAlias(stagingTableName)
.build(), NumericalValue.of(100L)))
.build();
}

public static FilteredDataset getEntityPriceWithVersionFilteredStagingTableSecondPass()
{
return FilteredDataset.builder()
.group(testSchemaName)
.name(stagingTableName)
.alias(stagingTableName)
.schema(SchemaDefinition.builder()
.addFields(date)
.addFields(entity)
.addFields(price)
.addFields(volume)
.addFields(digest)
.addFields(version)
.build()
)
.filter(GreaterThanEqualTo.of(FieldValue.builder()
.fieldName(volumeName)
.datasetRefAlias(stagingTableName)
.build(), NumericalValue.of(500L)))
.build();
}

public static DatasetDefinition getBitemporalMainTable()
{
return DatasetDefinition.builder()
Expand Down Expand Up @@ -950,6 +1078,46 @@ public static DatasetDefinition getBitemporalFromOnlyStagingTableIdBased()
.build();
}

public static FilteredDataset getBitemporalFromOnlyFilteredStagingTableIdBased()
{
return FilteredDataset.builder()
.group(testSchemaName)
.name(stagingTableName)
.schema(SchemaDefinition.builder()
.addFields(index)
.addFields(dateTime)
.addFields(balance)
.addFields(digest)
.build()
)
.alias(stagingTableName)
.filter(LessThanEqualTo.of(FieldValue.builder()
.fieldName(balanceName)
.datasetRefAlias(stagingTableName)
.build(), NumericalValue.of(3L)))
.build();
}

public static FilteredDataset getBitemporalFromOnlyFilteredStagingTableIdBasedSecondPass()
{
return FilteredDataset.builder()
.group(testSchemaName)
.name(stagingTableName)
.schema(SchemaDefinition.builder()
.addFields(index)
.addFields(dateTime)
.addFields(balance)
.addFields(digest)
.build()
)
.alias(stagingTableName)
.filter(LessThanEqualTo.of(FieldValue.builder()
.fieldName(balanceName)
.datasetRefAlias(stagingTableName)
.build(), NumericalValue.of(20L)))
.build();
}

public static DatasetDefinition getBitemporalFromOnlyStagingTableWithoutDuplicatesIdBased()
{
return DatasetDefinition.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.finos.legend.engine.persistence.components.ingestmode.versioning.DigestBasedResolver;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.DatasetDefinition;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.FilteredDataset;
import org.finos.legend.engine.persistence.components.planner.PlannerOptions;
import org.finos.legend.engine.persistence.components.relational.api.DataSplitRange;
import org.junit.jupiter.api.Assertions;
Expand Down Expand Up @@ -849,6 +850,59 @@ void testMilestoningSourceSpecifiesFromSet5WithDataSplitFilterDuplicates() throw
executePlansAndVerifyResultsWithSpecifiedDataSplits(ingestMode, options, datasets, schema, expectedDataPass6, expectedStats, dataSplitRanges);
}

/*
Scenario: Test milestoning Logic with only validity from time specified when staging table pre populated
*/
@Test
void testMilestoningSourceSpecifiesFromSet6WithStagingFilter() throws Exception
{
DatasetDefinition mainTable = TestUtils.getDefaultMainTable();
FilteredDataset stagingTable = TestUtils.getBitemporalFromOnlyFilteredStagingTableIdBased();

String[] schema = new String[] {indexName, balanceName, digestName, startDateTimeName, endDateTimeName, batchIdInName, batchIdOutName};

// Create staging table
createStagingTable(TestUtils.getBitemporalFromOnlyStagingTableIdBased());

BitemporalDelta ingestMode = BitemporalDelta.builder()
.digestField(digestName)
.transactionMilestoning(BatchId.builder()
.batchIdInName(batchIdInName)
.batchIdOutName(batchIdOutName)
.build())
.validityMilestoning(ValidDateTime.builder()
.dateTimeFromName(startDateTimeName)
.dateTimeThruName(endDateTimeName)
.validityDerivation(SourceSpecifiesFromDateTime.builder()
.sourceDateTimeFromField(dateTimeName)
.build())
.build())
.build();

PlannerOptions options = PlannerOptions.builder().cleanupStagingData(false).collectStatistics(true).build();
Datasets datasets = Datasets.builder().mainDataset(mainTable).stagingDataset(stagingTable).build();

// ------------ Perform Pass1 ------------------------
String dataPass1 = basePathForInput + "source_specifies_from/without_delete_ind/set_6_with_staging_filter/staging_data_pass1.csv";
String expectedDataPass1 = basePathForExpected + "source_specifies_from/without_delete_ind/set_6_with_staging_filter/expected_pass1.csv";
// 1. Load Staging table
loadStagingDataForBitemporalFromOnly(dataPass1);
// 2. Execute Plan and Verify Results
Map<String, Object> expectedStats = createExpectedStatsMap(3, 0, 3, 0, 0);
executePlansAndVerifyResults(ingestMode, options, datasets, schema, expectedDataPass1, expectedStats);

// ------------ Perform Pass2 ------------------------
// 0. Create new filter
datasets = Datasets.of(mainTable, TestUtils.getBitemporalFromOnlyFilteredStagingTableIdBasedSecondPass());
String dataPass2 = basePathForInput + "source_specifies_from/without_delete_ind/set_6_with_staging_filter/staging_data_pass2.csv";
String expectedDataPass2 = basePathForExpected + "source_specifies_from/without_delete_ind/set_6_with_staging_filter/expected_pass2.csv";
// 1. Load Staging table
loadStagingDataForBitemporalFromOnly(dataPass2);
// 2. Execute Plan and Verify Results
expectedStats = createExpectedStatsMap(8, 0, 6, 3, 0);
executePlansAndVerifyResults(ingestMode, options, datasets, schema, expectedDataPass2, expectedStats);
}

/*
Scenario: Test milestoning Logic with only validity from time specified when staging table pre populated
*/
Expand Down
Loading

0 comments on commit 06cf988

Please sign in to comment.