Skip to content

Commit

Permalink
Persistence Component: Fix Unitemporal snapshot partition to support …
Browse files Browse the repository at this point in the history
…multiple partition key values (#2823)
  • Loading branch information
prasar-ashutosh authored May 6, 2024
1 parent 7e73cf1 commit 26f48d1
Show file tree
Hide file tree
Showing 23 changed files with 905 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,7 @@
import org.finos.legend.engine.persistence.components.ingestmode.versioning.AllVersionsStrategy;
import org.finos.legend.engine.persistence.components.ingestmode.versioning.MaxVersionStrategy;

import java.util.Optional;
import java.util.List;
import java.util.Set;
import java.util.Map;
import java.util.HashMap;
import java.util.*;

import java.util.function.Function;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -118,6 +114,7 @@ public IngestMode visitUnitemporalSnapshot(UnitemporalSnapshotAbstract unitempor
.transactionMilestoning(unitemporalSnapshot.transactionMilestoning().accept(new TransactionMilestoningCaseConverter()))
.addAllPartitionFields(applyCase(unitemporalSnapshot.partitionFields()))
.putAllPartitionValuesByField(applyCase(unitemporalSnapshot.partitionValuesByField()))
.addAllPartitionSpecList(applyCaseForListOfMap(unitemporalSnapshot.partitionSpecList()))
.emptyDatasetHandling(unitemporalSnapshot.emptyDatasetHandling())
.deduplicationStrategy(unitemporalSnapshot.deduplicationStrategy())
.versioningStrategy(unitemporalSnapshot.versioningStrategy().accept(new VersionStrategyCaseConverter()))
Expand Down Expand Up @@ -210,6 +207,21 @@ private Map<String, Set<String>> applyCase(Map<String, Set<String>> map)
return caseAppliedMap;
}

private List<Map<String, Object>> applyCaseForListOfMap(List<Map<String, Object>> listOfMap)
{
List<Map<String, Object>> caseAppliedListOfMap = new ArrayList<>();
for (Map<String, Object> map : listOfMap)
{
Map<String, Object> caseAppliedMap = new HashMap<>();
for (Map.Entry<String, Object> entry : map.entrySet())
{
caseAppliedMap.put(applyCase(entry.getKey()), entry.getValue());
}
caseAppliedListOfMap.add(caseAppliedMap);
}
return caseAppliedListOfMap;
}

private class MergeStrategyCaseConverter implements MergeStrategyVisitor<MergeStrategy>
{
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,7 @@
import org.finos.legend.engine.persistence.components.ingestmode.versioning.DigestBasedResolverAbstract;
import org.immutables.value.Value;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.*;

import static org.immutables.value.Value.Derived;
import static org.immutables.value.Value.Immutable;
Expand All @@ -52,7 +49,9 @@ public interface UnitemporalSnapshotAbstract extends IngestMode, TransactionMile

List<String> partitionFields();

Map<String, Set<String>> partitionValuesByField();
List<Map<String, Object>> partitionSpecList(); // [ {date: D1, Id: ID1, Name: N1}, {date: D2, Id: ID2, Name: N2}, ....]

Map<String, Set<String>> partitionValuesByField(); // for Backward compatibility -- to be deprecated

@Derived
default boolean partitioned()
Expand All @@ -75,6 +74,12 @@ default <T> T accept(IngestModeVisitor<T> visitor)
@Value.Check
default void validate()
{

if (!partitionValuesByField().isEmpty() && !partitionSpecList().isEmpty())
{
throw new IllegalStateException("Can not build UnitemporalSnapshot, Provide either partitionValuesByField or partitionSpecList, both not supported together");
}

// All the keys in partitionValuesByField must exactly match the fields in partitionFields
if (!partitionValuesByField().isEmpty())
{
Expand All @@ -89,6 +94,39 @@ default void validate()
throw new IllegalStateException(String.format("Can not build UnitemporalSnapshot, partitionKey: [%s] not specified in partitionFields", partitionKey));
}
}
int partitionKeysWithMoreThanOneValues = 0;
for (Set<String> partitionValues: partitionValuesByField().values())
{
if (partitionValues.size() > 1)
{
partitionKeysWithMoreThanOneValues++;
}
}
if (partitionKeysWithMoreThanOneValues > 1)
{
throw new IllegalStateException(String.format("Can not build UnitemporalSnapshot, in partitionValuesByField at most one of the partition keys can have more than one value, all other partition keys must have exactly one value"));
}
}

if (!partitionSpecList().isEmpty())
{
for (Map<String, Object> partitionSpec : partitionSpecList())
{
if (partitionFields().size() != partitionSpec.size())
{
throw new IllegalStateException("Can not build UnitemporalSnapshot, size of each partitionSpec must be same as size of partitionFields");
}
}
for (Map<String, Object> partitionSpec : partitionSpecList())
{
for (String partitionKey: partitionSpec.keySet())
{
if (!partitionFields().contains(partitionKey))
{
throw new IllegalStateException(String.format("Can not build UnitemporalSnapshot, partitionKey: [%s] not specified in partitionSpec", partitionKey));
}
}
}
}

// Allowed Versioning Strategy - NoVersioning, MaxVersioining
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ protected Insert sqlToUpsertRows()
{
whereClauseForNotInSink.add(LogicalPlanUtils.getPartitionColumnValueMatchInCondition(mainDataset(), ingestMode().partitionValuesByField()));
}
else if (!ingestMode().partitionSpecList().isEmpty())
{
whereClauseForNotInSink.add(LogicalPlanUtils.getPartitionSpecMatchCondition(mainDataset(), ingestMode().partitionSpecList()));
}
else
{
whereClauseForNotInSink.add(LogicalPlanUtils.getPartitionColumnsMatchCondition(mainDataset(), stagingDataset(), ingestMode().partitionFields().toArray(new String[0])));
Expand Down Expand Up @@ -193,7 +197,15 @@ protected Update getSqlToMilestoneRows(List<Pair<FieldValue, Value>> values)

if (ingestMode().partitioned())
{
if (ingestMode().partitionValuesByField().isEmpty())
if (!ingestMode().partitionValuesByField().isEmpty())
{
whereClauseForPartition.add(LogicalPlanUtils.getPartitionColumnValueMatchInCondition(mainDataset(), ingestMode().partitionValuesByField()));
}
else if (!ingestMode().partitionSpecList().isEmpty())
{
whereClauseForPartition.add(LogicalPlanUtils.getPartitionSpecMatchCondition(mainDataset(), ingestMode().partitionSpecList()));
}
else
{
Condition partitionColumnCondition = Exists.of(
Selection.builder()
Expand All @@ -203,10 +215,6 @@ protected Update getSqlToMilestoneRows(List<Pair<FieldValue, Value>> values)
.build());
whereClauseForPartition.add(partitionColumnCondition);
}
else
{
whereClauseForPartition.add(LogicalPlanUtils.getPartitionColumnValueMatchInCondition(mainDataset(), ingestMode().partitionValuesByField()));
}
}

return UpdateAbstract.of(mainDataset(), values, And.of(whereClauseForPartition));
Expand All @@ -230,6 +238,10 @@ protected Update sqlToMilestoneAllRows(List<Pair<FieldValue, Value>> values)
{
conditions.add(LogicalPlanUtils.getPartitionColumnValueMatchInCondition(mainDataset(), ingestMode().partitionValuesByField()));
}
else if (ingestMode().partitioned() && !ingestMode().partitionSpecList().isEmpty())
{
conditions.add(LogicalPlanUtils.getPartitionSpecMatchCondition(mainDataset(), ingestMode().partitionSpecList()));
}
return UpdateAbstract.of(mainDataset(), values, And.of(conditions));
}

Expand All @@ -254,7 +266,7 @@ public LogicalPlan visitNoOp(NoOpAbstract noOpAbstract)
public LogicalPlan visitDeleteTargetData(DeleteTargetDataAbstract deleteTargetDataAbstract)
{
List<Operation> operations = new ArrayList<>();
if (ingestMode().partitioned() && ingestMode().partitionValuesByField().isEmpty())
if (ingestMode().partitioned() && ingestMode().partitionValuesByField().isEmpty() && ingestMode().partitionSpecList().isEmpty())
{
return LogicalPlan.of(operations);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,14 +130,24 @@ public static Condition getPartitionColumnsMatchCondition(Dataset mainDataSet, D
return getColumnsMatchCondition(mainDataSet, stagingDataSet, partitionColumns);
}

public static Condition getPartitionColumnsDoNotMatchCondition(Dataset mainDataSet, Dataset stagingDataSet, String[] partitionColumns)
public static Condition getPartitionColumnValueMatchInCondition(Dataset dataSet, Map<String, Set<String>> partitionFilter)
{
return getColumnsDoNotMatchCondition(mainDataSet, stagingDataSet, partitionColumns);
return getColumnValueMatchInCondition(dataSet, partitionFilter);
}

public static Condition getPartitionColumnValueMatchInCondition(Dataset dataSet, Map<String, Set<String>> partitionFilter)
// (key1 = val11 AND key2 = val21) OR (key1 = val12 AND key2 = val22) OR ...
public static Condition getPartitionSpecMatchCondition(Dataset dataSet, List<Map<String, Object>> partitionSpecList)
{
return getColumnValueMatchInCondition(dataSet, partitionFilter);
return Or.of(partitionSpecList.stream()
.map(partitionSpec -> And.of(
partitionSpec.entrySet().stream()
.map(columnValuePair ->
Equals.of(
FieldValue.builder().datasetRef(dataSet.datasetReference()).fieldName(columnValuePair.getKey()).build(),
columnValuePair.getValue() instanceof Number ? ObjectValue.of(columnValuePair.getValue()) : StringValue.of((String) columnValuePair.getValue()))
)
.collect(Collectors.toList()))
).collect(Collectors.toList()));
}

private static Condition getColumnValueMatchInCondition(Dataset dataSet, Map<String, Set<String>> keyValuePair)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ public class AnsiTestArtifacts
"\"id\" INTEGER NOT NULL,\"name\" VARCHAR NOT NULL,\"amount\" DOUBLE,\"biz_date\" DATE,\"digest\" VARCHAR," +
"\"batch_id_in\" INTEGER NOT NULL,\"batch_id_out\" INTEGER,PRIMARY KEY (\"id\", \"name\", \"batch_id_in\"))";

public static String expectedMainTableWithMultiPartitionsCreateQuery = "CREATE TABLE IF NOT EXISTS \"mydb\".\"main\"(" +
"\"id\" INTEGER NOT NULL,\"name\" VARCHAR NOT NULL,\"amount\" DOUBLE,\"account_type\" INTEGER,\"biz_date\" DATE,\"digest\" VARCHAR," +
"\"batch_id_in\" INTEGER NOT NULL,\"batch_id_out\" INTEGER,PRIMARY KEY (\"id\", \"name\", \"batch_id_in\"))";

public static String expectedMainTableBatchIdAndVersionBasedCreateQuery = "CREATE TABLE IF NOT EXISTS \"mydb\".\"main\"(" +
"\"id\" INTEGER NOT NULL,\"name\" VARCHAR NOT NULL,\"amount\" DOUBLE,\"biz_date\" DATE,\"digest\" VARCHAR,\"version\" INTEGER," +
"\"batch_id_in\" INTEGER NOT NULL,\"batch_id_out\" INTEGER,PRIMARY KEY (\"id\", \"name\", \"batch_id_in\"))";
Expand All @@ -75,6 +79,10 @@ public class AnsiTestArtifacts
"(\"ID\" INTEGER NOT NULL,\"NAME\" VARCHAR NOT NULL,\"AMOUNT\" DOUBLE,\"BIZ_DATE\" DATE,\"DIGEST\" VARCHAR," +
"\"BATCH_ID_IN\" INTEGER NOT NULL,\"BATCH_ID_OUT\" INTEGER,PRIMARY KEY (\"ID\", \"NAME\", \"BATCH_ID_IN\"))";

public static String expectedMainTableMultiPartitionCreateQueryWithUpperCase = "CREATE TABLE IF NOT EXISTS \"MYDB\".\"MAIN\"" +
"(\"ID\" INTEGER NOT NULL,\"NAME\" VARCHAR NOT NULL,\"AMOUNT\" DOUBLE,\"ACCOUNT_TYPE\" INTEGER,\"BIZ_DATE\" DATE,\"DIGEST\" VARCHAR," +
"\"BATCH_ID_IN\" INTEGER NOT NULL,\"BATCH_ID_OUT\" INTEGER,PRIMARY KEY (\"ID\", \"NAME\", \"BATCH_ID_IN\"))";

public static String expectedMainTableTimeBasedCreateQuery = "CREATE TABLE IF NOT EXISTS \"mydb\".\"main\"(" +
"\"id\" INTEGER NOT NULL,\"name\" VARCHAR NOT NULL,\"amount\" DOUBLE,\"biz_date\" DATE,\"digest\" VARCHAR," +
"\"batch_time_in\" DATETIME NOT NULL,\"batch_time_out\" DATETIME,PRIMARY KEY (\"id\", \"name\", \"batch_time_in\"))";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,95 @@ public void verifyUnitemporalSnapshotWithPartitionFiltersNoDedupNoVersion(Genera
Assertions.assertEquals(getExpectedMetadataTableIngestQuery(), metadataIngestSql.get(0));
}

@Override
public void verifyUnitemporalSnapshotWithPartitionSpecListNoDedupNoVersion(GeneratorResult operations)
{
List<String> preActionsSql = operations.preActionsSql();
List<String> milestoningSql = operations.ingestSql();
List<String> metadataIngestSql = operations.metadataIngestSql();

String expectedMilestoneQuery = "UPDATE \"mydb\".\"main\" as sink SET " +
"sink.\"batch_id_out\" = (SELECT COALESCE(MAX(batch_metadata.\"table_batch_id\"),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.\"table_name\") = 'MAIN')-1 " +
"WHERE (sink.\"batch_id_out\" = 999999999) AND " +
"(NOT (EXISTS (SELECT * FROM \"mydb\".\"staging\" as stage " +
"WHERE ((sink.\"id\" = stage.\"id\") AND (sink.\"name\" = stage.\"name\")) AND (sink.\"digest\" = stage.\"digest\")))) " +
"AND (((sink.\"biz_date\" = '2024-01-01') AND (sink.\"account_type\" = 1)) " +
"OR ((sink.\"biz_date\" = '2024-01-02') AND (sink.\"account_type\" = 1)) " +
"OR ((sink.\"biz_date\" = '2024-01-02') AND (sink.\"account_type\" = 2)))";

String expectedUpsertQuery = "INSERT INTO \"mydb\".\"main\" " +
"(\"id\", \"name\", \"amount\", \"account_type\", \"biz_date\", \"digest\", \"batch_id_in\", \"batch_id_out\") " +
"(SELECT stage.\"id\",stage.\"name\",stage.\"amount\",stage.\"account_type\",stage.\"biz_date\",stage.\"digest\"," +
"(SELECT COALESCE(MAX(batch_metadata.\"table_batch_id\"),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.\"table_name\") = 'MAIN'),999999999 " +
"FROM \"mydb\".\"staging\" as stage WHERE " +
"NOT (stage.\"digest\" IN (SELECT sink.\"digest\" FROM \"mydb\".\"main\" as sink WHERE " +
"(sink.\"batch_id_out\" = 999999999) AND " +
"(((sink.\"biz_date\" = '2024-01-01') AND (sink.\"account_type\" = 1)) " +
"OR ((sink.\"biz_date\" = '2024-01-02') AND (sink.\"account_type\" = 1)) " +
"OR ((sink.\"biz_date\" = '2024-01-02') AND (sink.\"account_type\" = 2))))))";

Assertions.assertEquals(AnsiTestArtifacts.expectedMainTableWithMultiPartitionsCreateQuery, preActionsSql.get(0));
Assertions.assertEquals(getExpectedMetadataTableCreateQuery(), preActionsSql.get(1));

Assertions.assertEquals(expectedMilestoneQuery, milestoningSql.get(0));
Assertions.assertEquals(expectedUpsertQuery, milestoningSql.get(1));
Assertions.assertEquals(getExpectedMetadataTableIngestQuery(), metadataIngestSql.get(0));
}

@Override
public void verifyUnitemporalSnapshotWithPartitionSpecListNoDedupNoVersionInUpperCase(GeneratorResult operations)
{
List<String> preActionsSql = operations.preActionsSql();
List<String> milestoningSql = operations.ingestSql();
List<String> metadataIngestSql = operations.metadataIngestSql();

String expectedMilestoneQuery = "UPDATE \"MYDB\".\"MAIN\" as sink " +
"SET sink.\"BATCH_ID_OUT\" = (SELECT COALESCE(MAX(BATCH_METADATA.\"TABLE_BATCH_ID\"),0)+1 FROM BATCH_METADATA as BATCH_METADATA WHERE UPPER(BATCH_METADATA.\"TABLE_NAME\") = 'MAIN')-1 " +
"WHERE (sink.\"BATCH_ID_OUT\" = 999999999) AND " +
"(NOT (EXISTS (SELECT * FROM \"MYDB\".\"STAGING\" as stage " +
"WHERE ((sink.\"ID\" = stage.\"ID\") AND (sink.\"NAME\" = stage.\"NAME\")) AND (sink.\"DIGEST\" = stage.\"DIGEST\")))) " +
"AND (((sink.\"ACCOUNT_TYPE\" = 1) AND (sink.\"BIZ_DATE\" = '2024-01-01')) " +
"OR ((sink.\"ACCOUNT_TYPE\" = 1) AND (sink.\"BIZ_DATE\" = '2024-01-02')) " +
"OR ((sink.\"ACCOUNT_TYPE\" = 2) AND (sink.\"BIZ_DATE\" = '2024-01-02')))";

String expectedUpsertQuery = "INSERT INTO \"MYDB\".\"MAIN\" " +
"(\"ID\", \"NAME\", \"AMOUNT\", \"ACCOUNT_TYPE\", \"BIZ_DATE\", \"DIGEST\", \"BATCH_ID_IN\", \"BATCH_ID_OUT\") " +
"(SELECT stage.\"ID\",stage.\"NAME\",stage.\"AMOUNT\",stage.\"ACCOUNT_TYPE\",stage.\"BIZ_DATE\",stage.\"DIGEST\"," +
"(SELECT COALESCE(MAX(BATCH_METADATA.\"TABLE_BATCH_ID\"),0)+1 FROM BATCH_METADATA as BATCH_METADATA WHERE UPPER(BATCH_METADATA.\"TABLE_NAME\") = 'MAIN')," +
"999999999 FROM \"MYDB\".\"STAGING\" as stage " +
"WHERE NOT (stage.\"DIGEST\" IN (SELECT sink.\"DIGEST\" FROM \"MYDB\".\"MAIN\" as sink WHERE (sink.\"BATCH_ID_OUT\" = 999999999) " +
"AND (((sink.\"ACCOUNT_TYPE\" = 1) AND (sink.\"BIZ_DATE\" = '2024-01-01')) " +
"OR ((sink.\"ACCOUNT_TYPE\" = 1) AND (sink.\"BIZ_DATE\" = '2024-01-02')) " +
"OR ((sink.\"ACCOUNT_TYPE\" = 2) AND (sink.\"BIZ_DATE\" = '2024-01-02'))))))";

Assertions.assertEquals(AnsiTestArtifacts.expectedMainTableMultiPartitionCreateQueryWithUpperCase, preActionsSql.get(0));
Assertions.assertEquals(getExpectedMetadataTableCreateQueryWithUpperCase(), preActionsSql.get(1));
Assertions.assertEquals(expectedMilestoneQuery, milestoningSql.get(0));
Assertions.assertEquals(expectedUpsertQuery, milestoningSql.get(1));
Assertions.assertEquals(getExpectedMetadataTableIngestQueryWithUpperCase(), metadataIngestSql.get(0));
}

@Override
public void verifyUnitemporalSnapshotWithPartitionSpecListWithEmptyBatchHandling(GeneratorResult operations)
{
List<String> preActionsSql = operations.preActionsSql();
List<String> milestoningSql = operations.ingestSql();
List<String> metadataIngestSql = operations.metadataIngestSql();

String expectedMilestoneQuery = "UPDATE \"mydb\".\"main\" as sink SET " +
"sink.\"batch_id_out\" = (SELECT COALESCE(MAX(batch_metadata.\"table_batch_id\"),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.\"table_name\") = 'MAIN')-1 " +
"WHERE (sink.\"batch_id_out\" = 999999999) AND " +
"(((sink.\"biz_date\" = '2024-01-01') AND (sink.\"account_type\" = 1)) " +
"OR ((sink.\"biz_date\" = '2024-01-02') AND (sink.\"account_type\" = 1)) " +
"OR ((sink.\"biz_date\" = '2024-01-02') AND (sink.\"account_type\" = 2)))";

Assertions.assertEquals(AnsiTestArtifacts.expectedMainTableWithMultiPartitionsCreateQuery, preActionsSql.get(0));
Assertions.assertEquals(getExpectedMetadataTableCreateQuery(), preActionsSql.get(1));

Assertions.assertEquals(expectedMilestoneQuery, milestoningSql.get(0));
Assertions.assertEquals(getExpectedMetadataTableIngestQuery(), metadataIngestSql.get(0));
}

@Override
public void verifyUnitemporalSnapshotWithCleanStagingData(GeneratorResult operations)
{
Expand Down
Loading

0 comments on commit 26f48d1

Please sign in to comment.