Skip to content

Commit

Permalink
Persistence Library: Dry run mode support for Bulk Load and Capture D…
Browse files Browse the repository at this point in the history
…ata Errors (#2642)

Co-authored-by: kumuwu <[email protected]>
  • Loading branch information
prasar-ashutosh and kumuwu authored Mar 13, 2024
1 parent 8e08a15 commit 20de9ff
Show file tree
Hide file tree
Showing 116 changed files with 4,132 additions and 653 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,15 @@
<artifactId>jackson-databind</artifactId>
</dependency>
<!-- JACKSON -->
<!-- ECLIPSE COLLECTIONS -->
<dependency>
<groupId>org.eclipse.collections</groupId>
<artifactId>eclipse-collections-api</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.collections</groupId>
<artifactId>eclipse-collections</artifactId>
</dependency>
<!-- ECLIPSE COLLECTIONS -->
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@

package org.finos.legend.engine.persistence.components.common;

public enum DedupAndVersionErrorStatistics
public enum DedupAndVersionErrorSqlType
{
MAX_DUPLICATES,
MAX_DATA_ERRORS;
DUPLICATE_ROWS,
MAX_DATA_ERRORS,
DATA_ERROR_ROWS;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright 2024 Goldman Sachs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.finos.legend.engine.persistence.components.ingestmode.versioning;

import org.finos.legend.engine.persistence.components.logicalplan.LogicalPlan;
import org.finos.legend.engine.persistence.components.logicalplan.conditions.GreaterThan;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Selection;
import org.finos.legend.engine.persistence.components.logicalplan.values.*;

import java.util.ArrayList;
import java.util.List;

public class DeriveDataErrorRowsLogicalPlan implements VersioningStrategyVisitor<LogicalPlan>
{
private List<String> primaryKeys;
private List<String> remainingColumns;
private Dataset tempStagingDataset;
private int sampleRowCount;

public static final String DATA_VERSION_ERROR_COUNT = "legend_persistence_error_count";

public DeriveDataErrorRowsLogicalPlan(List<String> primaryKeys, List<String> remainingColumns, Dataset tempStagingDataset, int sampleRowCount)
{
this.primaryKeys = primaryKeys;
this.remainingColumns = remainingColumns;
this.tempStagingDataset = tempStagingDataset;
this.sampleRowCount = sampleRowCount;
}

@Override
public LogicalPlan visitNoVersioningStrategy(NoVersioningStrategyAbstract noVersioningStrategy)
{
return null;
}

@Override
public LogicalPlan visitMaxVersionStrategy(MaxVersionStrategyAbstract maxVersionStrategy)
{
if (maxVersionStrategy.performStageVersioning())
{
return getLogicalPlanForDataErrors(maxVersionStrategy.versioningField());
}
else
{
return null;
}
}

@Override
public LogicalPlan visitAllVersionsStrategy(AllVersionsStrategyAbstract allVersionsStrategyAbstract)
{
if (allVersionsStrategyAbstract.performStageVersioning())
{
return getLogicalPlanForDataErrors(allVersionsStrategyAbstract.versioningField());
}
else
{
return null;
}
}

private LogicalPlan getLogicalPlanForDataErrors(String versionField)
{
List<Value> pKsAndVersion = new ArrayList<>();
for (String pk: primaryKeys)
{
pKsAndVersion.add(FieldValue.builder().fieldName(pk).build());
}
pKsAndVersion.add(FieldValue.builder().fieldName(versionField).build());

List<Value> distinctValueFields = new ArrayList<>();
for (String field: remainingColumns)
{
distinctValueFields.add(FieldValue.builder().fieldName(field).build());
}

FunctionImpl countDistinct = FunctionImpl.builder()
.functionName(FunctionName.COUNT)
.addValue(FunctionImpl.builder().functionName(FunctionName.DISTINCT).addAllValue(distinctValueFields).build())
.alias(DATA_VERSION_ERROR_COUNT)
.build();

Selection selectDataError = Selection.builder()
.source(tempStagingDataset)
.groupByFields(pKsAndVersion)
.addAllFields(pKsAndVersion)
.addFields(countDistinct)
.havingCondition(GreaterThan.of(FieldValue.builder().fieldName(DATA_VERSION_ERROR_COUNT).build(), ObjectValue.of(1)))
.limit(sampleRowCount)
.build();

return LogicalPlan.builder().addOps(selectDataError).build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

package org.finos.legend.engine.persistence.components.ingestmode.versioning;

import org.finos.legend.engine.persistence.components.common.DedupAndVersionErrorStatistics;
import org.finos.legend.engine.persistence.components.common.DedupAndVersionErrorSqlType;
import org.finos.legend.engine.persistence.components.logicalplan.LogicalPlan;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.*;
import org.finos.legend.engine.persistence.components.logicalplan.values.FieldValue;
Expand All @@ -25,14 +25,14 @@
import java.util.ArrayList;
import java.util.List;

public class DeriveDataErrorCheckLogicalPlan implements VersioningStrategyVisitor<LogicalPlan>
public class DeriveMaxDataErrorLogicalPlan implements VersioningStrategyVisitor<LogicalPlan>
{

List<String> primaryKeys;
List<String> remainingColumns;
Dataset tempStagingDataset;

public DeriveDataErrorCheckLogicalPlan(List<String> primaryKeys, List<String> remainingColumns, Dataset tempStagingDataset)
public DeriveMaxDataErrorLogicalPlan(List<String> primaryKeys, List<String> remainingColumns, Dataset tempStagingDataset)
{
this.primaryKeys = primaryKeys;
this.remainingColumns = remainingColumns;
Expand Down Expand Up @@ -73,7 +73,7 @@ public LogicalPlan visitAllVersionsStrategy(AllVersionsStrategyAbstract allVersi

private LogicalPlan getLogicalPlanForDataErrorCheck(String versionField)
{
String maxDataErrorAlias = DedupAndVersionErrorStatistics.MAX_DATA_ERRORS.name();
String maxDataErrorAlias = DedupAndVersionErrorSqlType.MAX_DATA_ERRORS.name();
String distinctRowCount = "legend_persistence_distinct_rows";
List<Value> pKsAndVersion = new ArrayList<>();
for (String pk: primaryKeys)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package org.finos.legend.engine.persistence.components.logicalplan;

import org.finos.legend.engine.persistence.components.common.Datasets;
import org.finos.legend.engine.persistence.components.logicalplan.conditions.Equals;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.CsvExternalDatasetReference;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Selection;
Expand Down Expand Up @@ -116,4 +117,14 @@ public static LogicalPlan getLogicalPlanForMaxOfField(Dataset dataset, String fi
.source(dataset).build();
return LogicalPlan.builder().addOps(selection).build();
}

public static LogicalPlan getLogicalPlanForSelectAllFieldsWithStringFieldEquals(FieldValue field, String fieldValue)
{
Selection selection = Selection.builder()
.addFields(All.INSTANCE)
.source(field.datasetRef())
.condition(Equals.of(field, StringValue.of(fieldValue)))
.build();
return LogicalPlan.builder().addOps(selection).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ public interface SelectionAbstract extends Dataset, Operation

Optional<List<Value>> groupByFields();

Optional<Condition> havingCondition();

Optional<String> alias();

Optional<Integer> limit();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ public interface StagedFilesDatasetProperties

List<String> filePatterns();

@Value.Derived
default boolean validationModeSupported()
{
return false;
}

@Value.Check
default void validate()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.List;

import static org.immutables.value.Value.Immutable;
import static org.immutables.value.Value.Parameter;
import static org.immutables.value.Value.Style;

@Immutable
Expand All @@ -34,15 +33,17 @@
)
public interface CopyAbstract extends Operation
{
@Parameter(order = 0)
Dataset targetDataset();

@Parameter(order = 1)
Dataset sourceDataset();

@Parameter(order = 2)
List<Value> fields();

@Parameter(order = 3)
StagedFilesDatasetProperties stagedFilesDatasetProperties();

@org.immutables.value.Value.Default
default boolean validationMode()
{
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2024 Goldman Sachs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.finos.legend.engine.persistence.components.logicalplan.values;

import org.finos.legend.engine.persistence.components.logicalplan.datasets.StagedFilesDatasetProperties;

import static org.immutables.value.Value.Immutable;
import static org.immutables.value.Value.Style;

@Immutable
@Style(
typeAbstract = "*Abstract",
typeImmutable = "*",
jdkOnly = true,
optionalAcceptNullable = true,
strictBuilder = true
)
public interface MetadataFileNameFieldAbstract extends Value
{
StagedFilesDatasetProperties stagedFilesDatasetProperties();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2024 Goldman Sachs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.finos.legend.engine.persistence.components.logicalplan.values;

import org.finos.legend.engine.persistence.components.logicalplan.datasets.StagedFilesDatasetProperties;

import static org.immutables.value.Value.Immutable;
import static org.immutables.value.Value.Style;

@Immutable
@Style(
typeAbstract = "*Abstract",
typeImmutable = "*",
jdkOnly = true,
optionalAcceptNullable = true,
strictBuilder = true
)
public interface MetadataRowNumberFieldAbstract extends Value
{
StagedFilesDatasetProperties stagedFilesDatasetProperties();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright 2024 Goldman Sachs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.finos.legend.engine.persistence.components.logicalplan.values;

import org.finos.legend.engine.persistence.components.logicalplan.datasets.FieldType;

import static org.immutables.value.Value.Immutable;
import static org.immutables.value.Value.Parameter;
import static org.immutables.value.Value.Style;

@Immutable
@Style(
typeAbstract = "*Abstract",
typeImmutable = "*",
jdkOnly = true,
optionalAcceptNullable = true,
strictBuilder = true
)
public interface TryCastFunctionAbstract extends Value
{
@Parameter(order = 0)
Value field();

@Parameter(order = 1)
FieldType type();
}
Loading

0 comments on commit 20de9ff

Please sign in to comment.