Skip to content

Commit

Permalink
Fix the logic for incoming records count
Browse files Browse the repository at this point in the history
  • Loading branch information
prasar-ashutosh committed Oct 11, 2023
1 parent 944920e commit 8a6edf9
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@ public IngestMode visitBitemporalDelta(BitemporalDeltaAbstract bitemporalDelta)
.validityMilestoning(bitemporalDelta.validityMilestoning().accept(new ValidityMilestoningCaseConverter()))
.deduplicationStrategy(bitemporalDelta.deduplicationStrategy())
.mergeStrategy(bitemporalDelta.mergeStrategy().accept(new MergeStrategyCaseConverter()))
.deduplicationStrategy(bitemporalDelta.deduplicationStrategy())
.versioningStrategy(bitemporalDelta.versioningStrategy().accept(new VersionStrategyCaseConverter()))
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,7 @@
import org.finos.legend.engine.persistence.components.ingestmode.audit.NoAuditingAbstract;
import org.finos.legend.engine.persistence.components.ingestmode.deduplication.DeduplicationVisitors;
import org.finos.legend.engine.persistence.components.ingestmode.deduplication.FailOnDuplicates;
import org.finos.legend.engine.persistence.components.ingestmode.versioning.AllVersionsStrategyAbstract;
import org.finos.legend.engine.persistence.components.ingestmode.versioning.MaxVersionStrategyAbstract;
import org.finos.legend.engine.persistence.components.ingestmode.versioning.NoVersioningStrategyAbstract;
import org.finos.legend.engine.persistence.components.ingestmode.versioning.VersioningStrategyVisitor;
import org.finos.legend.engine.persistence.components.ingestmode.versioning.VersioningVisitors;
import org.finos.legend.engine.persistence.components.ingestmode.versioning.DeriveDataErrorCheckLogicalPlan;
import org.finos.legend.engine.persistence.components.ingestmode.versioning.*;
import org.finos.legend.engine.persistence.components.logicalplan.LogicalPlan;
import org.finos.legend.engine.persistence.components.logicalplan.LogicalPlanFactory;
import org.finos.legend.engine.persistence.components.logicalplan.conditions.Condition;
Expand Down Expand Up @@ -417,17 +412,29 @@ protected void addPreRunStatsForRowsDeleted(Map<StatisticName, LogicalPlan> preR
protected void addPostRunStatsForIncomingRecords(Map<StatisticName, LogicalPlan> postRunStatisticsResult)
{
Optional<Condition> filterCondition = Optional.empty();
if (dataSplitExecutionSupported())
Value countIncomingRecords = FunctionImpl.builder().functionName(FunctionName.COUNT).alias(INCOMING_RECORD_COUNT.get()).addValue(All.INSTANCE).build();
Dataset dataset = originalStagingDataset();

// If the data splits
if (ingestMode.dataSplitField().isPresent())
{
Optional<Condition> dataSplitInRangeCondition = getDataSplitInRangeConditionForStatistics();
if (dataSplitInRangeCondition.isPresent())
dataset = stagingDataset();
filterCondition = getDataSplitInRangeConditionForStatistics();
Optional<String> duplicateCountFieldName = ingestMode.deduplicationStrategy().accept(DeduplicationVisitors.EXTRACT_DEDUP_FIELD);
// if the deduplication has been performed
if (duplicateCountFieldName.isPresent())
{
filterCondition = Optional.of(dataSplitInRangeCondition.get());
FieldValue duplicateCountField = FieldValue.builder().fieldName(duplicateCountFieldName.get()).datasetRef(dataset.datasetReference()).build();
countIncomingRecords = FunctionImpl.builder().functionName(FunctionName.SUM).alias(INCOMING_RECORD_COUNT.get()).addValue(duplicateCountField).build();
}
}

LogicalPlan incomingRecordCountPlan = LogicalPlan.builder()
.addOps(LogicalPlanUtils.getRecordCount(stagingDataset(), INCOMING_RECORD_COUNT.get(), filterCondition))
.addOps(Selection.builder()
.source(dataset)
.addFields(countIncomingRecords)
.condition(filterCondition)
.build())
.build();
postRunStatisticsResult.put(INCOMING_RECORD_COUNT, incomingRecordCountPlan);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public void verifyAppendOnlyFailOnDuplicatesWithAuditingAllVersionNoFilterExisti
Assertions.assertEquals(2, generatorResults.size());

// Stats
String incomingRecordCount = "SELECT COUNT(*) as \"incomingRecordCount\" FROM \"mydb\".\"staging_legend_persistence_temp_staging\" as stage " +
String incomingRecordCount = "SELECT SUM(stage.\"legend_persistence_count\") as \"incomingRecordCount\" FROM \"mydb\".\"staging_legend_persistence_temp_staging\" as stage " +
"WHERE (stage.\"data_split\" >= '{DATA_SPLIT_LOWER_BOUND_PLACEHOLDER}') AND (stage.\"data_split\" <= '{DATA_SPLIT_UPPER_BOUND_PLACEHOLDER}')";
String rowsInserted = "SELECT COUNT(*) as \"rowsInserted\" FROM \"mydb\".\"main\" as sink WHERE sink.\"batch_update_time\" = (SELECT MAX(sink.\"batch_update_time\") FROM \"mydb\".\"main\" as sink)";

Expand Down Expand Up @@ -120,7 +120,7 @@ public void verifyAppendOnlyFilterDuplicatesWithAuditingNoVersioningWithFilterEx
assertIfListsAreSameIgnoringOrder(expectedSQL, postActionsSql);

// Stats
String incomingRecordCount = "SELECT COUNT(*) as \"incomingRecordCount\" FROM \"mydb\".\"staging_legend_persistence_temp_staging\" as stage";
String incomingRecordCount = "SELECT COUNT(*) as \"incomingRecordCount\" FROM \"mydb\".\"staging\" as stage";
String rowsInserted = "SELECT COUNT(*) as \"rowsInserted\" FROM \"mydb\".\"main\" as sink WHERE sink.\"batch_update_time\" = (SELECT MAX(sink.\"batch_update_time\") FROM \"mydb\".\"main\" as sink)";
Assertions.assertEquals(incomingRecordCount, queries.postIngestStatisticsSql().get(StatisticName.INCOMING_RECORD_COUNT));
Assertions.assertEquals(rowsUpdated, queries.postIngestStatisticsSql().get(StatisticName.ROWS_UPDATED));
Expand Down Expand Up @@ -152,7 +152,7 @@ public void verifyAppendOnlyFilterDuplicatesWithAuditingAllVersionWithFilterExis
Assertions.assertEquals(2, operations.size());

// Stats
String incomingRecordCount = "SELECT COUNT(*) as \"incomingRecordCount\" FROM \"mydb\".\"staging_legend_persistence_temp_staging\" as stage " +
String incomingRecordCount = "SELECT SUM(stage.\"legend_persistence_count\") as \"incomingRecordCount\" FROM \"mydb\".\"staging_legend_persistence_temp_staging\" as stage " +
"WHERE (stage.\"data_split\" >= '{DATA_SPLIT_LOWER_BOUND_PLACEHOLDER}') AND (stage.\"data_split\" <= '{DATA_SPLIT_UPPER_BOUND_PLACEHOLDER}')";
String rowsInserted = "SELECT COUNT(*) as \"rowsInserted\" FROM \"mydb\".\"main\" as sink WHERE sink.\"batch_update_time\" = (SELECT MAX(sink.\"batch_update_time\") FROM \"mydb\".\"main\" as sink)";

Expand Down Expand Up @@ -216,7 +216,7 @@ public void verifyAppendOnlyFailOnDuplicatesWithAuditingMaxVersionWithFilterExis
Assertions.assertEquals(insertSql, milestoningSqlList.get(0));

// Stats
String incomingRecordCount = "SELECT COUNT(*) as \"incomingRecordCount\" FROM \"mydb\".\"staging_legend_persistence_temp_staging\" as stage";
String incomingRecordCount = "SELECT COUNT(*) as \"incomingRecordCount\" FROM \"mydb\".\"staging\" as stage";
String rowsInserted = "SELECT COUNT(*) as \"rowsInserted\" FROM \"mydb\".\"main\" as sink WHERE sink.\"batch_update_time\" = (SELECT MAX(sink.\"batch_update_time\") FROM \"mydb\".\"main\" as sink)";
Assertions.assertEquals(incomingRecordCount, operations.postIngestStatisticsSql().get(StatisticName.INCOMING_RECORD_COUNT));
Assertions.assertEquals(rowsUpdated, operations.postIngestStatisticsSql().get(StatisticName.ROWS_UPDATED));
Expand Down Expand Up @@ -245,7 +245,7 @@ public void verifyAppendOnlyFilterDuplicatesWithAuditingMaxVersionNoFilterExisti
Assertions.assertEquals(insertSql, milestoningSqlList.get(0));

// Stats
String incomingRecordCount = "SELECT COUNT(*) as \"incomingRecordCount\" FROM \"mydb\".\"staging_legend_persistence_temp_staging\" as stage";
String incomingRecordCount = "SELECT COUNT(*) as \"incomingRecordCount\" FROM \"mydb\".\"staging\" as stage";
String rowsInserted = "SELECT COUNT(*) as \"rowsInserted\" FROM \"mydb\".\"main\" as sink WHERE sink.\"batch_update_time\" = (SELECT MAX(sink.\"batch_update_time\") FROM \"mydb\".\"main\" as sink)";
Assertions.assertEquals(incomingRecordCount, operations.postIngestStatisticsSql().get(StatisticName.INCOMING_RECORD_COUNT));
Assertions.assertEquals(rowsUpdated, operations.postIngestStatisticsSql().get(StatisticName.ROWS_UPDATED));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public void verifyNontemporalSnapshotWithAuditingFilterDupsNoVersioning(Generato
Assertions.assertEquals(AnsiTestArtifacts.expectedInsertIntoBaseTempStagingWithFilterDuplicates, deduplicationAndVersioningSql.get(1));

// Stats
verifyStats(operations, "staging_legend_persistence_temp_staging");
verifyStats(operations, "staging");
}

@Override
Expand Down Expand Up @@ -120,7 +120,7 @@ public void verifyNontemporalSnapshotWithAuditingFailOnDupMaxVersioning(Generato
Assertions.assertEquals(AnsiTestArtifacts.expectedInsertIntoBaseTempStagingWithMaxVersionAndFilterDuplicates, deduplicationAndVersioningSql.get(1));

// Stats
verifyStats(operations, "staging_legend_persistence_temp_staging");
verifyStats(operations, "staging");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,8 @@ public static Optional<Object> getFirstColumnValue(Map<String, Object> row)
Optional<Object> object = Optional.empty();
if (!row.isEmpty())
{
object = row.values().stream().findFirst();
String key = row.keySet().stream().findFirst().orElseThrow(IllegalStateException::new);
object = Optional.ofNullable(row.get(key));
}
return object;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ void testAppendOnlyWithAuditingNoVersioningFilterDuplicatesFilterExistingRecords
// 1. Load staging table
loadBasicStagingData(dataPass2);
// 2. Execute plans and verify results
expectedStats = createExpectedStatsMap(3, 0, 2, 0, 0);
expectedStats = createExpectedStatsMap(4, 0, 2, 0, 0);
executePlansAndVerifyResults(ingestMode, options, datasets, schema, expectedDataPass2, expectedStats, fixedClock_2000_01_02);
}

Expand Down Expand Up @@ -261,7 +261,7 @@ void testAppendOnlyWithAuditingMaxVersionFilterDuplicatesNoFilterExistingRecords
// 1. Load staging table
loadStagingDataWithVersion(dataPass1);
// 2. Execute plans and verify results
Map<String, Object> expectedStats = createExpectedStatsMap(3, 0, 3, 0, 0);
Map<String, Object> expectedStats = createExpectedStatsMap(4, 0, 3, 0, 0);
executePlansAndVerifyResults(ingestMode, options, datasets, schema, expectedDataPass1, expectedStats, fixedClock_2000_01_01);

// ------------ Perform incremental (append) milestoning Pass2 ------------------------
Expand All @@ -270,7 +270,7 @@ void testAppendOnlyWithAuditingMaxVersionFilterDuplicatesNoFilterExistingRecords
// 1. Load staging table
loadStagingDataWithVersion(dataPass2);
// 2. Execute plans and verify results
expectedStats = createExpectedStatsMap(3, 0, 3, 0, 0);
expectedStats = createExpectedStatsMap(4, 0, 3, 0, 0);
executePlansAndVerifyResults(ingestMode, options, datasets, schema, expectedDataPass2, expectedStats, fixedClock_2000_01_02);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ void testNontemporalSnapshotWithMaxVersionAndFilterDuplicates() throws Exception
TestDedupAndVersioning.loadDataIntoStagingTableWithVersion(dataPass1);
// 2. Execute plans and verify results

Map<String, Object> expectedStats = createExpectedStatsMap(3, 0, 3, 0, 0);
Map<String, Object> expectedStats = createExpectedStatsMap(6, 0, 3, 0, 0);
executePlansAndVerifyResults(ingestMode, options, datasets, schema, expectedDataPass1, expectedStats);

// ------------ Perform snapshot milestoning Pass2 ------------------------
Expand Down

0 comments on commit 8a6edf9

Please sign in to comment.