Skip to content

Commit

Permalink
Persistence Component: Introduce Fail on Duplicate Primary Keys (fino…
Browse files Browse the repository at this point in the history
  • Loading branch information
kumuwu authored Apr 30, 2024
1 parent 67d4768 commit 3902a5e
Show file tree
Hide file tree
Showing 29 changed files with 505 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ public enum DedupAndVersionErrorSqlType
{
MAX_DUPLICATES,
DUPLICATE_ROWS,
MAX_PK_DUPLICATES,
PK_DUPLICATE_ROWS,
MAX_DATA_ERRORS,
DATA_ERROR_ROWS;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright 2024 Goldman Sachs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.finos.legend.engine.persistence.components.ingestmode.versioning;

import org.finos.legend.engine.persistence.components.logicalplan.LogicalPlan;
import org.finos.legend.engine.persistence.components.logicalplan.conditions.GreaterThan;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Selection;
import org.finos.legend.engine.persistence.components.logicalplan.values.All;
import org.finos.legend.engine.persistence.components.logicalplan.values.FieldValue;
import org.finos.legend.engine.persistence.components.logicalplan.values.FunctionImpl;
import org.finos.legend.engine.persistence.components.logicalplan.values.FunctionName;
import org.finos.legend.engine.persistence.components.logicalplan.values.ObjectValue;
import org.finos.legend.engine.persistence.components.logicalplan.values.Value;

import java.util.List;
import java.util.stream.Collectors;

public class DeriveDuplicatePkRowsLogicalPlan implements VersioningStrategyVisitor<LogicalPlan>
{
private List<String> primaryKeys;
private Dataset tempStagingDataset;
private int sampleRowCount;

public static final String DUPLICATE_PK_COUNT = "legend_persistence_pk_count";

public DeriveDuplicatePkRowsLogicalPlan(List<String> primaryKeys, Dataset tempStagingDataset, int sampleRowCount)
{
this.primaryKeys = primaryKeys;
this.tempStagingDataset = tempStagingDataset;
this.sampleRowCount = sampleRowCount;
}

@Override
public LogicalPlan visitNoVersioningStrategy(NoVersioningStrategyAbstract noVersioningStrategy)
{
if (noVersioningStrategy.failOnDuplicatePrimaryKeys())
{
List<Value> pks = primaryKeys.stream().map(pkName -> FieldValue.builder().fieldName(pkName).build()).collect(Collectors.toList());

FunctionImpl count = FunctionImpl.builder()
.functionName(FunctionName.COUNT)
.addValue(All.INSTANCE)
.alias(DUPLICATE_PK_COUNT)
.build();

Selection selectDuplicatePks = Selection.builder()
.source(tempStagingDataset)
.groupByFields(pks)
.addAllFields(pks)
.addFields(count)
.havingCondition(GreaterThan.of(FieldValue.builder().fieldName(DUPLICATE_PK_COUNT).build(), ObjectValue.of(1)))
.limit(sampleRowCount)
.build();

return LogicalPlan.builder().addOps(selectDuplicatePks).build();
}
else
{
return null;
}
}

@Override
public LogicalPlan visitMaxVersionStrategy(MaxVersionStrategyAbstract maxVersionStrategy)
{
return null;
}

@Override
public LogicalPlan visitAllVersionsStrategy(AllVersionsStrategyAbstract allVersionsStrategyAbstract)
{
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright 2024 Goldman Sachs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.finos.legend.engine.persistence.components.ingestmode.versioning;

import org.finos.legend.engine.persistence.components.common.DedupAndVersionErrorSqlType;
import org.finos.legend.engine.persistence.components.logicalplan.LogicalPlan;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Selection;
import org.finos.legend.engine.persistence.components.logicalplan.values.All;
import org.finos.legend.engine.persistence.components.logicalplan.values.FieldValue;
import org.finos.legend.engine.persistence.components.logicalplan.values.FunctionImpl;
import org.finos.legend.engine.persistence.components.logicalplan.values.FunctionName;
import org.finos.legend.engine.persistence.components.logicalplan.values.Value;

import java.util.List;
import java.util.stream.Collectors;

public class DeriveMaxDuplicatePkCountLogicalPlan implements VersioningStrategyVisitor<LogicalPlan>
{

List<String> primaryKeys;
Dataset tempStagingDataset;

public DeriveMaxDuplicatePkCountLogicalPlan(List<String> primaryKeys, Dataset tempStagingDataset)
{
this.primaryKeys = primaryKeys;
this.tempStagingDataset = tempStagingDataset;
}

@Override
public LogicalPlan visitNoVersioningStrategy(NoVersioningStrategyAbstract noVersioningStrategy)
{
if (noVersioningStrategy.failOnDuplicatePrimaryKeys())
{
String maxPkCountAlias = DedupAndVersionErrorSqlType.MAX_PK_DUPLICATES.name();
String pkCountAlias = "legend_persistence_pk_count";

List<Value> pks = primaryKeys.stream().map(pkName -> FieldValue.builder().fieldName(pkName).build()).collect(Collectors.toList());

FunctionImpl count = FunctionImpl.builder()
.functionName(FunctionName.COUNT)
.addValue(All.INSTANCE)
.alias(pkCountAlias)
.build();

Selection selectPkCount = Selection.builder()
.source(tempStagingDataset)
.groupByFields(pks)
.addFields(count)
.alias(tempStagingDataset.datasetReference().alias())
.build();

FunctionImpl maxCount = FunctionImpl.builder()
.functionName(FunctionName.MAX)
.addValue(FieldValue.builder().fieldName(pkCountAlias).build())
.alias(maxPkCountAlias)
.build();

Selection selectMaxPkCountCount = Selection.builder()
.source(selectPkCount)
.addFields(maxCount)
.build();

return LogicalPlan.builder().addOps(selectMaxPkCountCount).build();
}
else
{
return null;
}
}

@Override
public LogicalPlan visitMaxVersionStrategy(MaxVersionStrategyAbstract maxVersionStrategy)
{
return null;
}

@Override
public LogicalPlan visitAllVersionsStrategy(AllVersionsStrategyAbstract allVersionsStrategyAbstract)
{
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ public DeriveTempStagingSchemaDefinition(SchemaDefinition stagingSchema, Dedupli
@Override
public SchemaDefinition visitNoVersioningStrategy(NoVersioningStrategyAbstract noVersioningStrategy)
{

return schemaDefBuilder.addAllFields(schemaFields).build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@
)
public interface NoVersioningStrategyAbstract extends VersioningStrategy
{
@Value.Default
default boolean failOnDuplicatePrimaryKeys()
{
return false;
}

@Override
default <T> T accept(VersioningStrategyVisitor<T> visitor)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@

package org.finos.legend.engine.persistence.components.ingestmode.versioning;

import org.finos.legend.engine.persistence.components.ingestmode.deduplication.DeduplicationStrategy;
import org.finos.legend.engine.persistence.components.ingestmode.deduplication.FailOnDuplicates;

import java.util.Optional;

public class VersioningVisitors
Expand Down Expand Up @@ -62,6 +65,28 @@ public Boolean visitAllVersionsStrategy(AllVersionsStrategyAbstract allVersionsS
}
};

public static final VersioningStrategyVisitor<Boolean> IS_DUPLICATE_PK_CHECK_NEEDED = new VersioningStrategyVisitor<Boolean>()
{

@Override
public Boolean visitNoVersioningStrategy(NoVersioningStrategyAbstract noVersioningStrategy)
{
return noVersioningStrategy.failOnDuplicatePrimaryKeys();
}

@Override
public Boolean visitMaxVersionStrategy(MaxVersionStrategyAbstract maxVersionStrategy)
{
return false;
}

@Override
public Boolean visitAllVersionsStrategy(AllVersionsStrategyAbstract allVersionsStrategyAbstract)
{
return false;
}
};

public static final VersioningStrategyVisitor<Optional<String>> EXTRACT_VERSIONING_FIELD = new VersioningStrategyVisitor<Optional<String>>()
{
@Override
Expand All @@ -83,6 +108,35 @@ public Optional<String> visitAllVersionsStrategy(AllVersionsStrategyAbstract all
}
};

public static class ValidateDedupAndVersioningCombination implements VersioningStrategyVisitor<Void>
{
final DeduplicationStrategy deduplicationStrategy;

public ValidateDedupAndVersioningCombination(DeduplicationStrategy deduplicationStrategy)
{
this.deduplicationStrategy = deduplicationStrategy;
}

@Override
public Void visitNoVersioningStrategy(NoVersioningStrategyAbstract noVersioningStrategy)
{
if (noVersioningStrategy.failOnDuplicatePrimaryKeys() && !(this.deduplicationStrategy instanceof FailOnDuplicates))
{
throw new IllegalStateException("For failOnDuplicatePrimaryKeys, FailOnDuplicates must be selected as the DeduplicationStrategy");
}
return null;
}

@Override
public Void visitMaxVersionStrategy(MaxVersionStrategyAbstract maxVersionStrategy)
{
return null;
}

@Override
public Void visitAllVersionsStrategy(AllVersionsStrategyAbstract allVersionsStrategyAbstract)
{
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -178,11 +178,13 @@ default String ingestRunId()
this.batchEndTimestamp = BatchEndTimestamp.INSTANCE;

// Validation
// 1. MaxVersion & AllVersion strategies must have primary keys
// 1. Validate if the combination of deduplication and versioning is valid
ingestMode.versioningStrategy().accept(new VersioningVisitors.ValidateDedupAndVersioningCombination(ingestMode.deduplicationStrategy()));
// 2. MaxVersion & AllVersion strategies must have primary keys
ingestMode.versioningStrategy().accept(new ValidatePrimaryKeysForVersioningStrategy(primaryKeys, this::validatePrimaryKeysNotEmpty));
// 2. Validate if the versioningField is comparable if a versioningStrategy is present
// 3. Validate if the versioningField is comparable if a versioningStrategy is present
validateVersioningField(ingestMode().versioningStrategy(), stagingDataset());
// 3. cleanupStagingData must be turned off when using DerivedDataset or FilteredDataset
// 4. cleanupStagingData must be turned off when using DerivedDataset or FilteredDataset
validateCleanUpStagingData(plannerOptions, originalStagingDataset());
}

Expand Down Expand Up @@ -445,6 +447,7 @@ public Map<DedupAndVersionErrorSqlType, LogicalPlan> buildLogicalPlanForDeduplic
{
Map<DedupAndVersionErrorSqlType, LogicalPlan> dedupAndVersioningErrorChecks = new HashMap<>();
addMaxDuplicatesErrorCheck(dedupAndVersioningErrorChecks);
addMaxPkDuplicatesErrorCheck(dedupAndVersioningErrorChecks);
addDataErrorCheck(dedupAndVersioningErrorChecks);
return dedupAndVersioningErrorChecks;
}
Expand Down Expand Up @@ -485,6 +488,24 @@ protected void addMaxDuplicatesErrorCheck(Map<DedupAndVersionErrorSqlType, Logic
}
}

protected void addMaxPkDuplicatesErrorCheck(Map<DedupAndVersionErrorSqlType, LogicalPlan> dedupAndVersioningErrorChecks)
{
if (ingestMode.versioningStrategy().accept(VersioningVisitors.IS_DUPLICATE_PK_CHECK_NEEDED))
{
LogicalPlan logicalPlanForMaxDuplicatePkCount = ingestMode.versioningStrategy().accept(new DeriveMaxDuplicatePkCountLogicalPlan(primaryKeys, stagingDataset()));
if (logicalPlanForMaxDuplicatePkCount != null)
{
dedupAndVersioningErrorChecks.put(DedupAndVersionErrorSqlType.MAX_PK_DUPLICATES, logicalPlanForMaxDuplicatePkCount);
}

LogicalPlan logicalPlanForDuplicatePkRows = ingestMode.versioningStrategy().accept(new DeriveDuplicatePkRowsLogicalPlan(primaryKeys, stagingDataset(), options().sampleRowCount()));
if (logicalPlanForDuplicatePkRows != null)
{
dedupAndVersioningErrorChecks.put(DedupAndVersionErrorSqlType.PK_DUPLICATE_ROWS, logicalPlanForDuplicatePkRows);
}
}
}

protected void addDataErrorCheck(Map<DedupAndVersionErrorSqlType, LogicalPlan> dedupAndVersioningErrorChecks)
{
List<String> remainingColumns = getDigestOrRemainingColumns();
Expand Down Expand Up @@ -667,6 +688,10 @@ static class ValidatePrimaryKeysForVersioningStrategy implements VersioningStrat
@Override
public Void visitNoVersioningStrategy(NoVersioningStrategyAbstract noVersioningStrategy)
{
if (noVersioningStrategy.failOnDuplicatePrimaryKeys())
{
validatePrimaryKeysNotEmpty.accept(primaryKeys);
}
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,12 @@ public static String getDropTempTableQuery(String tableName)
public static String dupRowsSql = "SELECT \"id\",\"name\",\"legend_persistence_count\" FROM \"mydb\".\"staging_temp_staging_lp_yosulf\" as stage " +
"WHERE stage.\"legend_persistence_count\" > 1 LIMIT 20";

public static String maxPkDupsErrorCheckSql = "SELECT MAX(\"legend_persistence_pk_count\") as \"MAX_PK_DUPLICATES\" FROM " +
"(SELECT COUNT(*) as \"legend_persistence_pk_count\" FROM \"mydb\".\"staging_temp_staging_lp_yosulf\" as stage GROUP BY \"id\", \"name\") as stage";

public static String dupPkRowsSql = "SELECT \"id\",\"name\",COUNT(*) as \"legend_persistence_pk_count\" FROM " +
"\"mydb\".\"staging_temp_staging_lp_yosulf\" as stage GROUP BY \"id\", \"name\" HAVING \"legend_persistence_pk_count\" > 1 LIMIT 20";

public static String dataErrorCheckSqlWithBizDateVersion = "SELECT MAX(\"legend_persistence_distinct_rows\") as \"MAX_DATA_ERRORS\" FROM " +
"(SELECT COUNT(DISTINCT(\"digest\")) as \"legend_persistence_distinct_rows\" FROM " +
"\"mydb\".\"staging_temp_staging_lp_yosulf\" as stage GROUP BY \"id\", \"name\", \"biz_date\") as stage";
Expand Down
Loading

0 comments on commit 3902a5e

Please sign in to comment.