Skip to content

Commit

Permalink
Persistence Component: Refactoring DeleteStrategy as interfaces
Browse files Browse the repository at this point in the history
  • Loading branch information
rengam32 committed Dec 24, 2024
1 parent 7b05140 commit b0ed4ff
Show file tree
Hide file tree
Showing 9 changed files with 111 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package org.finos.legend.engine.persistence.components.ingestmode;

import org.finos.legend.engine.persistence.components.ingestmode.deletestrategy.DeleteUpdatedStrategy;
import org.finos.legend.engine.persistence.components.ingestmode.emptyhandling.DeleteTargetData;
import org.finos.legend.engine.persistence.components.ingestmode.emptyhandling.EmptyDatasetHandling;
import org.finos.legend.engine.persistence.components.ingestmode.partitioning.*;
Expand All @@ -24,7 +25,6 @@
import org.finos.legend.engine.persistence.components.ingestmode.versioning.MaxVersionStrategyAbstract;
import org.finos.legend.engine.persistence.components.ingestmode.versioning.MergeDataVersionResolver;
import org.finos.legend.engine.persistence.components.ingestmode.versioning.DigestBasedResolverAbstract;
import org.finos.legend.engine.persistence.components.util.DeleteStrategy;
import org.immutables.value.Value;

import java.util.*;
Expand Down Expand Up @@ -71,7 +71,7 @@ default void validate()
@Override
public Void visitPartitioning(PartitioningAbstract partitionStrategy)
{
if (partitionStrategy.deleteStrategy() == DeleteStrategy.DELETE_UPDATED)
if (partitionStrategy.deleteStrategy() instanceof DeleteUpdatedStrategy)
{
throw new IllegalStateException("Cannot build UnitemporalSnapshot, digestField is mandatory for Partitioning when delete strategy = DELETE_UPDATED");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright 2024 Goldman Sachs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.finos.legend.engine.persistence.components.ingestmode.deletestrategy;

import org.immutables.value.Value;

@Value.Immutable
@Value.Style(
typeAbstract = "*Abstract",
typeImmutable = "*",
jdkOnly = true,
optionalAcceptNullable = true,
strictBuilder = true
)
public interface DeleteAllStrategyAbstract extends DeleteStrategy
{

@Override
default <T> T accept(DeleteStrategyVisitor<T> visitor)
{
return visitor.visitDeleteAll(this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package org.finos.legend.engine.persistence.components.util;
package org.finos.legend.engine.persistence.components.ingestmode.deletestrategy;

public enum DeleteStrategy
public interface DeleteStrategy
{
DELETE_ALL,
DELETE_UPDATED
<T> T accept(DeleteStrategyVisitor<T> visitor);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright 2024 Goldman Sachs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.finos.legend.engine.persistence.components.ingestmode.deletestrategy;

import org.finos.legend.engine.persistence.components.ingestmode.partitioning.NoPartitioningAbstract;
import org.finos.legend.engine.persistence.components.ingestmode.partitioning.PartitioningAbstract;

public interface DeleteStrategyVisitor<T>
{
T visitDeleteAll(DeleteAllStrategyAbstract deleteStrategy);

T visitDeleteUpdated(DeleteUpdatedStrategyAbstract deleteStrategy);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright 2024 Goldman Sachs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.finos.legend.engine.persistence.components.ingestmode.deletestrategy;

import org.immutables.value.Value;

@Value.Immutable
@Value.Style(
typeAbstract = "*Abstract",
typeImmutable = "*",
jdkOnly = true,
optionalAcceptNullable = true,
strictBuilder = true
)
public interface DeleteUpdatedStrategyAbstract extends DeleteStrategy
{
@Override
default <T> T accept(DeleteStrategyVisitor<T> visitor)
{
return visitor.visitDeleteUpdated(this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@

package org.finos.legend.engine.persistence.components.ingestmode.partitioning;

import org.finos.legend.engine.persistence.components.util.DeleteStrategy;
import org.finos.legend.engine.persistence.components.ingestmode.deletestrategy.DeleteStrategy;
import org.finos.legend.engine.persistence.components.ingestmode.deletestrategy.*;
import org.immutables.value.Value;

import java.util.List;
Expand All @@ -34,7 +35,7 @@ public interface PartitioningAbstract extends PartitioningStrategy
@Value.Default
default DeleteStrategy deleteStrategy()
{
return DeleteStrategy.DELETE_UPDATED;
return DeleteUpdatedStrategy.builder().build();
}

List<String> partitionFields();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.finos.legend.engine.persistence.components.common.Resources;
import org.finos.legend.engine.persistence.components.exception.EmptyBatchException;
import org.finos.legend.engine.persistence.components.ingestmode.UnitemporalSnapshot;
import org.finos.legend.engine.persistence.components.ingestmode.deletestrategy.DeleteAllStrategy;
import org.finos.legend.engine.persistence.components.ingestmode.emptyhandling.DeleteTargetDataAbstract;
import org.finos.legend.engine.persistence.components.ingestmode.emptyhandling.EmptyDatasetHandlingVisitor;
import org.finos.legend.engine.persistence.components.ingestmode.emptyhandling.FailEmptyBatchAbstract;
Expand All @@ -40,7 +41,6 @@
import org.finos.legend.engine.persistence.components.logicalplan.values.Pair;
import org.finos.legend.engine.persistence.components.logicalplan.values.Value;
import org.finos.legend.engine.persistence.components.util.Capability;
import org.finos.legend.engine.persistence.components.util.DeleteStrategy;
import org.finos.legend.engine.persistence.components.util.LogicalPlanUtils;

import java.util.*;
Expand Down Expand Up @@ -139,7 +139,7 @@ protected Insert sqlToUpsertRows()
List<Value> fieldsToInsert = new ArrayList<>(dataFields);
fieldsToInsert.addAll(transactionMilestoningFields());

if (partitioning.isPresent() && partitioning.get().deleteStrategy() == DeleteStrategy.DELETE_ALL)
if (partitioning.isPresent() && partitioning.get().deleteStrategy() instanceof DeleteAllStrategy)
{
Dataset selectStage = Selection.builder().source(stagingDataset()).addAllFields(fieldsToSelect).build();
return Insert.of(mainDataset(), selectStage, fieldsToInsert);
Expand Down Expand Up @@ -212,7 +212,7 @@ protected Update getSqlToMilestoneRows(List<Pair<FieldValue, Value>> values)
{
List<Condition> whereClause = new ArrayList<>(Arrays.asList(openRecordCondition));

if (!(partitioning.isPresent() && partitioning.get().deleteStrategy() == DeleteStrategy.DELETE_ALL))
if (!(partitioning.isPresent() && partitioning.get().deleteStrategy() instanceof DeleteAllStrategy))
{
Condition notExistsWhereClause = Not.of(Exists.of(
Selection.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.finos.legend.engine.persistence.components.ingestmode.UnitemporalSnapshot;
import org.finos.legend.engine.persistence.components.ingestmode.deduplication.FailOnDuplicates;
import org.finos.legend.engine.persistence.components.ingestmode.deduplication.FilterDuplicates;
import org.finos.legend.engine.persistence.components.ingestmode.deletestrategy.DeleteAllStrategy;
import org.finos.legend.engine.persistence.components.ingestmode.emptyhandling.FailEmptyBatch;
import org.finos.legend.engine.persistence.components.ingestmode.emptyhandling.NoOp;
import org.finos.legend.engine.persistence.components.ingestmode.partitioning.Partitioning;
Expand All @@ -31,7 +32,6 @@
import org.finos.legend.engine.persistence.components.logicalplan.datasets.DatasetDefinition;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.FilteredDataset;
import org.finos.legend.engine.persistence.components.planner.PlannerOptions;
import org.finos.legend.engine.persistence.components.util.DeleteStrategy;
import org.finos.legend.engine.persistence.components.util.MetadataDataset;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -683,7 +683,7 @@ void testUnitemporalSnapshotMilestoningLogicWithPartitionNoDigest() throws Excep
.dateTimeInName(batchTimeInName)
.dateTimeOutName(batchTimeOutName)
.build())
.partitioningStrategy(Partitioning.builder().addAllPartitionFields(Collections.singletonList(dateName)).deleteStrategy(DeleteStrategy.DELETE_ALL).build())
.partitioningStrategy(Partitioning.builder().addAllPartitionFields(Collections.singletonList(dateName)).deleteStrategy(DeleteAllStrategy.builder().build()).build())
.build();

PlannerOptions options = PlannerOptions.builder().collectStatistics(true).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
import org.finos.legend.engine.persistence.components.BaseTest;
import org.finos.legend.engine.persistence.components.ingestmode.UnitemporalSnapshot;
import org.finos.legend.engine.persistence.components.ingestmode.deduplication.FailOnDuplicates;
import org.finos.legend.engine.persistence.components.ingestmode.deletestrategy.DeleteAllStrategy;
import org.finos.legend.engine.persistence.components.ingestmode.emptyhandling.DeleteTargetData;
import org.finos.legend.engine.persistence.components.ingestmode.emptyhandling.NoOp;
import org.finos.legend.engine.persistence.components.ingestmode.partitioning.NoPartitioning;
import org.finos.legend.engine.persistence.components.ingestmode.partitioning.Partitioning;
import org.finos.legend.engine.persistence.components.ingestmode.transactionmilestoning.BatchId;
import org.finos.legend.engine.persistence.components.util.DeleteStrategy;

import java.util.Arrays;

Expand Down Expand Up @@ -122,15 +122,15 @@ public TestScenario BATCH_ID_BASED__WITH_PARTITIONS__NO_DEDUP__NO_VERSION__NO_DI
.batchIdInName(batchIdInField)
.batchIdOutName(batchIdOutField)
.build())
.partitioningStrategy(Partitioning.builder().addAllPartitionFields(Arrays.asList(partitionKeys)).deleteStrategy(DeleteStrategy.DELETE_ALL).build())
.partitioningStrategy(Partitioning.builder().addAllPartitionFields(Arrays.asList(partitionKeys)).deleteStrategy(DeleteAllStrategy.builder().build()).build())
.build();
return new TestScenario(mainTableWithBatchIdBasedSchemaWithoutDigest, stagingTableWithBaseSchema, ingestMode);
}

public TestScenario BATCH_ID_BASED__WITH_PARTITION_FILTER__NO_DEDUP__NO_VERSION__NO_DIGEST()
{
UnitemporalSnapshot ingestMode = UnitemporalSnapshot.builder()
.partitioningStrategy(Partitioning.builder().addAllPartitionFields(Arrays.asList(partitionKeys)).putAllPartitionValuesByField(partitionFilter).deleteStrategy(DeleteStrategy.DELETE_ALL).build())
.partitioningStrategy(Partitioning.builder().addAllPartitionFields(Arrays.asList(partitionKeys)).putAllPartitionValuesByField(partitionFilter).deleteStrategy(DeleteAllStrategy.builder().build()).build())
.transactionMilestoning(BatchId.builder()
.batchIdInName(batchIdInField)
.batchIdOutName(batchIdOutField)
Expand All @@ -146,7 +146,7 @@ public TestScenario BATCH_ID_BASED__WITH_PARTITION_SPEC_LIST__NO_DEDUP__NO_VERSI
.batchIdInName(batchIdInField)
.batchIdOutName(batchIdOutField)
.build())
.partitioningStrategy(Partitioning.builder().addAllPartitionFields(Arrays.asList(partitionKeysMulti)).addAllPartitionSpecList(partitionSpecList()).deleteStrategy(DeleteStrategy.DELETE_ALL).build())
.partitioningStrategy(Partitioning.builder().addAllPartitionFields(Arrays.asList(partitionKeysMulti)).addAllPartitionSpecList(partitionSpecList()).deleteStrategy(DeleteAllStrategy.builder().build()).build())
.emptyDatasetHandling(DeleteTargetData.builder().build())
.build();
return new TestScenario(mainTableMultiPartitionsBasedWithoutDigest, stagingTableWithMultiPartitionsWithoutDigest, ingestMode);
Expand Down

0 comments on commit b0ed4ff

Please sign in to comment.