diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/pom.xml b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/pom.xml
index 65f950b235a..e79f36c5d4e 100644
--- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/pom.xml
+++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/pom.xml
@@ -42,5 +42,15 @@
jackson-databind
+
+
+ org.eclipse.collections
+ eclipse-collections-api
+
+
+ org.eclipse.collections
+ eclipse-collections
+
+
diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/common/DedupAndVersionErrorStatistics.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/common/DedupAndVersionErrorSqlType.java
similarity index 86%
rename from legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/common/DedupAndVersionErrorStatistics.java
rename to legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/common/DedupAndVersionErrorSqlType.java
index 60c193d7938..adda9e48e60 100644
--- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/common/DedupAndVersionErrorStatistics.java
+++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/common/DedupAndVersionErrorSqlType.java
@@ -14,8 +14,10 @@
package org.finos.legend.engine.persistence.components.common;
-public enum DedupAndVersionErrorStatistics
+public enum DedupAndVersionErrorSqlType
{
MAX_DUPLICATES,
- MAX_DATA_ERRORS;
+ DUPLICATE_ROWS,
+ MAX_DATA_ERRORS,
+ DATA_ERROR_ROWS;
}
diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/versioning/DeriveDataErrorRowsLogicalPlan.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/versioning/DeriveDataErrorRowsLogicalPlan.java
new file mode 100644
index 00000000000..44b156d092a
--- /dev/null
+++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/versioning/DeriveDataErrorRowsLogicalPlan.java
@@ -0,0 +1,108 @@
+// Copyright 2024 Goldman Sachs
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package org.finos.legend.engine.persistence.components.ingestmode.versioning;
+
+import org.finos.legend.engine.persistence.components.logicalplan.LogicalPlan;
+import org.finos.legend.engine.persistence.components.logicalplan.conditions.GreaterThan;
+import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset;
+import org.finos.legend.engine.persistence.components.logicalplan.datasets.Selection;
+import org.finos.legend.engine.persistence.components.logicalplan.values.*;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class DeriveDataErrorRowsLogicalPlan implements VersioningStrategyVisitor
+{
+ private List primaryKeys;
+ private List remainingColumns;
+ private Dataset tempStagingDataset;
+ private int sampleRowCount;
+
+ public static final String DATA_VERSION_ERROR_COUNT = "legend_persistence_error_count";
+
+ public DeriveDataErrorRowsLogicalPlan(List primaryKeys, List remainingColumns, Dataset tempStagingDataset, int sampleRowCount)
+ {
+ this.primaryKeys = primaryKeys;
+ this.remainingColumns = remainingColumns;
+ this.tempStagingDataset = tempStagingDataset;
+ this.sampleRowCount = sampleRowCount;
+ }
+
+ @Override
+ public LogicalPlan visitNoVersioningStrategy(NoVersioningStrategyAbstract noVersioningStrategy)
+ {
+ return null;
+ }
+
+ @Override
+ public LogicalPlan visitMaxVersionStrategy(MaxVersionStrategyAbstract maxVersionStrategy)
+ {
+ if (maxVersionStrategy.performStageVersioning())
+ {
+ return getLogicalPlanForDataErrors(maxVersionStrategy.versioningField());
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ @Override
+ public LogicalPlan visitAllVersionsStrategy(AllVersionsStrategyAbstract allVersionsStrategyAbstract)
+ {
+ if (allVersionsStrategyAbstract.performStageVersioning())
+ {
+ return getLogicalPlanForDataErrors(allVersionsStrategyAbstract.versioningField());
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ private LogicalPlan getLogicalPlanForDataErrors(String versionField)
+ {
+ List pKsAndVersion = new ArrayList<>();
+ for (String pk: primaryKeys)
+ {
+ pKsAndVersion.add(FieldValue.builder().fieldName(pk).build());
+ }
+ pKsAndVersion.add(FieldValue.builder().fieldName(versionField).build());
+
+ List distinctValueFields = new ArrayList<>();
+ for (String field: remainingColumns)
+ {
+ distinctValueFields.add(FieldValue.builder().fieldName(field).build());
+ }
+
+ FunctionImpl countDistinct = FunctionImpl.builder()
+ .functionName(FunctionName.COUNT)
+ .addValue(FunctionImpl.builder().functionName(FunctionName.DISTINCT).addAllValue(distinctValueFields).build())
+ .alias(DATA_VERSION_ERROR_COUNT)
+ .build();
+
+ Selection selectDataError = Selection.builder()
+ .source(tempStagingDataset)
+ .groupByFields(pKsAndVersion)
+ .addAllFields(pKsAndVersion)
+ .addFields(countDistinct)
+ .havingCondition(GreaterThan.of(FieldValue.builder().fieldName(DATA_VERSION_ERROR_COUNT).build(), ObjectValue.of(1)))
+ .limit(sampleRowCount)
+ .build();
+
+ return LogicalPlan.builder().addOps(selectDataError).build();
+ }
+
+}
\ No newline at end of file
diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/versioning/DeriveDataErrorCheckLogicalPlan.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/versioning/DeriveMaxDataErrorLogicalPlan.java
similarity index 92%
rename from legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/versioning/DeriveDataErrorCheckLogicalPlan.java
rename to legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/versioning/DeriveMaxDataErrorLogicalPlan.java
index 86cd4c59e4e..e5c1e2faf78 100644
--- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/versioning/DeriveDataErrorCheckLogicalPlan.java
+++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/versioning/DeriveMaxDataErrorLogicalPlan.java
@@ -14,7 +14,7 @@
package org.finos.legend.engine.persistence.components.ingestmode.versioning;
-import org.finos.legend.engine.persistence.components.common.DedupAndVersionErrorStatistics;
+import org.finos.legend.engine.persistence.components.common.DedupAndVersionErrorSqlType;
import org.finos.legend.engine.persistence.components.logicalplan.LogicalPlan;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.*;
import org.finos.legend.engine.persistence.components.logicalplan.values.FieldValue;
@@ -25,14 +25,14 @@
import java.util.ArrayList;
import java.util.List;
-public class DeriveDataErrorCheckLogicalPlan implements VersioningStrategyVisitor
+public class DeriveMaxDataErrorLogicalPlan implements VersioningStrategyVisitor
{
List primaryKeys;
List remainingColumns;
Dataset tempStagingDataset;
- public DeriveDataErrorCheckLogicalPlan(List primaryKeys, List remainingColumns, Dataset tempStagingDataset)
+ public DeriveMaxDataErrorLogicalPlan(List primaryKeys, List remainingColumns, Dataset tempStagingDataset)
{
this.primaryKeys = primaryKeys;
this.remainingColumns = remainingColumns;
@@ -73,7 +73,7 @@ public LogicalPlan visitAllVersionsStrategy(AllVersionsStrategyAbstract allVersi
private LogicalPlan getLogicalPlanForDataErrorCheck(String versionField)
{
- String maxDataErrorAlias = DedupAndVersionErrorStatistics.MAX_DATA_ERRORS.name();
+ String maxDataErrorAlias = DedupAndVersionErrorSqlType.MAX_DATA_ERRORS.name();
String distinctRowCount = "legend_persistence_distinct_rows";
List pKsAndVersion = new ArrayList<>();
for (String pk: primaryKeys)
diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/logicalplan/LogicalPlanFactory.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/logicalplan/LogicalPlanFactory.java
index 9199836cb2d..a20ddfd49be 100644
--- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/logicalplan/LogicalPlanFactory.java
+++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/logicalplan/LogicalPlanFactory.java
@@ -15,6 +15,7 @@
package org.finos.legend.engine.persistence.components.logicalplan;
import org.finos.legend.engine.persistence.components.common.Datasets;
+import org.finos.legend.engine.persistence.components.logicalplan.conditions.Equals;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.CsvExternalDatasetReference;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Selection;
@@ -116,4 +117,14 @@ public static LogicalPlan getLogicalPlanForMaxOfField(Dataset dataset, String fi
.source(dataset).build();
return LogicalPlan.builder().addOps(selection).build();
}
+
+ public static LogicalPlan getLogicalPlanForSelectAllFieldsWithStringFieldEquals(FieldValue field, String fieldValue)
+ {
+ Selection selection = Selection.builder()
+ .addFields(All.INSTANCE)
+ .source(field.datasetRef())
+ .condition(Equals.of(field, StringValue.of(fieldValue)))
+ .build();
+ return LogicalPlan.builder().addOps(selection).build();
+ }
}
diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/logicalplan/datasets/SelectionAbstract.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/logicalplan/datasets/SelectionAbstract.java
index 917525b2fc8..60530bc2288 100644
--- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/logicalplan/datasets/SelectionAbstract.java
+++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/logicalplan/datasets/SelectionAbstract.java
@@ -47,6 +47,8 @@ public interface SelectionAbstract extends Dataset, Operation
Optional> groupByFields();
+ Optional havingCondition();
+
Optional alias();
Optional limit();
diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/logicalplan/datasets/StagedFilesDatasetProperties.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/logicalplan/datasets/StagedFilesDatasetProperties.java
index 8dae01e0dc5..b6270d52b60 100644
--- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/logicalplan/datasets/StagedFilesDatasetProperties.java
+++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/logicalplan/datasets/StagedFilesDatasetProperties.java
@@ -24,6 +24,12 @@ public interface StagedFilesDatasetProperties
List filePatterns();
+ @Value.Derived
+ default boolean validationModeSupported()
+ {
+ return false;
+ }
+
@Value.Check
default void validate()
{
diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/logicalplan/operations/CopyAbstract.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/logicalplan/operations/CopyAbstract.java
index 82e5876ed42..cadd43c5839 100644
--- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/logicalplan/operations/CopyAbstract.java
+++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/logicalplan/operations/CopyAbstract.java
@@ -21,7 +21,6 @@
import java.util.List;
import static org.immutables.value.Value.Immutable;
-import static org.immutables.value.Value.Parameter;
import static org.immutables.value.Value.Style;
@Immutable
@@ -34,15 +33,17 @@
)
public interface CopyAbstract extends Operation
{
- @Parameter(order = 0)
Dataset targetDataset();
- @Parameter(order = 1)
Dataset sourceDataset();
- @Parameter(order = 2)
List fields();
- @Parameter(order = 3)
StagedFilesDatasetProperties stagedFilesDatasetProperties();
+
+ @org.immutables.value.Value.Default
+ default boolean validationMode()
+ {
+ return false;
+ }
}
diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/logicalplan/values/MetadataFileNameFieldAbstract.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/logicalplan/values/MetadataFileNameFieldAbstract.java
new file mode 100644
index 00000000000..95e1a9dff6d
--- /dev/null
+++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/logicalplan/values/MetadataFileNameFieldAbstract.java
@@ -0,0 +1,33 @@
+// Copyright 2024 Goldman Sachs
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package org.finos.legend.engine.persistence.components.logicalplan.values;
+
+import org.finos.legend.engine.persistence.components.logicalplan.datasets.StagedFilesDatasetProperties;
+
+import static org.immutables.value.Value.Immutable;
+import static org.immutables.value.Value.Style;
+
+@Immutable
+@Style(
+ typeAbstract = "*Abstract",
+ typeImmutable = "*",
+ jdkOnly = true,
+ optionalAcceptNullable = true,
+ strictBuilder = true
+)
+public interface MetadataFileNameFieldAbstract extends Value
+{
+ StagedFilesDatasetProperties stagedFilesDatasetProperties();
+}
diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/logicalplan/values/MetadataRowNumberFieldAbstract.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/logicalplan/values/MetadataRowNumberFieldAbstract.java
new file mode 100644
index 00000000000..23f0e7fdfa6
--- /dev/null
+++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/logicalplan/values/MetadataRowNumberFieldAbstract.java
@@ -0,0 +1,33 @@
+// Copyright 2024 Goldman Sachs
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package org.finos.legend.engine.persistence.components.logicalplan.values;
+
+import org.finos.legend.engine.persistence.components.logicalplan.datasets.StagedFilesDatasetProperties;
+
+import static org.immutables.value.Value.Immutable;
+import static org.immutables.value.Value.Style;
+
+@Immutable
+@Style(
+ typeAbstract = "*Abstract",
+ typeImmutable = "*",
+ jdkOnly = true,
+ optionalAcceptNullable = true,
+ strictBuilder = true
+)
+public interface MetadataRowNumberFieldAbstract extends Value
+{
+ StagedFilesDatasetProperties stagedFilesDatasetProperties();
+}
diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/logicalplan/values/TryCastFunctionAbstract.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/logicalplan/values/TryCastFunctionAbstract.java
new file mode 100644
index 00000000000..11222de8584
--- /dev/null
+++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/logicalplan/values/TryCastFunctionAbstract.java
@@ -0,0 +1,38 @@
+// Copyright 2024 Goldman Sachs
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package org.finos.legend.engine.persistence.components.logicalplan.values;
+
+import org.finos.legend.engine.persistence.components.logicalplan.datasets.FieldType;
+
+import static org.immutables.value.Value.Immutable;
+import static org.immutables.value.Value.Parameter;
+import static org.immutables.value.Value.Style;
+
+@Immutable
+@Style(
+ typeAbstract = "*Abstract",
+ typeImmutable = "*",
+ jdkOnly = true,
+ optionalAcceptNullable = true,
+ strictBuilder = true
+)
+public interface TryCastFunctionAbstract extends Value
+{
+ @Parameter(order = 0)
+ Value field();
+
+ @Parameter(order = 1)
+ FieldType type();
+}
diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/planner/BulkLoadPlanner.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/planner/BulkLoadPlanner.java
index ad882e365b8..9a9b9b57a06 100644
--- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/planner/BulkLoadPlanner.java
+++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/planner/BulkLoadPlanner.java
@@ -14,6 +14,8 @@
package org.finos.legend.engine.persistence.components.planner;
+import org.eclipse.collections.api.tuple.Pair;
+import org.eclipse.collections.impl.tuple.Tuples;
import org.finos.legend.engine.persistence.components.common.Datasets;
import org.finos.legend.engine.persistence.components.common.Resources;
import org.finos.legend.engine.persistence.components.common.StatisticName;
@@ -21,16 +23,26 @@
import org.finos.legend.engine.persistence.components.ingestmode.audit.AuditingVisitors;
import org.finos.legend.engine.persistence.components.ingestmode.digest.DigestGenerationHandler;
import org.finos.legend.engine.persistence.components.logicalplan.LogicalPlan;
+import org.finos.legend.engine.persistence.components.logicalplan.conditions.And;
import org.finos.legend.engine.persistence.components.logicalplan.conditions.Condition;
+import org.finos.legend.engine.persistence.components.logicalplan.conditions.IsNull;
+import org.finos.legend.engine.persistence.components.logicalplan.conditions.Not;
+import org.finos.legend.engine.persistence.components.logicalplan.conditions.Or;
+import org.finos.legend.engine.persistence.components.logicalplan.datasets.DataType;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.ExternalDataset;
+import org.finos.legend.engine.persistence.components.logicalplan.datasets.DatasetDefinition;
+import org.finos.legend.engine.persistence.components.logicalplan.datasets.FieldType;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Field;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.StagedFilesSelection;
+import org.finos.legend.engine.persistence.components.logicalplan.operations.Delete;
import org.finos.legend.engine.persistence.components.logicalplan.operations.Drop;
import org.finos.legend.engine.persistence.components.logicalplan.operations.Insert;
import org.finos.legend.engine.persistence.components.logicalplan.values.BulkLoadBatchStatusValue;
import org.finos.legend.engine.persistence.components.logicalplan.values.FunctionImpl;
import org.finos.legend.engine.persistence.components.logicalplan.values.FunctionName;
import org.finos.legend.engine.persistence.components.logicalplan.values.All;
+import org.finos.legend.engine.persistence.components.logicalplan.values.MetadataFileNameField;
+import org.finos.legend.engine.persistence.components.logicalplan.values.MetadataRowNumberField;
import org.finos.legend.engine.persistence.components.logicalplan.values.StringValue;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Selection;
@@ -38,26 +50,34 @@
import org.finos.legend.engine.persistence.components.logicalplan.operations.Create;
import org.finos.legend.engine.persistence.components.logicalplan.operations.Copy;
import org.finos.legend.engine.persistence.components.logicalplan.operations.Operation;
+import org.finos.legend.engine.persistence.components.logicalplan.values.TryCastFunction;
import org.finos.legend.engine.persistence.components.logicalplan.values.Value;
import org.finos.legend.engine.persistence.components.logicalplan.values.BatchStartTimestamp;
import org.finos.legend.engine.persistence.components.logicalplan.values.FieldValue;
+import org.finos.legend.engine.persistence.components.util.ValidationCategory;
import org.finos.legend.engine.persistence.components.util.Capability;
import org.finos.legend.engine.persistence.components.util.LogicalPlanUtils;
+import org.finos.legend.engine.persistence.components.util.TableNameGenUtils;
import java.util.*;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static org.finos.legend.engine.persistence.components.common.StatisticName.ROWS_INSERTED;
-import static org.finos.legend.engine.persistence.components.util.LogicalPlanUtils.TEMP_DATASET_BASE_NAME;
-import static org.finos.legend.engine.persistence.components.util.LogicalPlanUtils.UNDERSCORE;
+import static org.finos.legend.engine.persistence.components.util.TableNameGenUtils.TEMP_DATASET_ALIAS;
+import static org.finos.legend.engine.persistence.components.util.TableNameGenUtils.TEMP_DATASET_QUALIFIER;
class BulkLoadPlanner extends Planner
{
private boolean transformWhileCopy;
private Dataset externalDataset;
+ private Dataset validationDataset;
private StagedFilesDataset stagedFilesDataset;
+ private static final String FILE = "legend_persistence_file";
+ private static final String ROW_NUMBER = "legend_persistence_row_number";
+
BulkLoadPlanner(Datasets datasets, BulkLoad ingestMode, PlannerOptions plannerOptions, Set capabilities)
{
super(datasets, ingestMode, plannerOptions, capabilities);
@@ -74,14 +94,20 @@ class BulkLoadPlanner extends Planner
transformWhileCopy = capabilities.contains(Capability.TRANSFORM_WHILE_COPY);
if (!transformWhileCopy)
{
+ String externalDatasetName = TableNameGenUtils.generateTableName(datasets.mainDataset().datasetReference().name().orElseThrow((IllegalStateException::new)), TEMP_DATASET_QUALIFIER, options().ingestRunId());
externalDataset = ExternalDataset.builder()
.stagedFilesDataset(stagedFilesDataset)
.database(datasets.mainDataset().datasetReference().database())
.group(datasets.mainDataset().datasetReference().group())
- .name(datasets.mainDataset().datasetReference().name().orElseThrow((IllegalStateException::new)) + UNDERSCORE + TEMP_DATASET_BASE_NAME)
- .alias(TEMP_DATASET_BASE_NAME)
+ .name(externalDatasetName)
+ .alias(TEMP_DATASET_ALIAS)
.build();
}
+
+ if (capabilities.contains(Capability.DRY_RUN))
+ {
+ validationDataset = stagedFilesDataset.stagedFilesDatasetProperties().validationModeSupported() ? getValidationModeDataset() : getGenericValidationDataset();
+ }
}
private void validateNoPrimaryKeysInStageAndMain()
@@ -112,6 +138,137 @@ public LogicalPlan buildLogicalPlanForIngest(Resources resources)
}
}
+ /*
+ ------------------
+ Validation Mode Logic:
+ ------------------
+ COPY INTO temp_table (data_columns)
+ SELECT data_columns from staging
+ WITH VALIDATION_MODE = true
+
+ ------------------
+ Generic Approach Logic:
+ ------------------
+ modified_data_columns: nullable data_columns with String datatype
+ meta_columns: file_name, row_number
+
+ COPY INTO temp_table (modified_data_columns, meta_columns)
+ SELECT modified_data_columns, meta_columns from staging
+ */
+ @Override
+ public LogicalPlan buildLogicalPlanForDryRun(Resources resources)
+ {
+ if (!capabilities.contains(Capability.DRY_RUN))
+ {
+ return LogicalPlan.of(Collections.emptyList());
+ }
+
+ List operations = new ArrayList<>();
+
+ if (stagedFilesDataset.stagedFilesDatasetProperties().validationModeSupported())
+ {
+ Copy copy = Copy.builder()
+ .targetDataset(validationDataset)
+ .sourceDataset(stagedFilesDataset.datasetReference().withAlias(""))
+ .stagedFilesDatasetProperties(stagedFilesDataset.stagedFilesDatasetProperties())
+ .validationMode(true)
+ .build();
+ operations.add(copy);
+ }
+ else
+ {
+ operations.add(Delete.builder().dataset(validationDataset).build());
+
+ List fieldsToSelect = LogicalPlanUtils.extractStagedFilesFieldValuesWithVarCharType(stagingDataset());
+ fieldsToSelect.add(MetadataFileNameField.builder().stagedFilesDatasetProperties(stagedFilesDataset.stagedFilesDatasetProperties()).build());
+ fieldsToSelect.add(MetadataRowNumberField.builder().stagedFilesDatasetProperties(stagedFilesDataset.stagedFilesDatasetProperties()).build());
+
+ List fieldsToInsert = new ArrayList<>(stagingDataset().schemaReference().fieldValues());
+ fieldsToInsert.add(FieldValue.builder().fieldName(FILE).datasetRef(stagingDataset().datasetReference()).build());
+ fieldsToInsert.add(FieldValue.builder().fieldName(ROW_NUMBER).datasetRef(stagingDataset().datasetReference()).build());
+
+ Dataset selectStage = StagedFilesSelection.builder().source(stagedFilesDataset).addAllFields(fieldsToSelect).build();
+
+ Copy copy = Copy.builder()
+ .targetDataset(validationDataset)
+ .sourceDataset(selectStage)
+ .addAllFields(fieldsToInsert)
+ .stagedFilesDatasetProperties(stagedFilesDataset.stagedFilesDatasetProperties())
+ .validationMode(false)
+ .build();
+ operations.add(copy);
+ }
+ return LogicalPlan.of(operations);
+ }
+
+ /*
+ ------------------
+ Validation Mode Logic:
+ ------------------
+ NOT APPLICABLE
+
+ ------------------
+ Generic Approach Logic:
+ ------------------
+ For null values:
+ SELECT * FROM temp_table WHERE
+ (non_nullable_data_column_1 = NULL
+ OR non_nullable_data_column_2 = NULL
+ OR ...)
+
+ For datatype conversion:
+ SELECT * FROM temp_table WHERE (non_string_data_column_1 != NULL AND TRY_CAST(non_string_data_column_1 AS datatype) = NULL)
+ SELECT * FROM temp_table WHERE (non_string_data_column_2 != NULL AND TRY_CAST(non_string_data_column_2 AS datatype) = NULL)
+ ...
+ */
+ public Map, LogicalPlan>>> buildLogicalPlanForDryRunValidation(Resources resources)
+ {
+ if (!capabilities.contains(Capability.DRY_RUN) || stagedFilesDataset.stagedFilesDatasetProperties().validationModeSupported())
+ {
+ return Collections.emptyMap();
+ }
+ Map, LogicalPlan>>> validationMap = new HashMap<>();
+ List fieldsToCheckForNull = stagingDataset().schema().fields().stream().filter(field -> !field.nullable()).collect(Collectors.toList());
+ List fieldsToCheckForDatatype = stagingDataset().schema().fields().stream().filter(field -> !DataType.isStringDatatype(field.type().dataType())).collect(Collectors.toList());
+
+ if (!fieldsToCheckForNull.isEmpty())
+ {
+ Selection queryForNull = Selection.builder()
+ .source(validationDataset)
+ .condition(Or.of(fieldsToCheckForNull.stream().map(field ->
+ IsNull.of(FieldValue.builder().fieldName(field.name()).datasetRef(validationDataset.datasetReference()).build()))
+ .collect(Collectors.toList())))
+ .limit(options().sampleRowCount())
+ .build();
+
+ validationMap.put(ValidationCategory.NULL_VALUE,
+ Collections.singletonList(Tuples.pair(fieldsToCheckForNull.stream().map(field -> FieldValue.builder().fieldName(field.name()).datasetRef(validationDataset.datasetReference()).build()).collect(Collectors.toSet()),
+ LogicalPlan.of(Collections.singletonList(queryForNull)))));
+ }
+
+ if (!fieldsToCheckForDatatype.isEmpty())
+ {
+ validationMap.put(ValidationCategory.TYPE_CONVERSION, new ArrayList<>());
+
+ for (Field fieldToCheckForDatatype : fieldsToCheckForDatatype)
+ {
+ Selection queryForDatatype = Selection.builder()
+ .source(validationDataset)
+ .condition(And.builder()
+ .addConditions(Not.of(IsNull.of(FieldValue.builder().fieldName(fieldToCheckForDatatype.name()).datasetRef(validationDataset.datasetReference()).build())))
+ .addConditions(IsNull.of(TryCastFunction.of(FieldValue.builder().fieldName(fieldToCheckForDatatype.name()).datasetRef(validationDataset.datasetReference()).build(), fieldToCheckForDatatype.type())))
+ .build())
+ .limit(options().sampleRowCount())
+ .build();
+
+ validationMap.get(ValidationCategory.TYPE_CONVERSION).add(Tuples.pair(Stream.of(fieldToCheckForDatatype).map(field -> FieldValue.builder().fieldName(field.name()).datasetRef(validationDataset.datasetReference()).build()).collect(Collectors.toSet()),
+ LogicalPlan.of(Collections.singletonList(queryForDatatype))));
+ }
+ }
+
+ return validationMap;
+ }
+
private LogicalPlan buildLogicalPlanForTransformWhileCopy(Resources resources)
{
List fieldsToSelect = LogicalPlanUtils.extractStagedFilesFieldValues(stagingDataset());
@@ -131,7 +288,13 @@ private LogicalPlan buildLogicalPlanForTransformWhileCopy(Resources resources)
}
Dataset selectStage = StagedFilesSelection.builder().source(stagedFilesDataset).addAllFields(fieldsToSelect).build();
- return LogicalPlan.of(Collections.singletonList(Copy.of(mainDataset(), selectStage, fieldsToInsert, stagedFilesDataset.stagedFilesDatasetProperties())));
+ return LogicalPlan.of(Collections.singletonList(
+ Copy.builder()
+ .targetDataset(mainDataset())
+ .sourceDataset(selectStage)
+ .addAllFields(fieldsToInsert)
+ .stagedFilesDatasetProperties(stagedFilesDataset.stagedFilesDatasetProperties())
+ .build()));
}
private LogicalPlan buildLogicalPlanForCopyAndTransform(Resources resources)
@@ -176,6 +339,17 @@ public LogicalPlan buildLogicalPlanForPreActions(Resources resources)
return LogicalPlan.of(operations);
}
+ @Override
+ public LogicalPlan buildLogicalPlanForDryRunPreActions(Resources resources)
+ {
+ List operations = new ArrayList<>();
+ if (capabilities.contains(Capability.DRY_RUN))
+ {
+ operations.add(Create.of(true, validationDataset));
+ }
+ return LogicalPlan.of(operations);
+ }
+
@Override
public LogicalPlan buildLogicalPlanForPostActions(Resources resources)
{
@@ -195,6 +369,17 @@ public LogicalPlan buildLogicalPlanForPostCleanup(Resources resources)
return LogicalPlan.of(operations);
}
+ @Override
+ public LogicalPlan buildLogicalPlanForDryRunPostCleanup(Resources resources)
+ {
+ List operations = new ArrayList<>();
+ if (capabilities.contains(Capability.DRY_RUN))
+ {
+ operations.add(Drop.of(true, validationDataset, false));
+ }
+ return LogicalPlan.of(operations);
+ }
+
@Override
List getDigestOrRemainingColumns()
{
@@ -248,4 +433,33 @@ protected void addPostRunStatsForRowsDeleted(Map pos
{
// Not supported at the moment
}
+
+ private Dataset getValidationModeDataset()
+ {
+ String tableName = mainDataset().datasetReference().name().orElseThrow((IllegalStateException::new));
+ String validationDatasetName = TableNameGenUtils.generateTableName(tableName, "validation", options().ingestRunId());
+ return DatasetDefinition.builder()
+ .schema(stagedFilesDataset.schema())
+ .database(mainDataset().datasetReference().database())
+ .group(mainDataset().datasetReference().group())
+ .name(validationDatasetName)
+ .build();
+ }
+
+ private Dataset getGenericValidationDataset()
+ {
+ String tableName = mainDataset().datasetReference().name().orElseThrow((IllegalStateException::new));
+ String validationDatasetName = TableNameGenUtils.generateTableName(tableName, "validation", options().ingestRunId());
+
+ List fields = stagedFilesDataset.schema().fields().stream().map(field -> field.withType(FieldType.builder().dataType(DataType.VARCHAR).build()).withNullable(true)).collect(Collectors.toList());
+ fields.add(Field.builder().name(FILE).type(FieldType.builder().dataType(DataType.VARCHAR).build()).build());
+ fields.add(Field.builder().name(ROW_NUMBER).type(FieldType.builder().dataType(DataType.BIGINT).build()).build());
+
+ return DatasetDefinition.builder()
+ .schema(stagedFilesDataset.schema().withFields(fields))
+ .database(mainDataset().datasetReference().database())
+ .group(mainDataset().datasetReference().group())
+ .name(validationDatasetName)
+ .build();
+ }
}
diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/planner/Planner.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/planner/Planner.java
index 65c3f9fe32e..fd6878c393f 100644
--- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/planner/Planner.java
+++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/planner/Planner.java
@@ -18,9 +18,10 @@
import java.util.function.Consumer;
import java.util.stream.Collectors;
+import org.eclipse.collections.api.tuple.Pair;
import org.finos.legend.engine.persistence.components.common.Datasets;
+import org.finos.legend.engine.persistence.components.common.DedupAndVersionErrorSqlType;
import org.finos.legend.engine.persistence.components.common.Resources;
-import org.finos.legend.engine.persistence.components.common.DedupAndVersionErrorStatistics;
import org.finos.legend.engine.persistence.components.common.StatisticName;
import org.finos.legend.engine.persistence.components.ingestmode.IngestMode;
import org.finos.legend.engine.persistence.components.ingestmode.audit.AuditingVisitor;
@@ -32,6 +33,7 @@
import org.finos.legend.engine.persistence.components.logicalplan.LogicalPlan;
import org.finos.legend.engine.persistence.components.logicalplan.LogicalPlanFactory;
import org.finos.legend.engine.persistence.components.logicalplan.conditions.Condition;
+import org.finos.legend.engine.persistence.components.logicalplan.conditions.GreaterThan;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.DerivedDataset;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Field;
@@ -43,6 +45,7 @@
import org.finos.legend.engine.persistence.components.logicalplan.values.All;
import org.finos.legend.engine.persistence.components.logicalplan.values.BatchEndTimestamp;
import org.finos.legend.engine.persistence.components.logicalplan.values.BatchStartTimestamp;
+import org.finos.legend.engine.persistence.components.logicalplan.values.FieldValue;
import org.finos.legend.engine.persistence.components.logicalplan.values.FunctionImpl;
import org.finos.legend.engine.persistence.components.logicalplan.values.ObjectValue;
import org.finos.legend.engine.persistence.components.logicalplan.values.StringValue;
@@ -52,6 +55,7 @@
import org.finos.legend.engine.persistence.components.util.LogicalPlanUtils;
import org.finos.legend.engine.persistence.components.util.MetadataDataset;
import org.finos.legend.engine.persistence.components.util.MetadataUtils;
+import org.finos.legend.engine.persistence.components.util.ValidationCategory;
import java.util.ArrayList;
import java.util.List;
@@ -60,6 +64,7 @@
import java.util.Set;
import java.util.Collections;
import java.util.HashMap;
+import java.util.UUID;
import static org.finos.legend.engine.persistence.components.common.StatisticName.INCOMING_RECORD_COUNT;
import static org.finos.legend.engine.persistence.components.common.StatisticName.ROWS_DELETED;
@@ -124,6 +129,18 @@ default String batchSuccessStatusValue()
{
return MetadataUtils.MetaTableStatus.DONE.toString();
}
+
+ @Default
+ default int sampleRowCount()
+ {
+ return 20;
+ }
+
+ @Default
+ default String ingestRunId()
+ {
+ return UUID.randomUUID().toString();
+ }
}
private final Datasets datasets;
@@ -172,7 +189,7 @@ private Optional getTempStagingDataset()
Optional tempStagingDataset = Optional.empty();
if (isTempTableNeededForStaging)
{
- tempStagingDataset = Optional.of(LogicalPlanUtils.getTempStagingDatasetDefinition(originalStagingDataset(), ingestMode));
+ tempStagingDataset = Optional.of(LogicalPlanUtils.getTempStagingDatasetDefinition(originalStagingDataset(), ingestMode, options().ingestRunId()));
}
return tempStagingDataset;
}
@@ -256,6 +273,26 @@ protected PlannerOptions options()
public abstract LogicalPlan buildLogicalPlanForIngest(Resources resources);
+ public LogicalPlan buildLogicalPlanForDryRun(Resources resources)
+ {
+ return LogicalPlan.of(Collections.emptyList());
+ }
+
+ public Map, LogicalPlan>>> buildLogicalPlanForDryRunValidation(Resources resources)
+ {
+ return Collections.emptyMap();
+ }
+
+ public LogicalPlan buildLogicalPlanForDryRunPreActions(Resources resources)
+ {
+ return LogicalPlan.of(Collections.emptyList());
+ }
+
+ public LogicalPlan buildLogicalPlanForDryRunPostCleanup(Resources resources)
+ {
+ return LogicalPlan.of(Collections.emptyList());
+ }
+
public LogicalPlan buildLogicalPlanForMetadataIngest(Resources resources)
{
// Save staging filters into batch_source_info column
@@ -380,41 +417,65 @@ public Map buildLogicalPlanForPostRunStatistics(Reso
return postRunStatisticsResult;
}
- public Map buildLogicalPlanForDeduplicationAndVersioningErrorChecks(Resources resources)
+ public Map buildLogicalPlanForDeduplicationAndVersioningErrorChecks(Resources resources)
{
- Map dedupAndVersioningErrorChecks = new HashMap<>();
+ Map dedupAndVersioningErrorChecks = new HashMap<>();
addMaxDuplicatesErrorCheck(dedupAndVersioningErrorChecks);
addDataErrorCheck(dedupAndVersioningErrorChecks);
return dedupAndVersioningErrorChecks;
}
- protected void addMaxDuplicatesErrorCheck(Map dedupAndVersioningErrorChecks)
+ protected void addMaxDuplicatesErrorCheck(Map dedupAndVersioningErrorChecks)
{
if (ingestMode.deduplicationStrategy() instanceof FailOnDuplicates)
{
+ FieldValue count = FieldValue.builder().datasetRef(tempStagingDataset().datasetReference()).fieldName(COUNT).build();
FunctionImpl maxCount = FunctionImpl.builder()
.functionName(FunctionName.MAX)
- .addValue(FieldValue.builder().datasetRef(tempStagingDataset().datasetReference()).fieldName(COUNT).build())
- .alias(DedupAndVersionErrorStatistics.MAX_DUPLICATES.name())
+ .addValue(count)
+ .alias(DedupAndVersionErrorSqlType.MAX_DUPLICATES.name())
.build();
Selection selectMaxDupsCount = Selection.builder()
.source(tempStagingDataset())
.addFields(maxCount)
.build();
LogicalPlan maxDuplicatesCountPlan = LogicalPlan.builder().addOps(selectMaxDupsCount).build();
- dedupAndVersioningErrorChecks.put(DedupAndVersionErrorStatistics.MAX_DUPLICATES, maxDuplicatesCountPlan);
+ dedupAndVersioningErrorChecks.put(DedupAndVersionErrorSqlType.MAX_DUPLICATES, maxDuplicatesCountPlan);
+
+ /*
+ select pks from tempStagingDataset where COUNT > 1
+ */
+ List rowsToSelect = this.primaryKeys.stream().map(field -> FieldValue.builder().fieldName(field).build()).collect(Collectors.toList());
+ if (rowsToSelect.size() > 0)
+ {
+ rowsToSelect.add(FieldValue.builder().fieldName(COUNT).build());
+ Selection selectDuplicatesRows = Selection.builder()
+ .source(tempStagingDataset())
+ .addAllFields(rowsToSelect)
+ .condition(GreaterThan.of(count, ObjectValue.of(1)))
+ .limit(options().sampleRowCount())
+ .build();
+ LogicalPlan selectDuplicatesRowsPlan = LogicalPlan.builder().addOps(selectDuplicatesRows).build();
+ dedupAndVersioningErrorChecks.put(DedupAndVersionErrorSqlType.DUPLICATE_ROWS, selectDuplicatesRowsPlan);
+ }
}
}
- protected void addDataErrorCheck(Map dedupAndVersioningErrorChecks)
+ protected void addDataErrorCheck(Map dedupAndVersioningErrorChecks)
{
List remainingColumns = getDigestOrRemainingColumns();
if (ingestMode.versioningStrategy().accept(VersioningVisitors.IS_TEMP_TABLE_NEEDED))
{
- LogicalPlan logicalPlan = ingestMode.versioningStrategy().accept(new DeriveDataErrorCheckLogicalPlan(primaryKeys, remainingColumns, tempStagingDataset()));
- if (logicalPlan != null)
+ LogicalPlan logicalPlanForDataErrorCheck = ingestMode.versioningStrategy().accept(new DeriveMaxDataErrorLogicalPlan(primaryKeys, remainingColumns, tempStagingDataset()));
+ if (logicalPlanForDataErrorCheck != null)
+ {
+ dedupAndVersioningErrorChecks.put(DedupAndVersionErrorSqlType.MAX_DATA_ERRORS, logicalPlanForDataErrorCheck);
+ }
+
+ LogicalPlan logicalPlanForDataErrors = ingestMode.versioningStrategy().accept(new DeriveDataErrorRowsLogicalPlan(primaryKeys, remainingColumns, tempStagingDataset(), options().sampleRowCount()));
+ if (logicalPlanForDataErrors != null)
{
- dedupAndVersioningErrorChecks.put(DedupAndVersionErrorStatistics.MAX_DATA_ERRORS, logicalPlan);
+ dedupAndVersioningErrorChecks.put(DedupAndVersionErrorSqlType.DATA_ERROR_ROWS, logicalPlanForDataErrors);
}
}
}
diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/util/Capability.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/util/Capability.java
index f99f9f94a23..438dd9219ac 100644
--- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/util/Capability.java
+++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/util/Capability.java
@@ -22,5 +22,6 @@ public enum Capability
EXPLICIT_DATA_TYPE_CONVERSION,
DATA_TYPE_LENGTH_CHANGE,
DATA_TYPE_SCALE_CHANGE,
- TRANSFORM_WHILE_COPY;
+ TRANSFORM_WHILE_COPY,
+ DRY_RUN
}
diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/util/LogicalPlanUtils.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/util/LogicalPlanUtils.java
index ba83a307734..da880eee09c 100644
--- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/util/LogicalPlanUtils.java
+++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/util/LogicalPlanUtils.java
@@ -62,7 +62,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
-import java.util.UUID;
import java.util.stream.Collectors;
import static org.finos.legend.engine.persistence.components.logicalplan.datasets.DataType.BIGINT;
@@ -72,10 +71,13 @@
import static org.finos.legend.engine.persistence.components.logicalplan.datasets.DataType.FLOAT;
import static org.finos.legend.engine.persistence.components.logicalplan.datasets.DataType.INT;
import static org.finos.legend.engine.persistence.components.logicalplan.datasets.DataType.INTEGER;
+import static org.finos.legend.engine.persistence.components.logicalplan.datasets.DataType.VARCHAR;
import static org.finos.legend.engine.persistence.components.util.MetadataUtils.BATCH_SOURCE_INFO_BULK_LOAD_EVENT_ID;
import static org.finos.legend.engine.persistence.components.util.MetadataUtils.BATCH_SOURCE_INFO_FILE_PATHS;
import static org.finos.legend.engine.persistence.components.util.MetadataUtils.BATCH_SOURCE_INFO_FILE_PATTERNS;
import static org.finos.legend.engine.persistence.components.util.MetadataUtils.BATCH_SOURCE_INFO_STAGING_FILTERS;
+import static org.finos.legend.engine.persistence.components.util.TableNameGenUtils.TEMP_STAGING_DATASET_ALIAS;
+import static org.finos.legend.engine.persistence.components.util.TableNameGenUtils.TEMP_STAGING_DATASET_QUALIFIER;
public class LogicalPlanUtils
@@ -86,19 +88,12 @@ public class LogicalPlanUtils
public static final String DATA_SPLIT_UPPER_BOUND_PLACEHOLDER = "{DATA_SPLIT_UPPER_BOUND_PLACEHOLDER}";
public static final String UNDERSCORE = "_";
public static final String TEMP_DATASET_BASE_NAME = "legend_persistence_temp";
- public static final String TEMP_STAGING_DATASET_BASE_NAME = "legend_persistence_temp_staging";
public static final String TEMP_DATASET_WITH_DELETE_INDICATOR_BASE_NAME = "legend_persistence_tempWithDeleteIndicator";
private LogicalPlanUtils()
{
}
- public static String generateTableNameWithSuffix(String tableName, String suffix)
- {
- UUID uuid = UUID.randomUUID();
- return tableName + UNDERSCORE + suffix + UNDERSCORE + uuid;
- }
-
public static Value INFINITE_BATCH_ID()
{
return InfiniteBatchIdValue.builder().build();
@@ -408,19 +403,35 @@ public static List extractStagedFilesFieldValues(Dataset dataset)
int iter = 1;
for (Field field : dataset.schema().fields())
{
- StagedFilesFieldValue fieldValue = StagedFilesFieldValue.builder()
- .columnNumber(columnNumbersPresent ? field.columnNumber().get() : iter++)
- .datasetRefAlias(dataset.datasetReference().alias())
- .alias(field.fieldAlias().isPresent() ? field.fieldAlias().get() : field.name())
- .elementPath(field.elementPath())
- .fieldType(field.type())
- .fieldName(field.name())
- .build();
- stagedFilesFields.add(fieldValue);
+ stagedFilesFields.add(getStagedFilesFieldValueWithType(dataset, field, field.type(), columnNumbersPresent, iter++));
+ }
+ return stagedFilesFields;
+ }
+
+ public static List extractStagedFilesFieldValuesWithVarCharType(Dataset dataset)
+ {
+ List stagedFilesFields = new ArrayList<>();
+ boolean columnNumbersPresent = dataset.schema().fields().stream().allMatch(field -> field.columnNumber().isPresent());
+ int iter = 1;
+ for (Field field : dataset.schema().fields())
+ {
+ stagedFilesFields.add(getStagedFilesFieldValueWithType(dataset, field, FieldType.builder().dataType(VARCHAR).build(), columnNumbersPresent, iter++));
}
return stagedFilesFields;
}
+ public static StagedFilesFieldValue getStagedFilesFieldValueWithType(Dataset dataset, Field field, FieldType fieldType, boolean columnNumbersPresent, int counter)
+ {
+ return StagedFilesFieldValue.builder()
+ .columnNumber(columnNumbersPresent ? field.columnNumber().get() : counter)
+ .datasetRefAlias(dataset.datasetReference().alias())
+ .alias(field.fieldAlias().isPresent() ? field.fieldAlias().get() : field.name())
+ .elementPath(field.elementPath())
+ .fieldType(fieldType)
+ .fieldName(field.name())
+ .build();
+ }
+
public static Dataset getTempDataset(Datasets datasets)
{
String mainDatasetName = datasets.mainDataset().datasetReference().name().orElseThrow((IllegalStateException::new));
@@ -455,10 +466,10 @@ public static Dataset getTempDatasetWithDeleteIndicator(Datasets datasets, Strin
}
}
- public static Dataset getTempStagingDatasetDefinition(Dataset stagingDataset, IngestMode ingestMode)
+ public static Dataset getTempStagingDatasetDefinition(Dataset stagingDataset, IngestMode ingestMode, String ingestRunId)
{
- String alias = stagingDataset.datasetReference().alias().orElse(TEMP_STAGING_DATASET_BASE_NAME);
- String datasetName = stagingDataset.datasetReference().name().orElseThrow(IllegalStateException::new) + UNDERSCORE + TEMP_STAGING_DATASET_BASE_NAME;
+ String alias = stagingDataset.datasetReference().alias().orElse(TEMP_STAGING_DATASET_ALIAS);
+ String datasetName = TableNameGenUtils.generateTableName(stagingDataset.datasetReference().name().orElseThrow(IllegalStateException::new), TEMP_STAGING_DATASET_QUALIFIER, ingestRunId);
SchemaDefinition tempStagingSchema = ingestMode.versioningStrategy().accept(new DeriveTempStagingSchemaDefinition(stagingDataset.schema(), ingestMode.deduplicationStrategy()));
return DatasetDefinition.builder()
.schema(tempStagingSchema)
diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/util/TableNameGenUtils.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/util/TableNameGenUtils.java
new file mode 100644
index 00000000000..27911dab26e
--- /dev/null
+++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/util/TableNameGenUtils.java
@@ -0,0 +1,40 @@
+// Copyright 2024 Goldman Sachs
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package org.finos.legend.engine.persistence.components.util;
+
+import static org.finos.legend.engine.persistence.components.util.LogicalPlanUtils.UNDERSCORE;
+
+public class TableNameGenUtils
+{
+ public static String LEGEND_PERSISTENCE_MARKER = "lp";
+ public static final String TEMP_DATASET_QUALIFIER = "temp";
+ public static final String TEMP_DATASET_ALIAS = "legend_persistence_temp";
+ public static final String TEMP_STAGING_DATASET_QUALIFIER = "temp_staging";
+ public static final String TEMP_STAGING_DATASET_ALIAS = "legend_persistence_temp_staging";
+
+ private static String generateTableSuffix(String ingestRunId)
+ {
+ int hashCode = Math.abs(ingestRunId.hashCode());
+ return LEGEND_PERSISTENCE_MARKER + UNDERSCORE + Integer.toString(hashCode, 36);
+ }
+
+ /*
+ Table name = __lp_
+ */
+ public static String generateTableName(String baseTableName, String qualifier, String ingestRunId)
+ {
+ return baseTableName + UNDERSCORE + qualifier + UNDERSCORE + generateTableSuffix(ingestRunId);
+ }
+}
diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/util/ValidationCategory.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/util/ValidationCategory.java
new file mode 100644
index 00000000000..c2fbb123b81
--- /dev/null
+++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/util/ValidationCategory.java
@@ -0,0 +1,21 @@
+// Copyright 2024 Goldman Sachs
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package org.finos.legend.engine.persistence.components.util;
+
+public enum ValidationCategory
+{
+ NULL_VALUE,
+ TYPE_CONVERSION
+}
diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-physical-plan/src/main/java/org/finos/legend/engine/persistence/components/executor/Executor.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-physical-plan/src/main/java/org/finos/legend/engine/persistence/components/executor/Executor.java
index 543a2076f4f..3ce2231fdc7 100644
--- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-physical-plan/src/main/java/org/finos/legend/engine/persistence/components/executor/Executor.java
+++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-physical-plan/src/main/java/org/finos/legend/engine/persistence/components/executor/Executor.java
@@ -31,6 +31,8 @@ public interface Executor executePhysicalPlanAndGetResults(P physicalPlan);
+ List executePhysicalPlanAndGetResults(P physicalPlan, int rows);
+
List executePhysicalPlanAndGetResults(P physicalPlan, Map placeholderKeyValues);
boolean datasetExists(Dataset dataset);
diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-physical-plan/src/main/java/org/finos/legend/engine/persistence/components/executor/RelationalExecutionHelper.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-physical-plan/src/main/java/org/finos/legend/engine/persistence/components/executor/RelationalExecutionHelper.java
index c347996f81f..3a6ae90fe35 100644
--- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-physical-plan/src/main/java/org/finos/legend/engine/persistence/components/executor/RelationalExecutionHelper.java
+++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-physical-plan/src/main/java/org/finos/legend/engine/persistence/components/executor/RelationalExecutionHelper.java
@@ -42,6 +42,8 @@ public interface RelationalExecutionHelper
void executeStatements(List sqls);
+ List