From 75f9270fad8fad2ffe74875a8ddc3769b3715f60 Mon Sep 17 00:00:00 2001 From: prasar-ashutosh Date: Tue, 10 Oct 2023 22:36:32 +0800 Subject: [PATCH] Change interface for ingest to return List of Result --- .../logicalplan/LogicalPlanFactory.java | 9 + .../components/planner/Planner.java | 2 +- .../components/e2e/BigQueryEndToEndTest.java | 6 +- .../components/e2e/BulkLoadExecutorTest.java | 4 +- .../components/relational/api/ApiUtils.java | 145 +++++++++++++- .../api/RelationalIngestorAbstract.java | 186 +++--------------- .../persistence/components/BaseTest.java | 4 +- .../ingestmode/mixed/MixedIngestModeTest.java | 6 +- .../mixed/UnitemporalDeltaRunner.java | 2 +- .../unitemporal/MultiTableIngestionTest.java | 4 +- .../versioning/TestDedupAndVersioning.java | 34 ++-- .../extension/PersistenceTestRunner.java | 4 +- 12 files changed, 211 insertions(+), 195 deletions(-) diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/logicalplan/LogicalPlanFactory.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/logicalplan/LogicalPlanFactory.java index b16e938fbc8..f841210323d 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/logicalplan/LogicalPlanFactory.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/logicalplan/LogicalPlanFactory.java @@ -122,4 +122,13 @@ public static LogicalPlan getLogicalPlanForMinAndMaxForField(Dataset dataset, St .source(dataset).build(); return LogicalPlan.builder().addOps(selection).build(); } + + public static LogicalPlan getLogicalPlanForMaxOfField(Dataset dataset, String fieldName) + { + FieldValue field = FieldValue.builder().datasetRef(dataset.datasetReference()).fieldName(fieldName).build(); + Selection selection = Selection.builder() + .addFields(FunctionImpl.builder().functionName(FunctionName.MAX).addValue(field).alias(MAX_OF_FIELD).build()) + .source(dataset).build(); + return LogicalPlan.builder().addOps(selection).build(); + } } diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/planner/Planner.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/planner/Planner.java index ccfb3436f62..e678f1ff340 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/planner/Planner.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/planner/Planner.java @@ -170,7 +170,7 @@ protected Dataset mainDataset() return datasets.mainDataset(); } - protected Dataset stagingDataset() + public Dataset stagingDataset() { return effectiveStagingDataset; } diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/e2e/BigQueryEndToEndTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/e2e/BigQueryEndToEndTest.java index f212f81aa25..b4655ac3a60 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/e2e/BigQueryEndToEndTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/e2e/BigQueryEndToEndTest.java @@ -156,7 +156,7 @@ protected IngestorResult ingestViaExecutorAndVerifyStagingFilters(IngestMode ing // Load csv data loadData(path, datasets.stagingDataset(), 1); RelationalConnection connection = BigQueryConnection.of(getBigQueryConnection()); - IngestorResult ingestorResult = ingestor.performFullIngestion(connection, datasets); + IngestorResult ingestorResult = ingestor.performFullIngestion(connection, datasets).get(0); verifyStagingFilters(ingestor, connection, datasets); return ingestorResult; @@ -383,7 +383,7 @@ public IngestorResult executePlansAndVerifyForCaseConversion(IngestMode ingestMo .caseConversion(CaseConversion.TO_UPPER) .build(); - IngestorResult result = ingestor.performFullIngestion(BigQueryConnection.of(getBigQueryConnection()), datasets); + IngestorResult result = ingestor.performFullIngestion(BigQueryConnection.of(getBigQueryConnection()), datasets).get(0); Map actualStats = result.statisticByName(); @@ -430,7 +430,7 @@ protected IngestorResult executePlansAndVerifyResults(IngestMode ingestMode, Pla .enableSchemaEvolution(options.enableSchemaEvolution()) .schemaEvolutionCapabilitySet(userCapabilitySet) .build(); - IngestorResult result = ingestor.performFullIngestion(BigQueryConnection.of(getBigQueryConnection()), datasets); + IngestorResult result = ingestor.performFullIngestion(BigQueryConnection.of(getBigQueryConnection()), datasets).get(0); Map actualStats = result.statisticByName(); diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/e2e/BulkLoadExecutorTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/e2e/BulkLoadExecutorTest.java index a8efc6b0c04..559c2a0f5db 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/e2e/BulkLoadExecutorTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/e2e/BulkLoadExecutorTest.java @@ -120,7 +120,7 @@ public void testMilestoning() throws IOException, InterruptedException .build(); RelationalConnection connection = BigQueryConnection.of(getBigQueryConnection()); - IngestorResult ingestorResult = ingestor.performFullIngestion(connection, datasets); + IngestorResult ingestorResult = ingestor.performFullIngestion(connection, datasets).get(0); // Verify List> tableData = runQuery("select * from `demo`.`append_log` order by col_int asc"); @@ -178,7 +178,7 @@ public void testMilestoningFailure() throws IOException, InterruptedException .build(); RelationalConnection connection = BigQueryConnection.of(getBigQueryConnection()); - IngestorResult ingestorResult = ingestor.performFullIngestion(connection, datasets); + IngestorResult ingestorResult = ingestor.performFullIngestion(connection, datasets).get(0); // Verify List> tableData = runQuery("select * from `demo`.`append_log` order by col_int asc"); diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/ApiUtils.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/ApiUtils.java index 5a8c413207b..4fc98ab9556 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/ApiUtils.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/ApiUtils.java @@ -14,19 +14,37 @@ package org.finos.legend.engine.persistence.components.relational.api; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.eclipse.collections.api.tuple.Pair; +import org.eclipse.collections.impl.tuple.Tuples; +import org.finos.legend.engine.persistence.components.common.DatasetFilter; import org.finos.legend.engine.persistence.components.common.Datasets; -import org.finos.legend.engine.persistence.components.ingestmode.DeriveMainDatasetSchemaFromStaging; -import org.finos.legend.engine.persistence.components.ingestmode.IngestMode; -import org.finos.legend.engine.persistence.components.ingestmode.IngestModeCaseConverter; +import org.finos.legend.engine.persistence.components.common.FilterType; +import org.finos.legend.engine.persistence.components.common.OptimizationFilter; +import org.finos.legend.engine.persistence.components.executor.Executor; +import org.finos.legend.engine.persistence.components.ingestmode.*; +import org.finos.legend.engine.persistence.components.ingestmode.versioning.AllVersionsStrategy; +import org.finos.legend.engine.persistence.components.logicalplan.LogicalPlan; +import org.finos.legend.engine.persistence.components.logicalplan.LogicalPlanFactory; import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset; import org.finos.legend.engine.persistence.components.logicalplan.datasets.DatasetsCaseConverter; import org.finos.legend.engine.persistence.components.logicalplan.datasets.Field; +import org.finos.legend.engine.persistence.components.planner.Planner; import org.finos.legend.engine.persistence.components.relational.CaseConversion; +import org.finos.legend.engine.persistence.components.relational.SqlPlan; +import org.finos.legend.engine.persistence.components.relational.sql.TabularData; +import org.finos.legend.engine.persistence.components.relational.sqldom.SqlGen; +import org.finos.legend.engine.persistence.components.transformer.Transformer; import org.finos.legend.engine.persistence.components.util.BulkLoadMetadataDataset; import org.finos.legend.engine.persistence.components.util.LockInfoDataset; import org.finos.legend.engine.persistence.components.util.MetadataDataset; -import java.util.List; +import java.util.*; + +import static org.finos.legend.engine.persistence.components.logicalplan.LogicalPlanFactory.MAX_OF_FIELD; +import static org.finos.legend.engine.persistence.components.logicalplan.LogicalPlanFactory.MIN_OF_FIELD; public class ApiUtils { @@ -98,4 +116,123 @@ private static LockInfoDataset getLockInfoDataset(Datasets datasets) return lockInfoDataset; } + public static Optional getNextBatchId(Datasets datasets, Executor executor, + Transformer transformer, IngestMode ingestMode) + { + if (ingestMode.accept(IngestModeVisitors.IS_INGEST_MODE_TEMPORAL) || ingestMode instanceof BulkLoad) + { + LogicalPlan logicalPlanForNextBatchId = LogicalPlanFactory.getLogicalPlanForNextBatchId(datasets, ingestMode); + List tabularData = executor.executePhysicalPlanAndGetResults(transformer.generatePhysicalPlan(logicalPlanForNextBatchId)); + Optional nextBatchId = getFirstColumnValue(getFirstRowForFirstResult(tabularData)); + if (nextBatchId.isPresent()) + { + return retrieveValueAsLong(nextBatchId.get()); + } + } + return Optional.empty(); + } + + public static Optional>> getOptimizationFilterBounds(Datasets datasets, Executor executor, + Transformer transformer, IngestMode ingestMode) + { + List filters = ingestMode.accept(IngestModeVisitors.RETRIEVE_OPTIMIZATION_FILTERS); + if (!filters.isEmpty()) + { + Map> map = new HashMap<>(); + for (OptimizationFilter filter : filters) + { + LogicalPlan logicalPlanForMinAndMaxForField = LogicalPlanFactory.getLogicalPlanForMinAndMaxForField(datasets.stagingDataset(), filter.fieldName()); + List tabularData = executor.executePhysicalPlanAndGetResults(transformer.generatePhysicalPlan(logicalPlanForMinAndMaxForField)); + Map resultMap = getFirstRowForFirstResult(tabularData); + // Put into map only when not null + Object lower = resultMap.get(MIN_OF_FIELD); + Object upper = resultMap.get(MAX_OF_FIELD); + if (lower != null && upper != null) + { + map.put(filter, Tuples.pair(lower, upper)); + } + } + return Optional.of(map); + } + return Optional.empty(); + } + + public static List extractDatasetFilters(MetadataDataset metadataDataset, Executor executor, SqlPlan physicalPlan) throws JsonProcessingException + { + List datasetFilters = new ArrayList<>(); + List results = executor.executePhysicalPlanAndGetResults(physicalPlan); + Optional stagingFilters = results.stream() + .findFirst() + .map(TabularData::getData) + .flatMap(t -> t.stream().findFirst()) + .map(stringObjectMap -> (String) stringObjectMap.get(metadataDataset.stagingFiltersField())); + + // Convert map of Filters to List of Filters + if (stagingFilters.isPresent()) + { + Map> datasetFiltersMap = new ObjectMapper().readValue(stagingFilters.get(), new TypeReference>>() {}); + for (Map.Entry> filtersMapEntry : datasetFiltersMap.entrySet()) + { + for (Map.Entry filterEntry : filtersMapEntry.getValue().entrySet()) + { + DatasetFilter datasetFilter = DatasetFilter.of(filtersMapEntry.getKey(), FilterType.fromName(filterEntry.getKey()), filterEntry.getValue()); + datasetFilters.add(datasetFilter); + } + } + } + return datasetFilters; + } + + public static List getDataSplitRanges(Executor executor, Planner planner, + Transformer transformer, IngestMode ingestMode) + { + List dataSplitRanges = new ArrayList<>(); + if (ingestMode.versioningStrategy() instanceof AllVersionsStrategy) + { + Dataset stagingDataset = planner.stagingDataset(); + String dataSplitField = ingestMode.dataSplitField().get(); + LogicalPlan logicalPlanForMaxOfField = LogicalPlanFactory.getLogicalPlanForMaxOfField(stagingDataset, dataSplitField); + List tabularData = executor.executePhysicalPlanAndGetResults(transformer.generatePhysicalPlan(logicalPlanForMaxOfField)); + Map row = getFirstRowForFirstResult(tabularData); + Long maxDataSplit = retrieveValueAsLong(row.get(MAX_OF_FIELD)).orElseThrow(IllegalStateException::new); + for (int i = 1; i <= maxDataSplit; i++) + { + dataSplitRanges.add(DataSplitRange.of(i, i)); + } + } + return dataSplitRanges; + } + + public static Optional retrieveValueAsLong(Object obj) + { + if (obj instanceof Integer) + { + return Optional.of(Long.valueOf((Integer) obj)); + } + else if (obj instanceof Long) + { + return Optional.of((Long) obj); + } + return Optional.empty(); + } + + public static Map getFirstRowForFirstResult(List tabularData) + { + Map resultMap = tabularData.stream() + .findFirst() + .map(TabularData::getData) + .flatMap(t -> t.stream().findFirst()) + .orElse(Collections.emptyMap()); + return resultMap; + } + + public static Optional getFirstColumnValue(Map row) + { + Optional object = Optional.empty(); + if (!row.isEmpty()) + { + object = row.values().stream().findFirst(); + } + return object; + } } diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/RelationalIngestorAbstract.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/RelationalIngestorAbstract.java index 4a80514876d..0c96a34f450 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/RelationalIngestorAbstract.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/RelationalIngestorAbstract.java @@ -15,29 +15,16 @@ package org.finos.legend.engine.persistence.components.relational.api; import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; import org.eclipse.collections.api.tuple.Pair; -import org.eclipse.collections.impl.tuple.Tuples; import org.finos.legend.engine.persistence.components.common.*; import org.finos.legend.engine.persistence.components.executor.DigestInfo; import org.finos.legend.engine.persistence.components.executor.Executor; import org.finos.legend.engine.persistence.components.importer.Importer; import org.finos.legend.engine.persistence.components.importer.Importers; -import org.finos.legend.engine.persistence.components.ingestmode.DeriveMainDatasetSchemaFromStaging; -import org.finos.legend.engine.persistence.components.ingestmode.IngestMode; -import org.finos.legend.engine.persistence.components.ingestmode.IngestModeOptimizationColumnHandler; -import org.finos.legend.engine.persistence.components.ingestmode.IngestModeVisitors; -import org.finos.legend.engine.persistence.components.ingestmode.BulkLoad; -import org.finos.legend.engine.persistence.components.ingestmode.deduplication.FailOnDuplicates; +import org.finos.legend.engine.persistence.components.ingestmode.*; import org.finos.legend.engine.persistence.components.logicalplan.LogicalPlan; import org.finos.legend.engine.persistence.components.logicalplan.LogicalPlanFactory; -import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset; -import org.finos.legend.engine.persistence.components.logicalplan.datasets.Selection; -import org.finos.legend.engine.persistence.components.logicalplan.datasets.DatasetReference; -import org.finos.legend.engine.persistence.components.logicalplan.datasets.ExternalDatasetReference; -import org.finos.legend.engine.persistence.components.logicalplan.datasets.Field; -import org.finos.legend.engine.persistence.components.ingestmode.TempDatasetsEnricher; +import org.finos.legend.engine.persistence.components.logicalplan.datasets.*; import org.finos.legend.engine.persistence.components.logicalplan.values.StringValue; import org.finos.legend.engine.persistence.components.planner.Planner; import org.finos.legend.engine.persistence.components.planner.PlannerOptions; @@ -58,25 +45,16 @@ import org.immutables.value.Value.Derived; import org.immutables.value.Value.Immutable; import org.immutables.value.Value.Style; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import java.sql.Date; import java.time.Clock; import java.time.LocalDateTime; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Arrays; -import java.util.Set; -import java.util.stream.Collectors; - -import static org.finos.legend.engine.persistence.components.logicalplan.LogicalPlanFactory.MAX_OF_FIELD; -import static org.finos.legend.engine.persistence.components.logicalplan.LogicalPlanFactory.MIN_OF_FIELD; +import java.util.*; + import static org.finos.legend.engine.persistence.components.logicalplan.LogicalPlanFactory.TABLE_IS_NON_EMPTY; +import static org.finos.legend.engine.persistence.components.relational.api.ApiUtils.*; import static org.finos.legend.engine.persistence.components.relational.api.RelationalGeneratorAbstract.BULK_LOAD_BATCH_STATUS_PATTERN; import static org.finos.legend.engine.persistence.components.transformer.Transformer.TransformOptionsAbstract.DATE_TIME_FORMATTER; @@ -250,11 +228,12 @@ public Datasets dedupAndVersion(Datasets datasets) /* - Perform ingestion from staging to main dataset based on the Ingest mode, executes in current transaction */ - public IngestorResult ingest(Datasets datasets) + public List ingest(Datasets datasets) { LOGGER.info("Invoked ingest method, will perform the ingestion"); init(datasets); - IngestorResult result = ingest(Arrays.asList()).stream().findFirst().orElseThrow(IllegalStateException::new); + List dataSplitRanges = ApiUtils.getDataSplitRanges(executor, planner, transformer, ingestMode()); + List result = ingest(dataSplitRanges); LOGGER.info("Ingestion completed"); return result; } @@ -279,10 +258,11 @@ public Datasets cleanUp(Datasets datasets) 4. Ingestion from staging to main dataset in a transaction 5. Clean up of temporary tables */ - public IngestorResult performFullIngestion(RelationalConnection connection, Datasets datasets) + public List performFullIngestion(RelationalConnection connection, Datasets datasets) { LOGGER.info("Invoked performFullIngestion method"); - return performFullIngestion(connection, datasets, null).stream().findFirst().orElseThrow(IllegalStateException::new); + List dataSplitRanges = ApiUtils.getDataSplitRanges(executor, planner, transformer, ingestMode()); + return performFullIngestion(connection, datasets, dataSplitRanges); } /* @@ -321,7 +301,7 @@ public List getLatestStagingFilters(RelationalConnection connecti Transformer transformer = new RelationalTransformer(relationalSink(), transformOptions()); Executor executor = relationalSink().getRelationalExecutor(connection); SqlPlan physicalPlan = transformer.generatePhysicalPlan(logicalPlan); - return extractDatasetFilters(metadataDataset, executor, physicalPlan); + return ApiUtils.extractDatasetFilters(metadataDataset, executor, physicalPlan); } // ---------- UTILITY METHODS ---------- @@ -660,14 +640,8 @@ private boolean datasetEmpty(Dataset dataset, Transformer trans LogicalPlan checkIsDatasetEmptyLogicalPlan = LogicalPlanFactory.getLogicalPlanForIsDatasetEmpty(dataset); SqlPlan physicalPlanForCheckIsDataSetEmpty = transformer.generatePhysicalPlan(checkIsDatasetEmptyLogicalPlan); List results = executor.executePhysicalPlanAndGetResults(physicalPlanForCheckIsDataSetEmpty); - - String value = String.valueOf(results.stream() - .findFirst() - .map(TabularData::getData) - .flatMap(t -> t.stream().findFirst()) - .map(Map::values) - .flatMap(t -> t.stream().findFirst()) - .orElseThrow(IllegalStateException::new)); + Optional obj = getFirstColumnValue(getFirstRowForFirstResult(results)); + String value = String.valueOf(obj.orElseThrow(IllegalStateException::new)); return !value.equals(TABLE_IS_NON_EMPTY); } @@ -675,18 +649,15 @@ private Map executeStatisticsPhysicalPlan(Executor statisticsSqlPlan, Map placeHolderKeyValues) { - return statisticsSqlPlan.keySet() - .stream() - .collect(Collectors.toMap( - k -> k, - k -> executor.executePhysicalPlanAndGetResults(statisticsSqlPlan.get(k), placeHolderKeyValues) - .stream() - .findFirst() - .map(TabularData::getData) - .flatMap(t -> t.stream().findFirst()) - .map(Map::values) - .flatMap(t -> t.stream().findFirst()) - .orElseThrow(IllegalStateException::new))); + Map results = new HashMap<>(); + for (Map.Entry entry: statisticsSqlPlan.entrySet()) + { + List result = executor.executePhysicalPlanAndGetResults(entry.getValue(), placeHolderKeyValues); + Optional obj = getFirstColumnValue(getFirstRowForFirstResult(result)); + Object value = obj.orElse(null); + results.put(entry.getKey(), value); + } + return results; } private Map executeDeduplicationAndVersioningErrorChecks(Executor executor, @@ -695,20 +666,9 @@ private Map executeDeduplicationAndVersioningErrorCheck Map results = new HashMap<>(); for (Map.Entry entry: errorChecksPlan.entrySet()) { - Object value = null; List result = executor.executePhysicalPlanAndGetResults(entry.getValue()); - if (!result.isEmpty()) - { - List> data = result.get(0).getData(); - if (!data.isEmpty()) - { - Map row = data.get(0); - if (!row.isEmpty()) - { - value = row.get(entry.getKey().name()); - } - } - } + Optional obj = getFirstColumnValue(getFirstRowForFirstResult(result)); + Object value = obj.orElse(null); results.put(entry.getKey(), value); } return results; @@ -719,8 +679,8 @@ private Map extractPlaceHolderKeyValues(Datasets datasets, Execu Optional dataSplitRange) { Map placeHolderKeyValues = new HashMap<>(); - Optional nextBatchId = getNextBatchId(datasets, executor, transformer, ingestMode); - Optional>> optimizationFilters = getOptimizationFilterBounds(datasets, executor, transformer, ingestMode); + Optional nextBatchId = ApiUtils.getNextBatchId(datasets, executor, transformer, ingestMode); + Optional>> optimizationFilters = ApiUtils.getOptimizationFilterBounds(datasets, executor, transformer, ingestMode); if (nextBatchId.isPresent()) { LOGGER.info(String.format("Obtained the next Batch id: %s", nextBatchId.get())); @@ -757,94 +717,4 @@ else if (lowerBound instanceof Number) return placeHolderKeyValues; } - private Optional getNextBatchId(Datasets datasets, Executor executor, - Transformer transformer, IngestMode ingestMode) - { - if (ingestMode.accept(IngestModeVisitors.IS_INGEST_MODE_TEMPORAL) || ingestMode instanceof BulkLoad) - { - LogicalPlan logicalPlanForNextBatchId = LogicalPlanFactory.getLogicalPlanForNextBatchId(datasets, ingestMode); - List tabularData = executor.executePhysicalPlanAndGetResults(transformer.generatePhysicalPlan(logicalPlanForNextBatchId)); - Optional nextBatchId = Optional.ofNullable(tabularData.stream() - .findFirst() - .map(TabularData::getData) - .flatMap(t -> t.stream().findFirst()) - .map(Map::values) - .flatMap(t -> t.stream().findFirst()) - .orElseThrow(IllegalStateException::new)); - if (nextBatchId.isPresent()) - { - return retrieveValueAsLong(nextBatchId.get()); - } - } - return Optional.empty(); - } - - private Optional>> getOptimizationFilterBounds(Datasets datasets, Executor executor, - Transformer transformer, IngestMode ingestMode) - { - List filters = ingestMode.accept(IngestModeVisitors.RETRIEVE_OPTIMIZATION_FILTERS); - if (!filters.isEmpty()) - { - Map> map = new HashMap<>(); - for (OptimizationFilter filter : filters) - { - LogicalPlan logicalPlanForMinAndMaxForField = LogicalPlanFactory.getLogicalPlanForMinAndMaxForField(datasets.stagingDataset(), filter.fieldName()); - List tabularData = executor.executePhysicalPlanAndGetResults(transformer.generatePhysicalPlan(logicalPlanForMinAndMaxForField)); - Map resultMap = tabularData.stream() - .findFirst() - .map(TabularData::getData) - .flatMap(t -> t.stream().findFirst()) - .orElseThrow(IllegalStateException::new); - // Put into map only when not null - Object lower = resultMap.get(MIN_OF_FIELD); - Object upper = resultMap.get(MAX_OF_FIELD); - if (lower != null && upper != null) - { - map.put(filter, Tuples.pair(lower, upper)); - } - } - return Optional.of(map); - } - return Optional.empty(); - } - - private List extractDatasetFilters(MetadataDataset metadataDataset, Executor executor, SqlPlan physicalPlan) throws JsonProcessingException - { - List datasetFilters = new ArrayList<>(); - List results = executor.executePhysicalPlanAndGetResults(physicalPlan); - Optional stagingFilters = results.stream() - .findFirst() - .map(TabularData::getData) - .flatMap(t -> t.stream().findFirst()) - .map(stringObjectMap -> (String) stringObjectMap.get(metadataDataset.stagingFiltersField())); - - // Convert map of Filters to List of Filters - if (stagingFilters.isPresent()) - { - Map> datasetFiltersMap = new ObjectMapper().readValue(stagingFilters.get(), new TypeReference>>() {}); - for (Map.Entry> filtersMapEntry : datasetFiltersMap.entrySet()) - { - for (Map.Entry filterEntry : filtersMapEntry.getValue().entrySet()) - { - DatasetFilter datasetFilter = DatasetFilter.of(filtersMapEntry.getKey(), FilterType.fromName(filterEntry.getKey()), filterEntry.getValue()); - datasetFilters.add(datasetFilter); - } - } - } - return datasetFilters; - } - - private Optional retrieveValueAsLong(Object obj) - { - if (obj instanceof Integer) - { - return Optional.of(Long.valueOf((Integer) obj)); - } - else if (obj instanceof Long) - { - return Optional.of((Long) obj); - } - return Optional.empty(); - } - } diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/BaseTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/BaseTest.java index 40d5ad78050..0276fee23c2 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/BaseTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/BaseTest.java @@ -192,7 +192,7 @@ protected IngestorResult executePlansAndVerifyResults(RelationalIngestor ingesto String expectedDataPath, Map expectedStats, boolean verifyStagingFilters) throws Exception { // Execute physical plans - IngestorResult result = ingestor.performFullIngestion(JdbcConnection.of(h2Sink.connection()), datasets); + IngestorResult result = ingestor.performFullIngestion(JdbcConnection.of(h2Sink.connection()), datasets).get(0); Map actualStats = result.statisticByName(); @@ -300,7 +300,7 @@ public IngestorResult executePlansAndVerifyForCaseConversion(RelationalIngestor datasets = ingestor.dedupAndVersion(datasets); executor.begin(); - IngestorResult result = ingestor.ingest(datasets); + IngestorResult result = ingestor.ingest(datasets).get(0); // Do more stuff if needed executor.commit(); diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/mixed/MixedIngestModeTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/mixed/MixedIngestModeTest.java index ef070aaa5b0..95135656318 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/mixed/MixedIngestModeTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/mixed/MixedIngestModeTest.java @@ -87,7 +87,7 @@ public void testMultiIngestionTypes() throws Exception .enableConcurrentSafety(true) .build(); - IngestorResult result = ingestor.performFullIngestion(JdbcConnection.of(h2Sink.connection()), datasets); + IngestorResult result = ingestor.performFullIngestion(JdbcConnection.of(h2Sink.connection()), datasets).get(0); MultiTableIngestionTest.verifyResults(1, schema, expectedPath, "main", result, expectedStats); // Pass 2 : unitemporalDelta @@ -106,7 +106,7 @@ public void testMultiIngestionTypes() throws Exception .enableConcurrentSafety(true) .build(); - result = ingestor.performFullIngestion(JdbcConnection.of(h2Sink.connection()), datasets); + result = ingestor.performFullIngestion(JdbcConnection.of(h2Sink.connection()), datasets).get(0); MultiTableIngestionTest.verifyResults(2, schema, expectedPath, "main", result, expectedStats); // Pass 3 : unitemporalSnapshot @@ -125,7 +125,7 @@ public void testMultiIngestionTypes() throws Exception .enableConcurrentSafety(true) .build(); - result = ingestor.performFullIngestion(JdbcConnection.of(h2Sink.connection()), datasets); + result = ingestor.performFullIngestion(JdbcConnection.of(h2Sink.connection()), datasets).get(0); MultiTableIngestionTest.verifyResults(3, schema, expectedPath, "main", result, expectedStats); } diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/mixed/UnitemporalDeltaRunner.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/mixed/UnitemporalDeltaRunner.java index 28c7995ade2..ca85039087d 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/mixed/UnitemporalDeltaRunner.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/mixed/UnitemporalDeltaRunner.java @@ -90,7 +90,7 @@ public void run() .executionTimestampClock(clock) .build(); - IngestorResult result = ingestor.performFullIngestion(JdbcConnection.of(h2Sink.connection()), datasets); + IngestorResult result = ingestor.performFullIngestion(JdbcConnection.of(h2Sink.connection()), datasets).get(0); if (maxBatchIdCounter.get() < result.batchId().get()) { maxBatchIdCounter.set(result.batchId().get()); diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/unitemporal/MultiTableIngestionTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/unitemporal/MultiTableIngestionTest.java index 9a2b8fc4f0f..8eb252e90f6 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/unitemporal/MultiTableIngestionTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/unitemporal/MultiTableIngestionTest.java @@ -171,7 +171,7 @@ private List ingestMultiTables(Executor executor, RelationalInge executor.begin(); for (Datasets datasets: allDatasets) { - IngestorResult result = ingestor.ingest(datasets); + IngestorResult result = ingestor.ingest(datasets).get(0); multiTableIngestionResult.add(result); } @@ -256,7 +256,7 @@ private List ingestMultiTablesWithBadQuery(Executor executor, Re executor.begin(); for (Datasets datasets: allDatasets) { - IngestorResult result = ingestor.ingest(datasets); + IngestorResult result = ingestor.ingest(datasets).get(0); multiTableIngestionResult.add(result); } diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/versioning/TestDedupAndVersioning.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/versioning/TestDedupAndVersioning.java index 2c7aac0ae12..11626826936 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/versioning/TestDedupAndVersioning.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/versioning/TestDedupAndVersioning.java @@ -48,23 +48,23 @@ public class TestDedupAndVersioning extends BaseTest { /* Scenarios: - 1. [DONE] No Dedup, NoVersion -> No tempStagingTable - 2. [DONE] No Dedup, MaxVersion do not perform versioning -> No tempStagingTable - 3. [DONE] No Dedup, MaxVersion with perform versioning -> tempStagingTable with only MaxVersioned Data [throw Error on Data errors] - 4. [DONE] No Dedup, AllVersion do not perform versioning -> No tempStagingTable - 5. [DONE] No Dedup, AllVersion with perform versioning -> tempStagingTable with Data splits [throw Error on Data errors] - - 6. [DONE] Filter Dups, NoVersion -> tempStagingTable with count column - 7. [DONE] Filter Dups, MaxVersion do not perform versioning -> tempStagingTable with count column - 8. [DONE, throw error left] Filter Dups, MaxVersion with perform versioning -> tempStagingTable with count column and only max version [throw Error on Data errors] - 9. [DONE] Filter Dups, AllVersion do not perform versioning -> tempStagingTable with count column - 10. [DONE, throw error left] Filter Dups, AllVersion with perform versioning -> tempStagingTable with count column and Data splits [throw Error on Data errors] - - 11. [DONE] Fail on Dups, NoVersion -> tempStagingTable with count column [Throw error on dups] - 12. [DONE] Fail on Dups, MaxVersion do not perform versioning -> tempStagingTable with count column [Throw error on dups] - 13. [DONE] Fail on Dups, MaxVersion with perform versioning -> tempStagingTable with count column and only max version [Throw error on dups, throw Error on Data errors] - 14. [DONE] Fail on Dups, AllVersion do not perform versioning -> tempStagingTable with count column [Throw error on dups] - 15. [DONE] Fail on Dups, AllVersion with perform versioning -> tempStagingTable with count column and Data splits [Throw error on dups, throw Error on Data errors] + 1. No Dedup, NoVersion -> No tempStagingTable + 2. No Dedup, MaxVersion do not perform versioning -> No tempStagingTable + 3. No Dedup, MaxVersion with perform versioning -> tempStagingTable with only MaxVersioned Data [throw Error on Data errors] + 4. No Dedup, AllVersion do not perform versioning -> No tempStagingTable + 5. No Dedup, AllVersion with perform versioning -> tempStagingTable with Data splits [throw Error on Data errors] + + 6. Filter Dups, NoVersion -> tempStagingTable with count column + 7. Filter Dups, MaxVersion do not perform versioning -> tempStagingTable with count column + 8. throw error left] Filter Dups, MaxVersion with perform versioning -> tempStagingTable with count column and only max version [throw Error on Data errors] + 9. Filter Dups, AllVersion do not perform versioning -> tempStagingTable with count column + 10. throw error left] Filter Dups, AllVersion with perform versioning -> tempStagingTable with count column and Data splits [throw Error on Data errors] + + 11.Fail on Dups, NoVersion -> tempStagingTable with count column [Throw error on dups] + 12.Fail on Dups, MaxVersion do not perform versioning -> tempStagingTable with count column [Throw error on dups] + 13.Fail on Dups, MaxVersion with perform versioning -> tempStagingTable with count column and only max version [Throw error on dups, throw Error on Data errors] + 14.Fail on Dups, AllVersion do not perform versioning -> tempStagingTable with count column [Throw error on dups] + 15. Fail on Dups, AllVersion with perform versioning -> tempStagingTable with count column and Data splits [Throw error on dups, throw Error on Data errors] */ private static Field name = Field.builder().name(nameName).type(FieldType.of(DataType.VARCHAR, 64, null)).nullable(false).primaryKey(true).fieldAlias(nameName).build(); diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-test-runner/src/main/java/org/finos/legend/engine/testable/persistence/extension/PersistenceTestRunner.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-test-runner/src/main/java/org/finos/legend/engine/testable/persistence/extension/PersistenceTestRunner.java index 4c20e8cdee6..68989b4fdc9 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-test-runner/src/main/java/org/finos/legend/engine/testable/persistence/extension/PersistenceTestRunner.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-test-runner/src/main/java/org/finos/legend/engine/testable/persistence/extension/PersistenceTestRunner.java @@ -194,7 +194,7 @@ private IngestorResult invokePersistence(Dataset targetDataset, Persistence pers .enableSchemaEvolution(SCHEMA_EVOLUTION_DEFAULT) .build(); - IngestorResult result = ingestor.performFullIngestion(JdbcConnection.of(connection), enrichedDatasets); + IngestorResult result = ingestor.performFullIngestion(JdbcConnection.of(connection), enrichedDatasets).get(0); return result; } @@ -212,7 +212,7 @@ private IngestorResult invokePersistence(Dataset targetDataset, ServiceOutputTar .enableSchemaEvolution(SCHEMA_EVOLUTION_DEFAULT) .build(); - IngestorResult result = ingestor.performFullIngestion(JdbcConnection.of(connection), enrichedDatasets); + IngestorResult result = ingestor.performFullIngestion(JdbcConnection.of(connection), enrichedDatasets).get(0); return result; }