From c2ac9335ef0935d66615427f6f731341bad138fc Mon Sep 17 00:00:00 2001 From: prasar-ashutosh Date: Mon, 27 Nov 2023 15:10:09 +0800 Subject: [PATCH] Support for filepaths and filepatterns in Snowflake Load --- .../components/common/FileFormatType.java | 23 +++ .../StagedFilesDatasetProperties.java | 20 ++- .../logicalplan/operations/CopyAbstract.java | 5 +- .../components/planner/BulkLoadPlanner.java | 23 ++- .../components/planner/Planner.java | 2 +- ...yStagedFilesDatasetPropertiesAbstract.java | 7 +- .../StagedFilesDatasetReferenceVisitor.java | 40 +---- .../components/e2e/BulkLoadExecutorTest.java | 22 ++- .../components/e2e/BulkLoadGeneratorTest.java | 8 +- .../components/ingestmode/BulkLoadTest.java | 61 ++++---- .../api/RelationalGeneratorAbstract.java | 4 +- .../api/RelationalIngestorAbstract.java | 6 +- ...2StagedFilesDatasetPropertiesAbstract.java | 12 +- .../StagedFilesDatasetReferenceVisitor.java | 2 +- .../ingestmode/bulkload/BulkLoadTest.java | 64 ++++---- .../logicalplan/datasets}/FileFormat.java | 9 +- ...eStagedFilesDatasetPropertiesAbstract.java | 5 +- .../datasets/StandardFileFormatAbstract.java} | 35 ++--- .../UserDefinedFileFormatAbstract.java | 32 ++++ .../snowflake/sql/visitor/CopyVisitor.java | 44 +++++- .../StagedFilesDatasetReferenceVisitor.java | 2 - .../expressions/table/StagedFilesTable.java | 33 ---- .../schemaops/statements/CopyStatement.java | 142 +++++++++++++----- .../components/ingestmode/BulkLoadTest.java | 94 +++++++----- .../sqldom/schemaops/CopyStatementTest.java | 39 +++-- 25 files changed, 434 insertions(+), 300 deletions(-) create mode 100644 legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/common/FileFormatType.java rename legend-engine-xts-persistence/legend-engine-xt-persistence-component/{legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/common => legend-engine-xt-persistence-component-relational-snowflake/src/main/java/org/finos/legend/engine/persistence/components/relational/snowflake/logicalplan/datasets}/FileFormat.java (82%) rename legend-engine-xts-persistence/legend-engine-xt-persistence-component/{legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/common/LoadOptionsAbstract.java => legend-engine-xt-persistence-component-relational-snowflake/src/main/java/org/finos/legend/engine/persistence/components/relational/snowflake/logicalplan/datasets/StandardFileFormatAbstract.java} (53%) create mode 100644 legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-snowflake/src/main/java/org/finos/legend/engine/persistence/components/relational/snowflake/logicalplan/datasets/UserDefinedFileFormatAbstract.java diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/common/FileFormatType.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/common/FileFormatType.java new file mode 100644 index 00000000000..f5242a06056 --- /dev/null +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/common/FileFormatType.java @@ -0,0 +1,23 @@ +// Copyright 2023 Goldman Sachs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.finos.legend.engine.persistence.components.common; + +public enum FileFormatType +{ + CSV, + JSON, + AVRO, + PARQUET; +} diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/logicalplan/datasets/StagedFilesDatasetProperties.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/logicalplan/datasets/StagedFilesDatasetProperties.java index 248a7e98256..8dae01e0dc5 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/logicalplan/datasets/StagedFilesDatasetProperties.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/logicalplan/datasets/StagedFilesDatasetProperties.java @@ -14,14 +14,26 @@ package org.finos.legend.engine.persistence.components.logicalplan.datasets; -import org.finos.legend.engine.persistence.components.common.LoadOptions; +import org.immutables.value.Value; import java.util.List; -import java.util.Optional; public interface StagedFilesDatasetProperties { - List files(); + List filePaths(); - Optional loadOptions(); + List filePatterns(); + + @Value.Check + default void validate() + { + if (filePatterns().size() > 0 && filePaths().size() > 0) + { + throw new IllegalArgumentException("Cannot build StagedFilesDatasetProperties, Only one out of filePatterns and filePaths should be provided"); + } + if (filePatterns().size() == 0 && filePaths().size() == 0) + { + throw new IllegalArgumentException("Cannot build StagedFilesDatasetProperties, Either one of filePatterns and filePaths must be provided"); + } + } } \ No newline at end of file diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/logicalplan/operations/CopyAbstract.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/logicalplan/operations/CopyAbstract.java index 36d1a3c4c4b..82e5876ed42 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/logicalplan/operations/CopyAbstract.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/logicalplan/operations/CopyAbstract.java @@ -14,12 +14,11 @@ package org.finos.legend.engine.persistence.components.logicalplan.operations; -import org.finos.legend.engine.persistence.components.common.LoadOptions; import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset; +import org.finos.legend.engine.persistence.components.logicalplan.datasets.StagedFilesDatasetProperties; import org.finos.legend.engine.persistence.components.logicalplan.values.Value; import java.util.List; -import java.util.Optional; import static org.immutables.value.Value.Immutable; import static org.immutables.value.Value.Parameter; @@ -45,5 +44,5 @@ public interface CopyAbstract extends Operation List fields(); @Parameter(order = 3) - Optional loadOptions(); + StagedFilesDatasetProperties stagedFilesDatasetProperties(); } diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/planner/BulkLoadPlanner.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/planner/BulkLoadPlanner.java index 430e2c55383..0024e456d2e 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/planner/BulkLoadPlanner.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/planner/BulkLoadPlanner.java @@ -62,7 +62,7 @@ class BulkLoadPlanner extends Planner private Dataset tempDataset; private StagedFilesDataset stagedFilesDataset; private BulkLoadMetadataDataset bulkLoadMetadataDataset; - private Optional bulkLoadTaskIdValue; + private Optional bulkLoadEventIdValue; BulkLoadPlanner(Datasets datasets, BulkLoad ingestMode, PlannerOptions plannerOptions, Set capabilities) { @@ -75,7 +75,7 @@ class BulkLoadPlanner extends Planner throw new IllegalArgumentException("Only StagedFilesDataset are allowed under Bulk Load"); } - bulkLoadTaskIdValue = plannerOptions.bulkLoadTaskIdValue(); + bulkLoadEventIdValue = plannerOptions.bulkLoadEventIdValue(); stagedFilesDataset = (StagedFilesDataset) datasets.stagingDataset(); bulkLoadMetadataDataset = bulkLoadMetadataDataset().orElseThrow(IllegalStateException::new); @@ -139,7 +139,7 @@ private LogicalPlan buildLogicalPlanForTransformWhileCopy(Resources resources) } Dataset selectStage = StagedFilesSelection.builder().source(stagedFilesDataset).addAllFields(fieldsToSelect).build(); - return LogicalPlan.of(Collections.singletonList(Copy.of(mainDataset(), selectStage, fieldsToInsert, stagedFilesDataset.stagedFilesDatasetProperties().loadOptions()))); + return LogicalPlan.of(Collections.singletonList(Copy.of(mainDataset(), selectStage, fieldsToInsert, stagedFilesDataset.stagedFilesDatasetProperties()))); } private LogicalPlan buildLogicalPlanForCopyAndTransform(Resources resources) @@ -150,7 +150,7 @@ private LogicalPlan buildLogicalPlanForCopyAndTransform(Resources resources) // Operation 1: Copy into a temp table List fieldsToSelectFromStage = LogicalPlanUtils.extractStagedFilesFieldValues(stagingDataset()); Dataset selectStage = StagedFilesSelection.builder().source(stagedFilesDataset).addAllFields(fieldsToSelectFromStage).build(); - operations.add(Copy.of(tempDataset, selectStage, fieldsToSelectFromStage, stagedFilesDataset.stagedFilesDatasetProperties().loadOptions())); + operations.add(Copy.of(tempDataset, selectStage, fieldsToSelectFromStage, stagedFilesDataset.stagedFilesDatasetProperties())); // Operation 2: Transfer from temp table into target table, adding extra columns at the same time @@ -263,9 +263,18 @@ private Selection getRowsBasedOnAppendTimestamp(Dataset dataset, String field, S private String jsonifyBatchSourceInfo(StagedFilesDatasetProperties stagedFilesDatasetProperties) { Map batchSourceMap = new HashMap(); - List files = stagedFilesDatasetProperties.files(); - batchSourceMap.put("files", files); - bulkLoadTaskIdValue.ifPresent(taskId -> batchSourceMap.put("task_id", taskId)); + List filePaths = stagedFilesDatasetProperties.filePaths(); + List filePatterns = stagedFilesDatasetProperties.filePatterns(); + + if (filePaths != null && !filePaths.isEmpty()) + { + batchSourceMap.put("file_paths", filePaths); + } + if (filePatterns != null && !filePatterns.isEmpty()) + { + batchSourceMap.put("file_patterns", filePatterns); + } + bulkLoadEventIdValue.ifPresent(taskId -> batchSourceMap.put("event_id", taskId)); ObjectMapper objectMapper = new ObjectMapper(); try { diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/planner/Planner.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/planner/Planner.java index 5ee4ae53977..e3625c81709 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/planner/Planner.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/planner/Planner.java @@ -109,7 +109,7 @@ default boolean enableConcurrentSafety() return false; } - Optional bulkLoadTaskIdValue(); + Optional bulkLoadEventIdValue(); } private final Datasets datasets; diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/main/java/org/finos/legend/engine/persistence/components/relational/bigquery/logicalplan/datasets/BigQueryStagedFilesDatasetPropertiesAbstract.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/main/java/org/finos/legend/engine/persistence/components/relational/bigquery/logicalplan/datasets/BigQueryStagedFilesDatasetPropertiesAbstract.java index 70e5a26c1ba..564e0a1f83e 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/main/java/org/finos/legend/engine/persistence/components/relational/bigquery/logicalplan/datasets/BigQueryStagedFilesDatasetPropertiesAbstract.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/main/java/org/finos/legend/engine/persistence/components/relational/bigquery/logicalplan/datasets/BigQueryStagedFilesDatasetPropertiesAbstract.java @@ -15,10 +15,11 @@ package org.finos.legend.engine.persistence.components.relational.bigquery.logicalplan.datasets; -import org.finos.legend.engine.persistence.components.common.FileFormat; +import org.finos.legend.engine.persistence.components.common.FileFormatType; import org.finos.legend.engine.persistence.components.logicalplan.datasets.StagedFilesDatasetProperties; import org.immutables.value.Value; +import java.util.Map; @Value.Immutable @Value.Style( @@ -30,5 +31,7 @@ ) public interface BigQueryStagedFilesDatasetPropertiesAbstract extends StagedFilesDatasetProperties { - FileFormat fileFormat(); + FileFormatType fileFormat(); + + Map loadOptions(); } diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/main/java/org/finos/legend/engine/persistence/components/relational/bigquery/sql/visitor/StagedFilesDatasetReferenceVisitor.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/main/java/org/finos/legend/engine/persistence/components/relational/bigquery/sql/visitor/StagedFilesDatasetReferenceVisitor.java index 0c5e7d91bc5..a8cae186d4b 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/main/java/org/finos/legend/engine/persistence/components/relational/bigquery/sql/visitor/StagedFilesDatasetReferenceVisitor.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/main/java/org/finos/legend/engine/persistence/components/relational/bigquery/sql/visitor/StagedFilesDatasetReferenceVisitor.java @@ -14,8 +14,7 @@ package org.finos.legend.engine.persistence.components.relational.bigquery.sql.visitor; -import org.finos.legend.engine.persistence.components.common.FileFormat; -import org.finos.legend.engine.persistence.components.common.LoadOptions; +import org.finos.legend.engine.persistence.components.common.FileFormatType; import org.finos.legend.engine.persistence.components.logicalplan.datasets.StagedFilesDatasetReference; import org.finos.legend.engine.persistence.components.physicalplan.PhysicalPlanNode; import org.finos.legend.engine.persistence.components.relational.bigquery.logicalplan.datasets.BigQueryStagedFilesDatasetProperties; @@ -24,6 +23,7 @@ import org.finos.legend.engine.persistence.components.transformer.VisitorContext; import java.util.HashMap; +import java.util.List; import java.util.Map; @@ -38,39 +38,13 @@ public VisitorResult visit(PhysicalPlanNode prev, StagedFilesDatasetReference cu } BigQueryStagedFilesDatasetProperties datasetProperties = (BigQueryStagedFilesDatasetProperties) current.properties(); - Map loadOptionsMap = new HashMap<>(); - FileFormat fileFormat = datasetProperties.fileFormat(); - loadOptionsMap.put("format", fileFormat.name()); - datasetProperties.loadOptions().ifPresent(options -> retrieveLoadOptions(fileFormat, options, loadOptionsMap)); - - StagedFilesTable stagedFilesTable = new StagedFilesTable(datasetProperties.files(), loadOptionsMap); + FileFormatType fileFormatType = datasetProperties.fileFormat(); + Map loadOptionsMap = new HashMap<>(datasetProperties.loadOptions()); + loadOptionsMap.put("format", fileFormatType.name()); + List uris = datasetProperties.filePaths().isEmpty() ? datasetProperties.filePatterns() : datasetProperties.filePaths(); + StagedFilesTable stagedFilesTable = new StagedFilesTable(uris, loadOptionsMap); prev.push(stagedFilesTable); return new VisitorResult(null); } - - private void retrieveLoadOptions(FileFormat fileFormat, LoadOptions loadOptions, Map loadOptionsMap) - { - switch (fileFormat) - { - case CSV: - loadOptions.fieldDelimiter().ifPresent(property -> loadOptionsMap.put("field_delimiter", property)); - loadOptions.encoding().ifPresent(property -> loadOptionsMap.put("encoding", property)); - loadOptions.nullMarker().ifPresent(property -> loadOptionsMap.put("null_marker", property)); - loadOptions.quote().ifPresent(property -> loadOptionsMap.put("quote", property)); - loadOptions.skipLeadingRows().ifPresent(property -> loadOptionsMap.put("skip_leading_rows", property)); - loadOptions.maxBadRecords().ifPresent(property -> loadOptionsMap.put("max_bad_records", property)); - loadOptions.compression().ifPresent(property -> loadOptionsMap.put("compression", property)); - break; - case JSON: - loadOptions.maxBadRecords().ifPresent(property -> loadOptionsMap.put("max_bad_records", property)); - loadOptions.compression().ifPresent(property -> loadOptionsMap.put("compression", property)); - break; - case AVRO: - case PARQUET: - return; - default: - throw new IllegalStateException("Unrecognized file format: " + fileFormat); - } - } } diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/e2e/BulkLoadExecutorTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/e2e/BulkLoadExecutorTest.java index 559c2a0f5db..a608a8329e1 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/e2e/BulkLoadExecutorTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/e2e/BulkLoadExecutorTest.java @@ -15,8 +15,7 @@ package org.finos.legend.engine.persistence.components.e2e; import org.finos.legend.engine.persistence.components.common.Datasets; -import org.finos.legend.engine.persistence.components.common.FileFormat; -import org.finos.legend.engine.persistence.components.common.LoadOptions; +import org.finos.legend.engine.persistence.components.common.FileFormatType; import org.finos.legend.engine.persistence.components.ingestmode.BulkLoad; import org.finos.legend.engine.persistence.components.ingestmode.audit.DateTimeAuditing; import org.finos.legend.engine.persistence.components.ingestmode.digest.NoDigestGenStrategy; @@ -40,10 +39,7 @@ import org.junit.jupiter.api.Test; import java.io.IOException; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.Optional; +import java.util.*; import static org.finos.legend.engine.persistence.components.common.StatisticName.ROWS_INSERTED; import static org.finos.legend.engine.persistence.components.common.StatisticName.ROWS_WITH_ERRORS; @@ -89,8 +85,8 @@ public void testMilestoning() throws IOException, InterruptedException Dataset stagedFilesDataset = StagedFilesDataset.builder() .stagedFilesDatasetProperties( BigQueryStagedFilesDatasetProperties.builder() - .fileFormat(FileFormat.CSV) - .addAllFiles(FILE_LIST).build()) + .fileFormat(FileFormatType.CSV) + .addAllFilePaths(FILE_LIST).build()) .schema(SchemaDefinition.builder().addAllFields(Arrays.asList(col1, col2, col3, col4)).build()) .build(); @@ -116,7 +112,7 @@ public void testMilestoning() throws IOException, InterruptedException .relationalSink(BigQuerySink.get()) .collectStatistics(true) .executionTimestampClock(fixedClock_2000_01_01) - .bulkLoadTaskIdValue(TASK_ID_VALUE) + .bulkLoadEventIdValue(TASK_ID_VALUE) .build(); RelationalConnection connection = BigQueryConnection.of(getBigQueryConnection()); @@ -144,12 +140,14 @@ public void testMilestoningFailure() throws IOException, InterruptedException .auditing(DateTimeAuditing.builder().dateTimeField(APPEND_TIME).build()) .build(); + Map loadOptions = new HashMap<>(); + loadOptions.put("max_bad_records", 10L); Dataset stagedFilesDataset = StagedFilesDataset.builder() .stagedFilesDatasetProperties( BigQueryStagedFilesDatasetProperties.builder() - .fileFormat(FileFormat.CSV) - .loadOptions(LoadOptions.builder().maxBadRecords(10L).build()) - .addAllFiles(BAD_FILE_LIST).build()) + .fileFormat(FileFormatType.CSV) + .putAllLoadOptions(loadOptions) + .addAllFilePaths(BAD_FILE_LIST).build()) .schema(SchemaDefinition.builder().addAllFields(Arrays.asList(col1, col2, col3, col4)).build()) .build(); diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/e2e/BulkLoadGeneratorTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/e2e/BulkLoadGeneratorTest.java index 410dabb69e6..c77230b7576 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/e2e/BulkLoadGeneratorTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/e2e/BulkLoadGeneratorTest.java @@ -15,7 +15,7 @@ package org.finos.legend.engine.persistence.components.e2e; import org.finos.legend.engine.persistence.components.common.Datasets; -import org.finos.legend.engine.persistence.components.common.FileFormat; +import org.finos.legend.engine.persistence.components.common.FileFormatType; import org.finos.legend.engine.persistence.components.ingestmode.BulkLoad; import org.finos.legend.engine.persistence.components.ingestmode.audit.DateTimeAuditing; import org.finos.legend.engine.persistence.components.ingestmode.digest.NoDigestGenStrategy; @@ -81,8 +81,8 @@ public void testMilestoning() throws IOException, InterruptedException Dataset stagedFilesDataset = StagedFilesDataset.builder() .stagedFilesDatasetProperties( BigQueryStagedFilesDatasetProperties.builder() - .fileFormat(FileFormat.CSV) - .addAllFiles(FILE_LIST).build()) + .fileFormat(FileFormatType.CSV) + .addAllFilePaths(FILE_LIST).build()) .schema(SchemaDefinition.builder().addAllFields(Arrays.asList(col1, col2, col3, col4)).build()) .build(); @@ -108,7 +108,7 @@ public void testMilestoning() throws IOException, InterruptedException .relationalSink(BigQuerySink.get()) .collectStatistics(true) .executionTimestampClock(fixedClock_2000_01_01) - .bulkLoadTaskIdValue(TASK_ID_VALUE) + .bulkLoadEventIdValue(TASK_ID_VALUE) .bulkLoadBatchStatusPattern("{STATUS}") .build(); diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/BulkLoadTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/BulkLoadTest.java index d93400e5a66..801e8dcc7ac 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/BulkLoadTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/BulkLoadTest.java @@ -15,8 +15,7 @@ package org.finos.legend.engine.persistence.components.ingestmode; import org.finos.legend.engine.persistence.components.common.Datasets; -import org.finos.legend.engine.persistence.components.common.FileFormat; -import org.finos.legend.engine.persistence.components.common.LoadOptions; +import org.finos.legend.engine.persistence.components.common.FileFormatType; import org.finos.legend.engine.persistence.components.common.StatisticName; import org.finos.legend.engine.persistence.components.ingestmode.audit.DateTimeAuditing; import org.finos.legend.engine.persistence.components.ingestmode.audit.NoAuditing; @@ -40,10 +39,7 @@ import java.time.Clock; import java.time.ZoneOffset; import java.time.ZonedDateTime; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.Optional; +import java.util.*; import static org.finos.legend.engine.persistence.components.common.StatisticName.ROWS_DELETED; import static org.finos.legend.engine.persistence.components.common.StatisticName.ROWS_INSERTED; @@ -102,8 +98,8 @@ public void testBulkLoadWithDigestNotGeneratedAuditEnabledNoExtraOptions() Dataset stagedFilesDataset = StagedFilesDataset.builder() .stagedFilesDatasetProperties( BigQueryStagedFilesDatasetProperties.builder() - .fileFormat(FileFormat.CSV) - .addAllFiles(filesList).build()) + .fileFormat(FileFormatType.CSV) + .addAllFilePaths(filesList).build()) .schema(SchemaDefinition.builder().addAllFields(Arrays.asList(col1, col2, col3, col4, col5)).build()) .build(); @@ -117,7 +113,7 @@ public void testBulkLoadWithDigestNotGeneratedAuditEnabledNoExtraOptions() .relationalSink(BigQuerySink.get()) .collectStatistics(true) .executionTimestampClock(fixedClock_2000_01_01) - .bulkLoadTaskIdValue(TASK_ID_VALUE) + .bulkLoadEventIdValue(TASK_ID_VALUE) .batchIdPattern("{NEXT_BATCH_ID}") .build(); @@ -141,7 +137,7 @@ public void testBulkLoadWithDigestNotGeneratedAuditEnabledNoExtraOptions() "FROM `my_db`.`my_name_legend_persistence_temp` as legend_persistence_temp)"; String expectedMetadataIngestSql = "INSERT INTO bulk_load_batch_metadata (`batch_id`, `table_name`, `batch_start_ts_utc`, `batch_end_ts_utc`, `batch_status`, `batch_source_info`) " + - "(SELECT {NEXT_BATCH_ID},'my_name',PARSE_DATETIME('%Y-%m-%d %H:%M:%E6S','2000-01-01 00:00:00.000000'),CURRENT_DATETIME(),'{BULK_LOAD_BATCH_STATUS_PLACEHOLDER}',PARSE_JSON('{\"files\":[\"/path/xyz/file1.csv\",\"/path/xyz/file2.csv\"],\"task_id\":\"xyz123\"}'))"; + "(SELECT {NEXT_BATCH_ID},'my_name',PARSE_DATETIME('%Y-%m-%d %H:%M:%E6S','2000-01-01 00:00:00.000000'),CURRENT_DATETIME(),'{BULK_LOAD_BATCH_STATUS_PLACEHOLDER}',PARSE_JSON('{\"event_id\":\"xyz123\",\"file_paths\":[\"/path/xyz/file1.csv\",\"/path/xyz/file2.csv\"]}'))"; Assertions.assertEquals(expectedCreateTableSql, preActionsSql.get(0)); Assertions.assertEquals(expectedCopySql, ingestSql.get(0)); @@ -163,20 +159,21 @@ public void testBulkLoadWithDigestNotGeneratedAuditEnabledAllOptionsNoTaskId() .auditing(DateTimeAuditing.builder().dateTimeField(APPEND_TIME).build()) .build(); + Map loadOptions = new HashMap<>(); + loadOptions.put("encoding", "UTF8"); + loadOptions.put("max_bad_records", 100L); + loadOptions.put("null_marker", "NULL"); + loadOptions.put("quote", "'"); + loadOptions.put("compression", "GZIP"); + loadOptions.put("field_delimiter", ","); + loadOptions.put("skip_leading_rows", 1L); + Dataset stagedFilesDataset = StagedFilesDataset.builder() .stagedFilesDatasetProperties( BigQueryStagedFilesDatasetProperties.builder() - .fileFormat(FileFormat.CSV) - .loadOptions(LoadOptions.builder() - .encoding("UTF8") - .maxBadRecords(100L) - .nullMarker("NULL") - .quote("'") - .compression("GZIP") - .fieldDelimiter(",") - .skipLeadingRows(1L) - .build()) - .addAllFiles(filesList).build()) + .fileFormat(FileFormatType.CSV) + .putAllLoadOptions(loadOptions) + .addAllFilePaths(filesList).build()) .schema(SchemaDefinition.builder().addAllFields(Arrays.asList(col1, col2, col3, col4, col5)).build()) .build(); @@ -214,7 +211,7 @@ public void testBulkLoadWithDigestNotGeneratedAuditEnabledAllOptionsNoTaskId() String expectedMetadataIngestSql = "INSERT INTO bulk_load_batch_metadata (`batch_id`, `table_name`, `batch_start_ts_utc`, `batch_end_ts_utc`, `batch_status`, `batch_source_info`) " + "(SELECT (SELECT COALESCE(MAX(bulk_load_batch_metadata.`batch_id`),0)+1 FROM bulk_load_batch_metadata as bulk_load_batch_metadata WHERE UPPER(bulk_load_batch_metadata.`table_name`) = 'MY_NAME'),'my_name',PARSE_DATETIME('%Y-%m-%d %H:%M:%E6S','2000-01-01 00:00:00.000000'),CURRENT_DATETIME(),'{BULK_LOAD_BATCH_STATUS_PLACEHOLDER}'," + - "PARSE_JSON('{\"files\":[\"/path/xyz/file1.csv\",\"/path/xyz/file2.csv\"]}'))"; + "PARSE_JSON('{\"file_paths\":[\"/path/xyz/file1.csv\",\"/path/xyz/file2.csv\"]}'))"; Assertions.assertEquals(expectedCreateTableSql, preActionsSql.get(0)); Assertions.assertEquals(expectedCopySql, ingestSql.get(0)); @@ -239,8 +236,8 @@ public void testBulkLoadWithDigestNotGeneratedAuditDisabledNoExtraOptions() Dataset stagedFilesDataset = StagedFilesDataset.builder() .stagedFilesDatasetProperties( BigQueryStagedFilesDatasetProperties.builder() - .fileFormat(FileFormat.CSV) - .addAllFiles(filesList).build()) + .fileFormat(FileFormatType.CSV) + .addAllFilePaths(filesList).build()) .schema(SchemaDefinition.builder().addAllFields(Arrays.asList(col1, col2, col3, col4, col5)).build()) .build(); @@ -254,7 +251,7 @@ public void testBulkLoadWithDigestNotGeneratedAuditDisabledNoExtraOptions() .relationalSink(BigQuerySink.get()) .collectStatistics(true) .executionTimestampClock(fixedClock_2000_01_01) - .bulkLoadTaskIdValue(TASK_ID_VALUE) + .bulkLoadEventIdValue(TASK_ID_VALUE) .build(); GeneratorResult operations = generator.generateOperations(Datasets.of(mainDataset, stagedFilesDataset)); @@ -297,8 +294,8 @@ public void testBulkLoadWithDigestGeneratedAuditEnabledNoExtraOptions() Dataset stagedFilesDataset = StagedFilesDataset.builder() .stagedFilesDatasetProperties( BigQueryStagedFilesDatasetProperties.builder() - .fileFormat(FileFormat.CSV) - .addAllFiles(filesList).build()) + .fileFormat(FileFormatType.CSV) + .addAllFilePaths(filesList).build()) .schema(SchemaDefinition.builder().addAllFields(Arrays.asList(col1, col2, col3, col4, col5)).build()) .build(); @@ -312,7 +309,7 @@ public void testBulkLoadWithDigestGeneratedAuditEnabledNoExtraOptions() .relationalSink(BigQuerySink.get()) .collectStatistics(true) .executionTimestampClock(fixedClock_2000_01_01) - .bulkLoadTaskIdValue(TASK_ID_VALUE) + .bulkLoadEventIdValue(TASK_ID_VALUE) .build(); GeneratorResult operations = generator.generateOperations(Datasets.of(mainDataset, stagedFilesDataset)); @@ -355,8 +352,8 @@ public void testBulkLoadWithDigestGeneratedAuditEnabledNoExtraOptionsUpperCase() Dataset stagedFilesDataset = StagedFilesDataset.builder() .stagedFilesDatasetProperties( BigQueryStagedFilesDatasetProperties.builder() - .fileFormat(FileFormat.CSV) - .addAllFiles(filesList).build()) + .fileFormat(FileFormatType.CSV) + .addAllFilePaths(filesList).build()) .schema(SchemaDefinition.builder().addAllFields(Arrays.asList(col1, col2, col3, col4, col5)).build()) .build(); @@ -370,7 +367,7 @@ public void testBulkLoadWithDigestGeneratedAuditEnabledNoExtraOptionsUpperCase() .relationalSink(BigQuerySink.get()) .collectStatistics(true) .executionTimestampClock(fixedClock_2000_01_01) - .bulkLoadTaskIdValue(TASK_ID_VALUE) + .bulkLoadEventIdValue(TASK_ID_VALUE) .caseConversion(CaseConversion.TO_UPPER) .build(); @@ -462,7 +459,7 @@ public void testBulkLoadStagedFilesDatasetNotProvided() RelationalGenerator generator = RelationalGenerator.builder() .ingestMode(bulkLoad) .relationalSink(BigQuerySink.get()) - .bulkLoadTaskIdValue(TASK_ID_VALUE) + .bulkLoadEventIdValue(TASK_ID_VALUE) .collectStatistics(true) .executionTimestampClock(fixedClock_2000_01_01) .build(); diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/RelationalGeneratorAbstract.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/RelationalGeneratorAbstract.java index 2858ce87f80..afb9547d356 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/RelationalGeneratorAbstract.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/RelationalGeneratorAbstract.java @@ -114,7 +114,7 @@ public boolean enableConcurrentSafety() public abstract Optional infiniteBatchIdValue(); - public abstract Optional bulkLoadTaskIdValue(); + public abstract Optional bulkLoadEventIdValue(); @Default public String bulkLoadBatchStatusPattern() @@ -137,7 +137,7 @@ protected PlannerOptions plannerOptions() .enableSchemaEvolution(enableSchemaEvolution()) .createStagingDataset(createStagingDataset()) .enableConcurrentSafety(enableConcurrentSafety()) - .bulkLoadTaskIdValue(bulkLoadTaskIdValue()) + .bulkLoadEventIdValue(bulkLoadEventIdValue()) .build(); } diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/RelationalIngestorAbstract.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/RelationalIngestorAbstract.java index 12faa47d745..7bf2618518f 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/RelationalIngestorAbstract.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/api/RelationalIngestorAbstract.java @@ -140,7 +140,7 @@ public Set schemaEvolutionCapabilitySet() public abstract RelationalSink relationalSink(); - public abstract Optional bulkLoadTaskIdValue(); + public abstract Optional bulkLoadEventIdValue(); @Derived protected PlannerOptions plannerOptions() @@ -151,7 +151,7 @@ protected PlannerOptions plannerOptions() .enableSchemaEvolution(enableSchemaEvolution()) .createStagingDataset(createStagingDataset()) .enableConcurrentSafety(enableConcurrentSafety()) - .bulkLoadTaskIdValue(bulkLoadTaskIdValue()) + .bulkLoadEventIdValue(bulkLoadEventIdValue()) .build(); } @@ -512,7 +512,7 @@ private void init(Datasets datasets) .batchStartTimestampPattern(BATCH_START_TS_PATTERN) .batchEndTimestampPattern(BATCH_END_TS_PATTERN) .batchIdPattern(BATCH_ID_PATTERN) - .bulkLoadTaskIdValue(bulkLoadTaskIdValue()) + .bulkLoadEventIdValue(bulkLoadEventIdValue()) .build(); planner = Planners.get(enrichedDatasets, enrichedIngestMode, plannerOptions(), relationalSink().capabilities()); diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/main/java/org/finos/legend/engine/persistence/components/relational/h2/logicalplan/datasets/H2StagedFilesDatasetPropertiesAbstract.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/main/java/org/finos/legend/engine/persistence/components/relational/h2/logicalplan/datasets/H2StagedFilesDatasetPropertiesAbstract.java index ea69a121e66..f067057cd10 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/main/java/org/finos/legend/engine/persistence/components/relational/h2/logicalplan/datasets/H2StagedFilesDatasetPropertiesAbstract.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/main/java/org/finos/legend/engine/persistence/components/relational/h2/logicalplan/datasets/H2StagedFilesDatasetPropertiesAbstract.java @@ -15,7 +15,7 @@ package org.finos.legend.engine.persistence.components.relational.h2.logicalplan.datasets; -import org.finos.legend.engine.persistence.components.common.FileFormat; +import org.finos.legend.engine.persistence.components.common.FileFormatType; import org.finos.legend.engine.persistence.components.logicalplan.datasets.StagedFilesDatasetProperties; import org.immutables.value.Value; @@ -30,16 +30,20 @@ ) public interface H2StagedFilesDatasetPropertiesAbstract extends StagedFilesDatasetProperties { - FileFormat fileFormat(); + FileFormatType fileFormat(); @Value.Check default void validate() { - if (files().size() != 1) + if (filePatterns().size() > 0) + { + throw new IllegalArgumentException("Cannot build H2StagedFilesDatasetProperties, filePatterns not supported"); + } + if (filePaths().size() != 1) { throw new IllegalArgumentException("Cannot build H2StagedFilesDatasetProperties, only 1 file per load supported"); } - if (fileFormat() != FileFormat.CSV) + if (fileFormat() != FileFormatType.CSV) { throw new IllegalArgumentException("Cannot build H2StagedFilesDatasetProperties, only CSV file loading supported"); } diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/main/java/org/finos/legend/engine/persistence/components/relational/h2/sql/visitor/StagedFilesDatasetReferenceVisitor.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/main/java/org/finos/legend/engine/persistence/components/relational/h2/sql/visitor/StagedFilesDatasetReferenceVisitor.java index b697e4140ea..8f84482ee90 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/main/java/org/finos/legend/engine/persistence/components/relational/h2/sql/visitor/StagedFilesDatasetReferenceVisitor.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/main/java/org/finos/legend/engine/persistence/components/relational/h2/sql/visitor/StagedFilesDatasetReferenceVisitor.java @@ -32,7 +32,7 @@ public VisitorResult visit(PhysicalPlanNode prev, StagedFilesDatasetReference cu throw new IllegalStateException("Only H2StagedFilesDatasetProperties are supported for H2 Sink"); } H2StagedFilesDatasetProperties datasetProperties = (H2StagedFilesDatasetProperties) current.properties(); - CsvRead csvRead = new CsvRead(datasetProperties.files().get(0), String.join(",", current.columns()), null); + CsvRead csvRead = new CsvRead(datasetProperties.filePaths().get(0), String.join(",", current.columns()), null); prev.push(csvRead); return new VisitorResult(null); } diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/bulkload/BulkLoadTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/bulkload/BulkLoadTest.java index 4f09b145a66..d58eee0fe64 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/bulkload/BulkLoadTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/bulkload/BulkLoadTest.java @@ -16,7 +16,7 @@ import org.finos.legend.engine.persistence.components.BaseTest; import org.finos.legend.engine.persistence.components.common.Datasets; -import org.finos.legend.engine.persistence.components.common.FileFormat; +import org.finos.legend.engine.persistence.components.common.FileFormatType; import org.finos.legend.engine.persistence.components.common.StatisticName; import org.finos.legend.engine.persistence.components.ingestmode.BulkLoad; import org.finos.legend.engine.persistence.components.ingestmode.IngestMode; @@ -104,8 +104,8 @@ public void testBulkLoadWithDigestNotGeneratedAuditEnabledNoTaskId() throws Exce Dataset stagedFilesDataset = StagedFilesDataset.builder() .stagedFilesDatasetProperties( H2StagedFilesDatasetProperties.builder() - .fileFormat(FileFormat.CSV) - .addAllFiles(Collections.singletonList(filePath)).build()) + .fileFormat(FileFormatType.CSV) + .addAllFilePaths(Collections.singletonList(filePath)).build()) .schema(SchemaDefinition.builder().addAllFields(Arrays.asList(col1, col2, col3, col4)).build()) .build(); @@ -177,8 +177,8 @@ public void testBulkLoadWithDigestNotGeneratedAuditDisabled() throws Exception Dataset stagedFilesDataset = StagedFilesDataset.builder() .stagedFilesDatasetProperties( H2StagedFilesDatasetProperties.builder() - .fileFormat(FileFormat.CSV) - .addAllFiles(Collections.singletonList(filePath)).build()) + .fileFormat(FileFormatType.CSV) + .addAllFilePaths(Collections.singletonList(filePath)).build()) .schema(SchemaDefinition.builder().addAllFields(Arrays.asList(col1, col2, col3, col4)).build()) .build(); @@ -195,7 +195,7 @@ public void testBulkLoadWithDigestNotGeneratedAuditDisabled() throws Exception .relationalSink(H2Sink.get()) .collectStatistics(true) .executionTimestampClock(fixedClock_2000_01_01) - .bulkLoadTaskIdValue(TASK_ID_VALUE_1) + .bulkLoadEventIdValue(TASK_ID_VALUE_1) .build(); GeneratorResult operations = generator.generateOperations(datasets); @@ -250,8 +250,8 @@ public void testBulkLoadWithDigestGeneratedAuditEnabled() throws Exception Dataset stagedFilesDataset = StagedFilesDataset.builder() .stagedFilesDatasetProperties( H2StagedFilesDatasetProperties.builder() - .fileFormat(FileFormat.CSV) - .addAllFiles(Collections.singletonList(filePath)).build()) + .fileFormat(FileFormatType.CSV) + .addAllFilePaths(Collections.singletonList(filePath)).build()) .schema(SchemaDefinition.builder().addAllFields(Arrays.asList(col1, col2, col3, col4)).build()) .build(); @@ -267,7 +267,7 @@ public void testBulkLoadWithDigestGeneratedAuditEnabled() throws Exception .ingestMode(bulkLoad) .relationalSink(H2Sink.get()) .collectStatistics(true) - .bulkLoadTaskIdValue(TASK_ID_VALUE_1) + .bulkLoadEventIdValue(TASK_ID_VALUE_1) .executionTimestampClock(fixedClock_2000_01_01) .build(); @@ -325,8 +325,8 @@ public void testBulkLoadWithDigestGeneratedAuditEnabledUpperCase() throws Except Dataset stagedFilesDataset = StagedFilesDataset.builder() .stagedFilesDatasetProperties( H2StagedFilesDatasetProperties.builder() - .fileFormat(FileFormat.CSV) - .addAllFiles(Collections.singletonList(filePath)).build()) + .fileFormat(FileFormatType.CSV) + .addAllFilePaths(Collections.singletonList(filePath)).build()) .schema(SchemaDefinition.builder().addAllFields(Arrays.asList(col1, col2, col3, col4)).build()) .build(); @@ -342,7 +342,7 @@ public void testBulkLoadWithDigestGeneratedAuditEnabledUpperCase() throws Except .ingestMode(bulkLoad) .relationalSink(H2Sink.get()) .collectStatistics(true) - .bulkLoadTaskIdValue(TASK_ID_VALUE_1) + .bulkLoadEventIdValue(TASK_ID_VALUE_1) .executionTimestampClock(fixedClock_2000_01_01) .caseConversion(CaseConversion.TO_UPPER) .build(); @@ -399,8 +399,8 @@ public void testBulkLoadWithDigestNotGeneratedAuditDisabledTwoBatches() throws E Dataset stagedFilesDataset = StagedFilesDataset.builder() .stagedFilesDatasetProperties( H2StagedFilesDatasetProperties.builder() - .fileFormat(FileFormat.CSV) - .addAllFiles(Collections.singletonList(filePath)).build()) + .fileFormat(FileFormatType.CSV) + .addAllFilePaths(Collections.singletonList(filePath)).build()) .schema(SchemaDefinition.builder().addAllFields(Arrays.asList(col1, col2, col3, col4)).build()) .build(); @@ -499,7 +499,7 @@ public void testBulkLoadStagedFilesDatasetNotProvided() RelationalGenerator generator = RelationalGenerator.builder() .ingestMode(bulkLoad) .relationalSink(H2Sink.get()) - .bulkLoadTaskIdValue(TASK_ID_VALUE_1) + .bulkLoadEventIdValue(TASK_ID_VALUE_1) .collectStatistics(true) .executionTimestampClock(fixedClock_2000_01_01) .build(); @@ -533,8 +533,8 @@ public void testBulkLoadStageHasPrimaryKey() Dataset stagedFilesDataset = StagedFilesDataset.builder() .stagedFilesDatasetProperties( H2StagedFilesDatasetProperties.builder() - .fileFormat(FileFormat.CSV) - .addAllFiles(Collections.singletonList("src/test/resources/data/bulk-load/input/staged_file1.csv")).build()) + .fileFormat(FileFormatType.CSV) + .addAllFilePaths(Collections.singletonList("src/test/resources/data/bulk-load/input/staged_file1.csv")).build()) .schema(SchemaDefinition.builder().addAllFields(Arrays.asList(col1, col2, col3, col4, pkCol)).build()) .build(); @@ -546,7 +546,7 @@ public void testBulkLoadStageHasPrimaryKey() RelationalGenerator generator = RelationalGenerator.builder() .ingestMode(bulkLoad) .relationalSink(H2Sink.get()) - .bulkLoadTaskIdValue(TASK_ID_VALUE_1) + .bulkLoadEventIdValue(TASK_ID_VALUE_1) .collectStatistics(true) .executionTimestampClock(fixedClock_2000_01_01) .build(); @@ -580,8 +580,8 @@ public void testBulkLoadMainHasPrimaryKey() Dataset stagedFilesDataset = StagedFilesDataset.builder() .stagedFilesDatasetProperties( H2StagedFilesDatasetProperties.builder() - .fileFormat(FileFormat.CSV) - .addAllFiles(Collections.singletonList("src/test/resources/data/bulk-load/input/staged_file1.csv")).build()) + .fileFormat(FileFormatType.CSV) + .addAllFilePaths(Collections.singletonList("src/test/resources/data/bulk-load/input/staged_file1.csv")).build()) .schema(SchemaDefinition.builder().addAllFields(Arrays.asList(col1, col2, col3, col4)).build()) .build(); @@ -593,7 +593,7 @@ public void testBulkLoadMainHasPrimaryKey() RelationalGenerator generator = RelationalGenerator.builder() .ingestMode(bulkLoad) .relationalSink(H2Sink.get()) - .bulkLoadTaskIdValue(TASK_ID_VALUE_1) + .bulkLoadEventIdValue(TASK_ID_VALUE_1) .collectStatistics(true) .executionTimestampClock(fixedClock_2000_01_01) .build(); @@ -615,8 +615,8 @@ public void testBulkLoadMoreThanOneFile() Dataset stagedFilesDataset = StagedFilesDataset.builder() .stagedFilesDatasetProperties( H2StagedFilesDatasetProperties.builder() - .fileFormat(FileFormat.CSV) - .addAllFiles(Arrays.asList("src/test/resources/data/bulk-load/input/staged_file1.csv", "src/test/resources/data/bulk-load/input/staged_file2.csv")).build()) + .fileFormat(FileFormatType.CSV) + .addAllFilePaths(Arrays.asList("src/test/resources/data/bulk-load/input/staged_file1.csv", "src/test/resources/data/bulk-load/input/staged_file2.csv")).build()) .schema(SchemaDefinition.builder().addAllFields(Arrays.asList(col1, col2, col3, col4)).build()) .build(); Assertions.fail("Exception was not thrown"); @@ -635,8 +635,8 @@ public void testBulkLoadNotCsvFile() Dataset stagedFilesDataset = StagedFilesDataset.builder() .stagedFilesDatasetProperties( H2StagedFilesDatasetProperties.builder() - .fileFormat(FileFormat.JSON) - .addAllFiles(Arrays.asList("src/test/resources/data/bulk-load/input/staged_file1.json")).build()) + .fileFormat(FileFormatType.JSON) + .addAllFilePaths(Arrays.asList("src/test/resources/data/bulk-load/input/staged_file1.json")).build()) .schema(SchemaDefinition.builder().addAllFields(Arrays.asList(col1, col2, col3, col4)).build()) .build(); Assertions.fail("Exception was not thrown"); @@ -655,7 +655,7 @@ RelationalIngestor getRelationalIngestor(IngestMode ingestMode, PlannerOptions o .executionTimestampClock(executionTimestampClock) .cleanupStagingData(options.cleanupStagingData()) .collectStatistics(options.collectStatistics()) - .bulkLoadTaskIdValue(taskId) + .bulkLoadEventIdValue(taskId) .enableConcurrentSafety(true) .caseConversion(caseConversion) .build(); @@ -668,14 +668,14 @@ private void verifyBulkLoadMetadata(Map appendMetadata, String f Assertions.assertEquals("main", appendMetadata.get("table_name")); Assertions.assertEquals("2000-01-01 00:00:00.0", appendMetadata.get("batch_start_ts_utc").toString()); Assertions.assertEquals("2000-01-01 00:00:00.0", appendMetadata.get("batch_end_ts_utc").toString()); - Assertions.assertTrue(appendMetadata.get("batch_source_info").toString().contains(String.format("\"files\":[\"%s\"]", fileName))); + Assertions.assertTrue(appendMetadata.get("batch_source_info").toString().contains(String.format("\"file_paths\":[\"%s\"]", fileName))); if (taskId.isPresent()) { - Assertions.assertTrue(appendMetadata.get("batch_source_info").toString().contains(String.format("\"task_id\":\"%s\"", taskId.get()))); + Assertions.assertTrue(appendMetadata.get("batch_source_info").toString().contains(String.format("\"event_id\":\"%s\"", taskId.get()))); } else { - Assertions.assertFalse(appendMetadata.get("batch_source_info").toString().contains("\"task_id\"")); + Assertions.assertFalse(appendMetadata.get("batch_source_info").toString().contains("\"event_id\"")); } } @@ -686,14 +686,14 @@ private void verifyBulkLoadMetadataForUpperCase(Map appendMetada Assertions.assertEquals("MAIN", appendMetadata.get("TABLE_NAME")); Assertions.assertEquals("2000-01-01 00:00:00.0", appendMetadata.get("BATCH_START_TS_UTC").toString()); Assertions.assertEquals("2000-01-01 00:00:00.0", appendMetadata.get("BATCH_END_TS_UTC").toString()); - Assertions.assertTrue(appendMetadata.get("BATCH_SOURCE_INFO").toString().contains(String.format("\"files\":[\"%s\"]", fileName))); + Assertions.assertTrue(appendMetadata.get("BATCH_SOURCE_INFO").toString().contains(String.format("\"file_paths\":[\"%s\"]", fileName))); if (taskId.isPresent()) { - Assertions.assertTrue(appendMetadata.get("BATCH_SOURCE_INFO").toString().contains(String.format("\"task_id\":\"%s\"", taskId.get()))); + Assertions.assertTrue(appendMetadata.get("BATCH_SOURCE_INFO").toString().contains(String.format("\"event_id\":\"%s\"", taskId.get()))); } else { - Assertions.assertFalse(appendMetadata.get("BATCH_SOURCE_INFO").toString().contains("\"task_id\"")); + Assertions.assertFalse(appendMetadata.get("BATCH_SOURCE_INFO").toString().contains("\"event_id\"")); } } diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/common/FileFormat.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-snowflake/src/main/java/org/finos/legend/engine/persistence/components/relational/snowflake/logicalplan/datasets/FileFormat.java similarity index 82% rename from legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/common/FileFormat.java rename to legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-snowflake/src/main/java/org/finos/legend/engine/persistence/components/relational/snowflake/logicalplan/datasets/FileFormat.java index 75cf32a3a55..afd3b7148dd 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/common/FileFormat.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-snowflake/src/main/java/org/finos/legend/engine/persistence/components/relational/snowflake/logicalplan/datasets/FileFormat.java @@ -12,12 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -package org.finos.legend.engine.persistence.components.common; +package org.finos.legend.engine.persistence.components.relational.snowflake.logicalplan.datasets; -public enum FileFormat +public interface FileFormat { - CSV, - JSON, - AVRO, - PARQUET; + } diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-snowflake/src/main/java/org/finos/legend/engine/persistence/components/relational/snowflake/logicalplan/datasets/SnowflakeStagedFilesDatasetPropertiesAbstract.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-snowflake/src/main/java/org/finos/legend/engine/persistence/components/relational/snowflake/logicalplan/datasets/SnowflakeStagedFilesDatasetPropertiesAbstract.java index c70ae4ef4a5..751698e6ad2 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-snowflake/src/main/java/org/finos/legend/engine/persistence/components/relational/snowflake/logicalplan/datasets/SnowflakeStagedFilesDatasetPropertiesAbstract.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-snowflake/src/main/java/org/finos/legend/engine/persistence/components/relational/snowflake/logicalplan/datasets/SnowflakeStagedFilesDatasetPropertiesAbstract.java @@ -18,6 +18,7 @@ import org.finos.legend.engine.persistence.components.logicalplan.datasets.StagedFilesDatasetProperties; import org.immutables.value.Value; +import java.util.Map; import java.util.Optional; @Value.Immutable @@ -32,5 +33,7 @@ public interface SnowflakeStagedFilesDatasetPropertiesAbstract extends StagedFil { String location(); - Optional fileFormat(); + Optional fileFormat(); + + Map copyOptions(); } diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/common/LoadOptionsAbstract.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-snowflake/src/main/java/org/finos/legend/engine/persistence/components/relational/snowflake/logicalplan/datasets/StandardFileFormatAbstract.java similarity index 53% rename from legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/common/LoadOptionsAbstract.java rename to legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-snowflake/src/main/java/org/finos/legend/engine/persistence/components/relational/snowflake/logicalplan/datasets/StandardFileFormatAbstract.java index bc809c6f541..a117e6ecbb4 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/common/LoadOptionsAbstract.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-snowflake/src/main/java/org/finos/legend/engine/persistence/components/relational/snowflake/logicalplan/datasets/StandardFileFormatAbstract.java @@ -12,37 +12,24 @@ // See the License for the specific language governing permissions and // limitations under the License. -package org.finos.legend.engine.persistence.components.common; +package org.finos.legend.engine.persistence.components.relational.snowflake.logicalplan.datasets; +import org.finos.legend.engine.persistence.components.common.FileFormatType; import org.immutables.value.Value; -import java.util.Optional; +import java.util.Map; @Value.Immutable @Value.Style( - typeAbstract = "*Abstract", - typeImmutable = "*", - jdkOnly = true, - optionalAcceptNullable = true, - strictBuilder = true + typeAbstract = "*Abstract", + typeImmutable = "*", + jdkOnly = true, + optionalAcceptNullable = true, + strictBuilder = true ) -public interface LoadOptionsAbstract +public interface StandardFileFormatAbstract extends FileFormat { - Optional fieldDelimiter(); + FileFormatType formatType(); - Optional encoding(); - - Optional nullMarker(); - - Optional quote(); - - Optional skipLeadingRows(); - - Optional maxBadRecords(); - - Optional compression(); - - Optional force(); - - Optional onError(); + Map formatOptions(); } diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-snowflake/src/main/java/org/finos/legend/engine/persistence/components/relational/snowflake/logicalplan/datasets/UserDefinedFileFormatAbstract.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-snowflake/src/main/java/org/finos/legend/engine/persistence/components/relational/snowflake/logicalplan/datasets/UserDefinedFileFormatAbstract.java new file mode 100644 index 00000000000..b55a25112be --- /dev/null +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-snowflake/src/main/java/org/finos/legend/engine/persistence/components/relational/snowflake/logicalplan/datasets/UserDefinedFileFormatAbstract.java @@ -0,0 +1,32 @@ +// Copyright 2023 Goldman Sachs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + + +package org.finos.legend.engine.persistence.components.relational.snowflake.logicalplan.datasets; + +import org.immutables.value.Value; + +@Value.Immutable +@Value.Style( + typeAbstract = "*Abstract", + typeImmutable = "*", + jdkOnly = true, + optionalAcceptNullable = true, + strictBuilder = true +) +public interface UserDefinedFileFormatAbstract extends FileFormat +{ + @Value.Parameter(order = 0) + String formatName(); +} diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-snowflake/src/main/java/org/finos/legend/engine/persistence/components/relational/snowflake/sql/visitor/CopyVisitor.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-snowflake/src/main/java/org/finos/legend/engine/persistence/components/relational/snowflake/sql/visitor/CopyVisitor.java index e6b90d52e45..c476b8ec0c7 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-snowflake/src/main/java/org/finos/legend/engine/persistence/components/relational/snowflake/sql/visitor/CopyVisitor.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-snowflake/src/main/java/org/finos/legend/engine/persistence/components/relational/snowflake/sql/visitor/CopyVisitor.java @@ -14,10 +14,13 @@ package org.finos.legend.engine.persistence.components.relational.snowflake.sql.visitor; -import org.finos.legend.engine.persistence.components.common.LoadOptions; import org.finos.legend.engine.persistence.components.logicalplan.LogicalPlanNode; import org.finos.legend.engine.persistence.components.logicalplan.operations.Copy; import org.finos.legend.engine.persistence.components.physicalplan.PhysicalPlanNode; +import org.finos.legend.engine.persistence.components.relational.snowflake.logicalplan.datasets.FileFormat; +import org.finos.legend.engine.persistence.components.relational.snowflake.logicalplan.datasets.SnowflakeStagedFilesDatasetProperties; +import org.finos.legend.engine.persistence.components.relational.snowflake.logicalplan.datasets.StandardFileFormat; +import org.finos.legend.engine.persistence.components.relational.snowflake.logicalplan.datasets.UserDefinedFileFormat; import org.finos.legend.engine.persistence.components.relational.snowflake.sqldom.schemaops.statements.CopyStatement; import org.finos.legend.engine.persistence.components.transformer.LogicalPlanVisitor; import org.finos.legend.engine.persistence.components.transformer.VisitorContext; @@ -26,16 +29,16 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; public class CopyVisitor implements LogicalPlanVisitor { - @Override public VisitorResult visit(PhysicalPlanNode prev, Copy current, VisitorContext context) { - Map loadOptionsMap = new HashMap<>(); - current.loadOptions().ifPresent(options -> retrieveLoadOptions(options, loadOptionsMap)); - CopyStatement copyStatement = new CopyStatement(loadOptionsMap); + SnowflakeStagedFilesDatasetProperties properties = (SnowflakeStagedFilesDatasetProperties) current.stagedFilesDatasetProperties(); + CopyStatement copyStatement = new CopyStatement(); + setCopyStatementProperties(properties, copyStatement); prev.push(copyStatement); List logicalPlanNodes = new ArrayList<>(); @@ -46,9 +49,34 @@ public VisitorResult visit(PhysicalPlanNode prev, Copy current, VisitorContext c return new VisitorResult(copyStatement, logicalPlanNodes); } - private void retrieveLoadOptions(LoadOptions loadOptions, Map loadOptionsMap) + private static void setCopyStatementProperties(SnowflakeStagedFilesDatasetProperties properties, CopyStatement copyStatement) { - loadOptions.onError().ifPresent(property -> loadOptionsMap.put("ON_ERROR", property)); - loadOptions.force().ifPresent(property -> loadOptionsMap.put("FORCE", property)); + copyStatement.setFilePatterns(properties.filePatterns()); + copyStatement.setFilePaths(properties.filePaths()); + + // Add default option into the map + Map copyOptions = new HashMap<>(properties.copyOptions()); + if (!copyOptions.containsKey("ON_ERROR") && !copyOptions.containsKey("on_error")) + { + copyOptions.put("ON_ERROR", "ABORT_STATEMENT"); + } + copyStatement.setCopyOptions(copyOptions); + + Optional fileFormat = properties.fileFormat(); + if (fileFormat.isPresent()) + { + FileFormat format = properties.fileFormat().get(); + if (format instanceof UserDefinedFileFormat) + { + UserDefinedFileFormat userDefinedFileFormat = (UserDefinedFileFormat) format; + copyStatement.setUserDefinedFileFormatName(userDefinedFileFormat.formatName()); + } + else if (format instanceof StandardFileFormat) + { + StandardFileFormat standardFileFormat = (StandardFileFormat) format; + copyStatement.setFileFormatType(standardFileFormat.formatType()); + copyStatement.setFileFormatOptions(standardFileFormat.formatOptions()); + } + } } } diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-snowflake/src/main/java/org/finos/legend/engine/persistence/components/relational/snowflake/sql/visitor/StagedFilesDatasetReferenceVisitor.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-snowflake/src/main/java/org/finos/legend/engine/persistence/components/relational/snowflake/sql/visitor/StagedFilesDatasetReferenceVisitor.java index ccb15d743cb..286d60ae4af 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-snowflake/src/main/java/org/finos/legend/engine/persistence/components/relational/snowflake/sql/visitor/StagedFilesDatasetReferenceVisitor.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-snowflake/src/main/java/org/finos/legend/engine/persistence/components/relational/snowflake/sql/visitor/StagedFilesDatasetReferenceVisitor.java @@ -34,8 +34,6 @@ public VisitorResult visit(PhysicalPlanNode prev, StagedFilesDatasetReference cu } SnowflakeStagedFilesDatasetProperties datasetProperties = (SnowflakeStagedFilesDatasetProperties) current.properties(); StagedFilesTable stagedFiles = new StagedFilesTable(datasetProperties.location()); - datasetProperties.fileFormat().ifPresent(stagedFiles::setFileFormat); - stagedFiles.setFilePattern(datasetProperties.files().stream().map(s -> '(' + s + ')').collect(Collectors.joining("|"))); current.alias().ifPresent(stagedFiles::setAlias); prev.push(stagedFiles); return new VisitorResult(null); diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-snowflake/src/main/java/org/finos/legend/engine/persistence/components/relational/snowflake/sqldom/schemaops/expressions/table/StagedFilesTable.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-snowflake/src/main/java/org/finos/legend/engine/persistence/components/relational/snowflake/sqldom/schemaops/expressions/table/StagedFilesTable.java index 2bb2eac9f63..946183301ca 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-snowflake/src/main/java/org/finos/legend/engine/persistence/components/relational/snowflake/sqldom/schemaops/expressions/table/StagedFilesTable.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-snowflake/src/main/java/org/finos/legend/engine/persistence/components/relational/snowflake/sqldom/schemaops/expressions/table/StagedFilesTable.java @@ -18,16 +18,9 @@ import org.finos.legend.engine.persistence.components.relational.sqldom.schemaops.expresssions.table.TableLike; import org.finos.legend.engine.persistence.components.relational.sqldom.utils.StringUtils; -import java.util.ArrayList; -import java.util.List; - -import static org.finos.legend.engine.persistence.components.relational.sqldom.utils.SqlGenUtils.*; - public class StagedFilesTable extends TableLike { private String location; - private String fileFormat; - private String filePattern; public StagedFilesTable(String location) { @@ -53,22 +46,6 @@ public void genSqlWithoutAlias(StringBuilder builder) throws SqlDomException { validate(); builder.append(location); - // Add FILE_FORMAT, PATTERN - if (StringUtils.notEmpty(fileFormat) || StringUtils.notEmpty(filePattern)) - { - builder.append(WHITE_SPACE + OPEN_PARENTHESIS); - List options = new ArrayList<>(); - if (StringUtils.notEmpty(fileFormat)) - { - options.add(String.format("FILE_FORMAT => '%s'", fileFormat)); - } - if (StringUtils.notEmpty(filePattern)) - { - options.add(String.format("PATTERN => '%s'", filePattern)); - } - builder.append(String.join(COMMA + WHITE_SPACE, options)); - builder.append(CLOSING_PARENTHESIS); - } } @Override @@ -84,14 +61,4 @@ void validate() throws SqlDomException throw new SqlDomException("location is mandatory"); } } - - public void setFileFormat(String fileFormat) - { - this.fileFormat = fileFormat; - } - - public void setFilePattern(String filePattern) - { - this.filePattern = filePattern; - } } diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-snowflake/src/main/java/org/finos/legend/engine/persistence/components/relational/snowflake/sqldom/schemaops/statements/CopyStatement.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-snowflake/src/main/java/org/finos/legend/engine/persistence/components/relational/snowflake/sqldom/schemaops/statements/CopyStatement.java index b8b5c6166e1..6560bbc0edb 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-snowflake/src/main/java/org/finos/legend/engine/persistence/components/relational/snowflake/sqldom/schemaops/statements/CopyStatement.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-snowflake/src/main/java/org/finos/legend/engine/persistence/components/relational/snowflake/sqldom/schemaops/statements/CopyStatement.java @@ -14,6 +14,7 @@ package org.finos.legend.engine.persistence.components.relational.snowflake.sqldom.schemaops.statements; +import org.finos.legend.engine.persistence.components.common.FileFormatType; import org.finos.legend.engine.persistence.components.relational.sqldom.SqlDomException; import org.finos.legend.engine.persistence.components.relational.sqldom.common.Clause; import org.finos.legend.engine.persistence.components.relational.sqldom.schemaops.expresssions.table.Table; @@ -21,6 +22,7 @@ import org.finos.legend.engine.persistence.components.relational.sqldom.schemaops.statements.SelectStatement; import org.finos.legend.engine.persistence.components.relational.sqldom.schemaops.values.Field; import org.finos.legend.engine.persistence.components.relational.sqldom.utils.SqlGenUtils; +import org.finos.legend.engine.persistence.components.relational.sqldom.utils.StringUtils; import java.util.ArrayList; import java.util.HashMap; @@ -39,12 +41,16 @@ public class CopyStatement implements DMLStatement private Table table; private final List columns; private SelectStatement selectStatement; - private final Map loadOptions; - - public CopyStatement(Map loadOptions) + private List filePatterns; + private List filePaths; + private String userDefinedFileFormatName; + private FileFormatType fileFormatType; + private Map fileFormatOptions; + private Map copyOptions; + + public CopyStatement() { this.columns = new ArrayList<>(); - this.loadOptions = loadOptions; } public CopyStatement(Table table, List columns, SelectStatement selectStatement) @@ -52,25 +58,20 @@ public CopyStatement(Table table, List columns, SelectStatement selectSta this.table = table; this.columns = columns; this.selectStatement = selectStatement; - this.loadOptions = new HashMap<>(); - } - - public CopyStatement(Table table, List columns, SelectStatement selectStatement, Map loadOptions) - { - this.table = table; - this.columns = columns; - this.selectStatement = selectStatement; - this.loadOptions = loadOptions; } /* Copy GENERIC PLAN for Snowflake: - COPY INTO [.] (COLUMN_LIST) - FROM - ( SELECT [.]$[:] [ , [.]$[:] , ... ] - FROM { | } - [ ( FILE_FORMAT => '.', PATTERN => '' ) ] [ ] ) - [ ] + -------------------------------- + COPY INTO [.] (COLUMN_LIST) + FROM + ( SELECT [.]$[.] [ , [.]$[.] ... ] + FROM { internalStage | externalStage } ) + [ FILES = ( '' [ , '' ] [ , ... ] ) ] + [ PATTERN = '' ] + [ FILE_FORMAT = ( { FORMAT_NAME = '[.]' | + TYPE = { CSV | JSON | AVRO | ORC | PARQUET | XML } [ formatTypeOptions ] } ) ] + [ copyOptions ] */ @Override @@ -103,27 +104,66 @@ public void genSql(StringBuilder builder) throws SqlDomException builder.append(OPEN_PARENTHESIS); selectStatement.genSql(builder); builder.append(CLOSING_PARENTHESIS); - builder.append(WHITE_SPACE); - int ctr = 0; - loadOptions.putIfAbsent("ON_ERROR", "ABORT_STATEMENT"); // Add default option into the map - for (String option : loadOptions.keySet().stream().sorted().collect(Collectors.toList())) + // File Paths + if (filePaths != null && !filePaths.isEmpty()) { - ctr++; - builder.append(option); - builder.append(ASSIGNMENT_OPERATOR); - if (loadOptions.get(option) instanceof String) - { - builder.append(SqlGenUtils.singleQuote(loadOptions.get(option).toString().toUpperCase())); - } - else - { - builder.append(loadOptions.get(option).toString().toUpperCase()); - } + String filePathsStr = filePaths.stream().map(path -> SqlGenUtils.singleQuote(path)).collect(Collectors.joining(", ")); + builder.append(String.format(" FILES = (%s)", filePathsStr)); + } + // File Patterns + else if (filePatterns != null && !filePatterns.isEmpty()) + { + String filePatternStr = filePatterns.stream().map(s -> '(' + s + ')').collect(Collectors.joining("|")); + builder.append(String.format(" PATTERN = '%s'", filePatternStr)); + } + + // FILE_FORMAT + if (StringUtils.notEmpty(userDefinedFileFormatName)) + { + builder.append(String.format(" FILE_FORMAT = (FORMAT_NAME = '%s')", userDefinedFileFormatName)); + } + else if (fileFormatType != null) + { + builder.append(" FILE_FORMAT = "); + builder.append(OPEN_PARENTHESIS); + fileFormatOptions = new HashMap<>(fileFormatOptions); + fileFormatOptions.put("TYPE", fileFormatType.name()); + addOptions(fileFormatOptions, builder); + builder.append(CLOSING_PARENTHESIS); + } + + // Add copy Options + if (copyOptions != null && !copyOptions.isEmpty()) + { + builder.append(WHITE_SPACE); + addOptions(copyOptions, builder); + } + } + - if (ctr < loadOptions.size()) + private void addOptions(Map options, StringBuilder builder) + { + if (options != null && options.size() > 0) + { + int ctr = 0; + for (String option : options.keySet().stream().sorted().collect(Collectors.toList())) { - builder.append(COMMA + WHITE_SPACE); + ctr++; + builder.append(option); + builder.append(WHITE_SPACE + ASSIGNMENT_OPERATOR + WHITE_SPACE); + if (options.get(option) instanceof String) + { + builder.append(SqlGenUtils.singleQuote(options.get(option))); + } + else + { + builder.append(options.get(option)); + } + if (ctr < options.size()) + { + builder.append(COMMA + WHITE_SPACE); + } } } } @@ -157,4 +197,34 @@ void validate() throws SqlDomException throw new SqlDomException("table is mandatory for Copy Table Command"); } } + + public void setFilePatterns(List filePatterns) + { + this.filePatterns = filePatterns; + } + + public void setFilePaths(List filePaths) + { + this.filePaths = filePaths; + } + + public void setUserDefinedFileFormatName(String userDefinedFileFormatName) + { + this.userDefinedFileFormatName = userDefinedFileFormatName; + } + + public void setFileFormatType(FileFormatType fileFormatType) + { + this.fileFormatType = fileFormatType; + } + + public void setFileFormatOptions(Map fileFormatOptions) + { + this.fileFormatOptions = fileFormatOptions; + } + + public void setCopyOptions(Map copyOptions) + { + this.copyOptions = copyOptions; + } } diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-snowflake/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/BulkLoadTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-snowflake/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/BulkLoadTest.java index 982b720011e..54d26d645c9 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-snowflake/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/BulkLoadTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-snowflake/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/BulkLoadTest.java @@ -15,7 +15,7 @@ package org.finos.legend.engine.persistence.components.ingestmode; import org.finos.legend.engine.persistence.components.common.Datasets; -import org.finos.legend.engine.persistence.components.common.LoadOptions; +import org.finos.legend.engine.persistence.components.common.FileFormatType; import org.finos.legend.engine.persistence.components.common.StatisticName; import org.finos.legend.engine.persistence.components.ingestmode.audit.DateTimeAuditing; import org.finos.legend.engine.persistence.components.ingestmode.audit.NoAuditing; @@ -33,6 +33,8 @@ import org.finos.legend.engine.persistence.components.relational.api.GeneratorResult; import org.finos.legend.engine.persistence.components.relational.api.RelationalGenerator; import org.finos.legend.engine.persistence.components.relational.snowflake.SnowflakeSink; +import org.finos.legend.engine.persistence.components.relational.snowflake.logicalplan.datasets.StandardFileFormat; +import org.finos.legend.engine.persistence.components.relational.snowflake.logicalplan.datasets.UserDefinedFileFormat; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -87,8 +89,11 @@ public void testBulkLoadWithDigestNotGeneratedColumnNumbersDerived() .stagedFilesDatasetProperties( SnowflakeStagedFilesDatasetProperties.builder() .location("my_location") - .fileFormat("my_file_format") - .addAllFiles(filesList).build()) + .fileFormat(StandardFileFormat.builder() + .formatType(FileFormatType.CSV) + .putFormatOptions("FIELD_DELIMITER", ",") + .build()) + .addAllFilePatterns(filesList).build()) .schema(SchemaDefinition.builder().addAllFields(Arrays.asList(col1, col2)).build()) .build(); @@ -102,7 +107,7 @@ public void testBulkLoadWithDigestNotGeneratedColumnNumbersDerived() .relationalSink(SnowflakeSink.get()) .collectStatistics(true) .executionTimestampClock(fixedClock_2000_01_01) - .bulkLoadTaskIdValue("task123") + .bulkLoadEventIdValue("task123") .batchIdPattern("{NEXT_BATCH_ID}") .build(); @@ -118,11 +123,13 @@ public void testBulkLoadWithDigestNotGeneratedColumnNumbersDerived() "(\"col_int\", \"col_integer\", \"batch_id\", \"append_time\") " + "FROM " + "(SELECT legend_persistence_stage.$1 as \"col_int\",legend_persistence_stage.$2 as \"col_integer\",{NEXT_BATCH_ID},'2000-01-01 00:00:00.000000' " + - "FROM my_location (FILE_FORMAT => 'my_file_format', PATTERN => '(/path/xyz/file1.csv)|(/path/xyz/file2.csv)') as legend_persistence_stage)" + - " ON_ERROR='ABORT_STATEMENT'"; + "FROM my_location as legend_persistence_stage) " + + "PATTERN = '(/path/xyz/file1.csv)|(/path/xyz/file2.csv)' " + + "FILE_FORMAT = (FIELD_DELIMITER = ',', TYPE = 'CSV')" + + " ON_ERROR = 'ABORT_STATEMENT'"; String expectedMetadataIngestSql = "INSERT INTO bulk_load_batch_metadata (\"batch_id\", \"table_name\", \"batch_start_ts_utc\", \"batch_end_ts_utc\", \"batch_status\", \"batch_source_info\") " + - "(SELECT {NEXT_BATCH_ID},'my_name','2000-01-01 00:00:00.000000',SYSDATE(),'{BULK_LOAD_BATCH_STATUS_PLACEHOLDER}',PARSE_JSON('{\"files\":[\"/path/xyz/file1.csv\",\"/path/xyz/file2.csv\"],\"task_id\":\"task123\"}'))"; + "(SELECT {NEXT_BATCH_ID},'my_name','2000-01-01 00:00:00.000000',SYSDATE(),'{BULK_LOAD_BATCH_STATUS_PLACEHOLDER}',PARSE_JSON('{\"event_id\":\"task123\",\"file_patterns\":[\"/path/xyz/file1.csv\",\"/path/xyz/file2.csv\"]}'))"; Assertions.assertEquals(expectedCreateTableSql, preActionsSql.get(0)); Assertions.assertEquals(expectedIngestSql, ingestSql.get(0)); @@ -147,8 +154,8 @@ public void testBulkLoadWithDigestNotGeneratedColumnNumbersProvided() .stagedFilesDatasetProperties( SnowflakeStagedFilesDatasetProperties.builder() .location("my_location") - .fileFormat("my_file_format") - .addAllFiles(filesList).build()) + .fileFormat(StandardFileFormat.builder().formatType(FileFormatType.CSV).build()) + .addAllFilePaths(filesList).build()) .schema(SchemaDefinition.builder().addAllFields(Arrays.asList(col3, col4)).build()) .alias("t") .build(); @@ -162,7 +169,7 @@ public void testBulkLoadWithDigestNotGeneratedColumnNumbersProvided() .ingestMode(bulkLoad) .relationalSink(SnowflakeSink.get()) .collectStatistics(true) - .bulkLoadTaskIdValue("task123") + .bulkLoadEventIdValue("task123") .build(); GeneratorResult operations = generator.generateOperations(Datasets.of(mainDataset, stagedFilesDataset)); @@ -176,8 +183,10 @@ public void testBulkLoadWithDigestNotGeneratedColumnNumbersProvided() "(\"col_bigint\", \"col_variant\", \"batch_id\") " + "FROM " + "(SELECT t.$4 as \"col_bigint\",TO_VARIANT(PARSE_JSON(t.$5)) as \"col_variant\",(SELECT COALESCE(MAX(bulk_load_batch_metadata.\"batch_id\"),0)+1 FROM bulk_load_batch_metadata as bulk_load_batch_metadata WHERE UPPER(bulk_load_batch_metadata.\"table_name\") = 'MY_NAME') " + - "FROM my_location (FILE_FORMAT => 'my_file_format', PATTERN => '(/path/xyz/file1.csv)|(/path/xyz/file2.csv)') as t) " + - "ON_ERROR='ABORT_STATEMENT'"; + "FROM my_location as t) " + + "FILES = ('/path/xyz/file1.csv', '/path/xyz/file2.csv') " + + "FILE_FORMAT = (TYPE = 'CSV') " + + "ON_ERROR = 'ABORT_STATEMENT'"; Assertions.assertEquals(expectedCreateTableSql, preActionsSql.get(0)); Assertions.assertEquals(expectedIngestSql, ingestSql.get(0)); @@ -200,8 +209,8 @@ public void testBulkLoadWithUpperCaseConversionAndNoTaskId() .stagedFilesDatasetProperties( SnowflakeStagedFilesDatasetProperties.builder() .location("my_location") - .fileFormat("my_file_format") - .addAllFiles(filesList).build()) + .fileFormat(UserDefinedFileFormat.of("my_file_format")) + .addAllFilePaths(filesList).build()) .schema(SchemaDefinition.builder().addAllFields(Arrays.asList(col1, col2)).build()) .build(); @@ -233,13 +242,14 @@ public void testBulkLoadWithUpperCaseConversionAndNoTaskId() "(SELECT legend_persistence_stage.$1 as \"COL_INT\",legend_persistence_stage.$2 as \"COL_INTEGER\"," + "LAKEHOUSE_MD5(OBJECT_CONSTRUCT('COL_INT',legend_persistence_stage.$1,'COL_INTEGER',legend_persistence_stage.$2))," + "(SELECT COALESCE(MAX(BULK_LOAD_BATCH_METADATA.\"BATCH_ID\"),0)+1 FROM BULK_LOAD_BATCH_METADATA as BULK_LOAD_BATCH_METADATA WHERE UPPER(BULK_LOAD_BATCH_METADATA.\"TABLE_NAME\") = 'MY_NAME'),'2000-01-01 00:00:00.000000' " + - "FROM my_location (FILE_FORMAT => 'my_file_format', " + - "PATTERN => '(/path/xyz/file1.csv)|(/path/xyz/file2.csv)') as legend_persistence_stage) " + - "ON_ERROR='ABORT_STATEMENT'"; + "FROM my_location as legend_persistence_stage) " + + "FILES = ('/path/xyz/file1.csv', '/path/xyz/file2.csv') " + + "FILE_FORMAT = (FORMAT_NAME = 'my_file_format') " + + "ON_ERROR = 'ABORT_STATEMENT'"; String expectedMetadataIngestSql = "INSERT INTO BULK_LOAD_BATCH_METADATA (\"BATCH_ID\", \"TABLE_NAME\", \"BATCH_START_TS_UTC\", \"BATCH_END_TS_UTC\", \"BATCH_STATUS\", \"BATCH_SOURCE_INFO\") " + "(SELECT (SELECT COALESCE(MAX(BULK_LOAD_BATCH_METADATA.\"BATCH_ID\"),0)+1 FROM BULK_LOAD_BATCH_METADATA as BULK_LOAD_BATCH_METADATA WHERE UPPER(BULK_LOAD_BATCH_METADATA.\"TABLE_NAME\") = 'MY_NAME')," + - "'MY_NAME','2000-01-01 00:00:00.000000',SYSDATE(),'{BULK_LOAD_BATCH_STATUS_PLACEHOLDER}',PARSE_JSON('{\"files\":[\"/path/xyz/file1.csv\",\"/path/xyz/file2.csv\"]}'))"; + "'MY_NAME','2000-01-01 00:00:00.000000',SYSDATE(),'{BULK_LOAD_BATCH_STATUS_PLACEHOLDER}',PARSE_JSON('{\"file_paths\":[\"/path/xyz/file1.csv\",\"/path/xyz/file2.csv\"]}'))"; Assertions.assertEquals(expectedCreateTableSql, preActionsSql.get(0)); Assertions.assertEquals(expectedIngestSql, ingestSql.get(0)); @@ -313,7 +323,7 @@ public void testBulkLoadStagedFilesDatasetNotProvided() .relationalSink(SnowflakeSink.get()) .collectStatistics(true) .executionTimestampClock(fixedClock_2000_01_01) - .bulkLoadTaskIdValue("batch123") + .bulkLoadEventIdValue("batch123") .build(); GeneratorResult operations = generator.generateOperations(Datasets.of(mainDataset, stagingDataset)); @@ -338,8 +348,8 @@ public void testBulkLoadWithDigest() .stagedFilesDatasetProperties( SnowflakeStagedFilesDatasetProperties.builder() .location("my_location") - .fileFormat("my_file_format") - .addAllFiles(filesList).build()) + .fileFormat(UserDefinedFileFormat.of("my_file_format")) + .addAllFilePaths(filesList).build()) .schema(SchemaDefinition.builder().addAllFields(Arrays.asList(col1, col2)).build()) .build(); @@ -353,7 +363,7 @@ public void testBulkLoadWithDigest() .relationalSink(SnowflakeSink.get()) .collectStatistics(true) .executionTimestampClock(fixedClock_2000_01_01) - .bulkLoadTaskIdValue("task123") + .bulkLoadEventIdValue("task123") .build(); GeneratorResult operations = generator.generateOperations(Datasets.of(mainDataset, stagedFilesDataset)); @@ -370,9 +380,10 @@ public void testBulkLoadWithDigest() "(SELECT legend_persistence_stage.$1 as \"col_int\",legend_persistence_stage.$2 as \"col_integer\"," + "LAKEHOUSE_UDF(OBJECT_CONSTRUCT('col_int',legend_persistence_stage.$1,'col_integer',legend_persistence_stage.$2))," + "(SELECT COALESCE(MAX(bulk_load_batch_metadata.\"batch_id\"),0)+1 FROM bulk_load_batch_metadata as bulk_load_batch_metadata WHERE UPPER(bulk_load_batch_metadata.\"table_name\") = 'MY_NAME'),'2000-01-01 00:00:00.000000' " + - "FROM my_location (FILE_FORMAT => 'my_file_format', " + - "PATTERN => '(/path/xyz/file1.csv)|(/path/xyz/file2.csv)') as legend_persistence_stage) " + - "ON_ERROR='ABORT_STATEMENT'"; + "FROM my_location as legend_persistence_stage) " + + "FILES = ('/path/xyz/file1.csv', '/path/xyz/file2.csv') " + + "FILE_FORMAT = (FORMAT_NAME = 'my_file_format') " + + "ON_ERROR = 'ABORT_STATEMENT'"; Assertions.assertEquals(expectedCreateTableSql, preActionsSql.get(0)); Assertions.assertEquals(expectedIngestSql, ingestSql.get(0)); @@ -396,9 +407,9 @@ public void testBulkLoadWithDigestAndForceOption() .stagedFilesDatasetProperties( SnowflakeStagedFilesDatasetProperties.builder() .location("my_location") - .fileFormat("my_file_format") - .loadOptions(LoadOptions.builder().force(true).build()) - .addAllFiles(filesList).build()) + .fileFormat(UserDefinedFileFormat.of("my_file_format")) + .putCopyOptions("FORCE", true) + .addAllFilePatterns(filesList).build()) .schema(SchemaDefinition.builder().addAllFields(Arrays.asList(col1, col2)).build()) .build(); @@ -412,7 +423,7 @@ public void testBulkLoadWithDigestAndForceOption() .relationalSink(SnowflakeSink.get()) .collectStatistics(true) .executionTimestampClock(fixedClock_2000_01_01) - .bulkLoadTaskIdValue("task123") + .bulkLoadEventIdValue("task123") .build(); GeneratorResult operations = generator.generateOperations(Datasets.of(mainDataset, stagedFilesDataset)); @@ -429,9 +440,10 @@ public void testBulkLoadWithDigestAndForceOption() "(SELECT legend_persistence_stage.$1 as \"col_int\",legend_persistence_stage.$2 as \"col_integer\"," + "LAKEHOUSE_UDF(OBJECT_CONSTRUCT('col_int',legend_persistence_stage.$1,'col_integer',legend_persistence_stage.$2))," + "(SELECT COALESCE(MAX(bulk_load_batch_metadata.\"batch_id\"),0)+1 FROM bulk_load_batch_metadata as bulk_load_batch_metadata WHERE UPPER(bulk_load_batch_metadata.\"table_name\") = 'MY_NAME'),'2000-01-01 00:00:00.000000' " + - "FROM my_location (FILE_FORMAT => 'my_file_format', " + - "PATTERN => '(/path/xyz/file1.csv)|(/path/xyz/file2.csv)') as legend_persistence_stage) " + - "FORCE=TRUE, ON_ERROR='ABORT_STATEMENT'"; + "FROM my_location as legend_persistence_stage) " + + "PATTERN = '(/path/xyz/file1.csv)|(/path/xyz/file2.csv)' " + + "FILE_FORMAT = (FORMAT_NAME = 'my_file_format') " + + "FORCE = true, ON_ERROR = 'ABORT_STATEMENT'"; Assertions.assertEquals(expectedCreateTableSql, preActionsSql.get(0)); Assertions.assertEquals(expectedIngestSql, ingestSql.get(0)); @@ -455,9 +467,12 @@ public void testBulkLoadWithDigestAndForceOptionAndOnErrorOption() .stagedFilesDatasetProperties( SnowflakeStagedFilesDatasetProperties.builder() .location("my_location") - .fileFormat("my_file_format") - .loadOptions(LoadOptions.builder().force(false).onError("SKIP_FILE_10%").build()) - .addAllFiles(filesList).build()) + .fileFormat(StandardFileFormat.builder() + .formatType(FileFormatType.CSV) + .putFormatOptions("FIELD_DELIMITER", ",") + .build()) + .putCopyOptions("ON_ERROR", "SKIP_FILE") + .addAllFilePatterns(filesList).build()) .schema(SchemaDefinition.builder().addAllFields(Arrays.asList(col1, col2)).build()) .build(); @@ -471,7 +486,7 @@ public void testBulkLoadWithDigestAndForceOptionAndOnErrorOption() .relationalSink(SnowflakeSink.get()) .collectStatistics(true) .executionTimestampClock(fixedClock_2000_01_01) - .bulkLoadTaskIdValue("task123") + .bulkLoadEventIdValue("task123") .build(); GeneratorResult operations = generator.generateOperations(Datasets.of(mainDataset, stagedFilesDataset)); @@ -488,9 +503,10 @@ public void testBulkLoadWithDigestAndForceOptionAndOnErrorOption() "(SELECT legend_persistence_stage.$1 as \"col_int\",legend_persistence_stage.$2 as \"col_integer\"," + "LAKEHOUSE_UDF(OBJECT_CONSTRUCT('col_int',legend_persistence_stage.$1,'col_integer',legend_persistence_stage.$2))," + "(SELECT COALESCE(MAX(bulk_load_batch_metadata.\"batch_id\"),0)+1 FROM bulk_load_batch_metadata as bulk_load_batch_metadata WHERE UPPER(bulk_load_batch_metadata.\"table_name\") = 'MY_NAME'),'2000-01-01 00:00:00.000000' " + - "FROM my_location (FILE_FORMAT => 'my_file_format', " + - "PATTERN => '(/path/xyz/file1.csv)|(/path/xyz/file2.csv)') as legend_persistence_stage) " + - "FORCE=FALSE, ON_ERROR='SKIP_FILE_10%'"; + "FROM my_location as legend_persistence_stage) " + + "PATTERN = '(/path/xyz/file1.csv)|(/path/xyz/file2.csv)' " + + "FILE_FORMAT = (FIELD_DELIMITER = ',', TYPE = 'CSV') " + + "ON_ERROR = 'SKIP_FILE'"; Assertions.assertEquals(expectedCreateTableSql, preActionsSql.get(0)); Assertions.assertEquals(expectedIngestSql, ingestSql.get(0)); diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-snowflake/src/test/java/org/finos/legend/engine/persistence/components/relational/snowflake/sqldom/schemaops/CopyStatementTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-snowflake/src/test/java/org/finos/legend/engine/persistence/components/relational/snowflake/sqldom/schemaops/CopyStatementTest.java index 8ed6020752b..78877a8d46e 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-snowflake/src/test/java/org/finos/legend/engine/persistence/components/relational/snowflake/sqldom/schemaops/CopyStatementTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-snowflake/src/test/java/org/finos/legend/engine/persistence/components/relational/snowflake/sqldom/schemaops/CopyStatementTest.java @@ -14,6 +14,7 @@ package org.finos.legend.engine.persistence.components.relational.snowflake.sqldom.schemaops; +import org.finos.legend.engine.persistence.components.common.FileFormatType; import org.finos.legend.engine.persistence.components.relational.snowflake.sqldom.schemaops.expressions.table.StagedFilesTable; import org.finos.legend.engine.persistence.components.relational.snowflake.sqldom.schemaops.statements.CopyStatement; import org.finos.legend.engine.persistence.components.relational.snowflake.sqldom.schemaops.values.StagedFilesField; @@ -37,7 +38,7 @@ public class CopyStatementTest public static String QUOTE_IDENTIFIER = "\"%s\""; @Test - void testSelectStageStatement() throws SqlDomException + void testCopyStatementWithFilesAndStandardFileFormat() throws SqlDomException { StagedFilesTable stagedFiles = new StagedFilesTable("t","@my_stage"); List selectItems = Arrays.asList( @@ -57,20 +58,29 @@ void testSelectStageStatement() throws SqlDomException ); CopyStatement copyStatement = new CopyStatement(table, columns, selectStatement); + copyStatement.setFilePaths(Arrays.asList("path1", "path2")); + copyStatement.setFileFormatType(FileFormatType.CSV); + Map fileFormatOptions = new HashMap<>(); + fileFormatOptions.put("COMPRESSION", "AUTO"); + copyStatement.setFileFormatOptions(fileFormatOptions); + Map copyOptions = new HashMap<>(); + copyOptions.put("ON_ERROR", "ABORT_STATEMENT"); + copyStatement.setCopyOptions(copyOptions); + String sql1 = genSqlIgnoringErrors(copyStatement); assertEquals("COPY INTO \"mydb\".\"mytable1\" " + "(\"field1\", \"field2\", \"field3\", \"field4\") " + "FROM " + "(SELECT t.$1 as \"field1\",t.$2 as \"field2\",t.$3 as \"field3\",t.$4 as \"field4\" FROM @my_stage as t) " + - "ON_ERROR='ABORT_STATEMENT'", sql1); + "FILES = ('path1', 'path2') " + + "FILE_FORMAT = (COMPRESSION = 'AUTO', TYPE = 'CSV') " + + "ON_ERROR = 'ABORT_STATEMENT'", sql1); } @Test - void testSelectStageStatementWithPatternAndFileFormatAndForceOption() throws SqlDomException + void testCopyStatementWithPatternAndFileFormatAndForceOption() throws SqlDomException { StagedFilesTable stagedFiles = new StagedFilesTable("t","@my_stage"); - stagedFiles.setFileFormat("my_file_format"); - stagedFiles.setFilePattern("my_pattern"); List selectItems = Arrays.asList( new StagedFilesField(QUOTE_IDENTIFIER, 1, "t", "field1", "field1"), @@ -89,17 +99,24 @@ void testSelectStageStatementWithPatternAndFileFormatAndForceOption() throws Sql new Field("field4", QUOTE_IDENTIFIER) ); - Map loadOptions = new HashMap<>(); - loadOptions.put("FORCE", true); + Map copyOptions = new HashMap<>(); + copyOptions.put("FORCE", true); + copyOptions.put("ON_ERROR", "ABORT_STATEMENT"); + CopyStatement copyStatement = new CopyStatement(table, columns, selectStatement); + copyStatement.setFilePatterns(Arrays.asList("my_pattern1", "my_pattern2")); + copyStatement.setUserDefinedFileFormatName("my_file_format"); + copyStatement.setCopyOptions(copyOptions); - CopyStatement copyStatement = new CopyStatement(table, columns, selectStatement, loadOptions); String sql1 = genSqlIgnoringErrors(copyStatement); - assertEquals("COPY INTO \"mydb\".\"mytable1\" " + + String expectedStr = "COPY INTO \"mydb\".\"mytable1\" " + "(\"field1\", \"field2\", \"field3\", \"field4\") " + "FROM " + "(SELECT t.$1:field1 as \"field1\",t.$1:field2 as \"field2\",t.$1:field3 as \"field3\",t.$1:field4 as \"field4\" " + - "FROM @my_stage (FILE_FORMAT => 'my_file_format', PATTERN => 'my_pattern') as t) " + - "FORCE=TRUE, ON_ERROR='ABORT_STATEMENT'", sql1); + "FROM @my_stage as t) " + + "PATTERN = '(my_pattern1)|(my_pattern2)' " + + "FILE_FORMAT = (FORMAT_NAME = 'my_file_format') " + + "FORCE = true, ON_ERROR = 'ABORT_STATEMENT'"; + assertEquals(expectedStr, sql1); } public static String genSqlIgnoringErrors(SqlGen item)