Skip to content

Commit

Permalink
Fix AppendOnly and prepare skeleton for testing cases
Browse files Browse the repository at this point in the history
  • Loading branch information
kumuwu committed Oct 6, 2023
1 parent 05e8791 commit ed6d37c
Show file tree
Hide file tree
Showing 13 changed files with 340 additions and 651 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@
package org.finos.legend.engine.persistence.components.ingestmode;

import org.finos.legend.engine.persistence.components.ingestmode.audit.Auditing;
import org.finos.legend.engine.persistence.components.ingestmode.deduplication.DeduplicationStrategyVisitor;
import org.finos.legend.engine.persistence.components.ingestmode.deduplication.AllowDuplicatesAbstract;
import org.finos.legend.engine.persistence.components.ingestmode.deduplication.FailOnDuplicatesAbstract;
import org.finos.legend.engine.persistence.components.ingestmode.deduplication.FilterDuplicatesAbstract;
import org.finos.legend.engine.persistence.components.ingestmode.versioning.AllVersionsStrategyAbstract;
import org.finos.legend.engine.persistence.components.ingestmode.versioning.MaxVersionStrategyAbstract;
import org.finos.legend.engine.persistence.components.ingestmode.versioning.NoVersioningStrategyAbstract;
import org.finos.legend.engine.persistence.components.ingestmode.versioning.VersioningComparator;
import org.finos.legend.engine.persistence.components.ingestmode.versioning.VersioningStrategyVisitor;
import org.immutables.value.Value;

import java.util.Optional;

import static org.immutables.value.Value.Check;
import static org.immutables.value.Value.Immutable;
import static org.immutables.value.Value.Style;

Expand All @@ -47,6 +47,39 @@ default boolean filterExistingRecords()
return false;
}

@Value.Check
default void validate()
{
versioningStrategy().accept(new VersioningStrategyVisitor<Void>()
{
@Override
public Void visitNoVersioningStrategy(NoVersioningStrategyAbstract noVersioningStrategy)
{
return null;
}

@Override
public Void visitMaxVersionStrategy(MaxVersionStrategyAbstract maxVersionStrategy)
{
if (maxVersionStrategy.versioningComparator() != VersioningComparator.ALWAYS)
{
throw new IllegalStateException("Cannot build AppendOnly, versioning comparator can only be Always");
}
return null;
}

@Override
public Void visitAllVersionsStrategy(AllVersionsStrategyAbstract allVersionsStrategyAbstract)
{
if (allVersionsStrategyAbstract.versioningComparator() != VersioningComparator.ALWAYS)
{
throw new IllegalStateException("Cannot build AppendOnly, versioning comparator can only be Always");
}
return null;
}
});
}

@Override
default <T> T accept(IngestModeVisitor<T> visitor)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@
import org.finos.legend.engine.persistence.components.ingestmode.audit.AuditingVisitor;
import org.finos.legend.engine.persistence.components.ingestmode.audit.DateTimeAuditingAbstract;
import org.finos.legend.engine.persistence.components.ingestmode.audit.NoAuditingAbstract;
import org.finos.legend.engine.persistence.components.ingestmode.deduplication.AllowDuplicatesAbstract;
import org.finos.legend.engine.persistence.components.ingestmode.deduplication.DeduplicationStrategyVisitor;
import org.finos.legend.engine.persistence.components.ingestmode.deduplication.FailOnDuplicatesAbstract;
import org.finos.legend.engine.persistence.components.ingestmode.deduplication.FilterDuplicatesAbstract;
import org.finos.legend.engine.persistence.components.ingestmode.merge.DeleteIndicatorMergeStrategyAbstract;
import org.finos.legend.engine.persistence.components.ingestmode.merge.MergeStrategyVisitor;
import org.finos.legend.engine.persistence.components.ingestmode.merge.NoDeletesMergeStrategyAbstract;
Expand Down Expand Up @@ -72,13 +68,20 @@ public DeriveMainDatasetSchemaFromStaging(Dataset mainDataset, Dataset stagingDa
@Override
public Dataset visitAppendOnly(AppendOnlyAbstract appendOnly)
{
if (mainSchemaFields.stream().anyMatch(Field::primaryKey))
{
// if primary keys are present, auditing column should also be a primary key
appendOnly.auditing().accept(new EnrichSchemaWithAuditing(mainSchemaFields, true));
}
else
{
appendOnly.auditing().accept(new EnrichSchemaWithAuditing(mainSchemaFields, false));
}
if (appendOnly.digestField().isPresent())
{
addDigestField(mainSchemaFields, appendOnly.digestField().get());
}
removeDataSplitField(appendOnly.dataSplitField());
boolean isAuditingFieldPK = appendOnly.deduplicationStrategy().accept(new DeriveAuditingFieldPKForAppendOnly(appendOnly.dataSplitField().isPresent()));
appendOnly.auditing().accept(new EnrichSchemaWithAuditing(mainSchemaFields, isAuditingFieldPK));
return mainDatasetDefinitionBuilder.schema(mainSchemaDefinitionBuilder.addAllFields(mainSchemaFields).build()).build();
}

Expand Down Expand Up @@ -183,34 +186,6 @@ private boolean doesDatasetContainsAnyPK(List<Field> mainSchemaFields)
return mainSchemaFields.stream().anyMatch(field -> field.primaryKey());
}

public static class DeriveAuditingFieldPKForAppendOnly implements DeduplicationStrategyVisitor<Boolean>
{

private boolean isDataSplitEnabled;

public DeriveAuditingFieldPKForAppendOnly(boolean isDataSplitEnabled)
{
this.isDataSplitEnabled = isDataSplitEnabled;
}

@Override
public Boolean visitAllowDuplicates(AllowDuplicatesAbstract allowDuplicates)
{
return isDataSplitEnabled;
}

@Override
public Boolean visitFilterDuplicates(FilterDuplicatesAbstract filterDuplicates)
{
return true;
}

@Override
public Boolean visitFailOnDuplicates(FailOnDuplicatesAbstract failOnDuplicates)
{
return false;
}
}

public static class EnrichSchemaWithMergeStrategy implements MergeStrategyVisitor<Void>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.finos.legend.engine.persistence.components.ingestmode.versioning.AllVersionsStrategyAbstract;
import org.finos.legend.engine.persistence.components.ingestmode.versioning.MaxVersionStrategyAbstract;
import org.finos.legend.engine.persistence.components.ingestmode.versioning.NoVersioningStrategyAbstract;
import org.finos.legend.engine.persistence.components.ingestmode.versioning.VersioningComparator;
import org.finos.legend.engine.persistence.components.ingestmode.versioning.VersioningStrategyVisitor;
import org.immutables.value.Value;

Expand Down Expand Up @@ -57,6 +58,10 @@ public Void visitNoVersioningStrategy(NoVersioningStrategyAbstract noVersioningS
@Override
public Void visitMaxVersionStrategy(MaxVersionStrategyAbstract maxVersionStrategy)
{
if (maxVersionStrategy.versioningComparator() != VersioningComparator.ALWAYS)
{
throw new IllegalStateException("Cannot build NontemporalSnapshot, versioning comparator can only be Always");
}
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
public enum VersioningComparator
{
GREATER_THAN,
GREATER_THAN_EQUAL_TO

// TODO support always

GREATER_THAN_EQUAL_TO,
ALWAYS
}
Loading

0 comments on commit ed6d37c

Please sign in to comment.