Skip to content

Commit

Permalink
Persistence Component: Implement type conversion logic in Snowflake f…
Browse files Browse the repository at this point in the history
…or digest generation (#2849)
  • Loading branch information
kumuwu authored May 21, 2024
1 parent e930439 commit db8d281
Show file tree
Hide file tree
Showing 14 changed files with 225 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@

package org.finos.legend.engine.persistence.components.ingestmode.digest;

import org.finos.legend.engine.persistence.components.logicalplan.datasets.DataType;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.FieldType;
import org.finos.legend.engine.persistence.components.logicalplan.values.DigestUdf;
import org.finos.legend.engine.persistence.components.logicalplan.values.FieldValue;
import org.finos.legend.engine.persistence.components.logicalplan.values.StagedFilesFieldValue;
Expand All @@ -30,10 +30,10 @@ public class DigestGenerationHandler implements DigestGenStrategyVisitor<Void>
{
private List<Value> fieldsToSelect;
private List<Value> fieldsToInsert;
private List<DataType> fieldTypes;
private List<FieldType> fieldTypes;
private Dataset mainDataset;

public DigestGenerationHandler(Dataset mainDataset, List<Value> fieldsToSelect, List<Value> fieldsToInsert, List<DataType> fieldTypes)
public DigestGenerationHandler(Dataset mainDataset, List<Value> fieldsToSelect, List<Value> fieldsToInsert, List<FieldType> fieldTypes)
{
this.mainDataset = mainDataset;
this.fieldsToSelect = fieldsToSelect;
Expand All @@ -53,7 +53,7 @@ public Void visitUDFBasedDigestGenStrategy(UDFBasedDigestGenStrategyAbstract udf
Set<String> fieldsToExclude = udfBasedDigestGenStrategy.fieldsToExcludeFromDigest();
List<String> filteredStagingFieldNames = new ArrayList<>();
List<Value> filteredStagingFieldValues = new ArrayList<>();
List<DataType> filteredStagingFieldTypes = new ArrayList<>();
List<FieldType> filteredStagingFieldTypes = new ArrayList<>();

List<Value> sortedFieldsToSelect = fieldsToSelect.stream().sorted((o1, o2) ->
{
Expand All @@ -71,7 +71,7 @@ else if (o1 instanceof StagedFilesFieldValue && o2 instanceof StagedFilesFieldVa
for (Value value : sortedFieldsToSelect)
{
int index = fieldsToSelect.indexOf(value);
DataType dataType = fieldTypes.get(index);
FieldType dataType = fieldTypes.get(index);

if (value instanceof FieldValue)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright 2024 Goldman Sachs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.finos.legend.engine.persistence.components.logicalplan.values;

import org.finos.legend.engine.persistence.components.logicalplan.datasets.FieldType;

import static org.immutables.value.Value.Immutable;
import static org.immutables.value.Value.Parameter;
import static org.immutables.value.Value.Style;

@Immutable
@Style(
typeAbstract = "*Abstract",
typeImmutable = "*",
jdkOnly = true,
optionalAcceptNullable = true,
strictBuilder = true
)
public interface CastFunctionAbstract extends Value
{
@Parameter(order = 0)
Value field();

@Parameter(order = 1)
FieldType type();
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package org.finos.legend.engine.persistence.components.logicalplan.values;

import org.finos.legend.engine.persistence.components.logicalplan.datasets.DataType;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.FieldType;

import java.util.List;
import java.util.Map;
Expand All @@ -36,7 +37,7 @@ public interface DigestUdfAbstract extends Value

List<Value> values();

List<DataType> fieldTypes();
List<FieldType> fieldTypes();

Map<DataType, String> typeConversionUdfNames();
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@
import org.finos.legend.engine.persistence.components.logicalplan.conditions.Condition;
import org.finos.legend.engine.persistence.components.logicalplan.conditions.Exists;
import org.finos.legend.engine.persistence.components.logicalplan.conditions.Not;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.DataType;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Field;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.FieldType;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Selection;
import org.finos.legend.engine.persistence.components.logicalplan.operations.Insert;
import org.finos.legend.engine.persistence.components.logicalplan.values.All;
Expand Down Expand Up @@ -107,7 +107,7 @@ protected AppendOnly ingestMode()
@Override
public LogicalPlan buildLogicalPlanForIngest(Resources resources)
{
Pair<List<Value>, List<DataType>> dataFieldsWithTypes = getDataFieldsWithTypes();
Pair<List<Value>, List<FieldType>> dataFieldsWithTypes = getDataFieldsWithTypes();
List<Value> dataFields = dataFieldsWithTypes.getOne();
List<Value> fieldsToSelect = new ArrayList<>(dataFields);
List<Value> fieldsToInsert = new ArrayList<>(dataFields);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ private LogicalPlan buildLogicalPlanForTransformWhileCopy(Resources resources)
List<Value> fieldsToInsert = new ArrayList<>(stagingDataset().schemaReference().fieldValues());

// Add digest
List<DataType> fieldTypes = stagingDataset().schema().fields().stream().map(field -> field.type().dataType()).collect(Collectors.toList());
List<FieldType> fieldTypes = stagingDataset().schema().fields().stream().map(field -> field.type()).collect(Collectors.toList());
ingestMode().digestGenStrategy().accept(new DigestGenerationHandler(mainDataset(), fieldsToSelect, fieldsToInsert, fieldTypes));

// Add batch_id field
Expand Down Expand Up @@ -304,7 +304,7 @@ private LogicalPlan buildLogicalPlanForCopyAndTransform(Resources resources)
List<Value> fieldsToInsertIntoMain = new ArrayList<>(externalDataset.schemaReference().fieldValues());

// Add digest
List<DataType> fieldTypes = externalDataset.schema().fields().stream().map(field -> field.type().dataType()).collect(Collectors.toList());
List<FieldType> fieldTypes = externalDataset.schema().fields().stream().map(field -> field.type()).collect(Collectors.toList());
ingestMode().digestGenStrategy().accept(new DigestGenerationHandler(mainDataset(), fieldsToSelect, fieldsToInsertIntoMain, fieldTypes));

// Add batch_id field
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@
import org.finos.legend.engine.persistence.components.logicalplan.LogicalPlanFactory;
import org.finos.legend.engine.persistence.components.logicalplan.conditions.Condition;
import org.finos.legend.engine.persistence.components.logicalplan.conditions.GreaterThan;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.DataType;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.DerivedDataset;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Field;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.FieldType;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.FilteredDataset;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Selection;
import org.finos.legend.engine.persistence.components.logicalplan.operations.*;
Expand Down Expand Up @@ -239,10 +239,10 @@ protected Dataset tempStagingDatasetWithoutPks()
return tempStagingDatasetWithoutPks.orElseThrow(IllegalStateException::new);
}

protected Pair<List<Value>, List<DataType>> getDataFieldsWithTypes()
protected Pair<List<Value>, List<FieldType>> getDataFieldsWithTypes()
{
List<Value> dataFields = new ArrayList<>();
List<DataType> fieldTypes = new ArrayList<>();
List<FieldType> fieldTypes = new ArrayList<>();

Optional<String> dedupField = ingestMode.deduplicationStrategy().accept(DeduplicationVisitors.EXTRACT_DEDUP_FIELD);
Optional<String> dataSplitField = ingestMode.dataSplitField();
Expand All @@ -256,7 +256,7 @@ protected Pair<List<Value>, List<DataType>> getDataFieldsWithTypes()
continue;
}
dataFields.add(fieldValue);
fieldTypes.add(stagingDataset().schema().fields().get(i).type().dataType());
fieldTypes.add(stagingDataset().schema().fields().get(i).type());
}

return Tuples.pair(dataFields, fieldTypes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package org.finos.legend.engine.persistence.components.relational.ansi.sql.visitors;

import org.finos.legend.engine.persistence.components.logicalplan.datasets.DataType;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.FieldType;
import org.finos.legend.engine.persistence.components.logicalplan.values.DigestUdf;
import org.finos.legend.engine.persistence.components.logicalplan.values.StringValue;
import org.finos.legend.engine.persistence.components.logicalplan.values.ToArrayFunction;
Expand Down Expand Up @@ -51,7 +52,7 @@ public VisitorResult visit(PhysicalPlanNode prev, DigestUdf current, VisitorCont
return new VisitorResult(udf, Arrays.asList(toArrayColumnNames, toArrayColumnValues));
}

protected Value getColumnValueAsStringType(Value value, DataType dataType, Map<DataType, String> typeConversionUdfNames)
protected Value getColumnValueAsStringType(Value value, FieldType dataType, Map<DataType, String> typeConversionUdfNames)
{
throw new IllegalStateException("UDF is unsupported in ANSI sink");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package org.finos.legend.engine.persistence.components.relational.bigquery.sql.visitor;

import org.finos.legend.engine.persistence.components.logicalplan.datasets.DataType;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.FieldType;
import org.finos.legend.engine.persistence.components.logicalplan.values.Value;

import java.util.Map;
Expand All @@ -23,11 +24,11 @@ public class DigestUdfVisitor extends org.finos.legend.engine.persistence.compon
{

@Override
protected Value getColumnValueAsStringType(Value value, DataType dataType, Map<DataType, String> typeConversionUdfNames)
protected Value getColumnValueAsStringType(Value value, FieldType dataType, Map<DataType, String> typeConversionUdfNames)
{
if (typeConversionUdfNames.containsKey(dataType))
if (typeConversionUdfNames.containsKey(dataType.dataType()))
{
return org.finos.legend.engine.persistence.components.logicalplan.values.Udf.builder().udfName(typeConversionUdfNames.get(dataType)).addParameters(value).build();
return org.finos.legend.engine.persistence.components.logicalplan.values.Udf.builder().udfName(typeConversionUdfNames.get(dataType.dataType())).addParameters(value).build();
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package org.finos.legend.engine.persistence.components.relational.h2.sql.visitor;

import org.finos.legend.engine.persistence.components.logicalplan.datasets.DataType;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.FieldType;
import org.finos.legend.engine.persistence.components.logicalplan.values.FieldValue;
import org.finos.legend.engine.persistence.components.logicalplan.values.FunctionImpl;
import org.finos.legend.engine.persistence.components.logicalplan.values.FunctionName;
Expand All @@ -27,7 +28,7 @@
public class DigestUdfVisitor extends org.finos.legend.engine.persistence.components.relational.ansi.sql.visitors.DigestUdfVisitor
{

protected Value getColumnValueAsStringType(Value value, DataType dataType, Map<DataType, String> typeConversionUdfNames)
protected Value getColumnValueAsStringType(Value value, FieldType dataType, Map<DataType, String> typeConversionUdfNames)
{
if (value instanceof StagedFilesFieldValue)
{
Expand All @@ -37,9 +38,9 @@ protected Value getColumnValueAsStringType(Value value, DataType dataType, Map<D
}

// Else need to convert the field into a String
if (typeConversionUdfNames.containsKey(dataType))
if (typeConversionUdfNames.containsKey(dataType.dataType()))
{
return org.finos.legend.engine.persistence.components.logicalplan.values.Udf.builder().udfName(typeConversionUdfNames.get(dataType)).addParameters(value).build();
return org.finos.legend.engine.persistence.components.logicalplan.values.Udf.builder().udfName(typeConversionUdfNames.get(dataType.dataType())).addParameters(value).build();
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.finos.legend.engine.persistence.components.logicalplan.operations.Create;
import org.finos.legend.engine.persistence.components.logicalplan.operations.Show;
import org.finos.legend.engine.persistence.components.logicalplan.values.BatchEndTimestamp;
import org.finos.legend.engine.persistence.components.logicalplan.values.CastFunction;
import org.finos.legend.engine.persistence.components.logicalplan.values.DigestUdf;
import org.finos.legend.engine.persistence.components.logicalplan.values.FieldValue;
import org.finos.legend.engine.persistence.components.logicalplan.values.MetadataFileNameField;
Expand Down Expand Up @@ -67,6 +68,7 @@
import org.finos.legend.engine.persistence.components.relational.snowflake.sql.SnowflakeJdbcPropertiesToLogicalDataTypeMapping;
import org.finos.legend.engine.persistence.components.relational.snowflake.sql.visitor.AlterVisitor;
import org.finos.legend.engine.persistence.components.relational.snowflake.sql.visitor.BatchEndTimestampVisitor;
import org.finos.legend.engine.persistence.components.relational.snowflake.sql.visitor.CastFunctionVisitor;
import org.finos.legend.engine.persistence.components.relational.snowflake.sql.visitor.ClusterKeyVisitor;
import org.finos.legend.engine.persistence.components.relational.snowflake.sql.visitor.MetadataFileNameFieldVisitor;
import org.finos.legend.engine.persistence.components.relational.snowflake.sql.visitor.MetadataRowNumberFieldVisitor;
Expand Down Expand Up @@ -178,6 +180,7 @@ public class SnowflakeSink extends AnsiSqlSink
logicalPlanVisitorByClass.put(StagedFilesFieldValue.class, new StagedFilesFieldValueVisitor());
logicalPlanVisitorByClass.put(StagedFilesSelection.class, new StagedFilesSelectionVisitor());
logicalPlanVisitorByClass.put(DigestUdf.class, new DigestUdfVisitor());
logicalPlanVisitorByClass.put(CastFunction.class, new CastFunctionVisitor());
logicalPlanVisitorByClass.put(TryCastFunction.class, new TryCastFunctionVisitor());
logicalPlanVisitorByClass.put(MetadataFileNameField.class, new MetadataFileNameFieldVisitor());
logicalPlanVisitorByClass.put(MetadataRowNumberField.class, new MetadataRowNumberFieldVisitor());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright 2024 Goldman Sachs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.finos.legend.engine.persistence.components.relational.snowflake.sql.visitor;

import org.finos.legend.engine.persistence.components.logicalplan.LogicalPlanNode;
import org.finos.legend.engine.persistence.components.logicalplan.values.CastFunction;
import org.finos.legend.engine.persistence.components.optimizer.Optimizer;
import org.finos.legend.engine.persistence.components.physicalplan.PhysicalPlanNode;
import org.finos.legend.engine.persistence.components.relational.snowflake.sql.SnowflakeDataTypeMapping;
import org.finos.legend.engine.persistence.components.relational.sqldom.schema.DataType;
import org.finos.legend.engine.persistence.components.transformer.LogicalPlanVisitor;
import org.finos.legend.engine.persistence.components.transformer.VisitorContext;

import java.util.ArrayList;
import java.util.List;

public class CastFunctionVisitor implements LogicalPlanVisitor<CastFunction>
{
@Override
public VisitorResult visit(PhysicalPlanNode prev, CastFunction current, VisitorContext context)
{
DataType dataType = new SnowflakeDataTypeMapping().getDataType(current.type());

org.finos.legend.engine.persistence.components.relational.snowflake.sqldom.schemaops.values.CastFunction castFunction =
new org.finos.legend.engine.persistence.components.relational.snowflake.sqldom.schemaops.values.CastFunction(dataType, context.quoteIdentifier());
for (Optimizer optimizer : context.optimizers())
{
castFunction = (org.finos.legend.engine.persistence.components.relational.snowflake.sqldom.schemaops.values.CastFunction) optimizer.optimize(castFunction);
}
prev.push(castFunction);

List<LogicalPlanNode> logicalPlanNodeList = new ArrayList<>();
logicalPlanNodeList.add(current.field());

return new VisitorResult(castFunction, logicalPlanNodeList);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,42 @@
package org.finos.legend.engine.persistence.components.relational.snowflake.sql.visitor;

import org.finos.legend.engine.persistence.components.logicalplan.datasets.DataType;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.FieldType;
import org.finos.legend.engine.persistence.components.logicalplan.values.CastFunction;
import org.finos.legend.engine.persistence.components.logicalplan.values.StagedFilesFieldValue;
import org.finos.legend.engine.persistence.components.logicalplan.values.Value;

import java.util.Map;

public class DigestUdfVisitor extends org.finos.legend.engine.persistence.components.relational.ansi.sql.visitors.DigestUdfVisitor
{
protected Value getColumnValueAsStringType(Value value, DataType dataType, Map<DataType, String> typeConversionUdfNames)
protected Value getColumnValueAsStringType(Value value, FieldType dataType, Map<DataType, String> typeConversionUdfNames)
{
if (typeConversionUdfNames.containsKey(dataType))
if (value instanceof StagedFilesFieldValue)
{
return org.finos.legend.engine.persistence.components.logicalplan.values.Udf.builder().udfName(typeConversionUdfNames.get(dataType)).addParameters(value).build();
if (typeConversionUdfNames.containsKey(dataType.dataType()))
{
// TO_STRING(CAST(field))
return org.finos.legend.engine.persistence.components.logicalplan.values.Udf.builder().udfName(typeConversionUdfNames.get(dataType.dataType())).addParameters(CastFunction.builder().field(value).type(dataType).build()).build();
}
else
{
// CAST(field)
return CastFunction.builder().field(value).type(dataType).build();
}
}
else
{
return value;
if (typeConversionUdfNames.containsKey(dataType.dataType()))
{
// TO_STRING(field)
return org.finos.legend.engine.persistence.components.logicalplan.values.Udf.builder().udfName(typeConversionUdfNames.get(dataType.dataType())).addParameters(value).build();
}
else
{
// field
return value;
}
}
}
}
Loading

0 comments on commit db8d281

Please sign in to comment.