From 8a6edf984490c73160127dfe9dbaba990b6cc913 Mon Sep 17 00:00:00 2001 From: prasar-ashutosh Date: Wed, 11 Oct 2023 12:00:05 +0800 Subject: [PATCH] Fix the logic for incoming records count --- .../ingestmode/IngestModeCaseConverter.java | 1 - .../components/planner/Planner.java | 29 ++++++++++++------- .../nontemporal/AppendOnlyTest.java | 10 +++---- .../nontemporal/NontemporalSnapshotTest.java | 4 +-- .../components/relational/api/ApiUtils.java | 3 +- .../nontemporal/AppendOnlyTest.java | 6 ++-- .../nontemporal/NontemporalSnapshotTest.java | 2 +- 7 files changed, 31 insertions(+), 24 deletions(-) diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/IngestModeCaseConverter.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/IngestModeCaseConverter.java index d94a4ccf528..942b63e20be 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/IngestModeCaseConverter.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/IngestModeCaseConverter.java @@ -162,7 +162,6 @@ public IngestMode visitBitemporalDelta(BitemporalDeltaAbstract bitemporalDelta) .validityMilestoning(bitemporalDelta.validityMilestoning().accept(new ValidityMilestoningCaseConverter())) .deduplicationStrategy(bitemporalDelta.deduplicationStrategy()) .mergeStrategy(bitemporalDelta.mergeStrategy().accept(new MergeStrategyCaseConverter())) - .deduplicationStrategy(bitemporalDelta.deduplicationStrategy()) .versioningStrategy(bitemporalDelta.versioningStrategy().accept(new VersionStrategyCaseConverter())) .build(); } diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/planner/Planner.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/planner/Planner.java index e678f1ff340..3f21095f169 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/planner/Planner.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/planner/Planner.java @@ -27,12 +27,7 @@ import org.finos.legend.engine.persistence.components.ingestmode.audit.NoAuditingAbstract; import org.finos.legend.engine.persistence.components.ingestmode.deduplication.DeduplicationVisitors; import org.finos.legend.engine.persistence.components.ingestmode.deduplication.FailOnDuplicates; -import org.finos.legend.engine.persistence.components.ingestmode.versioning.AllVersionsStrategyAbstract; -import org.finos.legend.engine.persistence.components.ingestmode.versioning.MaxVersionStrategyAbstract; -import org.finos.legend.engine.persistence.components.ingestmode.versioning.NoVersioningStrategyAbstract; -import org.finos.legend.engine.persistence.components.ingestmode.versioning.VersioningStrategyVisitor; -import org.finos.legend.engine.persistence.components.ingestmode.versioning.VersioningVisitors; -import org.finos.legend.engine.persistence.components.ingestmode.versioning.DeriveDataErrorCheckLogicalPlan; +import org.finos.legend.engine.persistence.components.ingestmode.versioning.*; import org.finos.legend.engine.persistence.components.logicalplan.LogicalPlan; import org.finos.legend.engine.persistence.components.logicalplan.LogicalPlanFactory; import org.finos.legend.engine.persistence.components.logicalplan.conditions.Condition; @@ -417,17 +412,29 @@ protected void addPreRunStatsForRowsDeleted(Map preR protected void addPostRunStatsForIncomingRecords(Map postRunStatisticsResult) { Optional filterCondition = Optional.empty(); - if (dataSplitExecutionSupported()) + Value countIncomingRecords = FunctionImpl.builder().functionName(FunctionName.COUNT).alias(INCOMING_RECORD_COUNT.get()).addValue(All.INSTANCE).build(); + Dataset dataset = originalStagingDataset(); + + // If the data splits + if (ingestMode.dataSplitField().isPresent()) { - Optional dataSplitInRangeCondition = getDataSplitInRangeConditionForStatistics(); - if (dataSplitInRangeCondition.isPresent()) + dataset = stagingDataset(); + filterCondition = getDataSplitInRangeConditionForStatistics(); + Optional duplicateCountFieldName = ingestMode.deduplicationStrategy().accept(DeduplicationVisitors.EXTRACT_DEDUP_FIELD); + // if the deduplication has been performed + if (duplicateCountFieldName.isPresent()) { - filterCondition = Optional.of(dataSplitInRangeCondition.get()); + FieldValue duplicateCountField = FieldValue.builder().fieldName(duplicateCountFieldName.get()).datasetRef(dataset.datasetReference()).build(); + countIncomingRecords = FunctionImpl.builder().functionName(FunctionName.SUM).alias(INCOMING_RECORD_COUNT.get()).addValue(duplicateCountField).build(); } } LogicalPlan incomingRecordCountPlan = LogicalPlan.builder() - .addOps(LogicalPlanUtils.getRecordCount(stagingDataset(), INCOMING_RECORD_COUNT.get(), filterCondition)) + .addOps(Selection.builder() + .source(dataset) + .addFields(countIncomingRecords) + .condition(filterCondition) + .build()) .build(); postRunStatisticsResult.put(INCOMING_RECORD_COUNT, incomingRecordCountPlan); } diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/nontemporal/AppendOnlyTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/nontemporal/AppendOnlyTest.java index 3854284a796..8aa5cbe86d5 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/nontemporal/AppendOnlyTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/nontemporal/AppendOnlyTest.java @@ -82,7 +82,7 @@ public void verifyAppendOnlyFailOnDuplicatesWithAuditingAllVersionNoFilterExisti Assertions.assertEquals(2, generatorResults.size()); // Stats - String incomingRecordCount = "SELECT COUNT(*) as \"incomingRecordCount\" FROM \"mydb\".\"staging_legend_persistence_temp_staging\" as stage " + + String incomingRecordCount = "SELECT SUM(stage.\"legend_persistence_count\") as \"incomingRecordCount\" FROM \"mydb\".\"staging_legend_persistence_temp_staging\" as stage " + "WHERE (stage.\"data_split\" >= '{DATA_SPLIT_LOWER_BOUND_PLACEHOLDER}') AND (stage.\"data_split\" <= '{DATA_SPLIT_UPPER_BOUND_PLACEHOLDER}')"; String rowsInserted = "SELECT COUNT(*) as \"rowsInserted\" FROM \"mydb\".\"main\" as sink WHERE sink.\"batch_update_time\" = (SELECT MAX(sink.\"batch_update_time\") FROM \"mydb\".\"main\" as sink)"; @@ -120,7 +120,7 @@ public void verifyAppendOnlyFilterDuplicatesWithAuditingNoVersioningWithFilterEx assertIfListsAreSameIgnoringOrder(expectedSQL, postActionsSql); // Stats - String incomingRecordCount = "SELECT COUNT(*) as \"incomingRecordCount\" FROM \"mydb\".\"staging_legend_persistence_temp_staging\" as stage"; + String incomingRecordCount = "SELECT COUNT(*) as \"incomingRecordCount\" FROM \"mydb\".\"staging\" as stage"; String rowsInserted = "SELECT COUNT(*) as \"rowsInserted\" FROM \"mydb\".\"main\" as sink WHERE sink.\"batch_update_time\" = (SELECT MAX(sink.\"batch_update_time\") FROM \"mydb\".\"main\" as sink)"; Assertions.assertEquals(incomingRecordCount, queries.postIngestStatisticsSql().get(StatisticName.INCOMING_RECORD_COUNT)); Assertions.assertEquals(rowsUpdated, queries.postIngestStatisticsSql().get(StatisticName.ROWS_UPDATED)); @@ -152,7 +152,7 @@ public void verifyAppendOnlyFilterDuplicatesWithAuditingAllVersionWithFilterExis Assertions.assertEquals(2, operations.size()); // Stats - String incomingRecordCount = "SELECT COUNT(*) as \"incomingRecordCount\" FROM \"mydb\".\"staging_legend_persistence_temp_staging\" as stage " + + String incomingRecordCount = "SELECT SUM(stage.\"legend_persistence_count\") as \"incomingRecordCount\" FROM \"mydb\".\"staging_legend_persistence_temp_staging\" as stage " + "WHERE (stage.\"data_split\" >= '{DATA_SPLIT_LOWER_BOUND_PLACEHOLDER}') AND (stage.\"data_split\" <= '{DATA_SPLIT_UPPER_BOUND_PLACEHOLDER}')"; String rowsInserted = "SELECT COUNT(*) as \"rowsInserted\" FROM \"mydb\".\"main\" as sink WHERE sink.\"batch_update_time\" = (SELECT MAX(sink.\"batch_update_time\") FROM \"mydb\".\"main\" as sink)"; @@ -216,7 +216,7 @@ public void verifyAppendOnlyFailOnDuplicatesWithAuditingMaxVersionWithFilterExis Assertions.assertEquals(insertSql, milestoningSqlList.get(0)); // Stats - String incomingRecordCount = "SELECT COUNT(*) as \"incomingRecordCount\" FROM \"mydb\".\"staging_legend_persistence_temp_staging\" as stage"; + String incomingRecordCount = "SELECT COUNT(*) as \"incomingRecordCount\" FROM \"mydb\".\"staging\" as stage"; String rowsInserted = "SELECT COUNT(*) as \"rowsInserted\" FROM \"mydb\".\"main\" as sink WHERE sink.\"batch_update_time\" = (SELECT MAX(sink.\"batch_update_time\") FROM \"mydb\".\"main\" as sink)"; Assertions.assertEquals(incomingRecordCount, operations.postIngestStatisticsSql().get(StatisticName.INCOMING_RECORD_COUNT)); Assertions.assertEquals(rowsUpdated, operations.postIngestStatisticsSql().get(StatisticName.ROWS_UPDATED)); @@ -245,7 +245,7 @@ public void verifyAppendOnlyFilterDuplicatesWithAuditingMaxVersionNoFilterExisti Assertions.assertEquals(insertSql, milestoningSqlList.get(0)); // Stats - String incomingRecordCount = "SELECT COUNT(*) as \"incomingRecordCount\" FROM \"mydb\".\"staging_legend_persistence_temp_staging\" as stage"; + String incomingRecordCount = "SELECT COUNT(*) as \"incomingRecordCount\" FROM \"mydb\".\"staging\" as stage"; String rowsInserted = "SELECT COUNT(*) as \"rowsInserted\" FROM \"mydb\".\"main\" as sink WHERE sink.\"batch_update_time\" = (SELECT MAX(sink.\"batch_update_time\") FROM \"mydb\".\"main\" as sink)"; Assertions.assertEquals(incomingRecordCount, operations.postIngestStatisticsSql().get(StatisticName.INCOMING_RECORD_COUNT)); Assertions.assertEquals(rowsUpdated, operations.postIngestStatisticsSql().get(StatisticName.ROWS_UPDATED)); diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/nontemporal/NontemporalSnapshotTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/nontemporal/NontemporalSnapshotTest.java index f3690880e0a..968e1e321a2 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/nontemporal/NontemporalSnapshotTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/nontemporal/NontemporalSnapshotTest.java @@ -87,7 +87,7 @@ public void verifyNontemporalSnapshotWithAuditingFilterDupsNoVersioning(Generato Assertions.assertEquals(AnsiTestArtifacts.expectedInsertIntoBaseTempStagingWithFilterDuplicates, deduplicationAndVersioningSql.get(1)); // Stats - verifyStats(operations, "staging_legend_persistence_temp_staging"); + verifyStats(operations, "staging"); } @Override @@ -120,7 +120,7 @@ public void verifyNontemporalSnapshotWithAuditingFailOnDupMaxVersioning(Generato Assertions.assertEquals(AnsiTestArtifacts.expectedInsertIntoBaseTempStagingWithMaxVersionAndFilterDuplicates, deduplicationAndVersioningSql.get(1)); // Stats - verifyStats(operations, "staging_legend_persistence_temp_staging"); + verifyStats(operations, "staging"); } @Override diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/ApiUtils.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/ApiUtils.java index 4fc98ab9556..3ea9a5d4d9b 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/ApiUtils.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/ApiUtils.java @@ -231,7 +231,8 @@ public static Optional getFirstColumnValue(Map row) Optional object = Optional.empty(); if (!row.isEmpty()) { - object = row.values().stream().findFirst(); + String key = row.keySet().stream().findFirst().orElseThrow(IllegalStateException::new); + object = Optional.ofNullable(row.get(key)); } return object; } diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/nontemporal/AppendOnlyTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/nontemporal/AppendOnlyTest.java index b4b9a962d9b..e6642250667 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/nontemporal/AppendOnlyTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/nontemporal/AppendOnlyTest.java @@ -172,7 +172,7 @@ void testAppendOnlyWithAuditingNoVersioningFilterDuplicatesFilterExistingRecords // 1. Load staging table loadBasicStagingData(dataPass2); // 2. Execute plans and verify results - expectedStats = createExpectedStatsMap(3, 0, 2, 0, 0); + expectedStats = createExpectedStatsMap(4, 0, 2, 0, 0); executePlansAndVerifyResults(ingestMode, options, datasets, schema, expectedDataPass2, expectedStats, fixedClock_2000_01_02); } @@ -261,7 +261,7 @@ void testAppendOnlyWithAuditingMaxVersionFilterDuplicatesNoFilterExistingRecords // 1. Load staging table loadStagingDataWithVersion(dataPass1); // 2. Execute plans and verify results - Map expectedStats = createExpectedStatsMap(3, 0, 3, 0, 0); + Map expectedStats = createExpectedStatsMap(4, 0, 3, 0, 0); executePlansAndVerifyResults(ingestMode, options, datasets, schema, expectedDataPass1, expectedStats, fixedClock_2000_01_01); // ------------ Perform incremental (append) milestoning Pass2 ------------------------ @@ -270,7 +270,7 @@ void testAppendOnlyWithAuditingMaxVersionFilterDuplicatesNoFilterExistingRecords // 1. Load staging table loadStagingDataWithVersion(dataPass2); // 2. Execute plans and verify results - expectedStats = createExpectedStatsMap(3, 0, 3, 0, 0); + expectedStats = createExpectedStatsMap(4, 0, 3, 0, 0); executePlansAndVerifyResults(ingestMode, options, datasets, schema, expectedDataPass2, expectedStats, fixedClock_2000_01_02); } diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/nontemporal/NontemporalSnapshotTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/nontemporal/NontemporalSnapshotTest.java index 162c7c6f9eb..260a5ef316b 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/nontemporal/NontemporalSnapshotTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/nontemporal/NontemporalSnapshotTest.java @@ -284,7 +284,7 @@ void testNontemporalSnapshotWithMaxVersionAndFilterDuplicates() throws Exception TestDedupAndVersioning.loadDataIntoStagingTableWithVersion(dataPass1); // 2. Execute plans and verify results - Map expectedStats = createExpectedStatsMap(3, 0, 3, 0, 0); + Map expectedStats = createExpectedStatsMap(6, 0, 3, 0, 0); executePlansAndVerifyResults(ingestMode, options, datasets, schema, expectedDataPass1, expectedStats); // ------------ Perform snapshot milestoning Pass2 ------------------------