Skip to content

Commit

Permalink
Merge branch 'all_versions' of https://github.com/prasar-ashutosh/leg…
Browse files Browse the repository at this point in the history
…end-engine into all_versions
  • Loading branch information
kumuwu committed Oct 6, 2023
2 parents ed6d37c + 453f3ef commit fd32f7c
Show file tree
Hide file tree
Showing 20 changed files with 321 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ private Dataset selectionWithGroupByAllColumns()
Selection selectionWithGroupByAllColumns = Selection.builder()
.source(stagingDataset)
.addAllFields(allColumnsWithCount)
.alias(stagingDataset.datasetReference().alias())
.groupByFields(allColumns)
.build();
return selectionWithGroupByAllColumns;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ public Dataset visitMaxVersionStrategy(MaxVersionStrategyAbstract maxVersionStra
.source(selectionWithRank)
.addAllFields(allColumns)
.condition(rankFilterCondition)
.alias(dataset.datasetReference().alias())
.build();

return enrichedStagingDataset;
Expand Down Expand Up @@ -111,7 +110,6 @@ public Dataset visitAllVersionsStrategy(AllVersionsStrategyAbstract allVersionsS
Selection selectionWithRank = Selection.builder()
.source(dataset)
.addAllFields(allColumnsWithRank)
.alias(dataset.datasetReference().alias())
.build();
return selectionWithRank;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ public LogicalPlan buildLogicalPlanForDeduplicationAndVersioning(Resources resou
if (isTempTableNeededForStaging)
{
operations.add(Delete.builder().dataset(tempStagingDataset()).build());
Dataset dedupAndVersionedDataset = LogicalPlanUtils.getTempStagingDataset(ingestMode(), originalStagingDataset(), primaryKeys);
Dataset dedupAndVersionedDataset = LogicalPlanUtils.getDedupedAndVersionedDataset(ingestMode.deduplicationStrategy(), ingestMode.versioningStrategy(), originalStagingDataset(), primaryKeys);
List<Value> fieldsToInsert = new ArrayList<>(dedupAndVersionedDataset.schemaReference().fieldValues());
operations.add(Insert.of(tempStagingDataset(), dedupAndVersionedDataset, fieldsToInsert));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -457,11 +457,15 @@ public static Dataset getTempStagingDatasetDefinition(Dataset stagingDataset, In
.build();
}

public static Dataset getTempStagingDataset(IngestMode ingestMode, Dataset stagingDataset, List<String> primaryKeys)
public static Dataset getDedupedAndVersionedDataset(DeduplicationStrategy deduplicationStrategy, VersioningStrategy versioningStrategy, Dataset stagingDataset, List<String> primaryKeys)
{
DeduplicationStrategy deduplicationStrategy = ingestMode.deduplicationStrategy();
VersioningStrategy versioningStrategy = ingestMode.versioningStrategy();
Dataset dedupedDataset = deduplicationStrategy.accept(new DatasetDeduplicationHandler(stagingDataset));
boolean isTempTableNeededForVersioning = versioningStrategy.accept(VersioningVisitors.IS_TEMP_TABLE_NEEDED);
if (isTempTableNeededForVersioning && dedupedDataset instanceof Selection)
{
Selection selection = (Selection) dedupedDataset;
dedupedDataset = selection.withAlias(stagingDataset.datasetReference().alias());
}
Dataset versionedDataset = versioningStrategy.accept(new DatasetVersioningHandler(dedupedDataset, primaryKeys));
return versionedDataset;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,11 +415,10 @@ public static String getDropTempTableQuery(String tableName)

public static String expectedInsertIntoBaseTempStagingWithMaxVersionAndAllowDuplicates = "INSERT INTO \"mydb\".\"staging_legend_persistence_temp_staging\" " +
"(\"id\", \"name\", \"amount\", \"biz_date\", \"legend_persistence_count\") " +
"((SELECT stage.\"id\",stage.\"name\",stage.\"amount\",stage.\"biz_date\",stage.\"legend_persistence_count\" as \"legend_persistence_count\" FROM " +
"(SELECT stage.\"id\",stage.\"name\",stage.\"amount\",stage.\"biz_date\",stage.\"legend_persistence_count\" as \"legend_persistence_count\"," +
"DENSE_RANK() OVER (PARTITION BY stage.\"id\",stage.\"name\" ORDER BY stage.\"biz_date\" DESC) as \"legend_persistence_rank\" " +
"FROM " +
"(SELECT stage.\"id\",stage.\"name\",stage.\"amount\",stage.\"biz_date\",COUNT(*) as \"legend_persistence_count\" " +
"FROM \"mydb\".\"staging\" as stage GROUP BY stage.\"id\", stage.\"name\", stage.\"amount\", stage.\"biz_date\") as stage) as stage " +
"WHERE stage.\"legend_persistence_rank\" = 1) as stage)";
"(SELECT stage.\"id\",stage.\"name\",stage.\"amount\",stage.\"biz_date\",stage.\"legend_persistence_count\" as \"legend_persistence_count\" FROM " +
"(SELECT stage.\"id\",stage.\"name\",stage.\"amount\",stage.\"biz_date\",stage.\"legend_persistence_count\" as \"legend_persistence_count\",DENSE_RANK() OVER " +
"(PARTITION BY stage.\"id\",stage.\"name\" ORDER BY stage.\"biz_date\" DESC) as \"legend_persistence_rank\" FROM " +
"(SELECT stage.\"id\",stage.\"name\",stage.\"amount\",stage.\"biz_date\",COUNT(*) as \"legend_persistence_count\" FROM " +
"\"mydb\".\"staging\" as stage GROUP BY stage.\"id\", stage.\"name\", stage.\"amount\", stage.\"biz_date\") as stage) as stage " +
"WHERE stage.\"legend_persistence_rank\" = 1)";
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,9 @@ public void verifyNontemporalSnapshotWithAuditingFilterDupsNoVersioning(Generato
String cleanupTempStagingTableSql = "DELETE FROM \"mydb\".\"staging_legend_persistence_temp_staging\" as stage";
String insertTempStagingTableSql = "INSERT INTO \"mydb\".\"staging_legend_persistence_temp_staging\" " +
"(\"id\", \"name\", \"amount\", \"biz_date\", \"legend_persistence_count\") " +
"((SELECT stage.\"id\",stage.\"name\",stage.\"amount\",stage.\"biz_date\",COUNT(*) as \"legend_persistence_count\" " +
"FROM \"mydb\".\"staging\" as stage GROUP BY stage.\"id\", stage.\"name\", stage.\"amount\", stage.\"biz_date\") as stage)";

"(SELECT stage.\"id\",stage.\"name\",stage.\"amount\",stage.\"biz_date\"," +
"COUNT(*) as \"legend_persistence_count\" FROM \"mydb\".\"staging\" as stage " +
"GROUP BY stage.\"id\", stage.\"name\", stage.\"amount\", stage.\"biz_date\")";

Assertions.assertEquals(AnsiTestArtifacts.expectedBaseTableWithAuditPkCreateQuery, preActionsSqlList.get(0));
Assertions.assertEquals(createTempStagingTable, preActionsSqlList.get(1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ public class DatasetDeduplicationHandlerTest extends IngestModeTest
.schema(baseTableSchemaWithVersion)
.build();

String expectedSql = "(SELECT stage.\"id\",stage.\"name\",stage.\"version\",stage.\"biz_date\",COUNT(*) as \"legend_persistence_count\" " +
String expectedSql = "SELECT stage.\"id\",stage.\"name\",stage.\"version\",stage.\"biz_date\",COUNT(*) as \"legend_persistence_count\" " +
"FROM \"my_db\".\"my_schema\".\"my_table\" as stage " +
"GROUP BY stage.\"id\", stage.\"name\", stage.\"version\", stage.\"biz_date\") as stage";
"GROUP BY stage.\"id\", stage.\"name\", stage.\"version\", stage.\"biz_date\"";

@Test
public void testDatasetDeduplicationFailOnDuplicates()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@
import org.finos.legend.engine.persistence.components.common.DatasetFilter;
import org.finos.legend.engine.persistence.components.common.FilterType;
import org.finos.legend.engine.persistence.components.ingestmode.deduplication.*;
import org.finos.legend.engine.persistence.components.ingestmode.versioning.AllVersionsStrategy;
import org.finos.legend.engine.persistence.components.ingestmode.versioning.DatasetVersioningHandler;
import org.finos.legend.engine.persistence.components.ingestmode.versioning.MaxVersionStrategy;
import org.finos.legend.engine.persistence.components.ingestmode.versioning.NoVersioningStrategy;
import org.finos.legend.engine.persistence.components.ingestmode.versioning.*;
import org.finos.legend.engine.persistence.components.logicalplan.LogicalPlan;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.DatasetDefinition;
Expand Down Expand Up @@ -77,10 +74,10 @@ public void testVersioningHandlerMaxVersionStrategy()
LogicalPlan logicalPlan = LogicalPlan.builder().addOps(versionedSelection).build();
SqlPlan physicalPlan = transformer.generatePhysicalPlan(logicalPlan);
List<String> list = physicalPlan.getSqlList();
String expectedSql = "(SELECT stage.\"id\",stage.\"name\",stage.\"version\",stage.\"biz_date\" " +
String expectedSql = "SELECT stage.\"id\",stage.\"name\",stage.\"version\",stage.\"biz_date\" " +
"FROM (SELECT stage.\"id\",stage.\"name\",stage.\"version\",stage.\"biz_date\",DENSE_RANK() OVER " +
"(PARTITION BY stage.\"id\",stage.\"name\" ORDER BY stage.\"version\" DESC) as \"legend_persistence_rank\" " +
"FROM \"my_db\".\"my_schema\".\"my_table\" as stage) as stage WHERE stage.\"legend_persistence_rank\" = 1) as stage";
"FROM \"my_db\".\"my_schema\".\"my_table\" as stage) as stage WHERE stage.\"legend_persistence_rank\" = 1";
Assertions.assertEquals(expectedSql, list.get(0));
}

Expand All @@ -93,28 +90,30 @@ public void testVersioningHandlerAllVersionsStrategy()
LogicalPlan logicalPlan = LogicalPlan.builder().addOps(versionedSelection).build();
SqlPlan physicalPlan = transformer.generatePhysicalPlan(logicalPlan);
List<String> list = physicalPlan.getSqlList();
String expectedSql = "(SELECT stage.\"id\",stage.\"name\",stage.\"version\",stage.\"biz_date\"," +
String expectedSql = "SELECT stage.\"id\",stage.\"name\",stage.\"version\",stage.\"biz_date\"," +
"DENSE_RANK() OVER (PARTITION BY stage.\"id\",stage.\"name\" ORDER BY stage.\"version\" ASC) as \"legend_persistence_data_split\" " +
"FROM \"my_db\".\"my_schema\".\"my_table\" as stage) as stage";
"FROM \"my_db\".\"my_schema\".\"my_table\" as stage";
Assertions.assertEquals(expectedSql, list.get(0));
}

@Test
public void testVersioningHandlerWithDeduplicationHandler()
{
Dataset dedupedDataset = FailOnDuplicates.builder().build().accept(new DatasetDeduplicationHandler(derivedStagingDataset));
Dataset versionedDataset = AllVersionsStrategy.builder().versioningField("version").build().accept(new DatasetVersioningHandler(dedupedDataset, primaryKeys));
Selection versionedSelection = (Selection) versionedDataset;
DeduplicationStrategy deduplicationStrategy = FailOnDuplicates.builder().build();
VersioningStrategy versioningStrategy = AllVersionsStrategy.builder().versioningField("version").build();
Dataset dedupAndVersionedDataset = LogicalPlanUtils.getDedupedAndVersionedDataset(deduplicationStrategy, versioningStrategy, derivedStagingDataset, primaryKeys);

Selection versionedSelection = (Selection) dedupAndVersionedDataset;
RelationalTransformer transformer = new RelationalTransformer(AnsiSqlSink.get(), transformOptions);
LogicalPlan logicalPlan = LogicalPlan.builder().addOps(versionedSelection).build();
SqlPlan physicalPlan = transformer.generatePhysicalPlan(logicalPlan);
List<String> list = physicalPlan.getSqlList();
String expectedSql = "(SELECT stage.\"id\",stage.\"name\",stage.\"version\",stage.\"biz_date\"," +
String expectedSql = "SELECT stage.\"id\",stage.\"name\",stage.\"version\",stage.\"biz_date\"," +
"stage.\"legend_persistence_count\" as \"legend_persistence_count\"," +
"DENSE_RANK() OVER (PARTITION BY stage.\"id\",stage.\"name\" ORDER BY stage.\"version\" ASC) as \"legend_persistence_data_split\" " +
"FROM (SELECT stage.\"id\",stage.\"name\",stage.\"version\",stage.\"biz_date\"," +
"COUNT(*) as \"legend_persistence_count\" FROM \"my_db\".\"my_schema\".\"my_table\" as stage WHERE stage.\"bizDate\" = '2020-01-01' " +
"GROUP BY stage.\"id\", stage.\"name\", stage.\"version\", stage.\"biz_date\") as stage) as stage";
"GROUP BY stage.\"id\", stage.\"name\", stage.\"version\", stage.\"biz_date\") as stage";
Assertions.assertEquals(expectedSql, list.get(0));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -391,11 +391,10 @@ public class BigQueryTestArtifacts

public static String expectedInsertIntoBaseTempStagingWithMaxVersionAndAllowDuplicates = "INSERT INTO `mydb`.`staging_legend_persistence_temp_staging` " +
"(`id`, `name`, `amount`, `biz_date`, `legend_persistence_count`) " +
"((SELECT stage.`id`,stage.`name`,stage.`amount`,stage.`biz_date`,stage.`legend_persistence_count` as `legend_persistence_count` FROM " +
"(SELECT stage.`id`,stage.`name`,stage.`amount`,stage.`biz_date`,stage.`legend_persistence_count` as `legend_persistence_count` FROM " +
"(SELECT stage.`id`,stage.`name`,stage.`amount`,stage.`biz_date`,stage.`legend_persistence_count` as `legend_persistence_count`," +
"DENSE_RANK() OVER (PARTITION BY stage.`id`,stage.`name` ORDER BY stage.`biz_date` DESC) as `legend_persistence_rank` " +
"FROM " +
"(SELECT stage.`id`,stage.`name`,stage.`amount`,stage.`biz_date`,COUNT(*) as `legend_persistence_count` " +
"FROM `mydb`.`staging` as stage GROUP BY stage.`id`, stage.`name`, stage.`amount`, stage.`biz_date`) as stage) as stage " +
"WHERE stage.`legend_persistence_rank` = 1) as stage)";
"FROM (SELECT stage.`id`,stage.`name`,stage.`amount`,stage.`biz_date`,COUNT(*) as `legend_persistence_count` " +
"FROM `mydb`.`staging` as stage GROUP BY stage.`id`, stage.`name`, stage.`amount`, stage.`biz_date`) as stage) " +
"as stage WHERE stage.`legend_persistence_rank` = 1)";
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.finos.legend.engine.persistence.components.ingestmode.NontemporalSnapshot;
import org.finos.legend.engine.persistence.components.ingestmode.audit.DateTimeAuditing;
import org.finos.legend.engine.persistence.components.ingestmode.audit.NoAuditing;
import org.finos.legend.engine.persistence.components.ingestmode.deduplication.FilterDuplicates;
import org.finos.legend.engine.persistence.components.ingestmode.versioning.MaxVersionStrategy;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.DatasetDefinition;
import org.finos.legend.engine.persistence.components.planner.PlannerOptions;
Expand All @@ -30,6 +32,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.logging.Filter;

import static org.finos.legend.engine.persistence.components.TestUtils.batchUpdateTimeName;
import static org.finos.legend.engine.persistence.components.TestUtils.dataSplitName;
Expand All @@ -52,7 +55,8 @@ class NontemporalSnapshotTest extends BaseTest
4. No Auditing & import external CSV dataset
5. Staging has lesser columns than main dataset
6. Staging data cleanup
7. Data Splits enabled
7. With Auditing, Max Version, Filter Duplicates
8. With Auditing, No Version, Fail on Duplicates
*/

/*
Expand Down Expand Up @@ -258,7 +262,7 @@ void testNontemporalSnapshotWithCleanStagingData() throws Exception
Scenario: Test Nontemporal Snapshot when data splits are enabled
*/
@Test
void testNontemporalSnapshotWithDataSplits() throws Exception
void testNontemporalSnapshotWithMaxVersionAndFilterDuplicates() throws Exception
{
DatasetDefinition mainTable = TestUtils.getDefaultMainTable();
String dataPass1 = basePath + "input/with_data_splits/data_pass1.csv";
Expand All @@ -267,7 +271,8 @@ void testNontemporalSnapshotWithDataSplits() throws Exception
// Generate the milestoning object
NontemporalSnapshot ingestMode = NontemporalSnapshot.builder()
.auditing(NoAuditing.builder().build())
//.dataSplitField(dataSplitName)
.versioningStrategy(MaxVersionStrategy.of(""))
.deduplicationStrategy(FilterDuplicates.builder().build())
.build();

PlannerOptions options = PlannerOptions.builder().collectStatistics(true).build();
Expand Down
Loading

0 comments on commit fd32f7c

Please sign in to comment.