Skip to content

Commit

Permalink
Support for filepaths and filepatterns in Snowflake Load
Browse files Browse the repository at this point in the history
  • Loading branch information
prasar-ashutosh committed Nov 27, 2023
1 parent 06cf988 commit c2ac933
Show file tree
Hide file tree
Showing 25 changed files with 434 additions and 300 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright 2023 Goldman Sachs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.finos.legend.engine.persistence.components.common;

public enum FileFormatType
{
CSV,
JSON,
AVRO,
PARQUET;
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,26 @@

package org.finos.legend.engine.persistence.components.logicalplan.datasets;

import org.finos.legend.engine.persistence.components.common.LoadOptions;
import org.immutables.value.Value;

import java.util.List;
import java.util.Optional;

public interface StagedFilesDatasetProperties
{
List<String> files();
List<String> filePaths();

Optional<LoadOptions> loadOptions();
List<String> filePatterns();

@Value.Check
default void validate()
{
if (filePatterns().size() > 0 && filePaths().size() > 0)
{
throw new IllegalArgumentException("Cannot build StagedFilesDatasetProperties, Only one out of filePatterns and filePaths should be provided");
}
if (filePatterns().size() == 0 && filePaths().size() == 0)
{
throw new IllegalArgumentException("Cannot build StagedFilesDatasetProperties, Either one of filePatterns and filePaths must be provided");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,11 @@

package org.finos.legend.engine.persistence.components.logicalplan.operations;

import org.finos.legend.engine.persistence.components.common.LoadOptions;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.StagedFilesDatasetProperties;
import org.finos.legend.engine.persistence.components.logicalplan.values.Value;

import java.util.List;
import java.util.Optional;

import static org.immutables.value.Value.Immutable;
import static org.immutables.value.Value.Parameter;
Expand All @@ -45,5 +44,5 @@ public interface CopyAbstract extends Operation
List<Value> fields();

@Parameter(order = 3)
Optional<LoadOptions> loadOptions();
StagedFilesDatasetProperties stagedFilesDatasetProperties();
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class BulkLoadPlanner extends Planner
private Dataset tempDataset;
private StagedFilesDataset stagedFilesDataset;
private BulkLoadMetadataDataset bulkLoadMetadataDataset;
private Optional<String> bulkLoadTaskIdValue;
private Optional<String> bulkLoadEventIdValue;

BulkLoadPlanner(Datasets datasets, BulkLoad ingestMode, PlannerOptions plannerOptions, Set<Capability> capabilities)
{
Expand All @@ -75,7 +75,7 @@ class BulkLoadPlanner extends Planner
throw new IllegalArgumentException("Only StagedFilesDataset are allowed under Bulk Load");
}

bulkLoadTaskIdValue = plannerOptions.bulkLoadTaskIdValue();
bulkLoadEventIdValue = plannerOptions.bulkLoadEventIdValue();
stagedFilesDataset = (StagedFilesDataset) datasets.stagingDataset();
bulkLoadMetadataDataset = bulkLoadMetadataDataset().orElseThrow(IllegalStateException::new);

Expand Down Expand Up @@ -139,7 +139,7 @@ private LogicalPlan buildLogicalPlanForTransformWhileCopy(Resources resources)
}

Dataset selectStage = StagedFilesSelection.builder().source(stagedFilesDataset).addAllFields(fieldsToSelect).build();
return LogicalPlan.of(Collections.singletonList(Copy.of(mainDataset(), selectStage, fieldsToInsert, stagedFilesDataset.stagedFilesDatasetProperties().loadOptions())));
return LogicalPlan.of(Collections.singletonList(Copy.of(mainDataset(), selectStage, fieldsToInsert, stagedFilesDataset.stagedFilesDatasetProperties())));
}

private LogicalPlan buildLogicalPlanForCopyAndTransform(Resources resources)
Expand All @@ -150,7 +150,7 @@ private LogicalPlan buildLogicalPlanForCopyAndTransform(Resources resources)
// Operation 1: Copy into a temp table
List<Value> fieldsToSelectFromStage = LogicalPlanUtils.extractStagedFilesFieldValues(stagingDataset());
Dataset selectStage = StagedFilesSelection.builder().source(stagedFilesDataset).addAllFields(fieldsToSelectFromStage).build();
operations.add(Copy.of(tempDataset, selectStage, fieldsToSelectFromStage, stagedFilesDataset.stagedFilesDatasetProperties().loadOptions()));
operations.add(Copy.of(tempDataset, selectStage, fieldsToSelectFromStage, stagedFilesDataset.stagedFilesDatasetProperties()));


// Operation 2: Transfer from temp table into target table, adding extra columns at the same time
Expand Down Expand Up @@ -263,9 +263,18 @@ private Selection getRowsBasedOnAppendTimestamp(Dataset dataset, String field, S
private String jsonifyBatchSourceInfo(StagedFilesDatasetProperties stagedFilesDatasetProperties)
{
Map<String, Object> batchSourceMap = new HashMap();
List<String> files = stagedFilesDatasetProperties.files();
batchSourceMap.put("files", files);
bulkLoadTaskIdValue.ifPresent(taskId -> batchSourceMap.put("task_id", taskId));
List<String> filePaths = stagedFilesDatasetProperties.filePaths();
List<String> filePatterns = stagedFilesDatasetProperties.filePatterns();

if (filePaths != null && !filePaths.isEmpty())
{
batchSourceMap.put("file_paths", filePaths);
}
if (filePatterns != null && !filePatterns.isEmpty())
{
batchSourceMap.put("file_patterns", filePatterns);
}
bulkLoadEventIdValue.ifPresent(taskId -> batchSourceMap.put("event_id", taskId));
ObjectMapper objectMapper = new ObjectMapper();
try
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ default boolean enableConcurrentSafety()
return false;
}

Optional<String> bulkLoadTaskIdValue();
Optional<String> bulkLoadEventIdValue();
}

private final Datasets datasets;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@

package org.finos.legend.engine.persistence.components.relational.bigquery.logicalplan.datasets;

import org.finos.legend.engine.persistence.components.common.FileFormat;
import org.finos.legend.engine.persistence.components.common.FileFormatType;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.StagedFilesDatasetProperties;
import org.immutables.value.Value;

import java.util.Map;

@Value.Immutable
@Value.Style(
Expand All @@ -30,5 +31,7 @@
)
public interface BigQueryStagedFilesDatasetPropertiesAbstract extends StagedFilesDatasetProperties
{
FileFormat fileFormat();
FileFormatType fileFormat();

Map<String, Object> loadOptions();
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@

package org.finos.legend.engine.persistence.components.relational.bigquery.sql.visitor;

import org.finos.legend.engine.persistence.components.common.FileFormat;
import org.finos.legend.engine.persistence.components.common.LoadOptions;
import org.finos.legend.engine.persistence.components.common.FileFormatType;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.StagedFilesDatasetReference;
import org.finos.legend.engine.persistence.components.physicalplan.PhysicalPlanNode;
import org.finos.legend.engine.persistence.components.relational.bigquery.logicalplan.datasets.BigQueryStagedFilesDatasetProperties;
Expand All @@ -24,6 +23,7 @@
import org.finos.legend.engine.persistence.components.transformer.VisitorContext;

import java.util.HashMap;
import java.util.List;
import java.util.Map;


Expand All @@ -38,39 +38,13 @@ public VisitorResult visit(PhysicalPlanNode prev, StagedFilesDatasetReference cu
}
BigQueryStagedFilesDatasetProperties datasetProperties = (BigQueryStagedFilesDatasetProperties) current.properties();

Map<String, Object> loadOptionsMap = new HashMap<>();
FileFormat fileFormat = datasetProperties.fileFormat();
loadOptionsMap.put("format", fileFormat.name());
datasetProperties.loadOptions().ifPresent(options -> retrieveLoadOptions(fileFormat, options, loadOptionsMap));

StagedFilesTable stagedFilesTable = new StagedFilesTable(datasetProperties.files(), loadOptionsMap);
FileFormatType fileFormatType = datasetProperties.fileFormat();
Map<String, Object> loadOptionsMap = new HashMap<>(datasetProperties.loadOptions());
loadOptionsMap.put("format", fileFormatType.name());
List<String> uris = datasetProperties.filePaths().isEmpty() ? datasetProperties.filePatterns() : datasetProperties.filePaths();
StagedFilesTable stagedFilesTable = new StagedFilesTable(uris, loadOptionsMap);
prev.push(stagedFilesTable);

return new VisitorResult(null);
}

private void retrieveLoadOptions(FileFormat fileFormat, LoadOptions loadOptions, Map<String, Object> loadOptionsMap)
{
switch (fileFormat)
{
case CSV:
loadOptions.fieldDelimiter().ifPresent(property -> loadOptionsMap.put("field_delimiter", property));
loadOptions.encoding().ifPresent(property -> loadOptionsMap.put("encoding", property));
loadOptions.nullMarker().ifPresent(property -> loadOptionsMap.put("null_marker", property));
loadOptions.quote().ifPresent(property -> loadOptionsMap.put("quote", property));
loadOptions.skipLeadingRows().ifPresent(property -> loadOptionsMap.put("skip_leading_rows", property));
loadOptions.maxBadRecords().ifPresent(property -> loadOptionsMap.put("max_bad_records", property));
loadOptions.compression().ifPresent(property -> loadOptionsMap.put("compression", property));
break;
case JSON:
loadOptions.maxBadRecords().ifPresent(property -> loadOptionsMap.put("max_bad_records", property));
loadOptions.compression().ifPresent(property -> loadOptionsMap.put("compression", property));
break;
case AVRO:
case PARQUET:
return;
default:
throw new IllegalStateException("Unrecognized file format: " + fileFormat);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@
package org.finos.legend.engine.persistence.components.e2e;

import org.finos.legend.engine.persistence.components.common.Datasets;
import org.finos.legend.engine.persistence.components.common.FileFormat;
import org.finos.legend.engine.persistence.components.common.LoadOptions;
import org.finos.legend.engine.persistence.components.common.FileFormatType;
import org.finos.legend.engine.persistence.components.ingestmode.BulkLoad;
import org.finos.legend.engine.persistence.components.ingestmode.audit.DateTimeAuditing;
import org.finos.legend.engine.persistence.components.ingestmode.digest.NoDigestGenStrategy;
Expand All @@ -40,10 +39,7 @@
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.*;

import static org.finos.legend.engine.persistence.components.common.StatisticName.ROWS_INSERTED;
import static org.finos.legend.engine.persistence.components.common.StatisticName.ROWS_WITH_ERRORS;
Expand Down Expand Up @@ -89,8 +85,8 @@ public void testMilestoning() throws IOException, InterruptedException
Dataset stagedFilesDataset = StagedFilesDataset.builder()
.stagedFilesDatasetProperties(
BigQueryStagedFilesDatasetProperties.builder()
.fileFormat(FileFormat.CSV)
.addAllFiles(FILE_LIST).build())
.fileFormat(FileFormatType.CSV)
.addAllFilePaths(FILE_LIST).build())
.schema(SchemaDefinition.builder().addAllFields(Arrays.asList(col1, col2, col3, col4)).build())
.build();

Expand All @@ -116,7 +112,7 @@ public void testMilestoning() throws IOException, InterruptedException
.relationalSink(BigQuerySink.get())
.collectStatistics(true)
.executionTimestampClock(fixedClock_2000_01_01)
.bulkLoadTaskIdValue(TASK_ID_VALUE)
.bulkLoadEventIdValue(TASK_ID_VALUE)
.build();

RelationalConnection connection = BigQueryConnection.of(getBigQueryConnection());
Expand Down Expand Up @@ -144,12 +140,14 @@ public void testMilestoningFailure() throws IOException, InterruptedException
.auditing(DateTimeAuditing.builder().dateTimeField(APPEND_TIME).build())
.build();

Map<String, Object> loadOptions = new HashMap<>();
loadOptions.put("max_bad_records", 10L);
Dataset stagedFilesDataset = StagedFilesDataset.builder()
.stagedFilesDatasetProperties(
BigQueryStagedFilesDatasetProperties.builder()
.fileFormat(FileFormat.CSV)
.loadOptions(LoadOptions.builder().maxBadRecords(10L).build())
.addAllFiles(BAD_FILE_LIST).build())
.fileFormat(FileFormatType.CSV)
.putAllLoadOptions(loadOptions)
.addAllFilePaths(BAD_FILE_LIST).build())
.schema(SchemaDefinition.builder().addAllFields(Arrays.asList(col1, col2, col3, col4)).build())
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
package org.finos.legend.engine.persistence.components.e2e;

import org.finos.legend.engine.persistence.components.common.Datasets;
import org.finos.legend.engine.persistence.components.common.FileFormat;
import org.finos.legend.engine.persistence.components.common.FileFormatType;
import org.finos.legend.engine.persistence.components.ingestmode.BulkLoad;
import org.finos.legend.engine.persistence.components.ingestmode.audit.DateTimeAuditing;
import org.finos.legend.engine.persistence.components.ingestmode.digest.NoDigestGenStrategy;
Expand Down Expand Up @@ -81,8 +81,8 @@ public void testMilestoning() throws IOException, InterruptedException
Dataset stagedFilesDataset = StagedFilesDataset.builder()
.stagedFilesDatasetProperties(
BigQueryStagedFilesDatasetProperties.builder()
.fileFormat(FileFormat.CSV)
.addAllFiles(FILE_LIST).build())
.fileFormat(FileFormatType.CSV)
.addAllFilePaths(FILE_LIST).build())
.schema(SchemaDefinition.builder().addAllFields(Arrays.asList(col1, col2, col3, col4)).build())
.build();

Expand All @@ -108,7 +108,7 @@ public void testMilestoning() throws IOException, InterruptedException
.relationalSink(BigQuerySink.get())
.collectStatistics(true)
.executionTimestampClock(fixedClock_2000_01_01)
.bulkLoadTaskIdValue(TASK_ID_VALUE)
.bulkLoadEventIdValue(TASK_ID_VALUE)
.bulkLoadBatchStatusPattern("{STATUS}")
.build();

Expand Down
Loading

0 comments on commit c2ac933

Please sign in to comment.