Skip to content

Commit

Permalink
Persistence Component: Add support for digest generation for DuckDB (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
kumuwu authored Dec 1, 2024
1 parent 8fda381 commit 539c610
Show file tree
Hide file tree
Showing 28 changed files with 947 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ public DigestGenStrategy visitUDFBasedDigestGenStrategy(UDFBasedDigestGenStrateg
return UDFBasedDigestGenStrategy.builder()
.digestUdfName(udfBasedDigestGenStrategy.digestUdfName())
.putAllTypeConversionUdfNames(udfBasedDigestGenStrategy.typeConversionUdfNames())
.columnNameValueConcatUdfName(udfBasedDigestGenStrategy.columnNameValueConcatUdfName())
.digestField(applyCase(udfBasedDigestGenStrategy.digestField()))
.addAllFieldsToExcludeFromDigest(udfBasedDigestGenStrategy.fieldsToExcludeFromDigest().stream().map(name -> applyCase(name)).collect(Collectors.toSet()))
.build();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2024 Goldman Sachs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.finos.legend.engine.persistence.components.logicalplan.values;

import org.immutables.value.Value.Immutable;
import org.immutables.value.Value.Style;

import java.util.List;

@Immutable
@Style(
typeAbstract = "*Abstract",
typeImmutable = "*",
jdkOnly = true,
optionalAcceptNullable = true,
strictBuilder = true
)
public interface ConcatFunctionAbstract extends Value
{
List<Value> values();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.finos.legend.engine.persistence.components.logicalplan.datasets.DataType;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.FieldType;
import org.finos.legend.engine.persistence.components.logicalplan.values.*;
import org.finos.legend.engine.persistence.components.logicalplan.values.ConcatFunction;
import org.finos.legend.engine.persistence.components.physicalplan.PhysicalPlanNode;
import org.finos.legend.engine.persistence.components.relational.sqldom.schemaops.values.Udf;
import org.finos.legend.engine.persistence.components.transformer.LogicalPlanVisitor;
Expand Down Expand Up @@ -57,7 +58,7 @@ public VisitorResult visit(PhysicalPlanNode prev, DigestUdf current, VisitorCont

protected Value mergeColumnsFunction(List<Value> columns)
{
FunctionImpl concatFunction = FunctionImpl.builder().functionName(FunctionName.CONCAT).addAllValue(columns).build();
ConcatFunction concatFunction = ConcatFunction.builder().addAllValues(columns).build();
return concatFunction;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,10 @@ public boolean doesTableExist(Dataset dataset)
String name = dataset.datasetReference().name().orElseThrow(IllegalStateException::new);
String database = dataset.datasetReference().database().orElse(null);
String schema = dataset.datasetReference().group().orElse(null);
ResultSet result = this.connection.getMetaData().getTables(database, schema, name, new String[] {Clause.TABLE.get()});
return result.isBeforeFirst(); // This method returns true if ResultSet is not empty
try (ResultSet result = this.connection.getMetaData().getTables(database, schema, name, new String[] {Clause.TABLE.get()}))
{
return result.isBeforeFirst(); // This method returns true if ResultSet is not empty
}
}
catch (SQLException e)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright 2024 Goldman Sachs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.finos.legend.engine.persistence.components.relational.duckdb;

import org.finos.legend.engine.persistence.components.relational.jdbc.JdbcHelper;

public class DuckDBDigestUtil
{
private static final String CUSTOM_MD5_UDF = "MD5(SUBSTRING(HEX(IFNULL(NULLIF(A, FROM_HEX('')), FROM_HEX('DADB2C00'))), 9))";
private static final String CUSTOM_COLUMN_UDF = "IFNULL((FROM_HEX('DADB2C00') || ENCODE(COLUMN_NAME) || FROM_HEX('DADB2C00') || ENCODE(COLUMN_VALUE)), FROM_HEX(''))";
private static final String MD5_UDF = "MD5(A)";

public static void registerMD5Udf(JdbcHelper sink, String UdfName)
{
sink.executeStatement("CREATE OR REPLACE FUNCTION " + UdfName + "(A) AS " + CUSTOM_MD5_UDF);
}

public static void registerColumnUdf(JdbcHelper sink, String UdfName)
{
sink.executeStatement("CREATE OR REPLACE FUNCTION " + UdfName + "(COLUMN_NAME, COLUMN_VALUE) AS " + CUSTOM_COLUMN_UDF);
}

public static void registerBasicMD5Udf(JdbcHelper sink, String UdfName)
{
sink.executeStatement("CREATE OR REPLACE FUNCTION " + UdfName + "(A) AS " + MD5_UDF);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
import org.finos.legend.engine.persistence.components.logicalplan.datasets.StagedFilesSelection;
import org.finos.legend.engine.persistence.components.logicalplan.operations.Copy;
import org.finos.legend.engine.persistence.components.logicalplan.operations.Update;
import org.finos.legend.engine.persistence.components.logicalplan.values.CastFunction;
import org.finos.legend.engine.persistence.components.logicalplan.values.ConcatFunction;
import org.finos.legend.engine.persistence.components.logicalplan.values.DigestUdf;
import org.finos.legend.engine.persistence.components.logicalplan.values.ParseJsonFunction;
import org.finos.legend.engine.persistence.components.logicalplan.values.StagedFilesFieldValue;
import org.finos.legend.engine.persistence.components.relational.RelationalSink;
Expand All @@ -47,7 +50,10 @@
import org.finos.legend.engine.persistence.components.relational.duckdb.jdbc.DuckDBJdbcHelper;
import org.finos.legend.engine.persistence.components.relational.duckdb.sql.DuckDBDataTypeMapping;
import org.finos.legend.engine.persistence.components.relational.duckdb.sql.DuckDBJdbcPropertiesToLogicalDataTypeMapping;
import org.finos.legend.engine.persistence.components.relational.duckdb.sql.visitor.CastFunctionVisitor;
import org.finos.legend.engine.persistence.components.relational.duckdb.sql.visitor.ConcatFunctionVisitor;
import org.finos.legend.engine.persistence.components.relational.duckdb.sql.visitor.CopyVisitor;
import org.finos.legend.engine.persistence.components.relational.duckdb.sql.visitor.DigestUdfVisitor;
import org.finos.legend.engine.persistence.components.relational.duckdb.sql.visitor.ParseJsonFunctionVisitor;
import org.finos.legend.engine.persistence.components.relational.duckdb.sql.visitor.SQLUpdateVisitor;
import org.finos.legend.engine.persistence.components.relational.duckdb.sql.visitor.StagedFilesDatasetReferenceVisitor;
Expand Down Expand Up @@ -113,6 +119,9 @@ public class DuckDBSink extends AnsiSqlSink
logicalPlanVisitorByClass.put(StagedFilesDataset.class, new StagedFilesDatasetVisitor());
logicalPlanVisitorByClass.put(StagedFilesFieldValue.class, new StagedFilesFieldValueVisitor());
logicalPlanVisitorByClass.put(StagedFilesSelection.class, new StagedFilesSelectionVisitor());
logicalPlanVisitorByClass.put(ConcatFunction.class, new ConcatFunctionVisitor());
logicalPlanVisitorByClass.put(CastFunction.class, new CastFunctionVisitor());
logicalPlanVisitorByClass.put(DigestUdf.class, new DigestUdfVisitor());
LOGICAL_PLAN_VISITOR_BY_CLASS = Collections.unmodifiableMap(logicalPlanVisitorByClass);

// TODO: These two mappings have not been confirmed, to do with schema evolution
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,10 @@ public boolean doesTableExist(Dataset dataset)
String name = dataset.datasetReference().name().orElseThrow(IllegalStateException::new);
String database = dataset.datasetReference().database().orElse(null);
String schema = dataset.datasetReference().group().orElse(null);
ResultSet result = this.connection.getMetaData().getTables(database, schema, name, null);
return result.next(); // This method returns true if ResultSet is not empty
try (ResultSet result = this.connection.getMetaData().getTables(database, schema, name, null))
{
return result.next(); // This method returns true if ResultSet is not empty
}
}
catch (SQLException e)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright 2024 Goldman Sachs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.finos.legend.engine.persistence.components.relational.duckdb.sql.visitor;

import org.finos.legend.engine.persistence.components.logicalplan.LogicalPlanNode;
import org.finos.legend.engine.persistence.components.logicalplan.values.CastFunction;
import org.finos.legend.engine.persistence.components.optimizer.Optimizer;
import org.finos.legend.engine.persistence.components.physicalplan.PhysicalPlanNode;
import org.finos.legend.engine.persistence.components.relational.duckdb.sql.DuckDBDataTypeMapping;
import org.finos.legend.engine.persistence.components.relational.sqldom.schema.DataType;
import org.finos.legend.engine.persistence.components.transformer.LogicalPlanVisitor;
import org.finos.legend.engine.persistence.components.transformer.VisitorContext;

import java.util.ArrayList;
import java.util.List;

public class CastFunctionVisitor implements LogicalPlanVisitor<CastFunction>
{
@Override
public VisitorResult visit(PhysicalPlanNode prev, CastFunction current, VisitorContext context)
{
DataType dataType = new DuckDBDataTypeMapping().getDataType(current.type());

org.finos.legend.engine.persistence.components.relational.sqldom.schemaops.values.CastFunction castFunction =
new org.finos.legend.engine.persistence.components.relational.sqldom.schemaops.values.CastFunction(dataType, context.quoteIdentifier());
for (Optimizer optimizer : context.optimizers())
{
castFunction = (org.finos.legend.engine.persistence.components.relational.sqldom.schemaops.values.CastFunction) optimizer.optimize(castFunction);
}
prev.push(castFunction);

List<LogicalPlanNode> logicalPlanNodeList = new ArrayList<>();
logicalPlanNodeList.add(current.field());

return new VisitorResult(castFunction, logicalPlanNodeList);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright 2024 Goldman Sachs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.finos.legend.engine.persistence.components.relational.duckdb.sql.visitor;

import org.finos.legend.engine.persistence.components.logicalplan.values.ConcatFunction;
import org.finos.legend.engine.persistence.components.physicalplan.PhysicalPlanNode;
import org.finos.legend.engine.persistence.components.transformer.LogicalPlanVisitor;
import org.finos.legend.engine.persistence.components.transformer.VisitorContext;

public class ConcatFunctionVisitor implements LogicalPlanVisitor<ConcatFunction>
{
@Override
public VisitorResult visit(PhysicalPlanNode prev, ConcatFunction current, VisitorContext context)
{
org.finos.legend.engine.persistence.components.relational.duckdb.sqldom.schemaops.values.ConcatFunction concatFunction = new org.finos.legend.engine.persistence.components.relational.duckdb.sqldom.schemaops.values.ConcatFunction(context.quoteIdentifier());
prev.push(concatFunction);

return new VisitorResult(concatFunction, current.values());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright 2024 Goldman Sachs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.finos.legend.engine.persistence.components.relational.duckdb.sql.visitor;

import org.finos.legend.engine.persistence.components.logicalplan.datasets.DataType;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.FieldType;
import org.finos.legend.engine.persistence.components.logicalplan.values.CastFunction;
import org.finos.legend.engine.persistence.components.logicalplan.values.StagedFilesFieldValue;
import org.finos.legend.engine.persistence.components.logicalplan.values.Value;

import java.util.Map;

public class DigestUdfVisitor extends org.finos.legend.engine.persistence.components.relational.ansi.sql.visitors.DigestUdfVisitor
{
protected Value getColumnValueAsStringType(Value value, FieldType dataType, Map<DataType, String> typeConversionUdfNames)
{
if (value instanceof StagedFilesFieldValue)
{
if (typeConversionUdfNames.containsKey(dataType.dataType()))
{
// TO_STRING(CAST(field AS original_type))
return org.finos.legend.engine.persistence.components.logicalplan.values.Udf.builder()
.udfName(typeConversionUdfNames.get(dataType.dataType()))
.addParameters(CastFunction.builder().field(value).type(dataType).build()).build();
}
else
{
// CAST(CAST(field AS original_type) AS VARCHAR)
return CastFunction.builder()
.field(CastFunction.builder().field(value).type(dataType).build())
.type(FieldType.builder().dataType(DataType.VARCHAR).build()).build();
}
}
else
{
if (typeConversionUdfNames.containsKey(dataType.dataType()))
{
// TO_STRING(field)
return org.finos.legend.engine.persistence.components.logicalplan.values.Udf.builder()
.udfName(typeConversionUdfNames.get(dataType.dataType()))
.addParameters(value).build();
}
else
{
// CAST(field AS VARCHAR)
return CastFunction.builder()
.field(value)
.type(FieldType.builder().dataType(DataType.VARCHAR).build()).build();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright 2024 Goldman Sachs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.finos.legend.engine.persistence.components.relational.duckdb.sqldom.schemaops.values;

import org.finos.legend.engine.persistence.components.relational.sqldom.SqlDomException;
import org.finos.legend.engine.persistence.components.relational.sqldom.schemaops.values.Value;

import java.util.ArrayList;
import java.util.List;

public class ConcatFunction extends Value
{
private List<Value> values;

public ConcatFunction(String quoteIdentifier)
{
super(quoteIdentifier);
this.values = new ArrayList<>();
}

@Override
public void genSql(StringBuilder builder) throws SqlDomException
{
genSqlWithoutAlias(builder);
super.genSql(builder);
}

@Override
public void genSqlWithoutAlias(StringBuilder builder) throws SqlDomException
{
for (int ctr = 0; ctr < values.size(); ctr++)
{
values.get(ctr).genSqlWithoutAlias(builder);
if (ctr < (values.size() - 1))
{
builder.append("||");
}
}
}

@Override
public void push(Object node)
{
if (node instanceof Value)
{
values.add((Value) node);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -509,13 +509,13 @@ protected void loadStagingDataWithVersionInUpperCase(String path)
duckDBSink.executeStatement(loadSql);
}

protected void loadStagingDataWithVersionWithoutDigest(String path) throws Exception
protected void loadStagingDataWithVersionWithoutDigest(String path)
{
validateFileExists(path);
String loadSql = "TRUNCATE TABLE \"TEST\".\"staging\";" +
"INSERT INTO \"TEST\".\"staging\"(id, name, income, start_time ,expiry_date, version) " +
"SELECT CONVERT( \"id\",INT ), \"name\", CONVERT( \"income\", BIGINT), CONVERT( \"start_time\", DATETIME), CONVERT( \"expiry_date\", DATE), CONVERT( \"version\",INT)" +
" FROM CSVREAD( '" + path + "', 'id, name, income, start_time, expiry_date, version', NULL )";
"COPY \"TEST\".\"staging\"" +
"(\"id\", \"name\", \"income\", \"start_time\", \"expiry_date\", \"version\")" +
" FROM '" + path + "' CSV";
duckDBSink.executeStatement(loadSql);
}

Expand Down
Loading

0 comments on commit 539c610

Please sign in to comment.