From 794bfd3f433814ce7be655e2bce8dbb0271558f1 Mon Sep 17 00:00:00 2001 From: Ashutosh Date: Mon, 31 Jul 2023 16:04:07 +0800 Subject: [PATCH 1/6] Provide new interfaces in Relational Executor to enable ingestion within same transaction and multi table ingestion --- .../components/e2e/BigQueryEndToEndTest.java | 6 +- .../api/RelationalIngestorAbstract.java | 354 +++++++++++------- .../persistence/components/BaseTest.java | 14 +- .../nontemporal/AppendOnlyTest.java | 2 +- .../extension/PersistenceTestRunner.java | 4 +- 5 files changed, 244 insertions(+), 136 deletions(-) diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/e2e/BigQueryEndToEndTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/e2e/BigQueryEndToEndTest.java index a6687703006..34f5615cba4 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/e2e/BigQueryEndToEndTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/e2e/BigQueryEndToEndTest.java @@ -156,7 +156,7 @@ protected IngestorResult ingestViaExecutorAndVerifyStagingFilters(IngestMode ing // Load csv data loadData(path, datasets.stagingDataset(), 1); RelationalConnection connection = BigQueryConnection.of(getBigQueryConnection()); - IngestorResult ingestorResult = ingestor.ingest(connection, datasets); + IngestorResult ingestorResult = ingestor.performFullIngestion(connection, datasets); verifyStagingFilters(ingestor, connection, datasets); return ingestorResult; @@ -383,7 +383,7 @@ public IngestorResult executePlansAndVerifyForCaseConversion(IngestMode ingestMo .caseConversion(CaseConversion.TO_UPPER) .build(); - IngestorResult result = ingestor.ingest(BigQueryConnection.of(getBigQueryConnection()), datasets); + IngestorResult result = ingestor.performFullIngestion(BigQueryConnection.of(getBigQueryConnection()), datasets); Map actualStats = result.statisticByName(); @@ -430,7 +430,7 @@ protected IngestorResult executePlansAndVerifyResults(IngestMode ingestMode, Pla .enableSchemaEvolution(options.enableSchemaEvolution()) .schemaEvolutionCapabilitySet(userCapabilitySet) .build(); - IngestorResult result = ingestor.ingest(BigQueryConnection.of(getBigQueryConnection()), datasets); + IngestorResult result = ingestor.performFullIngestion(BigQueryConnection.of(getBigQueryConnection()), datasets); Map actualStats = result.statisticByName(); diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/RelationalIngestorAbstract.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/RelationalIngestorAbstract.java index 2b0ad6a25f3..3f241bddd2e 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/RelationalIngestorAbstract.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/RelationalIngestorAbstract.java @@ -177,23 +177,101 @@ protected TransformOptions transformOptions() return builder.build(); } + // ---------- Private Fields ---------- + private IngestMode enrichedIngestMode; + private Datasets enrichedDatasets; + private Transformer transformer; + private Executor executor; + private Resources.Builder resourcesBuilder; + private GeneratorResult generatorResult; + boolean mainDatasetExists; + private Planner planner; + + // ---------- API ---------- - public IngestorResult ingest(RelationalConnection connection, Datasets datasets) + /* + - Initializes executor + - @return : The methods returns the Executor to the caller enabling them to handle their own transaction + */ + public Executor initExecutor(RelationalConnection connection) + { + this.executor = relationalSink().getRelationalExecutor(connection); + return executor; + } + + /* + - Initializes Datasets + - @return : The methods enriches the datasets and returns that to the user + - User is expected to invoke init before calling create/evolve/ingest + */ + public Datasets initDatasets(Datasets datasets) + { + init(datasets); + return enrichedDatasets; + } + + /* + - Create Datasets + */ + public void create() + { + validate(); + createAllDatasets(); + } + + /* + - Evolve Schema of Target table based on schema changes in staging table + */ + public void evolve() + { + validate(); + evolveSchema(); + } + + /* + - Perform ingestion from staging to main dataset based on the Ingest mode, executes in current transaction + */ + public IngestorResult ingest() { - return ingest(connection, datasets, null).stream().findFirst().orElseThrow(IllegalStateException::new); + validate(); + return ingest(Arrays.asList()).stream().findFirst().orElseThrow(IllegalStateException::new); } - public List ingestWithDataSplits(RelationalConnection connection, Datasets datasets, List dataSplitRanges) + /* + Perform full ingestion from Staging to Target table based on the Ingest mode + Full Ingestion covers: + 1. Export external dataset + 2. Create tables + 3. Evolves Schema + 4. Ingestion from staging to main dataset in a transaction + */ + public IngestorResult performFullIngestion(RelationalConnection connection, Datasets datasets) + { + return performFullIngestion(connection, datasets, null).stream().findFirst().orElseThrow(IllegalStateException::new); + } + + /* + Perform ingestion from Staging to Target table based on the Ingest mode, handling different datasplits in the staging data + Full Ingestion covers: + 1. Export external dataset + 2. Create tables + 3. Evolves Schema + 4. Ingestion from staging to main dataset in a transaction + */ + public List performFullIngestionWithDataSplits(RelationalConnection connection, Datasets datasets, List dataSplitRanges) { // Provide the default dataSplit ranges if missing if (dataSplitRanges == null || dataSplitRanges.isEmpty()) { dataSplitRanges = Arrays.asList(DataSplitRange.of(1,1)); } - return ingest(connection, datasets, dataSplitRanges); + return performFullIngestion(connection, datasets, dataSplitRanges); } + /* + Get the latest staging filters stored in the metadata tables for a dataset + */ public List getLatestStagingFilters(RelationalConnection connection, Datasets datasets) throws JsonProcessingException { MetadataDataset metadataDataset = datasets.metadataDataset().isPresent() @@ -212,125 +290,66 @@ public List getLatestStagingFilters(RelationalConnection connecti // ---------- UTILITY METHODS ---------- - private List ingest(RelationalConnection connection, Datasets datasets, List dataSplitRanges) + private void evolveSchema() { - IngestMode enrichedIngestMode = ApiUtils.applyCase(ingestMode(), caseConversion()); - Datasets enrichedDatasets = ApiUtils.applyCase(datasets, caseConversion()); - - Transformer transformer = new RelationalTransformer(relationalSink(), transformOptions()); - Executor executor = relationalSink().getRelationalExecutor(connection); + if (mainDatasetExists && generatorResult.schemaEvolutionDataset().isPresent()) + { + enrichedDatasets = enrichedDatasets.withMainDataset(generatorResult.schemaEvolutionDataset().get()); + generatorResult.schemaEvolutionSqlPlan().ifPresent(executor::executePhysicalPlan); + } + } - Resources.Builder resourcesBuilder = Resources.builder(); - Datasets updatedDatasets = enrichedDatasets; + private void createAllDatasets() + { + executor.executePhysicalPlan(generatorResult.preActionsSqlPlan()); + } - // import external dataset reference - if (updatedDatasets.stagingDataset() instanceof ExternalDatasetReference) + private List ingest(List dataSplitRanges) + { + if (enrichedIngestMode instanceof BulkLoad) { - // update staging dataset reference to imported dataset - updatedDatasets = importExternalDataset(enrichedIngestMode, updatedDatasets, transformer, executor); - resourcesBuilder.externalDatasetImported(true); + return performBulkLoad(enrichedDatasets, transformer, planner, executor, generatorResult, enrichedIngestMode); } - - // Check if staging dataset is empty - if (ingestMode().accept(IngestModeVisitors.NEED_TO_CHECK_STAGING_EMPTY) && executor.datasetExists(updatedDatasets.stagingDataset())) + else { - resourcesBuilder.stagingDataSetEmpty(datasetEmpty(updatedDatasets.stagingDataset(), transformer, executor)); + return performIngestion(enrichedDatasets, transformer, planner, executor, generatorResult, dataSplitRanges, enrichedIngestMode); } + } - boolean mainDatasetExists = executor.datasetExists(updatedDatasets.mainDataset()); - if (mainDatasetExists && enableSchemaEvolution()) + private void validate() + { + if (this.executor == null) { - updatedDatasets = updatedDatasets.withMainDataset(constructDatasetFromDatabase(executor, updatedDatasets.mainDataset())); + throw new IllegalStateException("Executor not initialized, invoke initExecutor first"); } - else + if (this.enrichedDatasets == null) { - updatedDatasets = updatedDatasets.withMainDataset(ApiUtils.deriveMainDatasetFromStaging(updatedDatasets, enrichedIngestMode)); + throw new IllegalStateException("Datasets not initialized, invoke initDatasets first"); } + } - // Add Optimization Columns if needed - enrichedIngestMode = enrichedIngestMode.accept(new IngestModeOptimizationColumnHandler(updatedDatasets)); - - // generate sql plans - RelationalGenerator generator = RelationalGenerator.builder() - .ingestMode(enrichedIngestMode) - .relationalSink(relationalSink()) - .cleanupStagingData(cleanupStagingData()) - .collectStatistics(collectStatistics()) - .createStagingDataset(createStagingDataset()) - .enableSchemaEvolution(enableSchemaEvolution()) - .addAllSchemaEvolutionCapabilitySet(schemaEvolutionCapabilitySet()) - .caseConversion(caseConversion()) - .executionTimestampClock(executionTimestampClock()) - .batchStartTimestampPattern(BATCH_START_TS_PATTERN) - .batchEndTimestampPattern(BATCH_END_TS_PATTERN) - .batchIdPattern(BATCH_ID_PATTERN) - .build(); - - Planner planner = Planners.get(updatedDatasets, enrichedIngestMode, plannerOptions()); - GeneratorResult generatorResult = generator.generateOperations(updatedDatasets, resourcesBuilder.build(), planner, enrichedIngestMode); + private List performFullIngestion(RelationalConnection connection, Datasets datasets, List dataSplitRanges) + { + // 1. init + initExecutor(connection); + init(datasets); - // Create Datasets + // 2. Create Datasets if (createDatasets()) { - executor.executePhysicalPlan(generatorResult.preActionsSqlPlan()); + createAllDatasets(); } - // The below boolean is created before the execution of pre-actions, hence it represents whether the main table has already existed before that - if (mainDatasetExists) - { - // Perform schema evolution - if (generatorResult.schemaEvolutionDataset().isPresent()) - { - updatedDatasets = updatedDatasets.withMainDataset(generatorResult.schemaEvolutionDataset().get()); - generatorResult.schemaEvolutionSqlPlan().ifPresent(executor::executePhysicalPlan); - } - } + // Evolve Schema + evolveSchema(); + // Perform Ingestion List result; - if (enrichedIngestMode instanceof BulkLoad) - { - result = performBulkLoad(updatedDatasets, transformer, planner, executor, generatorResult, enrichedIngestMode); - } - else - { - result = performIngestion(updatedDatasets, transformer, planner, executor, generatorResult, dataSplitRanges, enrichedIngestMode); - } - return result; - } - - private List performIngestion(Datasets datasets, Transformer transformer, Planner planner, Executor executor, GeneratorResult generatorResult, List dataSplitRanges, IngestMode ingestMode) - { try { - List results = new ArrayList<>(); executor.begin(); - int dataSplitIndex = 0; - int dataSplitsCount = (dataSplitRanges == null || dataSplitRanges.isEmpty()) ? 0 : dataSplitRanges.size(); - do - { - Optional dataSplitRange = Optional.ofNullable(dataSplitsCount == 0 ? null : dataSplitRanges.get(dataSplitIndex)); - // Extract the Placeholders values - Map placeHolderKeyValues = extractPlaceHolderKeyValues(datasets, executor, planner, transformer, ingestMode, dataSplitRange); - // Load main table, extract stats and update metadata table - Map statisticsResultMap = loadData(executor, generatorResult, placeHolderKeyValues); - IngestorResult result = IngestorResult.builder() - .putAllStatisticByName(statisticsResultMap) - .updatedDatasets(datasets) - .batchId(Optional.ofNullable(placeHolderKeyValues.containsKey(BATCH_ID_PATTERN) ? Integer.valueOf(placeHolderKeyValues.get(BATCH_ID_PATTERN)) : null)) - .dataSplitRange(dataSplitRange) - .schemaEvolutionSql(generatorResult.schemaEvolutionSql()) - .status(IngestStatus.SUCCEEDED) - .ingestionTimestampUTC(placeHolderKeyValues.get(BATCH_START_TS_PATTERN)) - .build(); - results.add(result); - dataSplitIndex++; - } - while (planner.dataSplitExecutionSupported() && dataSplitIndex < dataSplitsCount); - // Clean up - executor.executePhysicalPlan(generatorResult.postActionsSqlPlan()); + result = ingest(dataSplitRanges); executor.commit(); - return results; } catch (Exception e) { @@ -341,6 +360,100 @@ private List performIngestion(Datasets datasets, Transformer performIngestion(Datasets datasets, Transformer transformer, Planner planner, Executor executor, GeneratorResult generatorResult, List dataSplitRanges, IngestMode ingestMode) + { + List results = new ArrayList<>(); + int dataSplitIndex = 0; + int dataSplitsCount = (dataSplitRanges == null || dataSplitRanges.isEmpty()) ? 0 : dataSplitRanges.size(); + do + { + Optional dataSplitRange = Optional.ofNullable(dataSplitsCount == 0 ? null : dataSplitRanges.get(dataSplitIndex)); + // Extract the Placeholders values + Map placeHolderKeyValues = extractPlaceHolderKeyValues(datasets, executor, planner, transformer, ingestMode, dataSplitRange); + // Load main table, extract stats and update metadata table + Map statisticsResultMap = loadData(executor, generatorResult, placeHolderKeyValues); + IngestorResult result = IngestorResult.builder() + .putAllStatisticByName(statisticsResultMap) + .updatedDatasets(datasets) + .batchId(Optional.ofNullable(placeHolderKeyValues.containsKey(BATCH_ID_PATTERN) ? Integer.valueOf(placeHolderKeyValues.get(BATCH_ID_PATTERN)) : null)) + .dataSplitRange(dataSplitRange) + .schemaEvolutionSql(generatorResult.schemaEvolutionSql()) + .status(IngestStatus.SUCCEEDED) + .ingestionTimestampUTC(placeHolderKeyValues.get(BATCH_START_TS_PATTERN)) + .build(); + results.add(result); + dataSplitIndex++; + } + while (planner.dataSplitExecutionSupported() && dataSplitIndex < dataSplitsCount); + // Clean up + executor.executePhysicalPlan(generatorResult.postActionsSqlPlan()); + return results; } private Map loadData(Executor executor, GeneratorResult generatorResult, Map placeHolderKeyValues) @@ -366,37 +479,23 @@ private Map loadData(Executor performBulkLoad(Datasets datasets, Transformer transformer, Planner planner, Executor executor, GeneratorResult generatorResult, IngestMode ingestMode) { - try - { - List results = new ArrayList<>(); - executor.begin(); - Map placeHolderKeyValues = extractPlaceHolderKeyValues(datasets, executor, planner, transformer, ingestMode, Optional.empty()); - - // Execute ingest SqlPlan - IngestorResult result = relationalSink().performBulkLoad(datasets, executor, generatorResult.ingestSqlPlan(), placeHolderKeyValues); - // Execute metadata ingest SqlPlan - if (generatorResult.metadataIngestSqlPlan().isPresent()) - { - executor.executePhysicalPlan(generatorResult.metadataIngestSqlPlan().get(), placeHolderKeyValues); - } + List results = new ArrayList<>(); + Map placeHolderKeyValues = extractPlaceHolderKeyValues(datasets, executor, planner, transformer, ingestMode, Optional.empty()); - results.add(result); - executor.commit(); - return results; - } - catch (Exception e) - { - executor.revert(); - throw e; - } - finally + // Execute ingest SqlPlan + IngestorResult result = relationalSink().performBulkLoad(datasets, executor, generatorResult.ingestSqlPlan(), placeHolderKeyValues); + // Execute metadata ingest SqlPlan + if (generatorResult.metadataIngestSqlPlan().isPresent()) { - executor.close(); + executor.executePhysicalPlan(generatorResult.metadataIngestSqlPlan().get(), placeHolderKeyValues); } + + results.add(result); + return results; } - private Datasets importExternalDataset(IngestMode ingestMode, Datasets datasets, Transformer transformer, Executor executor) + private Datasets importExternalDataset(Datasets datasets) { ExternalDatasetReference externalDatasetReference = (ExternalDatasetReference) datasets.stagingDataset(); DatasetReference mainDataSetReference = datasets.mainDataset().datasetReference(); @@ -410,8 +509,8 @@ private Datasets importExternalDataset(IngestMode ingestMode, Datasets datasets, // TODO : Auto infer schema in future // Prepare DigestInfo - boolean hasDigestField = ingestMode.accept(IngestModeVisitors.DIGEST_REQUIRED); - Optional digestFieldOptional = ingestMode.accept(IngestModeVisitors.EXTRACT_DIGEST_FIELD); + boolean hasDigestField = enrichedIngestMode.accept(IngestModeVisitors.DIGEST_REQUIRED); + Optional digestFieldOptional = enrichedIngestMode.accept(IngestModeVisitors.EXTRACT_DIGEST_FIELD); boolean populateDigest = hasDigestField && externalDatasetReference.schema().fields().stream().noneMatch(field -> field.name().equalsIgnoreCase(digestFieldOptional.orElseThrow(IllegalStateException::new))); if (populateDigest) @@ -430,10 +529,11 @@ private Datasets importExternalDataset(IngestMode ingestMode, Datasets datasets, executor.executePhysicalPlan(tableCreationPhysicalPlan); // Load staging data - Set metaFields = ingestMode.accept(IngestModeVisitors.META_FIELDS_TO_EXCLUDE_FROM_DIGEST); + Set metaFields = enrichedIngestMode.accept(IngestModeVisitors.META_FIELDS_TO_EXCLUDE_FROM_DIGEST); DigestInfo digestInfo = DigestInfo.builder().populateDigest(populateDigest).digestField(digestFieldOptional.orElse(null)).addAllMetaFields(metaFields).build(); Importer importer = Importers.forExternalDatasetReference(externalDatasetReference, transformer, executor); importer.importData(externalDatasetReference, digestInfo); + resourcesBuilder.externalDatasetImported(true); return updatedDatasets; } diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/BaseTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/BaseTest.java index 372dab217fc..318824291cb 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/BaseTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/BaseTest.java @@ -17,6 +17,7 @@ import org.finos.legend.engine.persistence.components.common.DatasetFilter; import org.finos.legend.engine.persistence.components.common.Datasets; import org.finos.legend.engine.persistence.components.common.StatisticName; +import org.finos.legend.engine.persistence.components.executor.Executor; import org.finos.legend.engine.persistence.components.ingestmode.IngestMode; import org.finos.legend.engine.persistence.components.logicalplan.LogicalPlan; import org.finos.legend.engine.persistence.components.logicalplan.LogicalPlanFactory; @@ -170,8 +171,8 @@ protected IngestorResult executePlansAndVerifyResults(IngestMode ingestMode, Pla .enableSchemaEvolution(options.enableSchemaEvolution()) .schemaEvolutionCapabilitySet(userCapabilitySet) .build(); - IngestorResult result = ingestor.ingest(JdbcConnection.of(h2Sink.connection()), datasets); + IngestorResult result = ingestor.performFullIngestion(JdbcConnection.of(h2Sink.connection()), datasets); Map actualStats = result.statisticByName(); // Verify the database data @@ -221,7 +222,7 @@ protected List executePlansAndVerifyResultsWithDataSplits(Ingest .enableSchemaEvolution(options.enableSchemaEvolution()) .build(); - List results = ingestor.ingestWithDataSplits(JdbcConnection.of(h2Sink.connection()), datasets, dataSplitRanges); + List results = ingestor.performFullIngestionWithDataSplits(JdbcConnection.of(h2Sink.connection()), datasets, dataSplitRanges); List> tableData = h2Sink.executeQuery("select * from \"TEST\".\"main\""); TestUtils.assertFileAndTableDataEquals(schema, expectedDataPath, tableData); @@ -267,7 +268,14 @@ public IngestorResult executePlansAndVerifyForCaseConversion(IngestMode ingestMo .caseConversion(CaseConversion.TO_UPPER) .build(); - IngestorResult result = ingestor.ingest(JdbcConnection.of(h2Sink.connection()), datasets); + Executor executor = ingestor.initExecutor(JdbcConnection.of(h2Sink.connection())); + datasets = ingestor.initDatasets(datasets); + ingestor.create(); + ingestor.evolve(); + executor.begin(); + IngestorResult result = ingestor.ingest(); + // Do more stuff + executor.commit(); Map actualStats = result.statisticByName(); diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/nontemporal/AppendOnlyTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/nontemporal/AppendOnlyTest.java index 6206e1070ae..30e8b98d60f 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/nontemporal/AppendOnlyTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/nontemporal/AppendOnlyTest.java @@ -388,7 +388,7 @@ void testAppendOnlyDoNotCreateTables() throws Exception .build(); try { - ingestor.ingest(JdbcConnection.of(h2Sink.connection()), datasets); + ingestor.performFullIngestion(JdbcConnection.of(h2Sink.connection()), datasets); Assertions.fail("Should not be successful"); } catch (Exception e) diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-test-runner/src/main/java/org/finos/legend/engine/testable/persistence/extension/PersistenceTestRunner.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-test-runner/src/main/java/org/finos/legend/engine/testable/persistence/extension/PersistenceTestRunner.java index 1bfd8a2fa0a..4c20e8cdee6 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-test-runner/src/main/java/org/finos/legend/engine/testable/persistence/extension/PersistenceTestRunner.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-test-runner/src/main/java/org/finos/legend/engine/testable/persistence/extension/PersistenceTestRunner.java @@ -194,7 +194,7 @@ private IngestorResult invokePersistence(Dataset targetDataset, Persistence pers .enableSchemaEvolution(SCHEMA_EVOLUTION_DEFAULT) .build(); - IngestorResult result = ingestor.ingest(JdbcConnection.of(connection), enrichedDatasets); + IngestorResult result = ingestor.performFullIngestion(JdbcConnection.of(connection), enrichedDatasets); return result; } @@ -212,7 +212,7 @@ private IngestorResult invokePersistence(Dataset targetDataset, ServiceOutputTar .enableSchemaEvolution(SCHEMA_EVOLUTION_DEFAULT) .build(); - IngestorResult result = ingestor.ingest(JdbcConnection.of(connection), enrichedDatasets); + IngestorResult result = ingestor.performFullIngestion(JdbcConnection.of(connection), enrichedDatasets); return result; } From 55c2265529f9cb3ef719fb2ef09c493e94ea122f Mon Sep 17 00:00:00 2001 From: Ashutosh Date: Tue, 1 Aug 2023 12:10:30 +0800 Subject: [PATCH 2/6] Fixes for Bitemp and fix interfaces to support multi table ingestion --- .../ingestmode/TempDatasetsEnricher.java | 92 +++++++++++++++++++ .../planner/BitemporalDeltaPlanner.java | 46 +--------- .../components/util/LogicalPlanUtils.java | 39 +++++++- .../api/RelationalIngestorAbstract.java | 49 ++++------ .../persistence/components/BaseTest.java | 25 +++-- 5 files changed, 169 insertions(+), 82 deletions(-) create mode 100644 legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/TempDatasetsEnricher.java diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/TempDatasetsEnricher.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/TempDatasetsEnricher.java new file mode 100644 index 00000000000..0f37a2baeed --- /dev/null +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/TempDatasetsEnricher.java @@ -0,0 +1,92 @@ +// Copyright 2023 Goldman Sachs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.finos.legend.engine.persistence.components.ingestmode; + +import org.finos.legend.engine.persistence.components.common.Datasets; +import org.finos.legend.engine.persistence.components.ingestmode.merge.MergeStrategyVisitors; +import org.finos.legend.engine.persistence.components.ingestmode.validitymilestoning.derivation.SourceSpecifiesFromDateTime; +import org.finos.legend.engine.persistence.components.util.LogicalPlanUtils; + +import java.util.Optional; + +public class TempDatasetsEnricher implements IngestModeVisitor +{ + Datasets datasets; + + public TempDatasetsEnricher(Datasets datasets) + { + this.datasets = datasets; + } + + + @Override + public Datasets visitAppendOnly(AppendOnlyAbstract appendOnly) + { + return datasets; + } + + @Override + public Datasets visitNontemporalSnapshot(NontemporalSnapshotAbstract nontemporalSnapshot) + { + return datasets; + } + + @Override + public Datasets visitNontemporalDelta(NontemporalDeltaAbstract nontemporalDelta) + { + return datasets; + } + + @Override + public Datasets visitUnitemporalSnapshot(UnitemporalSnapshotAbstract unitemporalSnapshot) + { + return datasets; + } + + @Override + public Datasets visitUnitemporalDelta(UnitemporalDeltaAbstract unitemporalDelta) + { + return datasets; + } + + @Override + public Datasets visitBitemporalSnapshot(BitemporalSnapshotAbstract bitemporalSnapshot) + { + return datasets; + } + + @Override + public Datasets visitBitemporalDelta(BitemporalDeltaAbstract bitemporalDelta) + { + Datasets enrichedDatasets = datasets; + if (bitemporalDelta.validityMilestoning().validityDerivation() instanceof SourceSpecifiesFromDateTime) + { + Optional deleteIndicatorField = bitemporalDelta.mergeStrategy().accept(MergeStrategyVisitors.EXTRACT_DELETE_FIELD); + enrichedDatasets = enrichedDatasets.withTempDataset(LogicalPlanUtils.getTempDataset(enrichedDatasets)); + if (deleteIndicatorField.isPresent()) + { + enrichedDatasets = enrichedDatasets.withTempDatasetWithDeleteIndicator(LogicalPlanUtils.getTempDatasetWithDeleteIndicator(enrichedDatasets, deleteIndicatorField.get())); + } + } + + return enrichedDatasets; + } + + @Override + public Datasets visitBulkLoad(BulkLoadAbstract bulkLoad) + { + return datasets; + } +} diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/planner/BitemporalDeltaPlanner.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/planner/BitemporalDeltaPlanner.java index 06a58231e01..fe6c7e7be97 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/planner/BitemporalDeltaPlanner.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/planner/BitemporalDeltaPlanner.java @@ -35,16 +35,12 @@ import org.finos.legend.engine.persistence.components.logicalplan.conditions.Not; import org.finos.legend.engine.persistence.components.logicalplan.conditions.NotEquals; import org.finos.legend.engine.persistence.components.logicalplan.conditions.Or; -import org.finos.legend.engine.persistence.components.logicalplan.datasets.DataType; import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset; import org.finos.legend.engine.persistence.components.logicalplan.datasets.DatasetDefinition; -import org.finos.legend.engine.persistence.components.logicalplan.datasets.Field; -import org.finos.legend.engine.persistence.components.logicalplan.datasets.FieldType; import org.finos.legend.engine.persistence.components.logicalplan.datasets.Join; import org.finos.legend.engine.persistence.components.logicalplan.datasets.JoinOperation; import org.finos.legend.engine.persistence.components.logicalplan.datasets.Selection; import org.finos.legend.engine.persistence.components.logicalplan.operations.Create; -import org.finos.legend.engine.persistence.components.logicalplan.operations.Drop; import org.finos.legend.engine.persistence.components.logicalplan.operations.Insert; import org.finos.legend.engine.persistence.components.logicalplan.operations.Operation; import org.finos.legend.engine.persistence.components.logicalplan.operations.Delete; @@ -72,6 +68,7 @@ import static org.finos.legend.engine.persistence.components.common.StatisticName.ROWS_UPDATED; import static org.finos.legend.engine.persistence.components.common.StatisticName.ROWS_TERMINATED; import static org.finos.legend.engine.persistence.components.common.StatisticName.ROWS_INSERTED; +import static org.finos.legend.engine.persistence.components.util.LogicalPlanUtils.UNDERSCORE; class BitemporalDeltaPlanner extends BitemporalPlanner { @@ -79,8 +76,6 @@ class BitemporalDeltaPlanner extends BitemporalPlanner private static final String VALID_DATE_TIME_THRU_NAME = "legend_persistence_end_date"; private static final String LEFT_DATASET_IN_JOIN_ALIAS = "legend_persistence_x"; private static final String RIGHT_DATASET_IN_JOIN_ALIAS = "legend_persistence_y"; - private static final String TEMP_DATASET_BASE_NAME = "legend_persistence_temp"; - private static final String TEMP_DATASET_WITH_DELETE_INDICATOR_BASE_NAME = "legend_persistence_tempWithDeleteIndicator"; private static final String STAGE_DATASET_WITHOUT_DUPLICATES_BASE_NAME = "legend_persistence_stageWithoutDuplicates"; private final Optional deleteIndicatorField; @@ -177,57 +172,26 @@ class BitemporalDeltaPlanner extends BitemporalPlanner if (ingestMode().validityMilestoning().validityDerivation() instanceof SourceSpecifiesFromDateTime) { - this.tempDataset = getTempDataset(datasets); + this.tempDataset = LogicalPlanUtils.getTempDataset(datasets); if (deleteIndicatorField.isPresent()) { - this.tempDatasetWithDeleteIndicator = getTempDatasetWithDeleteIndicator(datasets); + this.tempDatasetWithDeleteIndicator = LogicalPlanUtils.getTempDatasetWithDeleteIndicator(datasets, deleteIndicatorField.get()); } } } private Dataset getStagingDatasetWithoutDuplicates(Datasets datasets) { + String tableName = stagingDataset().datasetReference().name().orElseThrow((IllegalStateException::new)); return datasets.stagingDatasetWithoutDuplicates().orElse(DatasetDefinition.builder() .schema(stagingDataset().schema()) .database(stagingDataset().datasetReference().database()) .group(stagingDataset().datasetReference().group()) - .name(LogicalPlanUtils.generateTableNameWithSuffix(stagingDataset().datasetReference().name().orElseThrow((IllegalStateException::new)), STAGE_DATASET_WITHOUT_DUPLICATES_BASE_NAME)) + .name(tableName + UNDERSCORE + STAGE_DATASET_WITHOUT_DUPLICATES_BASE_NAME) .alias(STAGE_DATASET_WITHOUT_DUPLICATES_BASE_NAME) .build()); } - private Dataset getTempDataset(Datasets datasets) - { - return datasets.tempDataset().orElse(DatasetDefinition.builder() - .schema(mainDataset().schema()) - .database(mainDataset().datasetReference().database()) - .group(mainDataset().datasetReference().group()) - .name(LogicalPlanUtils.generateTableNameWithSuffix(mainDataset().datasetReference().name().orElseThrow((IllegalStateException::new)), TEMP_DATASET_BASE_NAME)) - .alias(TEMP_DATASET_BASE_NAME) - .build()); - } - - private Dataset getTempDatasetWithDeleteIndicator(Datasets datasets) - { - if (datasets.tempDatasetWithDeleteIndicator().isPresent()) - { - return datasets.tempDatasetWithDeleteIndicator().get(); - } - else - { - Field deleteIndicator = Field.builder().name(deleteIndicatorField.orElseThrow((IllegalStateException::new))).type(FieldType.of(DataType.BOOLEAN, Optional.empty(), Optional.empty())).build(); - List mainFieldsPlusDeleteIndicator = new ArrayList<>(mainDataset().schema().fields()); - mainFieldsPlusDeleteIndicator.add(deleteIndicator); - return DatasetDefinition.builder() - .schema(mainDataset().schema().withFields(mainFieldsPlusDeleteIndicator)) - .database(mainDataset().datasetReference().database()) - .group(mainDataset().datasetReference().group()) - .name(LogicalPlanUtils.generateTableNameWithSuffix(mainDataset().datasetReference().name().orElseThrow((IllegalStateException::new)), TEMP_DATASET_WITH_DELETE_INDICATOR_BASE_NAME)) - .alias(TEMP_DATASET_WITH_DELETE_INDICATOR_BASE_NAME) - .build(); - } - } - @Override protected BitemporalDelta ingestMode() { diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/util/LogicalPlanUtils.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/util/LogicalPlanUtils.java index aa5aa230e79..0631fe882cf 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/util/LogicalPlanUtils.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/util/LogicalPlanUtils.java @@ -16,6 +16,7 @@ import java.util.UUID; +import org.finos.legend.engine.persistence.components.common.Datasets; import org.finos.legend.engine.persistence.components.logicalplan.conditions.And; import org.finos.legend.engine.persistence.components.logicalplan.conditions.Condition; import org.finos.legend.engine.persistence.components.logicalplan.conditions.Equals; @@ -31,6 +32,8 @@ import org.finos.legend.engine.persistence.components.logicalplan.datasets.DerivedDataset; import org.finos.legend.engine.persistence.components.logicalplan.datasets.Field; import org.finos.legend.engine.persistence.components.logicalplan.datasets.Selection; +import org.finos.legend.engine.persistence.components.logicalplan.datasets.DatasetDefinition; +import org.finos.legend.engine.persistence.components.logicalplan.datasets.FieldType; import org.finos.legend.engine.persistence.components.logicalplan.values.All; import org.finos.legend.engine.persistence.components.logicalplan.values.Array; import org.finos.legend.engine.persistence.components.logicalplan.values.FieldValue; @@ -73,7 +76,9 @@ public class LogicalPlanUtils public static final String DEFAULT_META_TABLE = "batch_metadata"; public static final String DATA_SPLIT_LOWER_BOUND_PLACEHOLDER = "{DATA_SPLIT_LOWER_BOUND_PLACEHOLDER}"; public static final String DATA_SPLIT_UPPER_BOUND_PLACEHOLDER = "{DATA_SPLIT_UPPER_BOUND_PLACEHOLDER}"; - private static final String UNDERSCORE = "_"; + public static final String UNDERSCORE = "_"; + public static final String TEMP_DATASET_BASE_NAME = "legend_persistence_temp"; + public static final String TEMP_DATASET_WITH_DELETE_INDICATOR_BASE_NAME = "legend_persistence_tempWithDeleteIndicator"; private LogicalPlanUtils() { @@ -397,6 +402,38 @@ public static List extractStagedFilesFieldValues(Dataset dataset) return stagedFilesFields; } + public static Dataset getTempDataset(Datasets datasets) + { + return datasets.tempDataset().orElse(DatasetDefinition.builder() + .schema(datasets.mainDataset().schema()) + .database(datasets.mainDataset().datasetReference().database()) + .group(datasets.mainDataset().datasetReference().group()) + .name(LogicalPlanUtils.generateTableNameWithSuffix(datasets.mainDataset().datasetReference().name().orElseThrow((IllegalStateException::new)), TEMP_DATASET_BASE_NAME)) + .alias(TEMP_DATASET_BASE_NAME) + .build()); + } + + public static Dataset getTempDatasetWithDeleteIndicator(Datasets datasets, String deleteIndicatorField) + { + if (datasets.tempDatasetWithDeleteIndicator().isPresent()) + { + return datasets.tempDatasetWithDeleteIndicator().get(); + } + else + { + Field deleteIndicator = Field.builder().name(deleteIndicatorField).type(FieldType.of(DataType.BOOLEAN, Optional.empty(), Optional.empty())).build(); + List mainFieldsPlusDeleteIndicator = new ArrayList<>(datasets.mainDataset().schema().fields()); + mainFieldsPlusDeleteIndicator.add(deleteIndicator); + return DatasetDefinition.builder() + .schema(datasets.mainDataset().schema().withFields(mainFieldsPlusDeleteIndicator)) + .database(datasets.mainDataset().datasetReference().database()) + .group(datasets.mainDataset().datasetReference().group()) + .name(LogicalPlanUtils.generateTableNameWithSuffix(datasets.mainDataset().datasetReference().name().orElseThrow((IllegalStateException::new)), TEMP_DATASET_WITH_DELETE_INDICATOR_BASE_NAME)) + .alias(TEMP_DATASET_WITH_DELETE_INDICATOR_BASE_NAME) + .build(); + } + } + public static Set SUPPORTED_DATA_TYPES_FOR_OPTIMIZATION_COLUMNS = new HashSet<>(Arrays.asList(INT, INTEGER, BIGINT, FLOAT, DOUBLE, DECIMAL, DATE)); diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/RelationalIngestorAbstract.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/RelationalIngestorAbstract.java index 3f241bddd2e..b9bfd975040 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/RelationalIngestorAbstract.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/RelationalIngestorAbstract.java @@ -41,6 +41,7 @@ import org.finos.legend.engine.persistence.components.logicalplan.datasets.DatasetReference; import org.finos.legend.engine.persistence.components.logicalplan.datasets.ExternalDatasetReference; import org.finos.legend.engine.persistence.components.logicalplan.datasets.Field; +import org.finos.legend.engine.persistence.components.ingestmode.TempDatasetsEnricher; import org.finos.legend.engine.persistence.components.logicalplan.values.StringValue; import org.finos.legend.engine.persistence.components.planner.Planner; import org.finos.legend.engine.persistence.components.planner.PlannerOptions; @@ -194,47 +195,38 @@ protected TransformOptions transformOptions() - Initializes executor - @return : The methods returns the Executor to the caller enabling them to handle their own transaction */ - public Executor initExecutor(RelationalConnection connection) + public Executor init(RelationalConnection connection) { this.executor = relationalSink().getRelationalExecutor(connection); return executor; } - /* - - Initializes Datasets - - @return : The methods enriches the datasets and returns that to the user - - User is expected to invoke init before calling create/evolve/ingest - */ - public Datasets initDatasets(Datasets datasets) - { - init(datasets); - return enrichedDatasets; - } - /* - Create Datasets */ - public void create() + public Datasets create(Datasets datasets) { - validate(); + init(datasets); createAllDatasets(); + return this.enrichedDatasets; } /* - Evolve Schema of Target table based on schema changes in staging table */ - public void evolve() + public Datasets evolve(Datasets datasets) { - validate(); + init(datasets); evolveSchema(); + return this.enrichedDatasets; } /* - Perform ingestion from staging to main dataset based on the Ingest mode, executes in current transaction */ - public IngestorResult ingest() + public IngestorResult ingest(Datasets datasets) { - validate(); + init(datasets); return ingest(Arrays.asList()).stream().findFirst().orElseThrow(IllegalStateException::new); } @@ -316,22 +308,10 @@ private List ingest(List dataSplitRanges) } } - private void validate() - { - if (this.executor == null) - { - throw new IllegalStateException("Executor not initialized, invoke initExecutor first"); - } - if (this.enrichedDatasets == null) - { - throw new IllegalStateException("Datasets not initialized, invoke initDatasets first"); - } - } - private List performFullIngestion(RelationalConnection connection, Datasets datasets, List dataSplitRanges) { // 1. init - initExecutor(connection); + init(connection); init(datasets); // 2. Create Datasets @@ -405,7 +385,10 @@ private void init(Datasets datasets) // 6. Add Optimization Columns if needed enrichedIngestMode = enrichedIngestMode.accept(new IngestModeOptimizationColumnHandler(enrichedDatasets)); - // 7. generate sql plans + // 7. Enrich temp Datasets + enrichedDatasets = enrichedIngestMode.accept(new TempDatasetsEnricher(enrichedDatasets)); + + // 8. generate sql plans RelationalGenerator generator = RelationalGenerator.builder() .ingestMode(enrichedIngestMode) .relationalSink(relationalSink()) @@ -524,7 +507,7 @@ private Datasets importExternalDataset(Datasets datasets) Datasets updatedDatasets = datasets.withStagingDataset(extractedStagingDatasetDefinition); // Create staging table - LogicalPlan tableCreationPlan = LogicalPlanFactory.getDatasetCreationPlan(extractedStagingDatasetDefinition, false); + LogicalPlan tableCreationPlan = LogicalPlanFactory.getDatasetCreationPlan(extractedStagingDatasetDefinition, true); SqlPlan tableCreationPhysicalPlan = transformer.generatePhysicalPlan(tableCreationPlan); executor.executePhysicalPlan(tableCreationPhysicalPlan); diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/BaseTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/BaseTest.java index 318824291cb..ab64c7edeaf 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/BaseTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/BaseTest.java @@ -172,7 +172,17 @@ protected IngestorResult executePlansAndVerifyResults(IngestMode ingestMode, Pla .schemaEvolutionCapabilitySet(userCapabilitySet) .build(); - IngestorResult result = ingestor.performFullIngestion(JdbcConnection.of(h2Sink.connection()), datasets); +// IngestorResult result = ingestor.performFullIngestion(JdbcConnection.of(h2Sink.connection()), datasets); + Executor executor = ingestor.init(JdbcConnection.of(h2Sink.connection())); + + datasets = ingestor.create(datasets); + datasets = ingestor.evolve(datasets); + + executor.begin(); + IngestorResult result = ingestor.ingest(datasets); + // Do more stuff if needed + executor.commit(); + Map actualStats = result.statisticByName(); // Verify the database data @@ -268,13 +278,14 @@ public IngestorResult executePlansAndVerifyForCaseConversion(IngestMode ingestMo .caseConversion(CaseConversion.TO_UPPER) .build(); - Executor executor = ingestor.initExecutor(JdbcConnection.of(h2Sink.connection())); - datasets = ingestor.initDatasets(datasets); - ingestor.create(); - ingestor.evolve(); + Executor executor = ingestor.init(JdbcConnection.of(h2Sink.connection())); + + datasets = ingestor.create(datasets); + datasets = ingestor.evolve(datasets); + executor.begin(); - IngestorResult result = ingestor.ingest(); - // Do more stuff + IngestorResult result = ingestor.ingest(datasets); + // Do more stuff if needed executor.commit(); Map actualStats = result.statisticByName(); From e0edbb7c1b0599deceb77b83776831067b329b7b Mon Sep 17 00:00:00 2001 From: Ashutosh Date: Wed, 2 Aug 2023 10:02:25 +0800 Subject: [PATCH 3/6] Adding test for multi table ingestion --- .../persistence/components/TestUtils.java | 3 + .../unitemporal/MultiTableIngestionTest.java | 223 ++++++++++++++++++ .../expected_dataset1_pass1.csv | 3 + .../expected_dataset1_pass2.csv | 5 + .../expected_dataset1_pass3.csv | 5 + .../staging_dataset1_pass1.csv | 3 + .../staging_dataset1_pass2.csv | 3 + .../staging_dataset2_pass1.csv | 3 + .../staging_dataset2_pass2.csv | 3 + .../staging_dataset_pass3.csv | 1 + 10 files changed, 252 insertions(+) create mode 100644 legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/unitemporal/MultiTableIngestionTest.java create mode 100644 legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-incremental-milestoning/expected/batch_id_and_time_based/multi_table_ingestion/expected_dataset1_pass1.csv create mode 100644 legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-incremental-milestoning/expected/batch_id_and_time_based/multi_table_ingestion/expected_dataset1_pass2.csv create mode 100644 legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-incremental-milestoning/expected/batch_id_and_time_based/multi_table_ingestion/expected_dataset1_pass3.csv create mode 100644 legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-incremental-milestoning/input/batch_id_and_time_based/multi_table_ingestion/staging_dataset1_pass1.csv create mode 100644 legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-incremental-milestoning/input/batch_id_and_time_based/multi_table_ingestion/staging_dataset1_pass2.csv create mode 100644 legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-incremental-milestoning/input/batch_id_and_time_based/multi_table_ingestion/staging_dataset2_pass1.csv create mode 100644 legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-incremental-milestoning/input/batch_id_and_time_based/multi_table_ingestion/staging_dataset2_pass2.csv create mode 100644 legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-incremental-milestoning/input/batch_id_and_time_based/multi_table_ingestion/staging_dataset_pass3.csv diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/TestUtils.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/TestUtils.java index 542bbbfaaa7..19992393678 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/TestUtils.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/TestUtils.java @@ -101,6 +101,7 @@ public class TestUtils public static String endDateTimeName = "end_datetime"; public static String dataSplitName = "data_split"; public static String batchName = "batch"; + public static String ratingName = "rating"; public static HashMap> partitionFilter = new HashMap>() {{ @@ -151,6 +152,8 @@ public class TestUtils public static Field endDateTime = Field.builder().name(endDateTimeName).type(FieldType.of(DataType.DATETIME, Optional.empty(), Optional.empty())).fieldAlias(endDateTimeName).build(); public static Field dataSplit = Field.builder().name(dataSplitName).type(FieldType.of(DataType.BIGINT, Optional.empty(), Optional.empty())).primaryKey(true).fieldAlias(dataSplitName).build(); public static Field batch = Field.builder().name(batchName).type(FieldType.of(DataType.INT, Optional.empty(), Optional.empty())).fieldAlias(batchName).primaryKey(true).build(); + public static Field rating = Field.builder().name(ratingName).type(FieldType.of(DataType.INT, Optional.empty(), Optional.empty())).fieldAlias(ratingName).build(); + public static DatasetDefinition getBasicMainTable() { diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/unitemporal/MultiTableIngestionTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/unitemporal/MultiTableIngestionTest.java new file mode 100644 index 00000000000..005ca02e2cd --- /dev/null +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/unitemporal/MultiTableIngestionTest.java @@ -0,0 +1,223 @@ +// Copyright 2023 Goldman Sachs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.finos.legend.engine.persistence.components.ingestmode.unitemporal; + +import org.finos.legend.engine.persistence.components.BaseTest; +import org.finos.legend.engine.persistence.components.TestUtils; +import org.finos.legend.engine.persistence.components.common.Datasets; +import org.finos.legend.engine.persistence.components.common.StatisticName; +import org.finos.legend.engine.persistence.components.executor.Executor; +import org.finos.legend.engine.persistence.components.ingestmode.UnitemporalDelta; +import org.finos.legend.engine.persistence.components.ingestmode.transactionmilestoning.BatchIdAndDateTime; +import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset; +import org.finos.legend.engine.persistence.components.logicalplan.datasets.DatasetDefinition; +import org.finos.legend.engine.persistence.components.logicalplan.datasets.SchemaDefinition; +import org.finos.legend.engine.persistence.components.relational.api.IngestorResult; +import org.finos.legend.engine.persistence.components.relational.api.RelationalIngestor; +import org.finos.legend.engine.persistence.components.relational.h2.H2Sink; +import org.finos.legend.engine.persistence.components.relational.jdbc.JdbcConnection; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.finos.legend.engine.persistence.components.TestUtils.*; + +public class MultiTableIngestionTest extends BaseTest +{ + + private final String basePathForInput = "src/test/resources/data/unitemporal-incremental-milestoning/input/batch_id_and_time_based/"; + private final String basePathForExpected = "src/test/resources/data/unitemporal-incremental-milestoning/expected/batch_id_and_time_based/"; + + String[] datsetSchema1 = new String[]{idName, nameName, incomeName, startTimeName, expiryDateName, digestName, batchIdInName, batchIdOutName, batchTimeInName, batchTimeOutName}; + + String[] datsetSchema2 = new String[]{idName, nameName, ratingName, startTimeName, digestName, batchIdInName, batchIdOutName, batchTimeInName, batchTimeOutName}; + + private SchemaDefinition stagingDataset2Schema = + SchemaDefinition.builder() + .addFields(id) + .addFields(name) + .addFields(rating) + .addFields(startTime) + .addFields(digest) + .build(); + + @Test + public void testMultiTableIngestionSuccessCase() throws Exception + { + DatasetDefinition stagingTable1 = DatasetDefinition.builder() + .group(testSchemaName) + .name("staging1") + .schema(getStagingSchema()) + .build(); + + DatasetDefinition stagingTable2 = DatasetDefinition.builder() + .group(testSchemaName) + .name("staging2") + .schema(stagingDataset2Schema) + .build(); + + Dataset mainTable1 = DatasetDefinition.builder() + .group(testSchemaName) + .name("main1") + .schema(SchemaDefinition.builder().build()) + .build(); + + Dataset mainTable2 = DatasetDefinition.builder() + .group(testSchemaName) + .name("main2") + .schema(SchemaDefinition.builder().build()) + .build(); + + // Create staging tables + createStagingTable(stagingTable1); + createStagingTable(stagingTable2); + + Datasets datasets1 = Datasets.of(mainTable1, stagingTable1); + Datasets datasets2 = Datasets.of(mainTable2, stagingTable2); + + UnitemporalDelta ingestMode = org.finos.legend.engine.persistence.components.ingestmode.UnitemporalDelta.builder() + .digestField(digestName) + .transactionMilestoning(BatchIdAndDateTime.builder() + .batchIdInName(batchIdInName) + .batchIdOutName(batchIdOutName) + .dateTimeInName(batchTimeInName) + .dateTimeOutName(batchTimeOutName) + .build()) + .build(); + RelationalIngestor ingestor = RelationalIngestor.builder() + .ingestMode(ingestMode) + .relationalSink(H2Sink.get()) + .executionTimestampClock(fixedClock_2000_01_01) + .cleanupStagingData(true) + .collectStatistics(true) + .enableSchemaEvolution(false) + .build(); + + Executor executor = ingestor.init(JdbcConnection.of(h2Sink.connection())); + + // Create Main tables + datasets1 = ingestor.create(datasets1); + datasets2 = ingestor.create(datasets2); + + // Pass 1: + String dataset1Path = basePathForInput + "multi_table_ingestion/staging_dataset1_pass1.csv"; + String dataset2Path = basePathForInput + "multi_table_ingestion/staging_dataset2_pass1.csv"; + String expectedDataset1Path = basePathForExpected + "multi_table_ingestion/expected_dataset1_pass1.csv"; + String expectedDataset2Path = basePathForExpected + "multi_table_ingestion/expected_dataset2_pass1.csv"; + Map expectedStats = createExpectedStatsMap(3, 0, 3, 0, 0); + + loadStagingDataset1(dataset1Path); + loadStagingDataset2(dataset2Path); + + List result = ingestMultiTables(executor, ingestor, datasets1, datasets2); + verifyResults(1, datsetSchema1, expectedDataset1Path, "main1", result.get(0), expectedStats); + verifyResults(1, datsetSchema2, expectedDataset2Path, "main2", result.get(1), expectedStats); + + + // Pass 2: + dataset1Path = basePathForInput + "multi_table_ingestion/staging_dataset1_pass2.csv"; + dataset2Path = basePathForInput + "multi_table_ingestion/staging_dataset2_pass2.csv"; + expectedDataset1Path = basePathForExpected + "multi_table_ingestion/expected_dataset1_pass2.csv"; + expectedDataset2Path = basePathForExpected + "multi_table_ingestion/expected_dataset2_pass2.csv"; + expectedStats = createExpectedStatsMap(3, 0, 1, 1, 0); + + loadStagingDataset1(dataset1Path); + loadStagingDataset2(dataset2Path); + + result = ingestMultiTables(executor, ingestor, datasets1, datasets2); + verifyResults(2, datsetSchema1, expectedDataset1Path, "main1", result.get(0), expectedStats); + verifyResults(2, datsetSchema2, expectedDataset2Path, "main2", result.get(1), expectedStats); + + // Pass 3: + dataset1Path = basePathForInput + "multi_table_ingestion/staging_dataset_pass3.csv"; + dataset2Path = basePathForInput + "multi_table_ingestion/staging_dataset_pass3.csv"; + expectedDataset1Path = basePathForExpected + "multi_table_ingestion/expected_dataset1_pass3.csv"; + expectedDataset2Path = basePathForExpected + "multi_table_ingestion/expected_dataset2_pass3.csv"; + expectedStats = createExpectedStatsMap(0, 0, 0, 0, 0); + + loadStagingDataset1(dataset1Path); + loadStagingDataset2(dataset2Path); + + result = ingestMultiTables(executor, ingestor, datasets1, datasets2); + verifyResults(3, datsetSchema1, expectedDataset1Path, "main1", result.get(0), expectedStats); + verifyResults(3, datsetSchema2, expectedDataset2Path, "main2", result.get(1), expectedStats); + } + + private List ingestMultiTables(Executor executor, RelationalIngestor ingestor, Datasets... allDatasets) + { + List multiTableIngestionResult = new ArrayList<>(); + try + { + executor.begin(); + for (Datasets datasets: allDatasets) + { + IngestorResult result = ingestor.ingest(datasets); + multiTableIngestionResult.add(result); + } + executor.commit(); + + } + catch (Exception e) + { + executor.revert(); + throw e; + } + finally + { + executor.close(); + } + return multiTableIngestionResult; + } + + private void loadStagingDataset1(String path) throws Exception + { + validateFileExists(path); + String loadSql = "TRUNCATE TABLE \"TEST\".\"staging1\";" + + "INSERT INTO \"TEST\".\"staging1\"(id, name, income, start_time ,expiry_date, digest) " + + "SELECT CONVERT( \"id\",INT ), \"name\", CONVERT( \"income\", BIGINT), CONVERT( \"start_time\", DATETIME), CONVERT( \"expiry_date\", DATE), digest" + + " FROM CSVREAD( '" + path + "', 'id, name, income, start_time, expiry_date, digest', NULL )"; + h2Sink.executeStatement(loadSql); + } + + private void loadStagingDataset2(String path) throws Exception + { + validateFileExists(path); + String loadSql = "TRUNCATE TABLE \"TEST\".\"staging2\";" + + "INSERT INTO \"TEST\".\"staging2\"(id, name, rating, start_time , digest) " + + "SELECT CONVERT( \"id\",INT ), \"name\", CONVERT( \"rating\", INT), CONVERT( \"start_time\", DATETIME), digest" + + " FROM CSVREAD( '" + path + "', 'id, name, rating, start_time, digest', NULL )"; + h2Sink.executeStatement(loadSql); + } + + + private void verifyResults(int batchId, String[] schema, String expectedDataPath, String tableName, IngestorResult result, Map expectedStats) throws IOException + { + Assertions.assertEquals(batchId, result.batchId().get()); + Assertions.assertEquals("2000-01-01 00:00:00", result.ingestionTimestampUTC()); + List> tableData = h2Sink.executeQuery(String.format("select * from \"TEST\".\"%s\"", tableName)); + TestUtils.assertFileAndTableDataEquals(schema, expectedDataPath, tableData); + Map actualStats = result.statisticByName(); + for (String statistic : expectedStats.keySet()) + { + Assertions.assertEquals(expectedStats.get(statistic).toString(), actualStats.get(StatisticName.valueOf(statistic)).toString()); + } + } + + +} diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-incremental-milestoning/expected/batch_id_and_time_based/multi_table_ingestion/expected_dataset1_pass1.csv b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-incremental-milestoning/expected/batch_id_and_time_based/multi_table_ingestion/expected_dataset1_pass1.csv new file mode 100644 index 00000000000..5c9aa061073 --- /dev/null +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-incremental-milestoning/expected/batch_id_and_time_based/multi_table_ingestion/expected_dataset1_pass1.csv @@ -0,0 +1,3 @@ +1,HARRY,1000,2020-01-01 00:00:00.0,2022-12-01,DIGEST1,1,999999999,2000-01-01 00:00:00.0,9999-12-31 23:59:59.0 +2,ROBERT,2000,2020-01-02 00:00:00.0,2022-12-02,DIGEST2,1,999999999,2000-01-01 00:00:00.0,9999-12-31 23:59:59.0 +3,ANDY,3000,2020-01-03 00:00:00.0,2022-12-03,DIGEST3,1,999999999,2000-01-01 00:00:00.0,9999-12-31 23:59:59.0 diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-incremental-milestoning/expected/batch_id_and_time_based/multi_table_ingestion/expected_dataset1_pass2.csv b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-incremental-milestoning/expected/batch_id_and_time_based/multi_table_ingestion/expected_dataset1_pass2.csv new file mode 100644 index 00000000000..2e97278a1ec --- /dev/null +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-incremental-milestoning/expected/batch_id_and_time_based/multi_table_ingestion/expected_dataset1_pass2.csv @@ -0,0 +1,5 @@ +1,HARRY,1000,2020-01-01 00:00:00.0,2022-12-01,DIGEST1,1,999999999,2000-01-01 00:00:00.0,9999-12-31 23:59:59.0 +2,ROBERT,2000,2020-01-02 00:00:00.0,2022-12-02,DIGEST2,1,1,2000-01-01 00:00:00.0,2000-01-01 00:00:00.0 +3,ANDY,3000,2020-01-03 00:00:00.0,2022-12-03,DIGEST3,1,999999999,2000-01-01 00:00:00.0,9999-12-31 23:59:59.0 +2,ROBERT,4000,2020-01-02 00:00:00.0,2022-12-02,DIGEST2_UPDATED,2,999999999,2000-01-01 00:00:00.0,9999-12-31 23:59:59.0 +4,MATT,6000,2020-01-06 00:00:00.0,2022-12-06,DIGEST4,2,999999999,2000-01-01 00:00:00.0,9999-12-31 23:59:59.0 \ No newline at end of file diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-incremental-milestoning/expected/batch_id_and_time_based/multi_table_ingestion/expected_dataset1_pass3.csv b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-incremental-milestoning/expected/batch_id_and_time_based/multi_table_ingestion/expected_dataset1_pass3.csv new file mode 100644 index 00000000000..2e97278a1ec --- /dev/null +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-incremental-milestoning/expected/batch_id_and_time_based/multi_table_ingestion/expected_dataset1_pass3.csv @@ -0,0 +1,5 @@ +1,HARRY,1000,2020-01-01 00:00:00.0,2022-12-01,DIGEST1,1,999999999,2000-01-01 00:00:00.0,9999-12-31 23:59:59.0 +2,ROBERT,2000,2020-01-02 00:00:00.0,2022-12-02,DIGEST2,1,1,2000-01-01 00:00:00.0,2000-01-01 00:00:00.0 +3,ANDY,3000,2020-01-03 00:00:00.0,2022-12-03,DIGEST3,1,999999999,2000-01-01 00:00:00.0,9999-12-31 23:59:59.0 +2,ROBERT,4000,2020-01-02 00:00:00.0,2022-12-02,DIGEST2_UPDATED,2,999999999,2000-01-01 00:00:00.0,9999-12-31 23:59:59.0 +4,MATT,6000,2020-01-06 00:00:00.0,2022-12-06,DIGEST4,2,999999999,2000-01-01 00:00:00.0,9999-12-31 23:59:59.0 \ No newline at end of file diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-incremental-milestoning/input/batch_id_and_time_based/multi_table_ingestion/staging_dataset1_pass1.csv b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-incremental-milestoning/input/batch_id_and_time_based/multi_table_ingestion/staging_dataset1_pass1.csv new file mode 100644 index 00000000000..72cc5edbebc --- /dev/null +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-incremental-milestoning/input/batch_id_and_time_based/multi_table_ingestion/staging_dataset1_pass1.csv @@ -0,0 +1,3 @@ +1,HARRY,1000,2020-01-01 00:00:00.0,2022-12-01,DIGEST1 +2,ROBERT,2000,2020-01-02 00:00:00.0,2022-12-02,DIGEST2 +3,ANDY,3000,2020-01-03 00:00:00.0,2022-12-03,DIGEST3 \ No newline at end of file diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-incremental-milestoning/input/batch_id_and_time_based/multi_table_ingestion/staging_dataset1_pass2.csv b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-incremental-milestoning/input/batch_id_and_time_based/multi_table_ingestion/staging_dataset1_pass2.csv new file mode 100644 index 00000000000..0d58c6909b0 --- /dev/null +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-incremental-milestoning/input/batch_id_and_time_based/multi_table_ingestion/staging_dataset1_pass2.csv @@ -0,0 +1,3 @@ +1,HARRY,1000,2020-01-01 00:00:00.0,2022-12-01,DIGEST1 +2,ROBERT,4000,2020-01-02 00:00:00.0,2022-12-02,DIGEST2_UPDATED +4,MATT,6000,2020-01-06 00:00:00.0,2022-12-06,DIGEST4 diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-incremental-milestoning/input/batch_id_and_time_based/multi_table_ingestion/staging_dataset2_pass1.csv b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-incremental-milestoning/input/batch_id_and_time_based/multi_table_ingestion/staging_dataset2_pass1.csv new file mode 100644 index 00000000000..9b4aeedf5fb --- /dev/null +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-incremental-milestoning/input/batch_id_and_time_based/multi_table_ingestion/staging_dataset2_pass1.csv @@ -0,0 +1,3 @@ +1,PRODUCT1,10,2020-01-01 00:00:00.0,DIGEST1 +2,PRODUCT2,9,2021-01-01 00:00:00.0,DIGEST2 +3,PRODUCT3,8,2022-01-01 00:00:00.0,DIGEST3 \ No newline at end of file diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-incremental-milestoning/input/batch_id_and_time_based/multi_table_ingestion/staging_dataset2_pass2.csv b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-incremental-milestoning/input/batch_id_and_time_based/multi_table_ingestion/staging_dataset2_pass2.csv new file mode 100644 index 00000000000..cfd6b13af1b --- /dev/null +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-incremental-milestoning/input/batch_id_and_time_based/multi_table_ingestion/staging_dataset2_pass2.csv @@ -0,0 +1,3 @@ +1,PRODUCT1,10,2020-01-01 00:00:00.0,DIGEST1 +2,PRODUCT2,8,2021-01-01 00:00:00.0,DIGEST2_UPDATED +4,PRODUCT4,7,2023-01-01 00:00:00.0,DIGEST4 \ No newline at end of file diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-incremental-milestoning/input/batch_id_and_time_based/multi_table_ingestion/staging_dataset_pass3.csv b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-incremental-milestoning/input/batch_id_and_time_based/multi_table_ingestion/staging_dataset_pass3.csv new file mode 100644 index 00000000000..8b137891791 --- /dev/null +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-incremental-milestoning/input/batch_id_and_time_based/multi_table_ingestion/staging_dataset_pass3.csv @@ -0,0 +1 @@ + From e2f3987f8e5e7353a88ac074140eadaca84e7442 Mon Sep 17 00:00:00 2001 From: Ashutosh Date: Wed, 2 Aug 2023 10:03:27 +0800 Subject: [PATCH 4/6] Adding resources for multi table ingestion --- .../multi_table_ingestion/expected_dataset2_pass1.csv | 3 +++ .../multi_table_ingestion/expected_dataset2_pass2.csv | 5 +++++ .../multi_table_ingestion/expected_dataset2_pass3.csv | 5 +++++ 3 files changed, 13 insertions(+) create mode 100644 legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-incremental-milestoning/expected/batch_id_and_time_based/multi_table_ingestion/expected_dataset2_pass1.csv create mode 100644 legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-incremental-milestoning/expected/batch_id_and_time_based/multi_table_ingestion/expected_dataset2_pass2.csv create mode 100644 legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-incremental-milestoning/expected/batch_id_and_time_based/multi_table_ingestion/expected_dataset2_pass3.csv diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-incremental-milestoning/expected/batch_id_and_time_based/multi_table_ingestion/expected_dataset2_pass1.csv b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-incremental-milestoning/expected/batch_id_and_time_based/multi_table_ingestion/expected_dataset2_pass1.csv new file mode 100644 index 00000000000..de7a8a786f0 --- /dev/null +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-incremental-milestoning/expected/batch_id_and_time_based/multi_table_ingestion/expected_dataset2_pass1.csv @@ -0,0 +1,3 @@ +1,PRODUCT1,10,2020-01-01 00:00:00.0,DIGEST1,1,999999999,2000-01-01 00:00:00.0,9999-12-31 23:59:59.0 +2,PRODUCT2,9,2021-01-01 00:00:00.0,DIGEST2,1,999999999,2000-01-01 00:00:00.0,9999-12-31 23:59:59.0 +3,PRODUCT3,8,2022-01-01 00:00:00.0,DIGEST3,1,999999999,2000-01-01 00:00:00.0,9999-12-31 23:59:59.0 \ No newline at end of file diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-incremental-milestoning/expected/batch_id_and_time_based/multi_table_ingestion/expected_dataset2_pass2.csv b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-incremental-milestoning/expected/batch_id_and_time_based/multi_table_ingestion/expected_dataset2_pass2.csv new file mode 100644 index 00000000000..7e16f7a5e94 --- /dev/null +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-incremental-milestoning/expected/batch_id_and_time_based/multi_table_ingestion/expected_dataset2_pass2.csv @@ -0,0 +1,5 @@ +1,PRODUCT1,10,2020-01-01 00:00:00.0,DIGEST1,1,999999999,2000-01-01 00:00:00.0,9999-12-31 23:59:59.0 +2,PRODUCT2,9,2021-01-01 00:00:00.0,DIGEST2,1,1,2000-01-01 00:00:00.0,2000-01-01 00:00:00.0 +3,PRODUCT3,8,2022-01-01 00:00:00.0,DIGEST3,1,999999999,2000-01-01 00:00:00.0,9999-12-31 23:59:59.0 +2,PRODUCT2,8,2021-01-01 00:00:00.0,DIGEST2_UPDATED,2,999999999,2000-01-01 00:00:00.0,9999-12-31 23:59:59.0 +4,PRODUCT4,7,2023-01-01 00:00:00.0,DIGEST4,2,999999999,2000-01-01 00:00:00.0,9999-12-31 23:59:59.0 \ No newline at end of file diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-incremental-milestoning/expected/batch_id_and_time_based/multi_table_ingestion/expected_dataset2_pass3.csv b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-incremental-milestoning/expected/batch_id_and_time_based/multi_table_ingestion/expected_dataset2_pass3.csv new file mode 100644 index 00000000000..7e16f7a5e94 --- /dev/null +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/resources/data/unitemporal-incremental-milestoning/expected/batch_id_and_time_based/multi_table_ingestion/expected_dataset2_pass3.csv @@ -0,0 +1,5 @@ +1,PRODUCT1,10,2020-01-01 00:00:00.0,DIGEST1,1,999999999,2000-01-01 00:00:00.0,9999-12-31 23:59:59.0 +2,PRODUCT2,9,2021-01-01 00:00:00.0,DIGEST2,1,1,2000-01-01 00:00:00.0,2000-01-01 00:00:00.0 +3,PRODUCT3,8,2022-01-01 00:00:00.0,DIGEST3,1,999999999,2000-01-01 00:00:00.0,9999-12-31 23:59:59.0 +2,PRODUCT2,8,2021-01-01 00:00:00.0,DIGEST2_UPDATED,2,999999999,2000-01-01 00:00:00.0,9999-12-31 23:59:59.0 +4,PRODUCT4,7,2023-01-01 00:00:00.0,DIGEST4,2,999999999,2000-01-01 00:00:00.0,9999-12-31 23:59:59.0 \ No newline at end of file From 46e25124e6ae04848acb7aa19b5388cce36c954b Mon Sep 17 00:00:00 2001 From: Ashutosh Date: Wed, 2 Aug 2023 14:44:24 +0800 Subject: [PATCH 5/6] Exposing RelationalHelper from Executor --- .../components/executor/Executor.java | 2 + .../executor/RelationalExecutionHelper.java | 8 +- .../components/executor/TypeMapping.java | 20 +++ .../bigquery/executor/BigQueryExecutor.java | 7 + .../bigquery/executor/BigQueryHelper.java | 17 ++- .../components/relational/RelationalSink.java | 2 +- .../executor/RelationalExecutor.java | 7 + .../relational/jdbc/JdbcHelper.java | 17 ++- .../relational/sql/DataTypeMapping.java | 3 +- ...dbcPropertiesToLogicalDataTypeMapping.java | 3 +- .../persistence/components/TestUtils.java | 2 +- .../unitemporal/MultiTableIngestionTest.java | 140 +++++++++++++++--- .../relational/jdbc/JdbcHelperTest.java | 2 +- .../relational/memsql/MemSqlSink.java | 2 +- 14 files changed, 191 insertions(+), 41 deletions(-) rename legend-engine-xts-persistence/legend-engine-xt-persistence-component/{legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational => legend-engine-xt-persistence-component-physical-plan/src/main/java/org/finos/legend/engine/persistence/components}/executor/RelationalExecutionHelper.java (73%) create mode 100644 legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-physical-plan/src/main/java/org/finos/legend/engine/persistence/components/executor/TypeMapping.java diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-physical-plan/src/main/java/org/finos/legend/engine/persistence/components/executor/Executor.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-physical-plan/src/main/java/org/finos/legend/engine/persistence/components/executor/Executor.java index 55d8a1b4c2d..0548d5f0f12 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-physical-plan/src/main/java/org/finos/legend/engine/persistence/components/executor/Executor.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-physical-plan/src/main/java/org/finos/legend/engine/persistence/components/executor/Executor.java @@ -44,4 +44,6 @@ public interface Executor placeholderKeyValues, String sql) { String enrichedSql = sql; diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/main/java/org/finos/legend/engine/persistence/components/relational/bigquery/executor/BigQueryHelper.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/main/java/org/finos/legend/engine/persistence/components/relational/bigquery/executor/BigQueryHelper.java index 8e7168db320..867b7c7b096 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/main/java/org/finos/legend/engine/persistence/components/relational/bigquery/executor/BigQueryHelper.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/main/java/org/finos/legend/engine/persistence/components/relational/bigquery/executor/BigQueryHelper.java @@ -17,6 +17,7 @@ import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.FieldList; import com.google.cloud.bigquery.TableId; +import org.finos.legend.engine.persistence.components.executor.TypeMapping; import org.finos.legend.engine.persistence.components.logicalplan.LogicalPlan; import org.finos.legend.engine.persistence.components.logicalplan.conditions.And; import org.finos.legend.engine.persistence.components.logicalplan.conditions.Equals; @@ -32,7 +33,7 @@ import org.finos.legend.engine.persistence.components.relational.SqlPlan; import org.finos.legend.engine.persistence.components.relational.bigquery.BigQuerySink; import org.finos.legend.engine.persistence.components.relational.bigquery.sqldom.constraints.columns.PKColumnConstraint; -import org.finos.legend.engine.persistence.components.relational.executor.RelationalExecutionHelper; +import org.finos.legend.engine.persistence.components.executor.RelationalExecutionHelper; import org.finos.legend.engine.persistence.components.relational.sql.DataTypeMapping; import org.finos.legend.engine.persistence.components.relational.sql.JdbcPropertiesToLogicalDataTypeMapping; import org.finos.legend.engine.persistence.components.relational.sqldom.constraints.column.ColumnConstraint; @@ -156,8 +157,13 @@ public boolean doesTableExist(Dataset dataset) return tableExists; } - public void validateDatasetSchema(Dataset dataset, DataTypeMapping datatypeMapping) + public void validateDatasetSchema(Dataset dataset, TypeMapping typeMapping) { + if (!(typeMapping instanceof DataTypeMapping)) + { + throw new IllegalStateException("Only DataTypeMapping allowed in validateDatasetSchema"); + } + DataTypeMapping datatypeMapping = (DataTypeMapping) typeMapping; String name = dataset.datasetReference().name().orElseThrow(IllegalStateException::new); String schema = dataset.datasetReference().group().orElse(null); @@ -201,8 +207,13 @@ public void validateDatasetSchema(Dataset dataset, DataTypeMapping datatypeMappi validateColumns(userColumns, dbColumns); } - public Dataset constructDatasetFromDatabase(String tableName, String schemaName, String databaseName, JdbcPropertiesToLogicalDataTypeMapping mapping) + public Dataset constructDatasetFromDatabase(String tableName, String schemaName, String databaseName, TypeMapping typeMapping) { + if (!(typeMapping instanceof JdbcPropertiesToLogicalDataTypeMapping)) + { + throw new IllegalStateException("Only JdbcPropertiesToLogicalDataTypeMapping allowed in constructDatasetFromDatabase"); + } + JdbcPropertiesToLogicalDataTypeMapping mapping = (JdbcPropertiesToLogicalDataTypeMapping) typeMapping; List primaryKeysInDb = this.fetchPrimaryKeys(tableName, schemaName, databaseName); com.google.cloud.bigquery.Table table = this.bigQuery.getTable(TableId.of(schemaName, tableName)); diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/RelationalSink.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/RelationalSink.java index 621828471af..f7808d4a0fb 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/RelationalSink.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/RelationalSink.java @@ -24,7 +24,7 @@ import org.finos.legend.engine.persistence.components.optimizer.Optimizer; import org.finos.legend.engine.persistence.components.relational.api.IngestorResult; import org.finos.legend.engine.persistence.components.relational.api.RelationalConnection; -import org.finos.legend.engine.persistence.components.relational.executor.RelationalExecutionHelper; +import org.finos.legend.engine.persistence.components.executor.RelationalExecutionHelper; import org.finos.legend.engine.persistence.components.relational.sql.TabularData; import org.finos.legend.engine.persistence.components.relational.sqldom.SqlGen; import org.finos.legend.engine.persistence.components.sink.Sink; diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/executor/RelationalExecutor.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/executor/RelationalExecutor.java index 62ab267f697..0c20f9a8067 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/executor/RelationalExecutor.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/executor/RelationalExecutor.java @@ -15,6 +15,7 @@ package org.finos.legend.engine.persistence.components.relational.executor; import org.finos.legend.engine.persistence.components.executor.Executor; +import org.finos.legend.engine.persistence.components.executor.RelationalExecutionHelper; import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset; import org.finos.legend.engine.persistence.components.relational.RelationalSink; import org.finos.legend.engine.persistence.components.relational.SqlPlan; @@ -128,6 +129,12 @@ public void close() relationalExecutionHelper.closeTransactionManager(); } + @Override + public RelationalExecutionHelper getRelationalExecutionHelper() + { + return this.relationalExecutionHelper; + } + private String getEnrichedSql(Map placeholderKeyValues, String sql) { String enrichedSql = sql; diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/jdbc/JdbcHelper.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/jdbc/JdbcHelper.java index 0e8722d4d59..8d907c95a7d 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/jdbc/JdbcHelper.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/jdbc/JdbcHelper.java @@ -14,13 +14,14 @@ package org.finos.legend.engine.persistence.components.relational.jdbc; +import org.finos.legend.engine.persistence.components.executor.TypeMapping; import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset; import org.finos.legend.engine.persistence.components.logicalplan.datasets.DatasetDefinition; import org.finos.legend.engine.persistence.components.logicalplan.datasets.Field; import org.finos.legend.engine.persistence.components.logicalplan.datasets.FieldType; import org.finos.legend.engine.persistence.components.logicalplan.datasets.Index; import org.finos.legend.engine.persistence.components.logicalplan.datasets.SchemaDefinition; -import org.finos.legend.engine.persistence.components.relational.executor.RelationalExecutionHelper; +import org.finos.legend.engine.persistence.components.executor.RelationalExecutionHelper; import org.finos.legend.engine.persistence.components.relational.sql.DataTypeMapping; import org.finos.legend.engine.persistence.components.relational.sql.JdbcPropertiesToLogicalDataTypeMapping; import org.finos.legend.engine.persistence.components.relational.sqldom.common.Clause; @@ -160,10 +161,15 @@ public boolean doesTableExist(Dataset dataset) } @Override - public void validateDatasetSchema(Dataset dataset, DataTypeMapping datatypeMapping) + public void validateDatasetSchema(Dataset dataset, TypeMapping typeMapping) { try { + if (!(typeMapping instanceof DataTypeMapping)) + { + throw new IllegalStateException("Only DataTypeMapping allowed in validateDatasetSchema"); + } + DataTypeMapping datatypeMapping = (DataTypeMapping) typeMapping; String name = dataset.datasetReference().name().orElseThrow(IllegalStateException::new); String database = dataset.datasetReference().database().orElse(null); String schema = dataset.datasetReference().group().orElse(null); @@ -250,10 +256,15 @@ public void validateDatasetSchema(Dataset dataset, DataTypeMapping datatypeMappi } @Override - public Dataset constructDatasetFromDatabase(String tableName, String schemaName, String databaseName, JdbcPropertiesToLogicalDataTypeMapping mapping) + public Dataset constructDatasetFromDatabase(String tableName, String schemaName, String databaseName, TypeMapping typeMapping) { try { + if (!(typeMapping instanceof JdbcPropertiesToLogicalDataTypeMapping)) + { + throw new IllegalStateException("Only JdbcPropertiesToLogicalDataTypeMapping allowed in constructDatasetFromDatabase"); + } + JdbcPropertiesToLogicalDataTypeMapping mapping = (JdbcPropertiesToLogicalDataTypeMapping) typeMapping; DatabaseMetaData dbMetaData = this.connection.getMetaData(); // Get primary keys diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/sql/DataTypeMapping.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/sql/DataTypeMapping.java index fba4f18bf42..5561c66f0a3 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/sql/DataTypeMapping.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/sql/DataTypeMapping.java @@ -14,10 +14,11 @@ package org.finos.legend.engine.persistence.components.relational.sql; +import org.finos.legend.engine.persistence.components.executor.TypeMapping; import org.finos.legend.engine.persistence.components.logicalplan.datasets.FieldType; import org.finos.legend.engine.persistence.components.relational.sqldom.schema.DataType; -public interface DataTypeMapping +public interface DataTypeMapping extends TypeMapping { DataType getDataType(FieldType type); } diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/sql/JdbcPropertiesToLogicalDataTypeMapping.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/sql/JdbcPropertiesToLogicalDataTypeMapping.java index 20193b12ae2..e641716ce4e 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/sql/JdbcPropertiesToLogicalDataTypeMapping.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/sql/JdbcPropertiesToLogicalDataTypeMapping.java @@ -14,9 +14,10 @@ package org.finos.legend.engine.persistence.components.relational.sql; +import org.finos.legend.engine.persistence.components.executor.TypeMapping; import org.finos.legend.engine.persistence.components.logicalplan.datasets.FieldType; -public interface JdbcPropertiesToLogicalDataTypeMapping +public interface JdbcPropertiesToLogicalDataTypeMapping extends TypeMapping { String NUMBER = "NUMBER"; String TINYINT = "TINYINT"; diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/TestUtils.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/TestUtils.java index 19992393678..d5cae4280ad 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/TestUtils.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/TestUtils.java @@ -26,7 +26,7 @@ import org.finos.legend.engine.persistence.components.logicalplan.datasets.FieldType; import org.finos.legend.engine.persistence.components.logicalplan.datasets.JsonExternalDatasetReference; import org.finos.legend.engine.persistence.components.logicalplan.datasets.SchemaDefinition; -import org.finos.legend.engine.persistence.components.relational.executor.RelationalExecutionHelper; +import org.finos.legend.engine.persistence.components.executor.RelationalExecutionHelper; import org.finos.legend.engine.persistence.components.util.MetadataDataset; import org.junit.jupiter.api.Assertions; diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/unitemporal/MultiTableIngestionTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/unitemporal/MultiTableIngestionTest.java index 005ca02e2cd..515fd9ac080 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/unitemporal/MultiTableIngestionTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/unitemporal/MultiTableIngestionTest.java @@ -57,33 +57,33 @@ public class MultiTableIngestionTest extends BaseTest .addFields(digest) .build(); + DatasetDefinition stagingTable1 = DatasetDefinition.builder() + .group(testSchemaName) + .name("staging1") + .schema(getStagingSchema()) + .build(); + + DatasetDefinition stagingTable2 = DatasetDefinition.builder() + .group(testSchemaName) + .name("staging2") + .schema(stagingDataset2Schema) + .build(); + + Dataset mainTable1 = DatasetDefinition.builder() + .group(testSchemaName) + .name("main1") + .schema(SchemaDefinition.builder().build()) + .build(); + + Dataset mainTable2 = DatasetDefinition.builder() + .group(testSchemaName) + .name("main2") + .schema(SchemaDefinition.builder().build()) + .build(); + @Test public void testMultiTableIngestionSuccessCase() throws Exception { - DatasetDefinition stagingTable1 = DatasetDefinition.builder() - .group(testSchemaName) - .name("staging1") - .schema(getStagingSchema()) - .build(); - - DatasetDefinition stagingTable2 = DatasetDefinition.builder() - .group(testSchemaName) - .name("staging2") - .schema(stagingDataset2Schema) - .build(); - - Dataset mainTable1 = DatasetDefinition.builder() - .group(testSchemaName) - .name("main1") - .schema(SchemaDefinition.builder().build()) - .build(); - - Dataset mainTable2 = DatasetDefinition.builder() - .group(testSchemaName) - .name("main2") - .schema(SchemaDefinition.builder().build()) - .build(); - // Create staging tables createStagingTable(stagingTable1); createStagingTable(stagingTable2); @@ -157,6 +157,10 @@ public void testMultiTableIngestionSuccessCase() throws Exception result = ingestMultiTables(executor, ingestor, datasets1, datasets2); verifyResults(3, datsetSchema1, expectedDataset1Path, "main1", result.get(0), expectedStats); verifyResults(3, datsetSchema2, expectedDataset2Path, "main2", result.get(1), expectedStats); + + // Verify if additional query data was written + List> tableData = h2Sink.executeQuery(String.format("select * from batch_metadata where table_name = 'new_test_table'")); + Assertions.assertEquals(3, tableData.size()); } private List ingestMultiTables(Executor executor, RelationalIngestor ingestor, Datasets... allDatasets) @@ -170,8 +174,95 @@ private List ingestMultiTables(Executor executor, RelationalInge IngestorResult result = ingestor.ingest(datasets); multiTableIngestionResult.add(result); } + + // Show how we can add more to same tx + executor.getRelationalExecutionHelper().executeStatement("insert into batch_metadata(table_name, batch_status) values ('new_test_table', 'test_tx')"); executor.commit(); + } + catch (Exception e) + { + executor.revert(); + throw e; + } + finally + { + executor.close(); + } + return multiTableIngestionResult; + } + + @Test + public void testMultiTableIngestionWithFailedTx() throws Exception + { + // Create staging tables + createStagingTable(stagingTable1); + createStagingTable(stagingTable2); + + Datasets datasets1 = Datasets.of(mainTable1, stagingTable1); + Datasets datasets2 = Datasets.of(mainTable2, stagingTable2); + + UnitemporalDelta ingestMode = org.finos.legend.engine.persistence.components.ingestmode.UnitemporalDelta.builder() + .digestField(digestName) + .transactionMilestoning(BatchIdAndDateTime.builder() + .batchIdInName(batchIdInName) + .batchIdOutName(batchIdOutName) + .dateTimeInName(batchTimeInName) + .dateTimeOutName(batchTimeOutName) + .build()) + .build(); + RelationalIngestor ingestor = RelationalIngestor.builder() + .ingestMode(ingestMode) + .relationalSink(H2Sink.get()) + .executionTimestampClock(fixedClock_2000_01_01) + .cleanupStagingData(true) + .collectStatistics(true) + .enableSchemaEvolution(false) + .build(); + + Executor executor = ingestor.init(JdbcConnection.of(h2Sink.connection())); + + // Create Main tables + datasets1 = ingestor.create(datasets1); + datasets2 = ingestor.create(datasets2); + + // Pass 1: + String dataset1Path = basePathForInput + "multi_table_ingestion/staging_dataset1_pass1.csv"; + String dataset2Path = basePathForInput + "multi_table_ingestion/staging_dataset2_pass1.csv"; + + loadStagingDataset1(dataset1Path); + loadStagingDataset2(dataset2Path); + try + { + List result = ingestMultiTablesWithBadQuery(executor, ingestor, datasets1, datasets2); + Assertions.fail("Should not reach here"); + } + catch (Exception e) + { + Assertions.assertTrue(e.getMessage().contains("Column \"unknown_column\" not found")); + // Verify that no data was written in the two datasets + List> tableData1 = h2Sink.executeQuery(String.format("select * from \"TEST\".\"main1\"")); + List> tableData2 = h2Sink.executeQuery(String.format("select * from \"TEST\".\"main2\"")); + Assertions.assertTrue(tableData1.isEmpty()); + Assertions.assertTrue(tableData2.isEmpty()); + } + } + + private List ingestMultiTablesWithBadQuery(Executor executor, RelationalIngestor ingestor, Datasets... allDatasets) + { + List multiTableIngestionResult = new ArrayList<>(); + try + { + executor.begin(); + for (Datasets datasets: allDatasets) + { + IngestorResult result = ingestor.ingest(datasets); + multiTableIngestionResult.add(result); + } + + // A bad query should revert the complete transaction + executor.getRelationalExecutionHelper().executeStatement("insert into batch_metadata(table_name, batch_status, unknown_column) values ('new_test_table', 'test_tx', 'XYZ')"); + executor.commit(); } catch (Exception e) { @@ -185,6 +276,7 @@ private List ingestMultiTables(Executor executor, RelationalInge return multiTableIngestionResult; } + private void loadStagingDataset1(String path) throws Exception { validateFileExists(path); diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/relational/jdbc/JdbcHelperTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/relational/jdbc/JdbcHelperTest.java index 3271a4819c0..c9413f67ec2 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/relational/jdbc/JdbcHelperTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/relational/jdbc/JdbcHelperTest.java @@ -14,7 +14,7 @@ package org.finos.legend.engine.persistence.components.relational.jdbc; -import org.finos.legend.engine.persistence.components.relational.executor.RelationalExecutionHelper; +import org.finos.legend.engine.persistence.components.executor.RelationalExecutionHelper; import org.finos.legend.engine.persistence.components.relational.h2.H2Sink; import org.junit.jupiter.api.Test; diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-memsql/src/main/java/org/finos/legend/engine/persistence/components/relational/memsql/MemSqlSink.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-memsql/src/main/java/org/finos/legend/engine/persistence/components/relational/memsql/MemSqlSink.java index 9eb35b96bbf..dc1085f9aba 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-memsql/src/main/java/org/finos/legend/engine/persistence/components/relational/memsql/MemSqlSink.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-memsql/src/main/java/org/finos/legend/engine/persistence/components/relational/memsql/MemSqlSink.java @@ -34,7 +34,7 @@ import org.finos.legend.engine.persistence.components.relational.ansi.optimizer.LowerCaseOptimizer; import org.finos.legend.engine.persistence.components.relational.ansi.optimizer.UpperCaseOptimizer; import org.finos.legend.engine.persistence.components.relational.api.RelationalConnection; -import org.finos.legend.engine.persistence.components.relational.executor.RelationalExecutionHelper; +import org.finos.legend.engine.persistence.components.executor.RelationalExecutionHelper; import org.finos.legend.engine.persistence.components.relational.executor.RelationalExecutor; import org.finos.legend.engine.persistence.components.relational.jdbc.JdbcConnection; import org.finos.legend.engine.persistence.components.relational.jdbc.JdbcHelper; From 9a9250c1b38814418df35009094ebe987cf40562 Mon Sep 17 00:00:00 2001 From: Ashutosh Date: Fri, 4 Aug 2023 09:10:27 +0800 Subject: [PATCH 6/6] Removing comments --- .../engine/persistence/components/BaseTest.java | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/BaseTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/BaseTest.java index ab64c7edeaf..507c99b8148 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/BaseTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/BaseTest.java @@ -172,16 +172,7 @@ protected IngestorResult executePlansAndVerifyResults(IngestMode ingestMode, Pla .schemaEvolutionCapabilitySet(userCapabilitySet) .build(); -// IngestorResult result = ingestor.performFullIngestion(JdbcConnection.of(h2Sink.connection()), datasets); - Executor executor = ingestor.init(JdbcConnection.of(h2Sink.connection())); - - datasets = ingestor.create(datasets); - datasets = ingestor.evolve(datasets); - - executor.begin(); - IngestorResult result = ingestor.ingest(datasets); - // Do more stuff if needed - executor.commit(); + IngestorResult result = ingestor.performFullIngestion(JdbcConnection.of(h2Sink.connection()), datasets); Map actualStats = result.statisticByName();