Skip to content

Commit

Permalink
Fix the sqls for dedup and versioning and add H2 tests
Browse files Browse the repository at this point in the history
  • Loading branch information
prasar-ashutosh committed Oct 6, 2023
1 parent c904d99 commit 453f3ef
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,8 @@ public static Dataset getTempStagingDatasetDefinition(Dataset stagingDataset, In
public static Dataset getDedupedAndVersionedDataset(DeduplicationStrategy deduplicationStrategy, VersioningStrategy versioningStrategy, Dataset stagingDataset, List<String> primaryKeys)
{
Dataset dedupedDataset = deduplicationStrategy.accept(new DatasetDeduplicationHandler(stagingDataset));
if (dedupedDataset instanceof Selection)
boolean isTempTableNeededForVersioning = versioningStrategy.accept(VersioningVisitors.IS_TEMP_TABLE_NEEDED);
if (isTempTableNeededForVersioning && dedupedDataset instanceof Selection)
{
Selection selection = (Selection) dedupedDataset;
dedupedDataset = selection.withAlias(stagingDataset.datasetReference().alias());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,9 @@ public void verifyNontemporalSnapshotWithAuditingFilterDupsNoVersioning(Generato
String cleanupTempStagingTableSql = "DELETE FROM \"mydb\".\"staging_legend_persistence_temp_staging\" as stage";
String insertTempStagingTableSql = "INSERT INTO \"mydb\".\"staging_legend_persistence_temp_staging\" " +
"(\"id\", \"name\", \"amount\", \"biz_date\", \"legend_persistence_count\") " +
"((SELECT stage.\"id\",stage.\"name\",stage.\"amount\",stage.\"biz_date\",COUNT(*) as \"legend_persistence_count\" " +
"FROM \"mydb\".\"staging\" as stage GROUP BY stage.\"id\", stage.\"name\", stage.\"amount\", stage.\"biz_date\") as stage)";

"(SELECT stage.\"id\",stage.\"name\",stage.\"amount\",stage.\"biz_date\"," +
"COUNT(*) as \"legend_persistence_count\" FROM \"mydb\".\"staging\" as stage " +
"GROUP BY stage.\"id\", stage.\"name\", stage.\"amount\", stage.\"biz_date\")";

Assertions.assertEquals(AnsiTestArtifacts.expectedBaseTableWithAuditPkCreateQuery, preActionsSqlList.get(0));
Assertions.assertEquals(createTempStagingTable, preActionsSqlList.get(1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@
import org.finos.legend.engine.persistence.components.relational.api.RelationalIngestor;
import org.finos.legend.engine.persistence.components.relational.h2.H2Sink;
import org.finos.legend.engine.persistence.components.relational.jdbc.JdbcConnection;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import static org.finos.legend.engine.persistence.components.TestUtils.*;
import static org.finos.legend.engine.persistence.components.util.LogicalPlanUtils.TEMP_STAGING_DATASET_BASE_NAME;

public class TestDedupAndVersioning extends BaseTest
{
Expand Down Expand Up @@ -89,16 +91,10 @@ void testNoDedupNoVersioning() throws Exception
IngestMode ingestMode = NontemporalSnapshot.builder()
.auditing(NoAuditing.builder().build())
.build();
RelationalIngestor ingestor = RelationalIngestor.builder()
.ingestMode(ingestMode)
.relationalSink(H2Sink.get())
.build();

// Create staging Table without PKS and load Data into it

Executor executor = ingestor.init(JdbcConnection.of(h2Sink.connection()));
datasets = ingestor.create(datasets);
datasets = ingestor.dedupAndVersion(datasets);
performDedupAndVersioining(datasets, ingestMode);
// Validate tempTableExists
Assertions.assertEquals(false, h2Sink.doesTableExist(getTempStagingDataset()));
}

@Test
Expand All @@ -112,17 +108,12 @@ void testFilterDupsNoVersioning() throws Exception
.auditing(NoAuditing.builder().build())
.deduplicationStrategy(FilterDuplicates.builder().build())
.build();
RelationalIngestor ingestor = RelationalIngestor.builder()
.ingestMode(ingestMode)
.relationalSink(H2Sink.get())
.build();

// Create staging Table without PKS and load Data into it
createStagingTableWithoutVersion();
// TODO LOAD DATA

Executor executor = ingestor.init(JdbcConnection.of(h2Sink.connection()));
datasets = ingestor.create(datasets);
datasets = ingestor.dedupAndVersion(datasets);
performDedupAndVersioining(datasets, ingestMode);
// Validate tempTableExists
Assertions.assertEquals(true, h2Sink.doesTableExist(getTempStagingDataset()));
}


Expand All @@ -138,17 +129,14 @@ void testFilterDupsMaxVersion() throws Exception
.deduplicationStrategy(FilterDuplicates.builder().build())
.versioningStrategy(MaxVersionStrategy.builder().versioningField("version").build())
.build();
RelationalIngestor ingestor = RelationalIngestor.builder()
.ingestMode(ingestMode)
.relationalSink(H2Sink.get())
.build();

// Create staging Table without PKS and load Data into it
createStagingTableWithVersion();
// TODO LOAD DATA

Executor executor = ingestor.init(JdbcConnection.of(h2Sink.connection()));
datasets = ingestor.create(datasets);
datasets = ingestor.dedupAndVersion(datasets);
performDedupAndVersioining(datasets, ingestMode);

// Validate tempTableExists
Assertions.assertEquals(true, h2Sink.doesTableExist(getTempStagingDataset()));
}

@Test
Expand All @@ -164,17 +152,14 @@ void testFilterDupsAllVersion() throws Exception
.deduplicationStrategy(FilterDuplicates.builder().build())
.versioningStrategy(AllVersionsStrategy.builder().versioningField("version").build())
.build();
RelationalIngestor ingestor = RelationalIngestor.builder()
.ingestMode(ingestMode)
.relationalSink(H2Sink.get())
.build();

// Create staging Table without PKS and load Data into it
createStagingTableWithVersion();
// TODO LOAD DATA

Executor executor = ingestor.init(JdbcConnection.of(h2Sink.connection()));
datasets = ingestor.create(datasets);
datasets = ingestor.dedupAndVersion(datasets);
performDedupAndVersioining(datasets, ingestMode);

// Validate tempTableExists
Assertions.assertEquals(true, h2Sink.doesTableExist(getTempStagingDataset()));
}

private DatasetDefinition getStagingTableWithoutVersion()
Expand All @@ -186,6 +171,14 @@ private DatasetDefinition getStagingTableWithoutVersion()
.build();
}

private Dataset getTempStagingDataset()
{
return DatasetReferenceImpl.builder()
.group(testSchemaName)
.name(stagingTableName + "_" + TEMP_STAGING_DATASET_BASE_NAME)
.build();
}

private DatasetDefinition getStagingTableWithVersion()
{
return DatasetDefinition.builder()
Expand Down Expand Up @@ -219,7 +212,15 @@ private void createStagingTableWithVersion()
h2Sink.executeStatement(createSql);
}

private static void performDedupAndVersioining(Datasets datasets, IngestMode ingestMode) {
RelationalIngestor ingestor = RelationalIngestor.builder()
.ingestMode(ingestMode)
.relationalSink(H2Sink.get())
.build();



Executor executor = ingestor.init(JdbcConnection.of(h2Sink.connection()));
datasets = ingestor.create(datasets);
datasets = ingestor.dedupAndVersion(datasets);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
1,Andy,1,1000,2012-01-01,digest1,1
1,Andy,2,2000,2012-01-02,digest2,2
1,Andy,3,3000,2012-01-03,digest3,3
2,Becky,1,4000,2012-01-04,digest4,1
2,Becky,1,4000,2012-01-04,digest4,1
3,Cathy,1,5000,2012-01-05,digest5,1
3,Cathy,1,6000,2012-01-06,digest6,1
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
1,Andy,1000,2012-01-01,digest1
1,Andy,1000,2012-01-01,digest1
1,Andy,1000,2012-01-01,digest1
2,Becky,2000,2012-01-02,digest2
2,Becky,2000,2012-01-02,digest2
3,Cathy,3000,2012-01-03,digest3
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
1,Andy,3,3000,2012-01-03,digest3
2,Becky,1,4000,2012-01-04,digest4
2,Becky,1,4000,2012-01-04,digest4
3,Cathy,1,5000,2012-01-05,digest5
3,Cathy,1,6000,2012-01-06,digest6
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
1,Andy,1,1000,2012-01-01,digest1,1,1
1,Andy,2,2000,2012-01-02,digest2,1,2
1,Andy,3,3000,2012-01-03,digest3,1,3
2,Becky,1,4000,2012-01-04,digest4,2,1
3,Cathy,1,5000,2012-01-05,digest5,1,1
3,Cathy,1,6000,2012-01-06,digest6,1,1

0 comments on commit 453f3ef

Please sign in to comment.