From 539c61046e1ca90b46c533a39cc3ad86b4b9636e Mon Sep 17 00:00:00 2001 From: Zhang Lizhi Date: Sun, 1 Dec 2024 19:42:48 +0800 Subject: [PATCH] Persistence Component: Add support for digest generation for DuckDB (#3275) --- .../ingestmode/IngestModeCaseConverter.java | 1 + .../values/ConcatFunctionAbstract.java | 33 ++ .../ansi/sql/visitors/DigestUdfVisitor.java | 3 +- .../relational/jdbc/JdbcHelper.java | 6 +- .../relational/duckdb/DuckDBDigestUtil.java | 39 +++ .../relational/duckdb/DuckDBSink.java | 9 + .../duckdb/jdbc/DuckDBJdbcHelper.java | 6 +- .../sql/visitor/CastFunctionVisitor.java | 49 +++ .../sql/visitor/ConcatFunctionVisitor.java | 32 ++ .../duckdb/sql/visitor/DigestUdfVisitor.java | 64 ++++ .../schemaops/values/ConcatFunction.java | 61 ++++ .../persistence/components/e2e/BaseTest.java | 8 +- .../components/e2e/bulkload/BulkLoadTest.java | 249 +++++++++++++++ .../e2e/nontemporal/AppendOnlyTest.java | 292 ++++++++++++++++++ .../bulk-load/expected/expected_table3.csv | 3 + .../expected_table_json_with_digest_gen.csv | 7 + .../data/bulk-load/input/staged_file3.csv | 3 + .../data/bulk-load/input/staged_file3.json | 20 ++ .../digest_generation/expected_pass1.csv | 3 + .../digest_generation/expected_pass2.csv | 6 + .../expected_pass1.csv | 4 + .../expected_pass2.csv | 7 + .../input/digest_generation/data_pass1.csv | 3 + .../input/digest_generation/data_pass2.csv | 3 + .../data_pass1.csv | 4 + .../data_pass2.csv | 4 + .../relational/snowflake/SnowflakeSink.java | 7 +- .../sql/visitor/ConcatFunctionVisitor.java | 34 ++ 28 files changed, 947 insertions(+), 13 deletions(-) create mode 100644 legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/logicalplan/values/ConcatFunctionAbstract.java create mode 100644 legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/main/java/org/finos/legend/engine/persistence/components/relational/duckdb/DuckDBDigestUtil.java create mode 100644 legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/main/java/org/finos/legend/engine/persistence/components/relational/duckdb/sql/visitor/CastFunctionVisitor.java create mode 100644 legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/main/java/org/finos/legend/engine/persistence/components/relational/duckdb/sql/visitor/ConcatFunctionVisitor.java create mode 100644 legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/main/java/org/finos/legend/engine/persistence/components/relational/duckdb/sql/visitor/DigestUdfVisitor.java create mode 100644 legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/main/java/org/finos/legend/engine/persistence/components/relational/duckdb/sqldom/schemaops/values/ConcatFunction.java create mode 100644 legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/test/resources/data/bulk-load/expected/expected_table3.csv create mode 100644 legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/test/resources/data/bulk-load/expected/expected_table_json_with_digest_gen.csv create mode 100644 legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/test/resources/data/bulk-load/input/staged_file3.csv create mode 100644 legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/test/resources/data/bulk-load/input/staged_file3.json create mode 100644 legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/test/resources/data/incremental-append-milestoning/expected/digest_generation/expected_pass1.csv create mode 100644 legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/test/resources/data/incremental-append-milestoning/expected/digest_generation/expected_pass2.csv create mode 100644 legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/test/resources/data/incremental-append-milestoning/expected/digest_generation_with_fields_to_exclude/expected_pass1.csv create mode 100644 legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/test/resources/data/incremental-append-milestoning/expected/digest_generation_with_fields_to_exclude/expected_pass2.csv create mode 100644 legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/test/resources/data/incremental-append-milestoning/input/digest_generation/data_pass1.csv create mode 100644 legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/test/resources/data/incremental-append-milestoning/input/digest_generation/data_pass2.csv create mode 100644 legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/test/resources/data/incremental-append-milestoning/input/digest_generation_with_fields_to_exclude/data_pass1.csv create mode 100644 legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/test/resources/data/incremental-append-milestoning/input/digest_generation_with_fields_to_exclude/data_pass2.csv create mode 100644 legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-snowflake/src/main/java/org/finos/legend/engine/persistence/components/relational/snowflake/sql/visitor/ConcatFunctionVisitor.java diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/IngestModeCaseConverter.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/IngestModeCaseConverter.java index 33773778235..f084905eebc 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/IngestModeCaseConverter.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/IngestModeCaseConverter.java @@ -267,6 +267,7 @@ public DigestGenStrategy visitUDFBasedDigestGenStrategy(UDFBasedDigestGenStrateg return UDFBasedDigestGenStrategy.builder() .digestUdfName(udfBasedDigestGenStrategy.digestUdfName()) .putAllTypeConversionUdfNames(udfBasedDigestGenStrategy.typeConversionUdfNames()) + .columnNameValueConcatUdfName(udfBasedDigestGenStrategy.columnNameValueConcatUdfName()) .digestField(applyCase(udfBasedDigestGenStrategy.digestField())) .addAllFieldsToExcludeFromDigest(udfBasedDigestGenStrategy.fieldsToExcludeFromDigest().stream().map(name -> applyCase(name)).collect(Collectors.toSet())) .build(); diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/logicalplan/values/ConcatFunctionAbstract.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/logicalplan/values/ConcatFunctionAbstract.java new file mode 100644 index 00000000000..e8271230b4c --- /dev/null +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/logicalplan/values/ConcatFunctionAbstract.java @@ -0,0 +1,33 @@ +// Copyright 2024 Goldman Sachs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.finos.legend.engine.persistence.components.logicalplan.values; + +import org.immutables.value.Value.Immutable; +import org.immutables.value.Value.Style; + +import java.util.List; + +@Immutable +@Style( + typeAbstract = "*Abstract", + typeImmutable = "*", + jdkOnly = true, + optionalAcceptNullable = true, + strictBuilder = true +) +public interface ConcatFunctionAbstract extends Value +{ + List values(); +} diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/main/java/org/finos/legend/engine/persistence/components/relational/ansi/sql/visitors/DigestUdfVisitor.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/main/java/org/finos/legend/engine/persistence/components/relational/ansi/sql/visitors/DigestUdfVisitor.java index b6b6a2f731c..b2d365ea53d 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/main/java/org/finos/legend/engine/persistence/components/relational/ansi/sql/visitors/DigestUdfVisitor.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/main/java/org/finos/legend/engine/persistence/components/relational/ansi/sql/visitors/DigestUdfVisitor.java @@ -17,6 +17,7 @@ import org.finos.legend.engine.persistence.components.logicalplan.datasets.DataType; import org.finos.legend.engine.persistence.components.logicalplan.datasets.FieldType; import org.finos.legend.engine.persistence.components.logicalplan.values.*; +import org.finos.legend.engine.persistence.components.logicalplan.values.ConcatFunction; import org.finos.legend.engine.persistence.components.physicalplan.PhysicalPlanNode; import org.finos.legend.engine.persistence.components.relational.sqldom.schemaops.values.Udf; import org.finos.legend.engine.persistence.components.transformer.LogicalPlanVisitor; @@ -57,7 +58,7 @@ public VisitorResult visit(PhysicalPlanNode prev, DigestUdf current, VisitorCont protected Value mergeColumnsFunction(List columns) { - FunctionImpl concatFunction = FunctionImpl.builder().functionName(FunctionName.CONCAT).addAllValue(columns).build(); + ConcatFunction concatFunction = ConcatFunction.builder().addAllValues(columns).build(); return concatFunction; } diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/jdbc/JdbcHelper.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/jdbc/JdbcHelper.java index a9a597fef34..f749cd8003b 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/jdbc/JdbcHelper.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/jdbc/JdbcHelper.java @@ -163,8 +163,10 @@ public boolean doesTableExist(Dataset dataset) String name = dataset.datasetReference().name().orElseThrow(IllegalStateException::new); String database = dataset.datasetReference().database().orElse(null); String schema = dataset.datasetReference().group().orElse(null); - ResultSet result = this.connection.getMetaData().getTables(database, schema, name, new String[] {Clause.TABLE.get()}); - return result.isBeforeFirst(); // This method returns true if ResultSet is not empty + try (ResultSet result = this.connection.getMetaData().getTables(database, schema, name, new String[] {Clause.TABLE.get()})) + { + return result.isBeforeFirst(); // This method returns true if ResultSet is not empty + } } catch (SQLException e) { diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/main/java/org/finos/legend/engine/persistence/components/relational/duckdb/DuckDBDigestUtil.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/main/java/org/finos/legend/engine/persistence/components/relational/duckdb/DuckDBDigestUtil.java new file mode 100644 index 00000000000..0d60fc1d862 --- /dev/null +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/main/java/org/finos/legend/engine/persistence/components/relational/duckdb/DuckDBDigestUtil.java @@ -0,0 +1,39 @@ +// Copyright 2024 Goldman Sachs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.finos.legend.engine.persistence.components.relational.duckdb; + +import org.finos.legend.engine.persistence.components.relational.jdbc.JdbcHelper; + +public class DuckDBDigestUtil +{ + private static final String CUSTOM_MD5_UDF = "MD5(SUBSTRING(HEX(IFNULL(NULLIF(A, FROM_HEX('')), FROM_HEX('DADB2C00'))), 9))"; + private static final String CUSTOM_COLUMN_UDF = "IFNULL((FROM_HEX('DADB2C00') || ENCODE(COLUMN_NAME) || FROM_HEX('DADB2C00') || ENCODE(COLUMN_VALUE)), FROM_HEX(''))"; + private static final String MD5_UDF = "MD5(A)"; + + public static void registerMD5Udf(JdbcHelper sink, String UdfName) + { + sink.executeStatement("CREATE OR REPLACE FUNCTION " + UdfName + "(A) AS " + CUSTOM_MD5_UDF); + } + + public static void registerColumnUdf(JdbcHelper sink, String UdfName) + { + sink.executeStatement("CREATE OR REPLACE FUNCTION " + UdfName + "(COLUMN_NAME, COLUMN_VALUE) AS " + CUSTOM_COLUMN_UDF); + } + + public static void registerBasicMD5Udf(JdbcHelper sink, String UdfName) + { + sink.executeStatement("CREATE OR REPLACE FUNCTION " + UdfName + "(A) AS " + MD5_UDF); + } +} diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/main/java/org/finos/legend/engine/persistence/components/relational/duckdb/DuckDBSink.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/main/java/org/finos/legend/engine/persistence/components/relational/duckdb/DuckDBSink.java index 5a9d58c717b..32eb9767c79 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/main/java/org/finos/legend/engine/persistence/components/relational/duckdb/DuckDBSink.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/main/java/org/finos/legend/engine/persistence/components/relational/duckdb/DuckDBSink.java @@ -36,6 +36,9 @@ import org.finos.legend.engine.persistence.components.logicalplan.datasets.StagedFilesSelection; import org.finos.legend.engine.persistence.components.logicalplan.operations.Copy; import org.finos.legend.engine.persistence.components.logicalplan.operations.Update; +import org.finos.legend.engine.persistence.components.logicalplan.values.CastFunction; +import org.finos.legend.engine.persistence.components.logicalplan.values.ConcatFunction; +import org.finos.legend.engine.persistence.components.logicalplan.values.DigestUdf; import org.finos.legend.engine.persistence.components.logicalplan.values.ParseJsonFunction; import org.finos.legend.engine.persistence.components.logicalplan.values.StagedFilesFieldValue; import org.finos.legend.engine.persistence.components.relational.RelationalSink; @@ -47,7 +50,10 @@ import org.finos.legend.engine.persistence.components.relational.duckdb.jdbc.DuckDBJdbcHelper; import org.finos.legend.engine.persistence.components.relational.duckdb.sql.DuckDBDataTypeMapping; import org.finos.legend.engine.persistence.components.relational.duckdb.sql.DuckDBJdbcPropertiesToLogicalDataTypeMapping; +import org.finos.legend.engine.persistence.components.relational.duckdb.sql.visitor.CastFunctionVisitor; +import org.finos.legend.engine.persistence.components.relational.duckdb.sql.visitor.ConcatFunctionVisitor; import org.finos.legend.engine.persistence.components.relational.duckdb.sql.visitor.CopyVisitor; +import org.finos.legend.engine.persistence.components.relational.duckdb.sql.visitor.DigestUdfVisitor; import org.finos.legend.engine.persistence.components.relational.duckdb.sql.visitor.ParseJsonFunctionVisitor; import org.finos.legend.engine.persistence.components.relational.duckdb.sql.visitor.SQLUpdateVisitor; import org.finos.legend.engine.persistence.components.relational.duckdb.sql.visitor.StagedFilesDatasetReferenceVisitor; @@ -113,6 +119,9 @@ public class DuckDBSink extends AnsiSqlSink logicalPlanVisitorByClass.put(StagedFilesDataset.class, new StagedFilesDatasetVisitor()); logicalPlanVisitorByClass.put(StagedFilesFieldValue.class, new StagedFilesFieldValueVisitor()); logicalPlanVisitorByClass.put(StagedFilesSelection.class, new StagedFilesSelectionVisitor()); + logicalPlanVisitorByClass.put(ConcatFunction.class, new ConcatFunctionVisitor()); + logicalPlanVisitorByClass.put(CastFunction.class, new CastFunctionVisitor()); + logicalPlanVisitorByClass.put(DigestUdf.class, new DigestUdfVisitor()); LOGICAL_PLAN_VISITOR_BY_CLASS = Collections.unmodifiableMap(logicalPlanVisitorByClass); // TODO: These two mappings have not been confirmed, to do with schema evolution diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/main/java/org/finos/legend/engine/persistence/components/relational/duckdb/jdbc/DuckDBJdbcHelper.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/main/java/org/finos/legend/engine/persistence/components/relational/duckdb/jdbc/DuckDBJdbcHelper.java index 43541bedf3b..cdfd96782f4 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/main/java/org/finos/legend/engine/persistence/components/relational/duckdb/jdbc/DuckDBJdbcHelper.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/main/java/org/finos/legend/engine/persistence/components/relational/duckdb/jdbc/DuckDBJdbcHelper.java @@ -58,8 +58,10 @@ public boolean doesTableExist(Dataset dataset) String name = dataset.datasetReference().name().orElseThrow(IllegalStateException::new); String database = dataset.datasetReference().database().orElse(null); String schema = dataset.datasetReference().group().orElse(null); - ResultSet result = this.connection.getMetaData().getTables(database, schema, name, null); - return result.next(); // This method returns true if ResultSet is not empty + try (ResultSet result = this.connection.getMetaData().getTables(database, schema, name, null)) + { + return result.next(); // This method returns true if ResultSet is not empty + } } catch (SQLException e) { diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/main/java/org/finos/legend/engine/persistence/components/relational/duckdb/sql/visitor/CastFunctionVisitor.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/main/java/org/finos/legend/engine/persistence/components/relational/duckdb/sql/visitor/CastFunctionVisitor.java new file mode 100644 index 00000000000..3f873c52fc7 --- /dev/null +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/main/java/org/finos/legend/engine/persistence/components/relational/duckdb/sql/visitor/CastFunctionVisitor.java @@ -0,0 +1,49 @@ +// Copyright 2024 Goldman Sachs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.finos.legend.engine.persistence.components.relational.duckdb.sql.visitor; + +import org.finos.legend.engine.persistence.components.logicalplan.LogicalPlanNode; +import org.finos.legend.engine.persistence.components.logicalplan.values.CastFunction; +import org.finos.legend.engine.persistence.components.optimizer.Optimizer; +import org.finos.legend.engine.persistence.components.physicalplan.PhysicalPlanNode; +import org.finos.legend.engine.persistence.components.relational.duckdb.sql.DuckDBDataTypeMapping; +import org.finos.legend.engine.persistence.components.relational.sqldom.schema.DataType; +import org.finos.legend.engine.persistence.components.transformer.LogicalPlanVisitor; +import org.finos.legend.engine.persistence.components.transformer.VisitorContext; + +import java.util.ArrayList; +import java.util.List; + +public class CastFunctionVisitor implements LogicalPlanVisitor +{ + @Override + public VisitorResult visit(PhysicalPlanNode prev, CastFunction current, VisitorContext context) + { + DataType dataType = new DuckDBDataTypeMapping().getDataType(current.type()); + + org.finos.legend.engine.persistence.components.relational.sqldom.schemaops.values.CastFunction castFunction = + new org.finos.legend.engine.persistence.components.relational.sqldom.schemaops.values.CastFunction(dataType, context.quoteIdentifier()); + for (Optimizer optimizer : context.optimizers()) + { + castFunction = (org.finos.legend.engine.persistence.components.relational.sqldom.schemaops.values.CastFunction) optimizer.optimize(castFunction); + } + prev.push(castFunction); + + List logicalPlanNodeList = new ArrayList<>(); + logicalPlanNodeList.add(current.field()); + + return new VisitorResult(castFunction, logicalPlanNodeList); + } +} diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/main/java/org/finos/legend/engine/persistence/components/relational/duckdb/sql/visitor/ConcatFunctionVisitor.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/main/java/org/finos/legend/engine/persistence/components/relational/duckdb/sql/visitor/ConcatFunctionVisitor.java new file mode 100644 index 00000000000..706bd9fe809 --- /dev/null +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/main/java/org/finos/legend/engine/persistence/components/relational/duckdb/sql/visitor/ConcatFunctionVisitor.java @@ -0,0 +1,32 @@ +// Copyright 2024 Goldman Sachs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.finos.legend.engine.persistence.components.relational.duckdb.sql.visitor; + +import org.finos.legend.engine.persistence.components.logicalplan.values.ConcatFunction; +import org.finos.legend.engine.persistence.components.physicalplan.PhysicalPlanNode; +import org.finos.legend.engine.persistence.components.transformer.LogicalPlanVisitor; +import org.finos.legend.engine.persistence.components.transformer.VisitorContext; + +public class ConcatFunctionVisitor implements LogicalPlanVisitor +{ + @Override + public VisitorResult visit(PhysicalPlanNode prev, ConcatFunction current, VisitorContext context) + { + org.finos.legend.engine.persistence.components.relational.duckdb.sqldom.schemaops.values.ConcatFunction concatFunction = new org.finos.legend.engine.persistence.components.relational.duckdb.sqldom.schemaops.values.ConcatFunction(context.quoteIdentifier()); + prev.push(concatFunction); + + return new VisitorResult(concatFunction, current.values()); + } +} diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/main/java/org/finos/legend/engine/persistence/components/relational/duckdb/sql/visitor/DigestUdfVisitor.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/main/java/org/finos/legend/engine/persistence/components/relational/duckdb/sql/visitor/DigestUdfVisitor.java new file mode 100644 index 00000000000..a3aa82853ca --- /dev/null +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/main/java/org/finos/legend/engine/persistence/components/relational/duckdb/sql/visitor/DigestUdfVisitor.java @@ -0,0 +1,64 @@ +// Copyright 2024 Goldman Sachs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.finos.legend.engine.persistence.components.relational.duckdb.sql.visitor; + +import org.finos.legend.engine.persistence.components.logicalplan.datasets.DataType; +import org.finos.legend.engine.persistence.components.logicalplan.datasets.FieldType; +import org.finos.legend.engine.persistence.components.logicalplan.values.CastFunction; +import org.finos.legend.engine.persistence.components.logicalplan.values.StagedFilesFieldValue; +import org.finos.legend.engine.persistence.components.logicalplan.values.Value; + +import java.util.Map; + +public class DigestUdfVisitor extends org.finos.legend.engine.persistence.components.relational.ansi.sql.visitors.DigestUdfVisitor +{ + protected Value getColumnValueAsStringType(Value value, FieldType dataType, Map typeConversionUdfNames) + { + if (value instanceof StagedFilesFieldValue) + { + if (typeConversionUdfNames.containsKey(dataType.dataType())) + { + // TO_STRING(CAST(field AS original_type)) + return org.finos.legend.engine.persistence.components.logicalplan.values.Udf.builder() + .udfName(typeConversionUdfNames.get(dataType.dataType())) + .addParameters(CastFunction.builder().field(value).type(dataType).build()).build(); + } + else + { + // CAST(CAST(field AS original_type) AS VARCHAR) + return CastFunction.builder() + .field(CastFunction.builder().field(value).type(dataType).build()) + .type(FieldType.builder().dataType(DataType.VARCHAR).build()).build(); + } + } + else + { + if (typeConversionUdfNames.containsKey(dataType.dataType())) + { + // TO_STRING(field) + return org.finos.legend.engine.persistence.components.logicalplan.values.Udf.builder() + .udfName(typeConversionUdfNames.get(dataType.dataType())) + .addParameters(value).build(); + } + else + { + // CAST(field AS VARCHAR) + return CastFunction.builder() + .field(value) + .type(FieldType.builder().dataType(DataType.VARCHAR).build()).build(); + } + } + } +} diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/main/java/org/finos/legend/engine/persistence/components/relational/duckdb/sqldom/schemaops/values/ConcatFunction.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/main/java/org/finos/legend/engine/persistence/components/relational/duckdb/sqldom/schemaops/values/ConcatFunction.java new file mode 100644 index 00000000000..02a1a31f1e8 --- /dev/null +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/main/java/org/finos/legend/engine/persistence/components/relational/duckdb/sqldom/schemaops/values/ConcatFunction.java @@ -0,0 +1,61 @@ +// Copyright 2024 Goldman Sachs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.finos.legend.engine.persistence.components.relational.duckdb.sqldom.schemaops.values; + +import org.finos.legend.engine.persistence.components.relational.sqldom.SqlDomException; +import org.finos.legend.engine.persistence.components.relational.sqldom.schemaops.values.Value; + +import java.util.ArrayList; +import java.util.List; + +public class ConcatFunction extends Value +{ + private List values; + + public ConcatFunction(String quoteIdentifier) + { + super(quoteIdentifier); + this.values = new ArrayList<>(); + } + + @Override + public void genSql(StringBuilder builder) throws SqlDomException + { + genSqlWithoutAlias(builder); + super.genSql(builder); + } + + @Override + public void genSqlWithoutAlias(StringBuilder builder) throws SqlDomException + { + for (int ctr = 0; ctr < values.size(); ctr++) + { + values.get(ctr).genSqlWithoutAlias(builder); + if (ctr < (values.size() - 1)) + { + builder.append("||"); + } + } + } + + @Override + public void push(Object node) + { + if (node instanceof Value) + { + values.add((Value) node); + } + } +} diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/test/java/org/finos/legend/engine/persistence/components/e2e/BaseTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/test/java/org/finos/legend/engine/persistence/components/e2e/BaseTest.java index 203f795576e..c3acd51cbed 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/test/java/org/finos/legend/engine/persistence/components/e2e/BaseTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/test/java/org/finos/legend/engine/persistence/components/e2e/BaseTest.java @@ -509,13 +509,13 @@ protected void loadStagingDataWithVersionInUpperCase(String path) duckDBSink.executeStatement(loadSql); } - protected void loadStagingDataWithVersionWithoutDigest(String path) throws Exception + protected void loadStagingDataWithVersionWithoutDigest(String path) { validateFileExists(path); String loadSql = "TRUNCATE TABLE \"TEST\".\"staging\";" + - "INSERT INTO \"TEST\".\"staging\"(id, name, income, start_time ,expiry_date, version) " + - "SELECT CONVERT( \"id\",INT ), \"name\", CONVERT( \"income\", BIGINT), CONVERT( \"start_time\", DATETIME), CONVERT( \"expiry_date\", DATE), CONVERT( \"version\",INT)" + - " FROM CSVREAD( '" + path + "', 'id, name, income, start_time, expiry_date, version', NULL )"; + "COPY \"TEST\".\"staging\"" + + "(\"id\", \"name\", \"income\", \"start_time\", \"expiry_date\", \"version\")" + + " FROM '" + path + "' CSV"; duckDBSink.executeStatement(loadSql); } diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/test/java/org/finos/legend/engine/persistence/components/e2e/bulkload/BulkLoadTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/test/java/org/finos/legend/engine/persistence/components/e2e/bulkload/BulkLoadTest.java index 7087b9b588a..6ebf56ba8cd 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/test/java/org/finos/legend/engine/persistence/components/e2e/bulkload/BulkLoadTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/test/java/org/finos/legend/engine/persistence/components/e2e/bulkload/BulkLoadTest.java @@ -25,6 +25,7 @@ import org.finos.legend.engine.persistence.components.ingestmode.audit.DateTimeAuditing; import org.finos.legend.engine.persistence.components.ingestmode.audit.NoAuditing; import org.finos.legend.engine.persistence.components.ingestmode.digest.NoDigestGenStrategy; +import org.finos.legend.engine.persistence.components.ingestmode.digest.UDFBasedDigestGenStrategy; import org.finos.legend.engine.persistence.components.logicalplan.datasets.DataType; import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset; import org.finos.legend.engine.persistence.components.logicalplan.datasets.DatasetDefinition; @@ -37,6 +38,7 @@ import org.finos.legend.engine.persistence.components.relational.api.GeneratorResult; import org.finos.legend.engine.persistence.components.relational.api.RelationalGenerator; import org.finos.legend.engine.persistence.components.relational.api.RelationalIngestor; +import org.finos.legend.engine.persistence.components.relational.duckdb.DuckDBDigestUtil; import org.finos.legend.engine.persistence.components.relational.duckdb.DuckDBSink; import org.finos.legend.engine.persistence.components.relational.duckdb.logicalplan.datasets.DuckDBStagedFilesDatasetProperties; import org.junit.jupiter.api.Assertions; @@ -74,6 +76,7 @@ public class BulkLoadTest extends BaseTest private static final String APPEND_TIME = "append_time"; private static final String DIGEST = "digest"; private static final String DIGEST_UDF = "LAKEHOUSE_MD5"; + private static final String COLUMN_CONCAT_UDF = "COLUMN_CONCAT_UDF"; private static final String BATCH_ID = "batch_id"; private static final String EVENT_ID_1 = "xyz123"; private static final String EVENT_ID_2 = "abc987"; @@ -256,6 +259,170 @@ public void testBulkLoadWithDigestNotGeneratedAuditDisabled() throws Exception verifyBulkLoadMetadata(appendMetadata, Collections.singletonList(filePath), 1, Optional.of(EVENT_ID_1), Optional.of(ADDITIONAL_METADATA)); } + @Test + public void testBulkLoadWithDigestGenerated() throws Exception + { + // Register UDF + DuckDBDigestUtil.registerMD5Udf(duckDBSink, DIGEST_UDF); + DuckDBDigestUtil.registerColumnUdf(duckDBSink, COLUMN_CONCAT_UDF); + + // Register type conversion functions + Map typeConversionUdfs = new HashMap<>(); + typeConversionUdfs.put(DataType.INT, "intToString"); + typeConversionUdfs.put(DataType.DATETIME, "datetimeToString"); + duckDBSink.executeStatement("CREATE FUNCTION intToString(A) AS CAST(A AS VARCHAR)"); + duckDBSink.executeStatement("CREATE FUNCTION datetimeToString(A) AS CAST(A AS VARCHAR)"); + + String filePath = "src/test/resources/data/bulk-load/input/staged_file3.csv"; + + BulkLoad bulkLoad = BulkLoad.builder() + .digestGenStrategy(UDFBasedDigestGenStrategy.builder() + .digestUdfName(DIGEST_UDF) + .columnNameValueConcatUdfName(COLUMN_CONCAT_UDF) + .putAllTypeConversionUdfNames(typeConversionUdfs) + .digestField(DIGEST).build()) + .auditing(DateTimeAuditing.builder().dateTimeField(APPEND_TIME).build()) + .batchIdField(BATCH_ID) + .build(); + + Dataset stagedFilesDataset = StagedFilesDataset.builder() + .stagedFilesDatasetProperties( + DuckDBStagedFilesDatasetProperties.builder() + .fileFormat(FileFormatType.CSV) + .addAllFilePaths(Collections.singletonList(filePath)).build()) + .schema(SchemaDefinition.builder().addAllFields(Arrays.asList(col1, col2, col3, col4)).build()) + .build(); + + Dataset mainDataset = DatasetDefinition.builder() + .database(testDatabaseName).group(testSchemaName).name(mainTableName).alias("my_alias") + .schema(SchemaDefinition.builder().build()) + .build(); + + Datasets datasets = Datasets.of(mainDataset, stagedFilesDataset); + + // Verify SQLs using generator + RelationalGenerator generator = RelationalGenerator.builder() + .ingestMode(bulkLoad) + .relationalSink(DuckDBSink.get()) + .collectStatistics(true) + .ingestRequestId(EVENT_ID_1) + .executionTimestampClock(fixedClock_2000_01_01) + .build(); + + GeneratorResult operations = generator.generateOperations(datasets); + + List preActionsSql = operations.preActionsSql(); + List ingestSql = operations.ingestSql(); + Map statsSql = operations.postIngestStatisticsSql(); + + String expectedCreateTableSql = "CREATE TABLE IF NOT EXISTS \"TEST_DB\".\"TEST\".\"main\"" + + "(\"col_int\" INTEGER,\"col_string\" VARCHAR,\"col_decimal\" DECIMAL(5,2),\"col_datetime\" TIMESTAMP,\"digest\" VARCHAR,\"batch_id\" INTEGER,\"append_time\" TIMESTAMP)"; + + String expectedIngestSql = "INSERT INTO \"TEST_DB\".\"TEST\".\"main\" " + + "(\"col_int\", \"col_string\", \"col_decimal\", \"col_datetime\", \"digest\", \"batch_id\", \"append_time\") " + + "SELECT \"col_int\",\"col_string\",\"col_decimal\",\"col_datetime\"," + + "LAKEHOUSE_MD5(COLUMN_CONCAT_UDF('col_datetime',datetimeToString(CAST(\"col_datetime\" AS TIMESTAMP)))||COLUMN_CONCAT_UDF('col_decimal',CAST(CAST(\"col_decimal\" AS DECIMAL(5,2)) AS VARCHAR))||COLUMN_CONCAT_UDF('col_int',intToString(CAST(\"col_int\" AS INTEGER)))||COLUMN_CONCAT_UDF('col_string',CAST(CAST(\"col_string\" AS VARCHAR) AS VARCHAR)))," + + "(SELECT COALESCE(MAX(batch_metadata.\"table_batch_id\"),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.\"table_name\") = 'MAIN'),'2000-01-01 00:00:00.000000' " + + "FROM READ_CSV(['src/test/resources/data/bulk-load/input/staged_file3.csv'], COLUMNS = {'col_int':'INTEGER', 'col_string':'VARCHAR', 'col_decimal':'DECIMAL(5,2)', 'col_datetime':'TIMESTAMP'}, AUTO_DETECT = FALSE)"; + + Assertions.assertEquals(expectedCreateTableSql, preActionsSql.get(0)); + Assertions.assertEquals(expectedIngestSql, ingestSql.get(0)); + Assertions.assertEquals("SELECT COUNT(*) as \"rowsInserted\" FROM \"TEST_DB\".\"TEST\".\"main\" as my_alias WHERE my_alias.\"batch_id\" = (SELECT COALESCE(MAX(batch_metadata.\"table_batch_id\"),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.\"table_name\") = 'MAIN')", statsSql.get(ROWS_INSERTED)); + + + // Verify execution using ingestor + PlannerOptions options = PlannerOptions.builder().collectStatistics(true).build(); + String[] schema = new String[]{COL_INT, COL_STRING, COL_DECIMAL, COL_DATETIME, DIGEST, BATCH_ID, APPEND_TIME}; + + Map expectedStats = new HashMap<>(); + expectedStats.put(StatisticName.ROWS_INSERTED.name(), 3); + + String expectedDataPath = "src/test/resources/data/bulk-load/expected/expected_table3.csv"; + + RelationalIngestor ingestor = getRelationalIngestor(bulkLoad, options, fixedClock_2000_01_01, CaseConversion.NONE, Optional.of(EVENT_ID_1), ADDITIONAL_METADATA); + executePlansAndVerifyResults(ingestor, datasets, schema, expectedDataPath, expectedStats, false, ""); + Map appendMetadata = duckDBSink.executeQuery("select * from batch_metadata").get(0); + verifyBulkLoadMetadata(appendMetadata, Collections.singletonList(filePath), 1, Optional.of(EVENT_ID_1), Optional.of(ADDITIONAL_METADATA)); + } + + @Test + public void testBulkLoadJsonWithDigestGenerated() throws Exception + { + // Register UDF + DuckDBDigestUtil.registerMD5Udf(duckDBSink, DIGEST_UDF); + DuckDBDigestUtil.registerColumnUdf(duckDBSink, COLUMN_CONCAT_UDF); + + String filePath = "src/test/resources/data/bulk-load/input/staged_file3.json"; + + BulkLoad bulkLoad = BulkLoad.builder() + .digestGenStrategy(UDFBasedDigestGenStrategy.builder() + .digestUdfName(DIGEST_UDF) + .columnNameValueConcatUdfName(COLUMN_CONCAT_UDF) + .digestField(DIGEST).build()) + .auditing(DateTimeAuditing.builder().dateTimeField(APPEND_TIME).build()) + .batchIdField(BATCH_ID) + .build(); + + Dataset stagedFilesDataset = StagedFilesDataset.builder() + .stagedFilesDatasetProperties( + DuckDBStagedFilesDatasetProperties.builder() + .fileFormat(FileFormatType.JSON) + .addAllFilePaths(Collections.singletonList(filePath)).build()) + .schema(SchemaDefinition.builder().addAllFields(Arrays.asList(col1, col2, col3, col4)).build()) + .build(); + + Dataset mainDataset = DatasetDefinition.builder() + .database(testDatabaseName).group(testSchemaName).name(mainTableName).alias("my_alias") + .schema(SchemaDefinition.builder().build()) + .build(); + + Datasets datasets = Datasets.of(mainDataset, stagedFilesDataset); + + // Verify SQLs using generator + RelationalGenerator generator = RelationalGenerator.builder() + .ingestMode(bulkLoad) + .relationalSink(DuckDBSink.get()) + .collectStatistics(true) + .ingestRequestId(EVENT_ID_1) + .executionTimestampClock(fixedClock_2000_01_01) + .build(); + + GeneratorResult operations = generator.generateOperations(datasets); + + List preActionsSql = operations.preActionsSql(); + List ingestSql = operations.ingestSql(); + Map statsSql = operations.postIngestStatisticsSql(); + + String expectedCreateTableSql = "CREATE TABLE IF NOT EXISTS \"TEST_DB\".\"TEST\".\"main\"" + + "(\"col_int\" INTEGER,\"col_string\" VARCHAR,\"col_decimal\" DECIMAL(5,2),\"col_datetime\" TIMESTAMP,\"digest\" VARCHAR,\"batch_id\" INTEGER,\"append_time\" TIMESTAMP)"; + + String expectedIngestSql = "INSERT INTO \"TEST_DB\".\"TEST\".\"main\" " + + "(\"col_int\", \"col_string\", \"col_decimal\", \"col_datetime\", \"digest\", \"batch_id\", \"append_time\") " + + "SELECT \"col_int\",\"col_string\",\"col_decimal\",\"col_datetime\"," + + "LAKEHOUSE_MD5(COLUMN_CONCAT_UDF('col_datetime',CAST(CAST(\"col_datetime\" AS TIMESTAMP) AS VARCHAR))||COLUMN_CONCAT_UDF('col_decimal',CAST(CAST(\"col_decimal\" AS DECIMAL(5,2)) AS VARCHAR))||COLUMN_CONCAT_UDF('col_int',CAST(CAST(\"col_int\" AS INTEGER) AS VARCHAR))||COLUMN_CONCAT_UDF('col_string',CAST(CAST(\"col_string\" AS VARCHAR) AS VARCHAR)))," + + "(SELECT COALESCE(MAX(batch_metadata.\"table_batch_id\"),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.\"table_name\") = 'MAIN'),'2000-01-01 00:00:00.000000' " + + "FROM READ_JSON(['src/test/resources/data/bulk-load/input/staged_file3.json'])"; + + Assertions.assertEquals(expectedCreateTableSql, preActionsSql.get(0)); + Assertions.assertEquals(expectedIngestSql, ingestSql.get(0)); + Assertions.assertEquals("SELECT COUNT(*) as \"rowsInserted\" FROM \"TEST_DB\".\"TEST\".\"main\" as my_alias WHERE my_alias.\"batch_id\" = (SELECT COALESCE(MAX(batch_metadata.\"table_batch_id\"),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.\"table_name\") = 'MAIN')", statsSql.get(ROWS_INSERTED)); + + + // Verify execution using ingestor + PlannerOptions options = PlannerOptions.builder().collectStatistics(true).build(); + String[] schema = new String[]{COL_INT, COL_STRING, COL_DECIMAL, COL_DATETIME, DIGEST, BATCH_ID, APPEND_TIME}; + + Map expectedStats = new HashMap<>(); + expectedStats.put(StatisticName.ROWS_INSERTED.name(), 3); + + String expectedDataPath = "src/test/resources/data/bulk-load/expected/expected_table3.csv"; + + RelationalIngestor ingestor = getRelationalIngestor(bulkLoad, options, fixedClock_2000_01_01, CaseConversion.NONE, Optional.of(EVENT_ID_1), ADDITIONAL_METADATA); + executePlansAndVerifyResults(ingestor, datasets, schema, expectedDataPath, expectedStats, false, ""); + Map appendMetadata = duckDBSink.executeQuery("select * from batch_metadata").get(0); + verifyBulkLoadMetadata(appendMetadata, Collections.singletonList(filePath), 1, Optional.of(EVENT_ID_1), Optional.of(ADDITIONAL_METADATA)); + } + @Test public void testBulkLoadJsonUpperCase() throws Exception { @@ -330,6 +497,88 @@ public void testBulkLoadJsonUpperCase() throws Exception verifyBulkLoadMetadataForUpperCase(appendMetadata, Arrays.asList(filePath1, filePath2), 1, Optional.of(EVENT_ID_1), Optional.empty()); } + @Test + public void testBulkLoadJsonUpperCaseWithDigestGenerated() throws Exception + { + // Register UDF + DuckDBDigestUtil.registerMD5Udf(duckDBSink, DIGEST_UDF); + DuckDBDigestUtil.registerColumnUdf(duckDBSink, COLUMN_CONCAT_UDF); + + String filePath1 = "src/test/resources/data/bulk-load/input/staged_file1.json"; + String filePath2 = "src/test/resources/data/bulk-load/input/staged_file2.json"; + + BulkLoad bulkLoad = BulkLoad.builder() + .batchIdField(BATCH_ID) + .digestGenStrategy(UDFBasedDigestGenStrategy.builder() + .digestUdfName(DIGEST_UDF) + .columnNameValueConcatUdfName(COLUMN_CONCAT_UDF) + .addAllFieldsToExcludeFromDigest(Collections.singletonList(startTimeName)) + .digestField(DIGEST).build()) + .auditing(DateTimeAuditing.builder().dateTimeField(APPEND_TIME).build()) + .build(); + + Dataset stagedFilesDataset = StagedFilesDataset.builder() + .stagedFilesDatasetProperties( + DuckDBStagedFilesDatasetProperties.builder() + .fileFormat(FileFormatType.JSON) + .addAllFilePaths(Arrays.asList(filePath1, filePath2)).build()) + .schema(SchemaDefinition.builder().addAllFields(Arrays.asList(idNonPk, name, income, startTimeNonPk, expiryDate)).build()) + .build(); + + Dataset mainDataset = DatasetDefinition.builder() + .database(testDatabaseName).group(testSchemaName).name(mainTableName).alias("my_alias") + .schema(SchemaDefinition.builder().build()) + .build(); + + Datasets datasets = Datasets.of(mainDataset, stagedFilesDataset); + + // Verify SQLs using generator + RelationalGenerator generator = RelationalGenerator.builder() + .ingestMode(bulkLoad) + .relationalSink(DuckDBSink.get()) + .collectStatistics(true) + .executionTimestampClock(fixedClock_2000_01_01) + .batchIdPattern("{NEXT_BATCH_ID_PATTERN}") + .caseConversion(CaseConversion.TO_UPPER) + .build(); + + GeneratorResult operations = generator.generateOperations(datasets); + + List preActionsSql = operations.preActionsSql(); + List ingestSql = operations.ingestSql(); + Map statsSql = operations.postIngestStatisticsSql(); + + String expectedCreateTableSql = "CREATE TABLE IF NOT EXISTS \"TEST_DB\".\"TEST\".\"MAIN\"" + + "(\"ID\" INTEGER,\"NAME\" VARCHAR NOT NULL,\"INCOME\" BIGINT,\"START_TIME\" TIMESTAMP,\"EXPIRY_DATE\" DATE,\"DIGEST\" VARCHAR,\"BATCH_ID\" INTEGER,\"APPEND_TIME\" TIMESTAMP)"; + + String expectedIngestSql = "INSERT INTO \"TEST_DB\".\"TEST\".\"MAIN\" " + + "(\"ID\", \"NAME\", \"INCOME\", \"START_TIME\", \"EXPIRY_DATE\", \"DIGEST\", \"BATCH_ID\", \"APPEND_TIME\") " + + "SELECT \"ID\",\"NAME\",\"INCOME\",\"START_TIME\",\"EXPIRY_DATE\"," + + "LAKEHOUSE_MD5(COLUMN_CONCAT_UDF('EXPIRY_DATE',CAST(CAST(\"EXPIRY_DATE\" AS DATE) AS VARCHAR))||COLUMN_CONCAT_UDF('ID',CAST(CAST(\"ID\" AS INTEGER) AS VARCHAR))||COLUMN_CONCAT_UDF('INCOME',CAST(CAST(\"INCOME\" AS BIGINT) AS VARCHAR))||COLUMN_CONCAT_UDF('NAME',CAST(CAST(\"NAME\" AS VARCHAR) AS VARCHAR)))," + + "{NEXT_BATCH_ID_PATTERN},'2000-01-01 00:00:00.000000' " + + "FROM READ_JSON(['src/test/resources/data/bulk-load/input/staged_file1.json','src/test/resources/data/bulk-load/input/staged_file2.json'])"; + + Assertions.assertEquals(expectedCreateTableSql, preActionsSql.get(0)); + Assertions.assertEquals(expectedIngestSql, ingestSql.get(0)); + Assertions.assertEquals("SELECT COUNT(*) as \"ROWSINSERTED\" FROM \"TEST_DB\".\"TEST\".\"MAIN\" as my_alias WHERE my_alias.\"BATCH_ID\" = {NEXT_BATCH_ID_PATTERN}", statsSql.get(ROWS_INSERTED)); + + + // Verify execution using ingestor + PlannerOptions options = PlannerOptions.builder().collectStatistics(true).build(); + String[] schema = new String[]{idName.toUpperCase(), nameName.toUpperCase(), incomeName.toUpperCase(), startTimeName.toUpperCase(), expiryDateName.toUpperCase(), digestName.toUpperCase(), BATCH_ID.toUpperCase(), APPEND_TIME.toUpperCase()}; + + Map expectedStats = new HashMap<>(); + expectedStats.put(StatisticName.ROWS_INSERTED.name(), 7); + + String expectedDataPath = "src/test/resources/data/bulk-load/expected/expected_table_json_with_digest_gen.csv"; + + RelationalIngestor ingestor = getRelationalIngestor(bulkLoad, options, fixedClock_2000_01_01, CaseConversion.TO_UPPER, Optional.of(EVENT_ID_1), new HashMap<>()); + executePlansAndVerifyResults(ingestor, datasets, schema, expectedDataPath, expectedStats, false, ""); + + Map appendMetadata = duckDBSink.executeQuery("select * from batch_metadata").get(0); + verifyBulkLoadMetadataForUpperCase(appendMetadata, Arrays.asList(filePath1, filePath2), 1, Optional.of(EVENT_ID_1), Optional.empty()); + } + RelationalIngestor getRelationalIngestor(IngestMode ingestMode, PlannerOptions options, Clock executionTimestampClock, CaseConversion caseConversion, Optional eventId, Map additionalMetadata) { return RelationalIngestor.builder() diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/test/java/org/finos/legend/engine/persistence/components/e2e/nontemporal/AppendOnlyTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/test/java/org/finos/legend/engine/persistence/components/e2e/nontemporal/AppendOnlyTest.java index 77acddd852d..6082df3c340 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/test/java/org/finos/legend/engine/persistence/components/e2e/nontemporal/AppendOnlyTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/test/java/org/finos/legend/engine/persistence/components/e2e/nontemporal/AppendOnlyTest.java @@ -23,15 +23,21 @@ import org.finos.legend.engine.persistence.components.ingestmode.audit.NoAuditing; import org.finos.legend.engine.persistence.components.ingestmode.deduplication.AllowDuplicates; import org.finos.legend.engine.persistence.components.ingestmode.deduplication.FilterDuplicates; +import org.finos.legend.engine.persistence.components.ingestmode.digest.UDFBasedDigestGenStrategy; import org.finos.legend.engine.persistence.components.ingestmode.digest.UserProvidedDigestGenStrategy; import org.finos.legend.engine.persistence.components.ingestmode.versioning.AllVersionsStrategy; import org.finos.legend.engine.persistence.components.ingestmode.versioning.DigestBasedResolver; import org.finos.legend.engine.persistence.components.ingestmode.versioning.MaxVersionStrategy; import org.finos.legend.engine.persistence.components.ingestmode.versioning.NoVersioningStrategy; +import org.finos.legend.engine.persistence.components.logicalplan.datasets.DataType; import org.finos.legend.engine.persistence.components.logicalplan.datasets.DatasetDefinition; import org.finos.legend.engine.persistence.components.logicalplan.datasets.FilteredDataset; import org.finos.legend.engine.persistence.components.planner.PlannerOptions; +import org.finos.legend.engine.persistence.components.relational.CaseConversion; +import org.finos.legend.engine.persistence.components.relational.api.GeneratorResult; +import org.finos.legend.engine.persistence.components.relational.api.RelationalGenerator; import org.finos.legend.engine.persistence.components.relational.api.RelationalIngestor; +import org.finos.legend.engine.persistence.components.relational.duckdb.DuckDBDigestUtil; import org.finos.legend.engine.persistence.components.relational.duckdb.DuckDBSink; import org.finos.legend.engine.persistence.components.relational.jdbc.JdbcConnection; import org.junit.jupiter.api.Assertions; @@ -39,6 +45,8 @@ import java.time.ZoneOffset; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -55,6 +63,8 @@ class AppendOnlyTest extends BaseTest { + private static final String DIGEST_UDF = "LAKEHOUSE_MD5"; + private static final String COLUMN_CONCAT_UDF = "COLUMN_CONCAT_UDF"; private final String basePath = "src/test/resources/data/incremental-append-milestoning/"; /* @@ -411,6 +421,288 @@ void testAppendOnlyWithAuditingAllVersionFilterDuplicatesNoFilterExistingRecords executePlansAndVerifyResultsWithDerivedDataSplits(ingestMode, options, datasets, schema, expectedDataPass2, expectedStatsList, incrementalClock, " order by \"batch_id\", \"income\""); } + /* + Scenario: Scenario: Test Append Only vanilla case + staging table is cleaned up in the end with upper case with UDF based digest generation + */ + @Test + void testAppendOnlyWithUDFDigestGeneration() throws Exception + { + // Register UDF + DuckDBDigestUtil.registerBasicMD5Udf(duckDBSink, DIGEST_UDF); + + DatasetDefinition mainTable = TestUtils.getDefaultMainTable(); + DatasetDefinition stagingTable = TestUtils.getStagingTableWithNoPks(); + Datasets datasets = Datasets.of(mainTable, stagingTable); + + // Create staging table + duckDBSink.executeStatement("CREATE TABLE IF NOT EXISTS \"TEST\".\"STAGING\"(\"NAME\" VARCHAR(64) NOT NULL,\"INCOME\" BIGINT,\"EXPIRY_DATE\" DATE)"); + + // Generate the milestoning object + AppendOnly ingestMode = AppendOnly.builder() + .deduplicationStrategy(AllowDuplicates.builder().build()) + .versioningStrategy(NoVersioningStrategy.builder().build()) + .auditing(NoAuditing.builder().build()) + .filterExistingRecords(false) + .digestGenStrategy(UDFBasedDigestGenStrategy.builder().digestUdfName(DIGEST_UDF).digestField(digestName).build()) + .build(); + + // Verify SQLs using generator + RelationalGenerator generator = RelationalGenerator.builder() + .ingestMode(ingestMode) + .relationalSink(DuckDBSink.get()) + .collectStatistics(true) + .executionTimestampClock(fixedClock_2000_01_01) + .caseConversion(CaseConversion.TO_UPPER) + .build(); + + GeneratorResult operations = generator.generateOperations(datasets); + + List preActionsSql = operations.preActionsSql(); + List ingestSql = operations.ingestSql(); + + String expectedCreateTableSql = "CREATE TABLE IF NOT EXISTS \"TEST\".\"MAIN\"" + + "(\"NAME\" VARCHAR NOT NULL,\"INCOME\" BIGINT,\"EXPIRY_DATE\" DATE,\"DIGEST\" VARCHAR,\"BATCH_ID\" INTEGER)"; + + String expectedIngestSql = "INSERT INTO \"TEST\".\"MAIN\" " + + "(\"NAME\", \"INCOME\", \"EXPIRY_DATE\", \"DIGEST\", \"BATCH_ID\") " + + "(SELECT staging.\"NAME\" as \"NAME\",staging.\"INCOME\" as \"INCOME\",staging.\"EXPIRY_DATE\" as \"EXPIRY_DATE\"," + + "LAKEHOUSE_MD5('EXPIRY_DATE'||CAST(staging.\"EXPIRY_DATE\" AS VARCHAR)||'INCOME'||CAST(staging.\"INCOME\" AS VARCHAR)||'NAME'||CAST(staging.\"NAME\" AS VARCHAR))," + + "(SELECT COALESCE(MAX(BATCH_METADATA.\"TABLE_BATCH_ID\"),0)+1 FROM BATCH_METADATA as BATCH_METADATA WHERE UPPER(BATCH_METADATA.\"TABLE_NAME\") = 'MAIN') " + + "FROM \"TEST\".\"STAGING\" as staging)"; + + Assertions.assertEquals(expectedCreateTableSql, preActionsSql.get(0)); + Assertions.assertEquals(expectedIngestSql, ingestSql.get(0)); + + + // Verify execution using ingestor + PlannerOptions options = PlannerOptions.builder().collectStatistics(true).build(); + String[] schema = new String[]{nameName.toUpperCase(), incomeName.toUpperCase(), expiryDateName.toUpperCase(), digestName.toUpperCase(), batchIdName.toUpperCase()}; + + // ------------ Perform incremental (append) milestoning With Clean Staging Table ------------------------ + String dataPass1 = basePath + "input/digest_generation/data_pass1.csv"; + String expectedDataPass1 = basePath + "expected/digest_generation/expected_pass1.csv"; + // 1. Load staging table + loadStagingDataWithNoPkInUpperCase(dataPass1); + // 2. Execute plans and verify results + Map expectedStats = createExpectedStatsMap(3, 0, 3, 0, 0); + executePlansAndVerifyForCaseConversion(ingestMode, options, datasets, schema, expectedDataPass1, expectedStats, " order by \"BATCH_ID\", \"INCOME\""); + // 3. Assert that the staging table is truncated + List> stagingTableList = duckDBSink.executeQuery("select * from \"TEST\".\"STAGING\""); + Assertions.assertEquals(stagingTableList.size(), 0); + + // ------------ Perform incremental (append) milestoning With Clean Staging Table ------------------------ + String dataPass2 = basePath + "input/digest_generation/data_pass2.csv"; + String expectedDataPass2 = basePath + "expected/digest_generation/expected_pass2.csv"; + // 1. Load staging table + loadStagingDataWithNoPkInUpperCase(dataPass2); + // 2. Execute plans and verify results + expectedStats = createExpectedStatsMap(3, 0, 3, 0, 0); + executePlansAndVerifyForCaseConversion(ingestMode, options, datasets, schema, expectedDataPass2, expectedStats, " order by \"BATCH_ID\", \"INCOME\""); + // 3. Assert that the staging table is truncated + stagingTableList = duckDBSink.executeQuery("select * from \"TEST\".\"STAGING\""); + Assertions.assertEquals(stagingTableList.size(), 0); + } + + /* + Scenario: Test Append Only with auditing, all version, filter duplicates and no filter existing records with UDF based digest generation + */ + @Test + void testAppendOnlyWithUDFDigestGenerationWithFieldsToExclude() throws Exception + { + // Register UDF + DuckDBDigestUtil.registerMD5Udf(duckDBSink, DIGEST_UDF); + DuckDBDigestUtil.registerColumnUdf(duckDBSink, COLUMN_CONCAT_UDF); + + DatasetDefinition mainTable = TestUtils.getDefaultMainTable(); + DatasetDefinition stagingTable = TestUtils.getStagingTableWithNonPkVersionWithoutDigest(); + Datasets datasets = Datasets.of(mainTable, stagingTable); + IncrementalClock incrementalClock = new IncrementalClock(fixedExecutionZonedDateTime1.toInstant(), ZoneOffset.UTC, 1000); + + // Create staging table + createStagingTableWithoutPks(stagingTable); + + // Generate the milestoning object + AppendOnly ingestMode = AppendOnly.builder() + .deduplicationStrategy(FilterDuplicates.builder().build()) + .versioningStrategy(AllVersionsStrategy.builder() + .versioningField(versionName) + .dataSplitFieldName(dataSplitName) + .mergeDataVersionResolver(DigestBasedResolver.INSTANCE) + .performStageVersioning(true) + .build()) + .auditing(DateTimeAuditing.builder().dateTimeField(batchUpdateTimeName).build()) + .filterExistingRecords(false) + .digestGenStrategy(UDFBasedDigestGenStrategy.builder() + .digestUdfName(DIGEST_UDF) + .columnNameValueConcatUdfName(COLUMN_CONCAT_UDF) + .digestField(digestName) + .addAllFieldsToExcludeFromDigest(Collections.singletonList(versionName)) + .build()) + .build(); + + // Verify SQLs using generator + RelationalGenerator generator = RelationalGenerator.builder() + .ingestMode(ingestMode) + .relationalSink(DuckDBSink.get()) + .collectStatistics(true) + .executionTimestampClock(fixedClock_2000_01_01) + .ingestRunId("075605e3-bada-47d7-9ae9-7138f392fe22") + .build(); + + GeneratorResult operations = generator.generateOperations(datasets); + + List preActionsSql = operations.preActionsSql(); + List ingestSql = operations.ingestSql(); + + String expectedCreateTableSql = "CREATE TABLE IF NOT EXISTS \"TEST\".\"main\"" + + "(\"id\" INTEGER NOT NULL,\"name\" VARCHAR NOT NULL,\"income\" BIGINT,\"start_time\" TIMESTAMP NOT NULL,\"expiry_date\" DATE,\"version\" INTEGER,\"batch_update_time\" TIMESTAMP NOT NULL,\"digest\" VARCHAR,\"batch_id\" INTEGER,PRIMARY KEY (\"id\", \"start_time\", \"batch_update_time\"))"; + + String expectedIngestSql = "INSERT INTO \"TEST\".\"main\" " + + "(\"id\", \"name\", \"income\", \"start_time\", \"expiry_date\", \"version\", \"digest\", \"batch_update_time\", \"batch_id\") " + + "(SELECT staging.\"id\" as \"id\",staging.\"name\" as \"name\",staging.\"income\" as \"income\",staging.\"start_time\" as \"start_time\",staging.\"expiry_date\" as \"expiry_date\",staging.\"version\" as \"version\"," + + "LAKEHOUSE_MD5(COLUMN_CONCAT_UDF('expiry_date',CAST(staging.\"expiry_date\" AS VARCHAR))||COLUMN_CONCAT_UDF('id',CAST(staging.\"id\" AS VARCHAR))||COLUMN_CONCAT_UDF('income',CAST(staging.\"income\" AS VARCHAR))||COLUMN_CONCAT_UDF('name',CAST(staging.\"name\" AS VARCHAR))||COLUMN_CONCAT_UDF('start_time',CAST(staging.\"start_time\" AS VARCHAR)))," + + "'2000-01-01 00:00:00.000000'," + + "(SELECT COALESCE(MAX(batch_metadata.\"table_batch_id\"),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.\"table_name\") = 'MAIN') " + + "FROM \"TEST\".\"staging_temp_staging_lp_yosulf\" as staging WHERE (staging.\"data_split\" >= '{DATA_SPLIT_LOWER_BOUND_PLACEHOLDER}') AND (staging.\"data_split\" <= '{DATA_SPLIT_UPPER_BOUND_PLACEHOLDER}'))"; + + Assertions.assertEquals(expectedCreateTableSql, preActionsSql.get(0)); + Assertions.assertEquals(expectedIngestSql, ingestSql.get(0)); + + + // Verify execution using ingestor + PlannerOptions options = PlannerOptions.builder().collectStatistics(true).build(); + String[] schema = new String[]{idName, nameName, incomeName, startTimeName, expiryDateName, versionName, digestName, batchUpdateTimeName, batchIdName}; + + // ------------ Perform incremental (append) milestoning Pass1 ------------------------ + String dataPass1 = basePath + "input/digest_generation_with_fields_to_exclude/data_pass1.csv"; + String expectedDataPass1 = basePath + "expected/digest_generation_with_fields_to_exclude/expected_pass1.csv"; + // 1. Load staging table + loadStagingDataWithVersionWithoutDigest(dataPass1); + // 2. Execute plans and verify results + List> expectedStatsList = new ArrayList<>(); + Map expectedStats1 = createExpectedStatsMap(3, 0, 3, 0, 0); + Map expectedStats2 = createExpectedStatsMap(1, 0, 1, 0, 0); + expectedStatsList.add(expectedStats1); + expectedStatsList.add(expectedStats2); + executePlansAndVerifyResultsWithDerivedDataSplits(ingestMode, options, datasets, schema, expectedDataPass1, expectedStatsList, incrementalClock, " order by \"batch_id\", \"income\""); + + // ------------ Perform incremental (append) milestoning Pass2 ------------------------ + String dataPass2 = basePath + "input/digest_generation_with_fields_to_exclude/data_pass2.csv"; + String expectedDataPass2 = basePath + "expected/digest_generation_with_fields_to_exclude/expected_pass2.csv"; + // 1. Load staging table + loadStagingDataWithVersionWithoutDigest(dataPass2); + // 2. Execute plans and verify results + expectedStatsList = new ArrayList<>(); + expectedStats1 = createExpectedStatsMap(4, 0, 3, 0, 0); + expectedStatsList.add(expectedStats1); + expectedStatsList.add(expectedStats2); + executePlansAndVerifyResultsWithDerivedDataSplits(ingestMode, options, datasets, schema, expectedDataPass2, expectedStatsList, incrementalClock, " order by \"batch_id\", \"income\""); + } + + /* + Scenario: Test Append Only with auditing, all version, filter duplicates and no filter existing records with UDF based digest generation with type conversion UDFs + */ + @Test + void testAppendOnlyWithUDFDigestGenerationWithFieldsToExcludeAndTypeConversionUdfs() throws Exception + { + // Register UDF + DuckDBDigestUtil.registerMD5Udf(duckDBSink, DIGEST_UDF); + DuckDBDigestUtil.registerColumnUdf(duckDBSink, COLUMN_CONCAT_UDF); + + // Register type conversion functions + Map typeConversionUdfs = new HashMap<>(); + typeConversionUdfs.put(DataType.INTEGER, "intToString"); + typeConversionUdfs.put(DataType.DATETIME, "datetimeToString"); + duckDBSink.executeStatement("CREATE FUNCTION intToString(A) AS CAST(A AS VARCHAR)"); + duckDBSink.executeStatement("CREATE FUNCTION datetimeToString(A) AS CAST(A AS VARCHAR)"); + + DatasetDefinition mainTable = TestUtils.getDefaultMainTable(); + DatasetDefinition stagingTable = TestUtils.getStagingTableWithNonPkVersionWithoutDigest(); + Datasets datasets = Datasets.of(mainTable, stagingTable); + IncrementalClock incrementalClock = new IncrementalClock(fixedExecutionZonedDateTime1.toInstant(), ZoneOffset.UTC, 1000); + + // Create staging table + createStagingTableWithoutPks(stagingTable); + + // Generate the milestoning object + AppendOnly ingestMode = AppendOnly.builder() + .deduplicationStrategy(FilterDuplicates.builder().build()) + .versioningStrategy(AllVersionsStrategy.builder() + .versioningField(versionName) + .dataSplitFieldName(dataSplitName) + .mergeDataVersionResolver(DigestBasedResolver.INSTANCE) + .performStageVersioning(true) + .build()) + .auditing(DateTimeAuditing.builder().dateTimeField(batchUpdateTimeName).build()) + .filterExistingRecords(false) + .digestGenStrategy(UDFBasedDigestGenStrategy.builder() + .digestUdfName(DIGEST_UDF) + .columnNameValueConcatUdfName(COLUMN_CONCAT_UDF) + .putAllTypeConversionUdfNames(typeConversionUdfs) + .digestField(digestName) + .addAllFieldsToExcludeFromDigest(Collections.singletonList(versionName)) + .build()) + .build(); + + // Verify SQLs using generator + RelationalGenerator generator = RelationalGenerator.builder() + .ingestMode(ingestMode) + .relationalSink(DuckDBSink.get()) + .collectStatistics(true) + .executionTimestampClock(fixedClock_2000_01_01) + .ingestRunId("075605e3-bada-47d7-9ae9-7138f392fe22") + .build(); + + GeneratorResult operations = generator.generateOperations(datasets); + + List preActionsSql = operations.preActionsSql(); + List ingestSql = operations.ingestSql(); + + String expectedCreateTableSql = "CREATE TABLE IF NOT EXISTS \"TEST\".\"main\"" + + "(\"id\" INTEGER NOT NULL,\"name\" VARCHAR NOT NULL,\"income\" BIGINT,\"start_time\" TIMESTAMP NOT NULL,\"expiry_date\" DATE,\"version\" INTEGER,\"batch_update_time\" TIMESTAMP NOT NULL,\"digest\" VARCHAR,\"batch_id\" INTEGER,PRIMARY KEY (\"id\", \"start_time\", \"batch_update_time\"))"; + + String expectedIngestSql = "INSERT INTO \"TEST\".\"main\" " + + "(\"id\", \"name\", \"income\", \"start_time\", \"expiry_date\", \"version\", \"digest\", \"batch_update_time\", \"batch_id\") " + + "(SELECT staging.\"id\" as \"id\",staging.\"name\" as \"name\",staging.\"income\" as \"income\",staging.\"start_time\" as \"start_time\",staging.\"expiry_date\" as \"expiry_date\",staging.\"version\" as \"version\"," + + "LAKEHOUSE_MD5(COLUMN_CONCAT_UDF('expiry_date',CAST(staging.\"expiry_date\" AS VARCHAR))||COLUMN_CONCAT_UDF('id',intToString(staging.\"id\"))||COLUMN_CONCAT_UDF('income',CAST(staging.\"income\" AS VARCHAR))||COLUMN_CONCAT_UDF('name',CAST(staging.\"name\" AS VARCHAR))||COLUMN_CONCAT_UDF('start_time',datetimeToString(staging.\"start_time\")))," + + "'2000-01-01 00:00:00.000000'," + + "(SELECT COALESCE(MAX(batch_metadata.\"table_batch_id\"),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.\"table_name\") = 'MAIN') " + + "FROM \"TEST\".\"staging_temp_staging_lp_yosulf\" as staging WHERE (staging.\"data_split\" >= '{DATA_SPLIT_LOWER_BOUND_PLACEHOLDER}') AND (staging.\"data_split\" <= '{DATA_SPLIT_UPPER_BOUND_PLACEHOLDER}'))"; + + Assertions.assertEquals(expectedCreateTableSql, preActionsSql.get(0)); + Assertions.assertEquals(expectedIngestSql, ingestSql.get(0)); + + + // Verify execution using ingestor + PlannerOptions options = PlannerOptions.builder().collectStatistics(true).build(); + String[] schema = new String[]{idName, nameName, incomeName, startTimeName, expiryDateName, versionName, digestName, batchUpdateTimeName, batchIdName}; + + // ------------ Perform incremental (append) milestoning Pass1 ------------------------ + String dataPass1 = basePath + "input/digest_generation_with_fields_to_exclude/data_pass1.csv"; + String expectedDataPass1 = basePath + "expected/digest_generation_with_fields_to_exclude/expected_pass1.csv"; + // 1. Load staging table + loadStagingDataWithVersionWithoutDigest(dataPass1); + // 2. Execute plans and verify results + List> expectedStatsList = new ArrayList<>(); + Map expectedStats1 = createExpectedStatsMap(3, 0, 3, 0, 0); + Map expectedStats2 = createExpectedStatsMap(1, 0, 1, 0, 0); + expectedStatsList.add(expectedStats1); + expectedStatsList.add(expectedStats2); + executePlansAndVerifyResultsWithDerivedDataSplits(ingestMode, options, datasets, schema, expectedDataPass1, expectedStatsList, incrementalClock, " order by \"batch_id\", \"income\""); + + // ------------ Perform incremental (append) milestoning Pass2 ------------------------ + String dataPass2 = basePath + "input/digest_generation_with_fields_to_exclude/data_pass2.csv"; + String expectedDataPass2 = basePath + "expected/digest_generation_with_fields_to_exclude/expected_pass2.csv"; + // 1. Load staging table + loadStagingDataWithVersionWithoutDigest(dataPass2); + // 2. Execute plans and verify results + expectedStatsList = new ArrayList<>(); + expectedStats1 = createExpectedStatsMap(4, 0, 3, 0, 0); + expectedStatsList.add(expectedStats1); + expectedStatsList.add(expectedStats2); + executePlansAndVerifyResultsWithDerivedDataSplits(ingestMode, options, datasets, schema, expectedDataPass2, expectedStatsList, incrementalClock, " order by \"batch_id\", \"income\""); + } + /* Scenario: Test Append Only with auditing, no version, allow duplicates and no filter existing records when staging has lesser columns than main */ diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/test/resources/data/bulk-load/expected/expected_table3.csv b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/test/resources/data/bulk-load/expected/expected_table3.csv new file mode 100644 index 00000000000..a07c396c079 --- /dev/null +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/test/resources/data/bulk-load/expected/expected_table3.csv @@ -0,0 +1,3 @@ +1,Andy,5.20,2022-01-11 00:00:00.0,af3a6845618d95cbcb3633dcedc0716e,1,2000-01-01 00:00:00.0 +2,Bella,99.99,2022-01-12 00:00:00.0,9bb49fa0bddc95126be41cd672e85bf2,1,2000-01-01 00:00:00.0 +49,Sandy,123.45,2022-01-13 00:00:00.0,61b0887aa850450dd93e0d24e0cd16b1,1,2000-01-01 00:00:00.0 \ No newline at end of file diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/test/resources/data/bulk-load/expected/expected_table_json_with_digest_gen.csv b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/test/resources/data/bulk-load/expected/expected_table_json_with_digest_gen.csv new file mode 100644 index 00000000000..557c32a2d0c --- /dev/null +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/test/resources/data/bulk-load/expected/expected_table_json_with_digest_gen.csv @@ -0,0 +1,7 @@ +1,HARRY,1000,2020-01-01 00:00:00.0,2022-12-01,2cdf8eaa9d537881b78756e6d11928b1,1,2000-01-01 00:00:00.0 +2,ROBERT,2000,2020-01-02 00:00:00.0,2022-12-02,99bf9dae59e5f022dd11f9ed8fa97803,1,2000-01-01 00:00:00.0 +3,ANDY,3000,2020-01-03 00:00:00.0,2022-12-03,de3448ec2034f37261df4237e78afaf7,1,2000-01-01 00:00:00.0 +4,MICHEL,4000,2020-01-04 00:00:00.0,2022-12-04,66e93c372eabfd5cec81df5f4d7aaa15,1,2000-01-01 00:00:00.0 +5,LIZA,5000,2020-01-05 00:00:00.0,2022-12-05,aec35faf9c591e18420ceb03e917e439,1,2000-01-01 00:00:00.0 +5,LIZA,5100,2020-01-05 00:00:00.0,2022-12-05,9f28327a03a9462839966852b6bd8ea5,1,2000-01-01 00:00:00.0 +6,MATT,6100,2020-01-06 00:00:00.0,2022-12-06,0c0ec75696a757a25638e99abb3103c2,1,2000-01-01 00:00:00.0 diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/test/resources/data/bulk-load/input/staged_file3.csv b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/test/resources/data/bulk-load/input/staged_file3.csv new file mode 100644 index 00000000000..dd2941bedb8 --- /dev/null +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/test/resources/data/bulk-load/input/staged_file3.csv @@ -0,0 +1,3 @@ +1,Andy,5.20,2022-01-11 00:00:00.0 +2,Bella,99.99,2022-01-12 00:00:00.0 +49,Sandy,123.45,2022-01-13 00:00:00.0 \ No newline at end of file diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/test/resources/data/bulk-load/input/staged_file3.json b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/test/resources/data/bulk-load/input/staged_file3.json new file mode 100644 index 00000000000..ac2117cbf64 --- /dev/null +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/test/resources/data/bulk-load/input/staged_file3.json @@ -0,0 +1,20 @@ +[ + { + "col_int": 1, + "col_string": "Andy", + "col_decimal": 5.20, + "col_datetime": "2022-01-11 00:00:00.0" + }, + { + "col_int": 2, + "col_string": "Bella", + "col_decimal": 99.99, + "col_datetime": "2022-01-12 00:00:00.0" + }, + { + "col_int": 49, + "col_string": "Sandy", + "col_decimal": 123.45, + "col_datetime": "2022-01-13 00:00:00.0" + } +] \ No newline at end of file diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/test/resources/data/incremental-append-milestoning/expected/digest_generation/expected_pass1.csv b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/test/resources/data/incremental-append-milestoning/expected/digest_generation/expected_pass1.csv new file mode 100644 index 00000000000..e182a34c5ae --- /dev/null +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/test/resources/data/incremental-append-milestoning/expected/digest_generation/expected_pass1.csv @@ -0,0 +1,3 @@ +HARRY,1000,2022-12-01,cc7cd58d30715f7492aff082817ecce2,1 +ROBERT,2000,2022-12-02,4edbdb30f325f257b175a48a3c13819b,1 +ANDY,3000,2022-12-03,2be8a637e184828fb8dd76eac47a5f3f,1 diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/test/resources/data/incremental-append-milestoning/expected/digest_generation/expected_pass2.csv b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/test/resources/data/incremental-append-milestoning/expected/digest_generation/expected_pass2.csv new file mode 100644 index 00000000000..a7e46033b05 --- /dev/null +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/test/resources/data/incremental-append-milestoning/expected/digest_generation/expected_pass2.csv @@ -0,0 +1,6 @@ +HARRY,1000,2022-12-01,cc7cd58d30715f7492aff082817ecce2,1 +ROBERT,2000,2022-12-02,4edbdb30f325f257b175a48a3c13819b,1 +ANDY,3000,2022-12-03,2be8a637e184828fb8dd76eac47a5f3f,1 +ROBERT,2000,2022-12-02,4edbdb30f325f257b175a48a3c13819b,2 +ANDY,3100,2022-12-03,916f727a44f3f9dee228aaa5c5d7d4a0,2 +MATT,4000,2022-12-06,3491bca22adbe8792ffddf307df4b446,2 \ No newline at end of file diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/test/resources/data/incremental-append-milestoning/expected/digest_generation_with_fields_to_exclude/expected_pass1.csv b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/test/resources/data/incremental-append-milestoning/expected/digest_generation_with_fields_to_exclude/expected_pass1.csv new file mode 100644 index 00000000000..1ef8d6e4909 --- /dev/null +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/test/resources/data/incremental-append-milestoning/expected/digest_generation_with_fields_to_exclude/expected_pass1.csv @@ -0,0 +1,4 @@ +1,HARRY,1000,2020-01-01 00:00:00.0,2022-12-01,1,c0a26fd6a78ed9344f8b7a9380d7b7a7,2000-01-01 00:00:02.0,1 +2,ROBERT,2000,2020-01-02 00:00:00.0,2022-12-02,1,bb41db26c50794b2daefd379cf7603e8,2000-01-01 00:00:02.0,1 +3,ANDY,3000,2020-01-03 00:00:00.0,2022-12-03,1,dab5f100b4c9fd3303e64d356ff4b349,2000-01-01 00:00:02.0,1 +3,ANDY,4000,2020-01-03 00:00:00.0,2022-12-03,2,5065a7e68c39d8c0789a7c5e8094f436,2000-01-01 00:00:04.0,2 diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/test/resources/data/incremental-append-milestoning/expected/digest_generation_with_fields_to_exclude/expected_pass2.csv b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/test/resources/data/incremental-append-milestoning/expected/digest_generation_with_fields_to_exclude/expected_pass2.csv new file mode 100644 index 00000000000..0c32192d499 --- /dev/null +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/test/resources/data/incremental-append-milestoning/expected/digest_generation_with_fields_to_exclude/expected_pass2.csv @@ -0,0 +1,7 @@ +1,HARRY,1000,2020-01-01 00:00:00.0,2022-12-01,1,c0a26fd6a78ed9344f8b7a9380d7b7a7,2000-01-01 00:00:02.0,1 +2,ROBERT,2000,2020-01-02 00:00:00.0,2022-12-02,1,bb41db26c50794b2daefd379cf7603e8,2000-01-01 00:00:02.0,1 +3,ANDY,3000,2020-01-03 00:00:00.0,2022-12-03,1,dab5f100b4c9fd3303e64d356ff4b349,2000-01-01 00:00:02.0,1 +3,ANDY,4000,2020-01-03 00:00:00.0,2022-12-03,2,5065a7e68c39d8c0789a7c5e8094f436,2000-01-01 00:00:04.0,2 +1,HARRY,1000,2020-01-01 00:00:00.0,2022-12-01,1,c0a26fd6a78ed9344f8b7a9380d7b7a7,2000-01-01 00:00:08.0,3 +2,ROBERT,2001,2020-01-02 00:00:00.0,2022-12-02,2,1de7fec46c71f10715f9c99db8c7376a,2000-01-01 00:00:08.0,3 +4,SANDY,4000,2020-01-04 00:00:00.0,2022-12-04,1,6166d91ab0f8ff0434e6c32ba248f7b5,2000-01-01 00:00:08.0,3 diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/test/resources/data/incremental-append-milestoning/input/digest_generation/data_pass1.csv b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/test/resources/data/incremental-append-milestoning/input/digest_generation/data_pass1.csv new file mode 100644 index 00000000000..179b54e57d8 --- /dev/null +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/test/resources/data/incremental-append-milestoning/input/digest_generation/data_pass1.csv @@ -0,0 +1,3 @@ +HARRY,1000,2022-12-01 +ROBERT,2000,2022-12-02 +ANDY,3000,2022-12-03 diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/test/resources/data/incremental-append-milestoning/input/digest_generation/data_pass2.csv b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/test/resources/data/incremental-append-milestoning/input/digest_generation/data_pass2.csv new file mode 100644 index 00000000000..7bf2b920ce4 --- /dev/null +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/test/resources/data/incremental-append-milestoning/input/digest_generation/data_pass2.csv @@ -0,0 +1,3 @@ +ROBERT,2000,2022-12-02 +ANDY,3100,2022-12-03 +MATT,4000,2022-12-06 \ No newline at end of file diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/test/resources/data/incremental-append-milestoning/input/digest_generation_with_fields_to_exclude/data_pass1.csv b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/test/resources/data/incremental-append-milestoning/input/digest_generation_with_fields_to_exclude/data_pass1.csv new file mode 100644 index 00000000000..2694c70a227 --- /dev/null +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/test/resources/data/incremental-append-milestoning/input/digest_generation_with_fields_to_exclude/data_pass1.csv @@ -0,0 +1,4 @@ +1,HARRY,1000,2020-01-01 00:00:00.0,2022-12-01,1 +2,ROBERT,2000,2020-01-02 00:00:00.0,2022-12-02,1 +3,ANDY,3000,2020-01-03 00:00:00.0,2022-12-03,1 +3,ANDY,4000,2020-01-03 00:00:00.0,2022-12-03,2 diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/test/resources/data/incremental-append-milestoning/input/digest_generation_with_fields_to_exclude/data_pass2.csv b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/test/resources/data/incremental-append-milestoning/input/digest_generation_with_fields_to_exclude/data_pass2.csv new file mode 100644 index 00000000000..38d09bb021a --- /dev/null +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-duckdb/src/test/resources/data/incremental-append-milestoning/input/digest_generation_with_fields_to_exclude/data_pass2.csv @@ -0,0 +1,4 @@ +1,HARRY,1000,2020-01-01 00:00:00.0,2022-12-01,1 +2,ROBERT,2001,2020-01-02 00:00:00.0,2022-12-02,2 +4,SANDY,4000,2020-01-04 00:00:00.0,2022-12-04,1 +4,SANDY,4000,2020-01-04 00:00:00.0,2022-12-04,1 diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-snowflake/src/main/java/org/finos/legend/engine/persistence/components/relational/snowflake/SnowflakeSink.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-snowflake/src/main/java/org/finos/legend/engine/persistence/components/relational/snowflake/SnowflakeSink.java index 4c6ae90f8b0..f007e918908 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-snowflake/src/main/java/org/finos/legend/engine/persistence/components/relational/snowflake/SnowflakeSink.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-snowflake/src/main/java/org/finos/legend/engine/persistence/components/relational/snowflake/SnowflakeSink.java @@ -32,7 +32,6 @@ import org.finos.legend.engine.persistence.components.logicalplan.datasets.Field; import org.finos.legend.engine.persistence.components.logicalplan.datasets.FunctionalDataset; import org.finos.legend.engine.persistence.components.logicalplan.datasets.SchemaDefinition; -import org.finos.legend.engine.persistence.components.logicalplan.datasets.DatasetAdditionalProperties; import org.finos.legend.engine.persistence.components.logicalplan.datasets.StagedFilesDataset; import org.finos.legend.engine.persistence.components.logicalplan.datasets.StagedFilesDatasetReference; import org.finos.legend.engine.persistence.components.logicalplan.datasets.StagedFilesSelection; @@ -42,6 +41,7 @@ import org.finos.legend.engine.persistence.components.logicalplan.operations.Show; import org.finos.legend.engine.persistence.components.logicalplan.values.BatchEndTimestamp; import org.finos.legend.engine.persistence.components.logicalplan.values.CastFunction; +import org.finos.legend.engine.persistence.components.logicalplan.values.ConcatFunction; import org.finos.legend.engine.persistence.components.logicalplan.values.DigestUdf; import org.finos.legend.engine.persistence.components.logicalplan.values.FieldValue; import org.finos.legend.engine.persistence.components.logicalplan.values.MetadataFileNameField; @@ -73,6 +73,7 @@ import org.finos.legend.engine.persistence.components.relational.snowflake.sql.visitor.BatchEndTimestampVisitor; import org.finos.legend.engine.persistence.components.relational.snowflake.sql.visitor.CastFunctionVisitor; import org.finos.legend.engine.persistence.components.relational.snowflake.sql.visitor.ClusterKeyVisitor; +import org.finos.legend.engine.persistence.components.relational.snowflake.sql.visitor.ConcatFunctionVisitor; import org.finos.legend.engine.persistence.components.relational.snowflake.sql.visitor.CopyVisitor; import org.finos.legend.engine.persistence.components.relational.snowflake.sql.visitor.DatasetAdditionalPropertiesVisitor; import org.finos.legend.engine.persistence.components.relational.snowflake.sql.visitor.DigestUdfVisitor; @@ -83,12 +84,9 @@ import org.finos.legend.engine.persistence.components.relational.snowflake.sql.visitor.SchemaDefinitionVisitor; import org.finos.legend.engine.persistence.components.relational.snowflake.sql.visitor.FieldVisitor; import org.finos.legend.engine.persistence.components.relational.snowflake.sql.visitor.ShowVisitor; -import org.finos.legend.engine.persistence.components.relational.snowflake.sql.visitor.DatasetAdditionalPropertiesVisitor; -import org.finos.legend.engine.persistence.components.relational.snowflake.sql.visitor.CopyVisitor; import org.finos.legend.engine.persistence.components.relational.snowflake.sql.visitor.StagedFilesDatasetReferenceVisitor; import org.finos.legend.engine.persistence.components.relational.snowflake.sql.visitor.StagedFilesDatasetVisitor; import org.finos.legend.engine.persistence.components.relational.snowflake.sql.visitor.StagedFilesFieldValueVisitor; -import org.finos.legend.engine.persistence.components.relational.snowflake.sql.visitor.DigestUdfVisitor; import org.finos.legend.engine.persistence.components.relational.snowflake.sql.visitor.StagedFilesSelectionVisitor; import org.finos.legend.engine.persistence.components.relational.snowflake.sql.visitor.ToArrayFunctionVisitor; import org.finos.legend.engine.persistence.components.relational.snowflake.sql.visitor.TryCastFunctionVisitor; @@ -197,6 +195,7 @@ public class SnowflakeSink extends AnsiSqlSink logicalPlanVisitorByClass.put(MetadataRowNumberField.class, new MetadataRowNumberFieldVisitor()); logicalPlanVisitorByClass.put(ToArrayFunction.class, new ToArrayFunctionVisitor()); logicalPlanVisitorByClass.put(FunctionalDataset.class, new FunctionalDatasetVisitor()); + logicalPlanVisitorByClass.put(ConcatFunction.class, new ConcatFunctionVisitor()); LOGICAL_PLAN_VISITOR_BY_CLASS = Collections.unmodifiableMap(logicalPlanVisitorByClass); diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-snowflake/src/main/java/org/finos/legend/engine/persistence/components/relational/snowflake/sql/visitor/ConcatFunctionVisitor.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-snowflake/src/main/java/org/finos/legend/engine/persistence/components/relational/snowflake/sql/visitor/ConcatFunctionVisitor.java new file mode 100644 index 00000000000..7f9d4a18418 --- /dev/null +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-snowflake/src/main/java/org/finos/legend/engine/persistence/components/relational/snowflake/sql/visitor/ConcatFunctionVisitor.java @@ -0,0 +1,34 @@ +// Copyright 2024 Goldman Sachs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.finos.legend.engine.persistence.components.relational.snowflake.sql.visitor; + +import org.finos.legend.engine.persistence.components.logicalplan.values.ConcatFunction; +import org.finos.legend.engine.persistence.components.logicalplan.values.FunctionImpl; +import org.finos.legend.engine.persistence.components.logicalplan.values.FunctionName; +import org.finos.legend.engine.persistence.components.physicalplan.PhysicalPlanNode; +import org.finos.legend.engine.persistence.components.relational.ansi.sql.visitors.FunctionVisitor; +import org.finos.legend.engine.persistence.components.transformer.LogicalPlanVisitor; +import org.finos.legend.engine.persistence.components.transformer.VisitorContext; + +public class ConcatFunctionVisitor implements LogicalPlanVisitor +{ + + @Override + public VisitorResult visit(PhysicalPlanNode prev, ConcatFunction current, VisitorContext context) + { + FunctionImpl function = FunctionImpl.builder().functionName(FunctionName.CONCAT).addAllValue(current.values()).build(); + return new FunctionVisitor().visit(prev, function, context); + } +}