Skip to content

Commit

Permalink
Persistence Component: Deduplication and Versioning (#2424)
Browse files Browse the repository at this point in the history
  • Loading branch information
prasar-ashutosh authored Nov 13, 2023
1 parent dcee11f commit 872c668
Show file tree
Hide file tree
Showing 361 changed files with 8,607 additions and 4,919 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright 2023 Goldman Sachs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.finos.legend.engine.persistence.components.common;

public enum DedupAndVersionErrorStatistics
{
MAX_DUPLICATES,
MAX_DATA_ERRORS;
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,10 @@
package org.finos.legend.engine.persistence.components.ingestmode;

import org.finos.legend.engine.persistence.components.ingestmode.audit.Auditing;
import org.finos.legend.engine.persistence.components.ingestmode.deduplication.AllowDuplicatesAbstract;
import org.finos.legend.engine.persistence.components.ingestmode.deduplication.DeduplicationStrategy;
import org.finos.legend.engine.persistence.components.ingestmode.deduplication.DeduplicationStrategyVisitor;
import org.finos.legend.engine.persistence.components.ingestmode.deduplication.FailOnDuplicatesAbstract;
import org.finos.legend.engine.persistence.components.ingestmode.deduplication.FilterDuplicatesAbstract;
import org.immutables.value.Value;

import java.util.Optional;

import static org.immutables.value.Value.Check;
import static org.immutables.value.Value.Immutable;
import static org.immutables.value.Value.Style;

Expand All @@ -39,43 +34,12 @@ public interface AppendOnlyAbstract extends IngestMode
{
Optional<String> digestField();

Optional<String> dataSplitField();

Auditing auditing();

DeduplicationStrategy deduplicationStrategy();

@Check
default void validate()
@Value.Default
default boolean filterExistingRecords()
{
deduplicationStrategy().accept(new DeduplicationStrategyVisitor<Void>()
{
@Override
public Void visitAllowDuplicates(AllowDuplicatesAbstract allowDuplicates)
{
return null;
}

@Override
public Void visitFilterDuplicates(FilterDuplicatesAbstract filterDuplicates)
{
if (!digestField().isPresent())
{
throw new IllegalStateException("Cannot build AppendOnly, [digestField] must be specified since [deduplicationStrategy] is set to filter duplicates");
}
return null;
}

@Override
public Void visitFailOnDuplicates(FailOnDuplicatesAbstract failOnDuplicates)
{
if (dataSplitField().isPresent())
{
throw new IllegalStateException("Cannot build AppendOnly, DataSplits not supported for failOnDuplicates mode");
}
return null;
}
});
return false;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,19 @@

package org.finos.legend.engine.persistence.components.ingestmode;

import org.finos.legend.engine.persistence.components.ingestmode.deduplication.AllowDuplicates;
import org.finos.legend.engine.persistence.components.ingestmode.deduplication.DeduplicationStrategy;
import org.finos.legend.engine.persistence.components.ingestmode.deduplication.AllowDuplicatesAbstract;
import org.finos.legend.engine.persistence.components.ingestmode.deduplication.DeduplicationStrategyVisitor;
import org.finos.legend.engine.persistence.components.ingestmode.deduplication.FailOnDuplicatesAbstract;
import org.finos.legend.engine.persistence.components.ingestmode.deduplication.FilterDuplicatesAbstract;
import org.finos.legend.engine.persistence.components.ingestmode.merge.MergeStrategy;
import org.finos.legend.engine.persistence.components.ingestmode.merge.NoDeletesMergeStrategy;
import org.finos.legend.engine.persistence.components.ingestmode.transactionmilestoning.TransactionMilestoning;
import org.finos.legend.engine.persistence.components.ingestmode.validitymilestoning.ValidityMilestoning;

import java.util.Optional;
import org.finos.legend.engine.persistence.components.ingestmode.versioning.AllVersionsStrategyAbstract;
import org.finos.legend.engine.persistence.components.ingestmode.versioning.MaxVersionStrategyAbstract;
import org.finos.legend.engine.persistence.components.ingestmode.versioning.NoVersioningStrategyAbstract;
import org.finos.legend.engine.persistence.components.ingestmode.versioning.VersioningStrategyVisitor;
import org.immutables.value.Value;

import static org.immutables.value.Value.Default;
import static org.immutables.value.Value.Immutable;
Expand All @@ -37,9 +42,13 @@
)
public interface BitemporalDeltaAbstract extends IngestMode, BitemporalMilestoned
{
String digestField();
@Value.Default
default boolean filterExistingRecords()
{
return false;
}

Optional<String> dataSplitField();
String digestField();

@Override
TransactionMilestoning transactionMilestoning();
Expand All @@ -53,15 +62,59 @@ default MergeStrategy mergeStrategy()
return NoDeletesMergeStrategy.builder().build();
}

@Default
default DeduplicationStrategy deduplicationStrategy()
{
return AllowDuplicates.builder().build();
}

@Override
default <T> T accept(IngestModeVisitor<T> visitor)
{
return visitor.visitBitemporalDelta(this);
}

@Value.Check
default void validate()
{
versioningStrategy().accept(new VersioningStrategyVisitor<Void>()
{
@Override
public Void visitNoVersioningStrategy(NoVersioningStrategyAbstract noVersioningStrategy)
{
return null;
}

@Override
public Void visitMaxVersionStrategy(MaxVersionStrategyAbstract maxVersionStrategy)
{
throw new IllegalStateException("Cannot build BitemporalDelta, max version is not supported");
}

@Override
public Void visitAllVersionsStrategy(AllVersionsStrategyAbstract allVersionsStrategyAbstract)
{
if (allVersionsStrategyAbstract.performStageVersioning())
{
throw new IllegalStateException("Cannot build BitemporalDelta, perform versioning not allowed");
}
return null;
}
});

deduplicationStrategy().accept(new DeduplicationStrategyVisitor<Void>()
{
@Override
public Void visitAllowDuplicates(AllowDuplicatesAbstract allowDuplicates)
{
return null;
}

@Override
public Void visitFilterDuplicates(FilterDuplicatesAbstract filterDuplicates)
{
throw new IllegalStateException("Cannot build BitemporalDelta, filter duplicates is not supported");
}

@Override
public Void visitFailOnDuplicates(FailOnDuplicatesAbstract failOnDuplicates)
{
throw new IllegalStateException("Cannot build BitemporalDelta, fail on duplicates is not supported");
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,15 @@
package org.finos.legend.engine.persistence.components.ingestmode;

import org.finos.legend.engine.persistence.components.ingestmode.audit.Auditing;
import org.finos.legend.engine.persistence.components.ingestmode.deduplication.AllowDuplicatesAbstract;
import org.finos.legend.engine.persistence.components.ingestmode.deduplication.DeduplicationStrategyVisitor;
import org.finos.legend.engine.persistence.components.ingestmode.deduplication.FailOnDuplicatesAbstract;
import org.finos.legend.engine.persistence.components.ingestmode.deduplication.FilterDuplicatesAbstract;
import org.finos.legend.engine.persistence.components.ingestmode.digest.DigestGenStrategy;
import org.finos.legend.engine.persistence.components.ingestmode.versioning.AllVersionsStrategyAbstract;
import org.finos.legend.engine.persistence.components.ingestmode.versioning.MaxVersionStrategyAbstract;
import org.finos.legend.engine.persistence.components.ingestmode.versioning.NoVersioningStrategyAbstract;
import org.finos.legend.engine.persistence.components.ingestmode.versioning.VersioningStrategyVisitor;
import org.immutables.value.Value;

@Value.Immutable
Expand All @@ -39,4 +47,50 @@ default <T> T accept(IngestModeVisitor<T> visitor)
{
return visitor.visitBulkLoad(this);
}

@Value.Check
default void validate()
{
deduplicationStrategy().accept(new DeduplicationStrategyVisitor<Void>()
{
@Override
public Void visitAllowDuplicates(AllowDuplicatesAbstract allowDuplicates)
{
return null;
}

@Override
public Void visitFilterDuplicates(FilterDuplicatesAbstract filterDuplicates)
{
throw new IllegalStateException("Cannot build BulkLoad, filter duplicates is not supported");
}

@Override
public Void visitFailOnDuplicates(FailOnDuplicatesAbstract failOnDuplicates)
{
throw new IllegalStateException("Cannot build BulkLoad, fail on duplicates is not supported");
}
});

versioningStrategy().accept(new VersioningStrategyVisitor<Void>()
{
@Override
public Void visitNoVersioningStrategy(NoVersioningStrategyAbstract noVersioningStrategy)
{
return null;
}

@Override
public Void visitMaxVersionStrategy(MaxVersionStrategyAbstract maxVersionStrategy)
{
throw new IllegalStateException("Cannot build BulkLoad, max version is not supported");
}

@Override
public Void visitAllVersionsStrategy(AllVersionsStrategyAbstract allVersionsStrategyAbstract)
{
throw new IllegalStateException("Cannot build BulkLoad, all version is not supported");
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@
import org.finos.legend.engine.persistence.components.ingestmode.audit.AuditingVisitor;
import org.finos.legend.engine.persistence.components.ingestmode.audit.DateTimeAuditingAbstract;
import org.finos.legend.engine.persistence.components.ingestmode.audit.NoAuditingAbstract;
import org.finos.legend.engine.persistence.components.ingestmode.deduplication.AllowDuplicatesAbstract;
import org.finos.legend.engine.persistence.components.ingestmode.deduplication.DeduplicationStrategyVisitor;
import org.finos.legend.engine.persistence.components.ingestmode.deduplication.FailOnDuplicatesAbstract;
import org.finos.legend.engine.persistence.components.ingestmode.deduplication.FilterDuplicatesAbstract;
import org.finos.legend.engine.persistence.components.ingestmode.merge.DeleteIndicatorMergeStrategyAbstract;
import org.finos.legend.engine.persistence.components.ingestmode.merge.MergeStrategyVisitor;
import org.finos.legend.engine.persistence.components.ingestmode.merge.NoDeletesMergeStrategyAbstract;
Expand Down Expand Up @@ -72,13 +68,13 @@ public DeriveMainDatasetSchemaFromStaging(Dataset mainDataset, Dataset stagingDa
@Override
public Dataset visitAppendOnly(AppendOnlyAbstract appendOnly)
{
boolean isAuditingFieldPK = doesDatasetContainsAnyPK(mainSchemaFields);
appendOnly.auditing().accept(new EnrichSchemaWithAuditing(mainSchemaFields, isAuditingFieldPK));
if (appendOnly.digestField().isPresent())
{
addDigestField(mainSchemaFields, appendOnly.digestField().get());
}
removeDataSplitField(appendOnly.dataSplitField());
boolean isAuditingFieldPK = appendOnly.deduplicationStrategy().accept(new DeriveAuditingFieldPKForAppendOnly(appendOnly.dataSplitField().isPresent()));
appendOnly.auditing().accept(new EnrichSchemaWithAuditing(mainSchemaFields, isAuditingFieldPK));
return mainDatasetDefinitionBuilder.schema(mainSchemaDefinitionBuilder.addAllFields(mainSchemaFields).build()).build();
}

Expand Down Expand Up @@ -180,37 +176,9 @@ public static void addDigestField(List<Field> schemaFields, String digestFieldNa

private boolean doesDatasetContainsAnyPK(List<Field> mainSchemaFields)
{
return mainSchemaFields.stream().anyMatch(field -> field.primaryKey());
return mainSchemaFields.stream().anyMatch(Field::primaryKey);
}

public static class DeriveAuditingFieldPKForAppendOnly implements DeduplicationStrategyVisitor<Boolean>
{

private boolean isDataSplitEnabled;

public DeriveAuditingFieldPKForAppendOnly(boolean isDataSplitEnabled)
{
this.isDataSplitEnabled = isDataSplitEnabled;
}

@Override
public Boolean visitAllowDuplicates(AllowDuplicatesAbstract allowDuplicates)
{
return isDataSplitEnabled;
}

@Override
public Boolean visitFilterDuplicates(FilterDuplicatesAbstract filterDuplicates)
{
return true;
}

@Override
public Boolean visitFailOnDuplicates(FailOnDuplicatesAbstract failOnDuplicates)
{
return false;
}
}

public static class EnrichSchemaWithMergeStrategy implements MergeStrategyVisitor<Void>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,34 @@

package org.finos.legend.engine.persistence.components.ingestmode;

import org.finos.legend.engine.persistence.components.ingestmode.deduplication.AllowDuplicates;
import org.finos.legend.engine.persistence.components.ingestmode.deduplication.DeduplicationStrategy;
import org.finos.legend.engine.persistence.components.ingestmode.versioning.NoVersioningStrategy;
import org.finos.legend.engine.persistence.components.ingestmode.versioning.VersioningStrategy;
import org.finos.legend.engine.persistence.components.ingestmode.versioning.VersioningVisitors;
import org.immutables.value.Value;

import java.util.Optional;

public interface IngestMode
{
@Value.Derived
default Optional<String> dataSplitField()
{
return this.versioningStrategy().accept(VersioningVisitors.EXTRACT_DATA_SPLIT_FIELD);
}

@Value.Default
default DeduplicationStrategy deduplicationStrategy()
{
return AllowDuplicates.builder().build();
}

@Value.Default
default VersioningStrategy versioningStrategy()
{
return NoVersioningStrategy.builder().build();
}

<T> T accept(IngestModeVisitor<T> visitor);
}
Loading

0 comments on commit 872c668

Please sign in to comment.