Skip to content

Commit

Permalink
Persistence Component: Improve UDF-Based Digest Generation (#2700)
Browse files Browse the repository at this point in the history
  • Loading branch information
kumuwu authored Mar 26, 2024
1 parent c868832 commit 25bdd39
Show file tree
Hide file tree
Showing 44 changed files with 987 additions and 160 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ public DigestGenStrategy visitUDFBasedDigestGenStrategy(UDFBasedDigestGenStrateg
{
return UDFBasedDigestGenStrategy.builder()
.digestUdfName(udfBasedDigestGenStrategy.digestUdfName())
.putAllTypeConversionUdfNames(udfBasedDigestGenStrategy.typeConversionUdfNames())
.digestField(applyCase(udfBasedDigestGenStrategy.digestField()))
.addAllFieldsToExcludeFromDigest(udfBasedDigestGenStrategy.fieldsToExcludeFromDigest().stream().map(name -> applyCase(name)).collect(Collectors.toSet()))
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package org.finos.legend.engine.persistence.components.ingestmode.digest;

import org.finos.legend.engine.persistence.components.logicalplan.datasets.DataType;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset;
import org.finos.legend.engine.persistence.components.logicalplan.values.DigestUdf;
import org.finos.legend.engine.persistence.components.logicalplan.values.FieldValue;
Expand All @@ -23,18 +24,21 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

public class DigestGenerationHandler implements DigestGenStrategyVisitor<Void>
{
private List<Value> fieldsToSelect;
private List<Value> fieldsToInsert;
private List<DataType> fieldTypes;
private Dataset mainDataset;

public DigestGenerationHandler(Dataset mainDataset, List<Value> fieldsToSelect, List<Value> fieldsToInsert)
public DigestGenerationHandler(Dataset mainDataset, List<Value> fieldsToSelect, List<Value> fieldsToInsert, List<DataType> fieldTypes)
{
this.mainDataset = mainDataset;
this.fieldsToSelect = fieldsToSelect;
this.fieldsToInsert = fieldsToInsert;
this.fieldTypes = fieldTypes;
}

@Override
Expand All @@ -49,16 +53,34 @@ public Void visitUDFBasedDigestGenStrategy(UDFBasedDigestGenStrategyAbstract udf
Set<String> fieldsToExclude = udfBasedDigestGenStrategy.fieldsToExcludeFromDigest();
List<String> filteredStagingFieldNames = new ArrayList<>();
List<Value> filteredStagingFieldValues = new ArrayList<>();
List<DataType> filteredStagingFieldTypes = new ArrayList<>();

for (Value value: fieldsToSelect)
List<Value> sortedFieldsToSelect = fieldsToSelect.stream().sorted((o1, o2) ->
{
if (o1 instanceof FieldValue && o2 instanceof FieldValue)
{
return ((FieldValue) o1).fieldName().compareTo(((FieldValue) o2).fieldName());
}
else if (o1 instanceof StagedFilesFieldValue && o2 instanceof StagedFilesFieldValue)
{
return ((StagedFilesFieldValue) o1).fieldName().compareTo(((StagedFilesFieldValue) o2).fieldName());
}
return 0;
}).collect(Collectors.toList());

for (Value value : sortedFieldsToSelect)
{
int index = fieldsToSelect.indexOf(value);
DataType dataType = fieldTypes.get(index);

if (value instanceof FieldValue)
{
FieldValue fieldValue = (FieldValue) value;
if (!fieldsToExclude.contains(fieldValue.fieldName()))
{
filteredStagingFieldNames.add(fieldValue.fieldName());
filteredStagingFieldValues.add(fieldValue);
filteredStagingFieldTypes.add(dataType);
}
}
else if (value instanceof StagedFilesFieldValue)
Expand All @@ -68,6 +90,7 @@ else if (value instanceof StagedFilesFieldValue)
{
filteredStagingFieldNames.add(stagedFilesFieldValue.fieldName());
filteredStagingFieldValues.add(stagedFilesFieldValue);
filteredStagingFieldTypes.add(dataType);
}
}
else
Expand All @@ -81,6 +104,8 @@ else if (value instanceof StagedFilesFieldValue)
.udfName(udfBasedDigestGenStrategy.digestUdfName())
.addAllFieldNames(filteredStagingFieldNames)
.addAllValues(filteredStagingFieldValues)
.addAllFieldTypes(filteredStagingFieldTypes)
.putAllTypeConversionUdfNames(udfBasedDigestGenStrategy.typeConversionUdfNames())
.build();

String digestField = udfBasedDigestGenStrategy.digestField();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@

package org.finos.legend.engine.persistence.components.ingestmode.digest;

import org.finos.legend.engine.persistence.components.logicalplan.datasets.DataType;
import org.immutables.value.Value;

import java.util.Map;
import java.util.Set;

@Value.Immutable
Expand All @@ -34,6 +36,8 @@ public interface UDFBasedDigestGenStrategyAbstract extends DigestGenStrategy

Set<String> fieldsToExcludeFromDigest();

Map<DataType, String> typeConversionUdfNames();

@Override
default <T> T accept(DigestGenStrategyVisitor<T> visitor)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,10 @@ default SchemaReference schemaReference()
.collect(Collectors.toList()))
.build();
}

@Value.Derived
default SchemaDefinition schema()
{
return stagedFilesDataset().schema();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@

package org.finos.legend.engine.persistence.components.logicalplan.values;

import org.finos.legend.engine.persistence.components.logicalplan.datasets.DataType;

import java.util.List;
import java.util.Map;

@org.immutables.value.Value.Immutable
@org.immutables.value.Value.Style(
Expand All @@ -32,4 +35,8 @@ public interface DigestUdfAbstract extends Value
List<String> fieldNames();

List<Value> values();

List<DataType> fieldTypes();

Map<DataType, String> typeConversionUdfNames();
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public enum FunctionName
GENERATE_ARRAY,
PARSE_DATETIME,
OBJECT_CONSTRUCT,
ARRAY_CONSTRUCT,
TO_VARIANT,
TO_JSON,
CONVERT,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2023 Goldman Sachs
// Copyright 2024 Goldman Sachs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -12,9 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package org.finos.legend.engine.persistence.components.relational.h2.logicalplan.values;
package org.finos.legend.engine.persistence.components.logicalplan.values;

import org.finos.legend.engine.persistence.components.logicalplan.values.Value;
import org.immutables.value.Value.Immutable;
import org.immutables.value.Value.Style;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2024 Goldman Sachs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.finos.legend.engine.persistence.components.logicalplan.values;

import java.util.List;

@org.immutables.value.Value.Immutable
@org.immutables.value.Value.Style(
typeAbstract = "*Abstract",
typeImmutable = "*",
jdkOnly = true,
optionalAcceptNullable = true,
strictBuilder = true
)
public interface UdfAbstract extends Value
{

String udfName();

List<Value> parameters();
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package org.finos.legend.engine.persistence.components.planner;

import org.eclipse.collections.api.tuple.Pair;
import org.finos.legend.engine.persistence.components.common.Datasets;
import org.finos.legend.engine.persistence.components.common.Resources;
import org.finos.legend.engine.persistence.components.common.StatisticName;
Expand All @@ -30,6 +31,7 @@
import org.finos.legend.engine.persistence.components.logicalplan.conditions.Condition;
import org.finos.legend.engine.persistence.components.logicalplan.conditions.Exists;
import org.finos.legend.engine.persistence.components.logicalplan.conditions.Not;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.DataType;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Field;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Selection;
Expand Down Expand Up @@ -105,12 +107,13 @@ protected AppendOnly ingestMode()
@Override
public LogicalPlan buildLogicalPlanForIngest(Resources resources)
{
List<Value> dataFields = getDataFields();
Pair<List<Value>, List<DataType>> dataFieldsWithTypes = getDataFieldsWithTypes();
List<Value> dataFields = dataFieldsWithTypes.getOne();
List<Value> fieldsToSelect = new ArrayList<>(dataFields);
List<Value> fieldsToInsert = new ArrayList<>(dataFields);

// Add digest generation (if applicable)
ingestMode().digestGenStrategy().accept(new DigestGenerationHandler(mainDataset(), fieldsToSelect, fieldsToInsert));
ingestMode().digestGenStrategy().accept(new DigestGenerationHandler(mainDataset(), fieldsToSelect, fieldsToInsert, dataFieldsWithTypes.getTwo()));

// Add auditing (if applicable)
if (ingestMode().auditing().accept(AUDIT_ENABLED))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,8 @@ private LogicalPlan buildLogicalPlanForTransformWhileCopy(Resources resources)
List<Value> fieldsToInsert = new ArrayList<>(stagingDataset().schemaReference().fieldValues());

// Add digest
ingestMode().digestGenStrategy().accept(new DigestGenerationHandler(mainDataset(), fieldsToSelect, fieldsToInsert));
List<DataType> fieldTypes = stagingDataset().schema().fields().stream().map(field -> field.type().dataType()).collect(Collectors.toList());
ingestMode().digestGenStrategy().accept(new DigestGenerationHandler(mainDataset(), fieldsToSelect, fieldsToInsert, fieldTypes));

// Add batch_id field
fieldsToInsert.add(FieldValue.builder().datasetRef(mainDataset().datasetReference()).fieldName(ingestMode().batchIdField()).build());
Expand Down Expand Up @@ -303,7 +304,8 @@ private LogicalPlan buildLogicalPlanForCopyAndTransform(Resources resources)
List<Value> fieldsToInsertIntoMain = new ArrayList<>(externalDataset.schemaReference().fieldValues());

// Add digest
ingestMode().digestGenStrategy().accept(new DigestGenerationHandler(mainDataset(), fieldsToSelect, fieldsToInsertIntoMain));
List<DataType> fieldTypes = externalDataset.schema().fields().stream().map(field -> field.type().dataType()).collect(Collectors.toList());
ingestMode().digestGenStrategy().accept(new DigestGenerationHandler(mainDataset(), fieldsToSelect, fieldsToInsertIntoMain, fieldTypes));

// Add batch_id field
fieldsToInsertIntoMain.add(FieldValue.builder().datasetRef(mainDataset().datasetReference()).fieldName(ingestMode().batchIdField()).build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.stream.Collectors;

import org.eclipse.collections.api.tuple.Pair;
import org.eclipse.collections.impl.tuple.Tuples;
import org.finos.legend.engine.persistence.components.common.Datasets;
import org.finos.legend.engine.persistence.components.common.DedupAndVersionErrorSqlType;
import org.finos.legend.engine.persistence.components.common.Resources;
Expand All @@ -34,6 +35,7 @@
import org.finos.legend.engine.persistence.components.logicalplan.LogicalPlanFactory;
import org.finos.legend.engine.persistence.components.logicalplan.conditions.Condition;
import org.finos.legend.engine.persistence.components.logicalplan.conditions.GreaterThan;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.DataType;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.DerivedDataset;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Field;
Expand Down Expand Up @@ -235,6 +237,29 @@ protected Dataset tempStagingDatasetWithoutPks()
return tempStagingDatasetWithoutPks.orElseThrow(IllegalStateException::new);
}

protected Pair<List<Value>, List<DataType>> getDataFieldsWithTypes()
{
List<Value> dataFields = new ArrayList<>();
List<DataType> fieldTypes = new ArrayList<>();

Optional<String> dedupField = ingestMode.deduplicationStrategy().accept(DeduplicationVisitors.EXTRACT_DEDUP_FIELD);
Optional<String> dataSplitField = ingestMode.dataSplitField();

for (int i = 0; i < stagingDataset().schemaReference().fieldValues().size(); i++)
{
FieldValue fieldValue = stagingDataset().schemaReference().fieldValues().get(i);
if ((dedupField.isPresent() && dedupField.get().equalsIgnoreCase(fieldValue.fieldName())) ||
(dataSplitField.isPresent() && dataSplitField.get().equalsIgnoreCase(fieldValue.fieldName())))
{
continue;
}
dataFields.add(fieldValue);
fieldTypes.add(stagingDataset().schema().fields().get(i).type().dataType());
}

return Tuples.pair(dataFields, fieldTypes);
}

protected List<Value> getDataFields()
{
List<Value> dataFields = new ArrayList<>(stagingDataset().schemaReference().fieldValues());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@

package org.finos.legend.engine.persistence.components.relational.ansi;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.finos.legend.engine.persistence.components.common.Datasets;
import org.finos.legend.engine.persistence.components.common.StatisticName;
import org.finos.legend.engine.persistence.components.executor.Executor;
Expand Down Expand Up @@ -82,6 +80,7 @@
import org.finos.legend.engine.persistence.components.logicalplan.values.StringValue;
import org.finos.legend.engine.persistence.components.logicalplan.values.SumBinaryValueOperator;
import org.finos.legend.engine.persistence.components.logicalplan.values.TabularValues;
import org.finos.legend.engine.persistence.components.logicalplan.values.Udf;
import org.finos.legend.engine.persistence.components.logicalplan.values.WindowFunction;
import org.finos.legend.engine.persistence.components.optimizer.Optimizer;
import org.finos.legend.engine.persistence.components.relational.CaseConversion;
Expand Down Expand Up @@ -148,6 +147,7 @@
import org.finos.legend.engine.persistence.components.relational.ansi.sql.visitors.TableModifierVisitor;
import org.finos.legend.engine.persistence.components.relational.ansi.sql.visitors.TabularValuesVisitor;
import org.finos.legend.engine.persistence.components.relational.ansi.sql.visitors.TruncateVisitor;
import org.finos.legend.engine.persistence.components.relational.ansi.sql.visitors.UdfVisitor;
import org.finos.legend.engine.persistence.components.relational.ansi.sql.visitors.WindowFunctionVisitor;
import org.finos.legend.engine.persistence.components.relational.api.DataError;
import org.finos.legend.engine.persistence.components.relational.api.ApiUtils;
Expand Down Expand Up @@ -220,6 +220,7 @@ public class AnsiSqlSink extends RelationalSink

logicalPlanVisitorByClass.put(Update.class, new SQLUpdateVisitor());
logicalPlanVisitorByClass.put(FunctionImpl.class, new FunctionVisitor());
logicalPlanVisitorByClass.put(Udf.class, new UdfVisitor());
logicalPlanVisitorByClass.put(HashFunction.class, new HashFunctionVisitor());
logicalPlanVisitorByClass.put(WindowFunction.class, new WindowFunctionVisitor());
logicalPlanVisitorByClass.put(ParseJsonFunction.class, new ParseJsonFunctionVisitor());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright 2024 Goldman Sachs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.finos.legend.engine.persistence.components.relational.ansi.sql.visitors;

import org.finos.legend.engine.persistence.components.logicalplan.datasets.DataType;
import org.finos.legend.engine.persistence.components.logicalplan.values.DigestUdf;
import org.finos.legend.engine.persistence.components.logicalplan.values.StringValue;
import org.finos.legend.engine.persistence.components.logicalplan.values.ToArrayFunction;
import org.finos.legend.engine.persistence.components.logicalplan.values.Value;
import org.finos.legend.engine.persistence.components.physicalplan.PhysicalPlanNode;
import org.finos.legend.engine.persistence.components.relational.sqldom.schemaops.values.Udf;
import org.finos.legend.engine.persistence.components.transformer.LogicalPlanVisitor;
import org.finos.legend.engine.persistence.components.transformer.VisitorContext;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

public class DigestUdfVisitor implements LogicalPlanVisitor<DigestUdf>
{

@Override
public VisitorResult visit(PhysicalPlanNode prev, DigestUdf current, VisitorContext context)
{
Udf udf = new Udf(context.quoteIdentifier(), current.udfName());
prev.push(udf);
List<Value> columnNameList = new ArrayList<>();
List<Value> columnValueList = new ArrayList<>();
for (int i = 0; i < current.values().size(); i++)
{
columnNameList.add(StringValue.of(current.fieldNames().get(i)));
columnValueList.add(getColumnValueAsStringType(current.values().get(i), current.fieldTypes().get(i), current.typeConversionUdfNames()));
}

ToArrayFunction toArrayColumnNames = ToArrayFunction.builder().addAllValues(columnNameList).build();
ToArrayFunction toArrayColumnValues = ToArrayFunction.builder().addAllValues(columnValueList).build();

return new VisitorResult(udf, Arrays.asList(toArrayColumnNames, toArrayColumnValues));
}

protected Value getColumnValueAsStringType(Value value, DataType dataType, Map<DataType, String> typeConversionUdfNames)
{
throw new IllegalStateException("UDF is unsupported in ANSI sink");
}
}
Loading

0 comments on commit 25bdd39

Please sign in to comment.