Skip to content

Commit

Permalink
Upper case support for Ingest mode and address code review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
prasar-ashutosh committed May 3, 2024
1 parent a07e60b commit 1a70a6f
Show file tree
Hide file tree
Showing 14 changed files with 207 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,7 @@
import org.finos.legend.engine.persistence.components.ingestmode.versioning.AllVersionsStrategy;
import org.finos.legend.engine.persistence.components.ingestmode.versioning.MaxVersionStrategy;

import java.util.Optional;
import java.util.List;
import java.util.Set;
import java.util.Map;
import java.util.HashMap;
import java.util.*;

import java.util.function.Function;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -118,6 +114,7 @@ public IngestMode visitUnitemporalSnapshot(UnitemporalSnapshotAbstract unitempor
.transactionMilestoning(unitemporalSnapshot.transactionMilestoning().accept(new TransactionMilestoningCaseConverter()))
.addAllPartitionFields(applyCase(unitemporalSnapshot.partitionFields()))
.putAllPartitionValuesByField(applyCase(unitemporalSnapshot.partitionValuesByField()))
.addAllPartitionSpecList(applyCaseForListOfMap(unitemporalSnapshot.partitionSpecList()))
.emptyDatasetHandling(unitemporalSnapshot.emptyDatasetHandling())
.deduplicationStrategy(unitemporalSnapshot.deduplicationStrategy())
.versioningStrategy(unitemporalSnapshot.versioningStrategy().accept(new VersionStrategyCaseConverter()))
Expand Down Expand Up @@ -210,6 +207,21 @@ private Map<String, Set<String>> applyCase(Map<String, Set<String>> map)
return caseAppliedMap;
}

private List<Map<String, Object>> applyCaseForListOfMap(List<Map<String, Object>> listOfMap)
{
List<Map<String, Object>> caseAppliedListOfMap = new ArrayList<>();
for (Map<String, Object> map : listOfMap)
{
Map<String, Object> caseAppliedMap = new HashMap<>();
for (Map.Entry<String, Object> entry : map.entrySet())
{
caseAppliedMap.put(applyCase(entry.getKey()), entry.getValue());
}
caseAppliedListOfMap.add(caseAppliedMap);
}
return caseAppliedListOfMap;
}

private class MergeStrategyCaseConverter implements MergeStrategyVisitor<MergeStrategy>
{
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,7 @@
import org.finos.legend.engine.persistence.components.ingestmode.versioning.DigestBasedResolverAbstract;
import org.immutables.value.Value;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.*;

import static org.immutables.value.Value.Derived;
import static org.immutables.value.Value.Immutable;
Expand All @@ -52,10 +49,9 @@ public interface UnitemporalSnapshotAbstract extends IngestMode, TransactionMile

List<String> partitionFields();

List<Map<String, String>> partitionSpecList(); // [ [date: D1, Id: ID1, Name: N1], [date: D2, Id: ID2, Name: N2], ....]
List<Map<String, Object>> partitionSpecList(); // [ {date: D1, Id: ID1, Name: N1}, {date: D2, Id: ID2, Name: N2}, ....]

// for Backward compatibility -- to be deprecated
Map<String, Set<String>> partitionValuesByField();
Map<String, Set<String>> partitionValuesByField(); // for Backward compatibility -- to be deprecated

@Derived
default boolean partitioned()
Expand Down Expand Up @@ -108,19 +104,29 @@ default void validate()
}
if (partitionKeysWithMoreThanOneValues > 1)
{
throw new IllegalStateException(String.format("Can not build UnitemporalSnapshot, partitionValuesByField does not support more than 1 value for more than one partition key"));
throw new IllegalStateException(String.format("Can not build UnitemporalSnapshot, in partitionValuesByField at most one of the partition keys can have more than one value, all other partition keys must have exactly one value"));
}
}

if (!partitionSpecList().isEmpty())
{
for (Map<String, String> partitionSpec : partitionSpecList())
for (Map<String, Object> partitionSpec : partitionSpecList())
{
if (partitionFields().size() != partitionSpec.size())
{
throw new IllegalStateException("Can not build UnitemporalSnapshot, size of each partitionSpec must be same as size of partitionFields");
}
}
for (Map<String, Object> partitionSpec : partitionSpecList())
{
for (String partitionKey: partitionSpec.keySet())
{
if (!partitionFields().contains(partitionKey))
{
throw new IllegalStateException(String.format("Can not build UnitemporalSnapshot, partitionKey: [%s] not specified in partitionSpec", partitionKey));
}
}
}
}

// Allowed Versioning Strategy - NoVersioning, MaxVersioining
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,15 +136,15 @@ public static Condition getPartitionColumnValueMatchInCondition(Dataset dataSet,
}

// (key1 = val11 AND key2 = val21) OR (key1 = val12 AND key2 = val22) OR ...
public static Condition getPartitionSpecMatchCondition(Dataset dataSet, List<Map<String, String>> partitionSpecList)
public static Condition getPartitionSpecMatchCondition(Dataset dataSet, List<Map<String, Object>> partitionSpecList)
{
return Or.of(partitionSpecList.stream()
.map(partitionSpec -> And.of(
partitionSpec.entrySet().stream()
.map(columnValuePair ->
Equals.of(
FieldValue.builder().datasetRef(dataSet.datasetReference()).fieldName(columnValuePair.getKey()).build(),
StringValue.of(columnValuePair.getValue()))
StringValue.of((String) columnValuePair.getValue()))
)
.collect(Collectors.toList()))
).collect(Collectors.toList()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ public class AnsiTestArtifacts
"(\"ID\" INTEGER NOT NULL,\"NAME\" VARCHAR NOT NULL,\"AMOUNT\" DOUBLE,\"BIZ_DATE\" DATE,\"DIGEST\" VARCHAR," +
"\"BATCH_ID_IN\" INTEGER NOT NULL,\"BATCH_ID_OUT\" INTEGER,PRIMARY KEY (\"ID\", \"NAME\", \"BATCH_ID_IN\"))";

public static String expectedMainTableMultiPartitionCreateQueryWithUpperCase = "CREATE TABLE IF NOT EXISTS \"MYDB\".\"MAIN\"" +
"(\"ID\" INTEGER NOT NULL,\"NAME\" VARCHAR NOT NULL,\"AMOUNT\" DOUBLE,\"ACCOUNT_TYPE\" VARCHAR,\"BIZ_DATE\" DATE,\"DIGEST\" VARCHAR," +
"\"BATCH_ID_IN\" INTEGER NOT NULL,\"BATCH_ID_OUT\" INTEGER,PRIMARY KEY (\"ID\", \"NAME\", \"BATCH_ID_IN\"))";

public static String expectedMainTableTimeBasedCreateQuery = "CREATE TABLE IF NOT EXISTS \"mydb\".\"main\"(" +
"\"id\" INTEGER NOT NULL,\"name\" VARCHAR NOT NULL,\"amount\" DOUBLE,\"biz_date\" DATE,\"digest\" VARCHAR," +
"\"batch_time_in\" DATETIME NOT NULL,\"batch_time_out\" DATETIME,PRIMARY KEY (\"id\", \"name\", \"batch_time_in\"))";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,39 @@ public void verifyUnitemporalSnapshotWithPartitionSpecListNoDedupNoVersion(Gener
Assertions.assertEquals(getExpectedMetadataTableIngestQuery(), metadataIngestSql.get(0));
}

@Override
public void verifyUnitemporalSnapshotWithPartitionSpecListNoDedupNoVersionInUpperCase(GeneratorResult operations)
{
List<String> preActionsSql = operations.preActionsSql();
List<String> milestoningSql = operations.ingestSql();
List<String> metadataIngestSql = operations.metadataIngestSql();

String expectedMilestoneQuery = "UPDATE \"MYDB\".\"MAIN\" as sink " +
"SET sink.\"BATCH_ID_OUT\" = (SELECT COALESCE(MAX(BATCH_METADATA.\"TABLE_BATCH_ID\"),0)+1 FROM BATCH_METADATA as BATCH_METADATA WHERE UPPER(BATCH_METADATA.\"TABLE_NAME\") = 'MAIN')-1 " +
"WHERE (sink.\"BATCH_ID_OUT\" = 999999999) AND " +
"(NOT (EXISTS (SELECT * FROM \"MYDB\".\"STAGING\" as stage " +
"WHERE ((sink.\"ID\" = stage.\"ID\") AND (sink.\"NAME\" = stage.\"NAME\")) AND (sink.\"DIGEST\" = stage.\"DIGEST\")))) " +
"AND (((sink.\"ACCOUNT_TYPE\" = 'TYPE_1') AND (sink.\"BIZ_DATE\" = '2024-01-01')) " +
"OR ((sink.\"ACCOUNT_TYPE\" = 'TYPE_1') AND (sink.\"BIZ_DATE\" = '2024-01-02')) " +
"OR ((sink.\"ACCOUNT_TYPE\" = 'TYPE_2') AND (sink.\"BIZ_DATE\" = '2024-01-02')))";

String expectedUpsertQuery = "INSERT INTO \"MYDB\".\"MAIN\" " +
"(\"ID\", \"NAME\", \"AMOUNT\", \"ACCOUNT_TYPE\", \"BIZ_DATE\", \"DIGEST\", \"BATCH_ID_IN\", \"BATCH_ID_OUT\") " +
"(SELECT stage.\"ID\",stage.\"NAME\",stage.\"AMOUNT\",stage.\"ACCOUNT_TYPE\",stage.\"BIZ_DATE\",stage.\"DIGEST\"," +
"(SELECT COALESCE(MAX(BATCH_METADATA.\"TABLE_BATCH_ID\"),0)+1 FROM BATCH_METADATA as BATCH_METADATA WHERE UPPER(BATCH_METADATA.\"TABLE_NAME\") = 'MAIN')," +
"999999999 FROM \"MYDB\".\"STAGING\" as stage " +
"WHERE NOT (stage.\"DIGEST\" IN (SELECT sink.\"DIGEST\" FROM \"MYDB\".\"MAIN\" as sink WHERE (sink.\"BATCH_ID_OUT\" = 999999999) " +
"AND (((sink.\"ACCOUNT_TYPE\" = 'TYPE_1') AND (sink.\"BIZ_DATE\" = '2024-01-01')) " +
"OR ((sink.\"ACCOUNT_TYPE\" = 'TYPE_1') AND (sink.\"BIZ_DATE\" = '2024-01-02')) " +
"OR ((sink.\"ACCOUNT_TYPE\" = 'TYPE_2') AND (sink.\"BIZ_DATE\" = '2024-01-02'))))))";

Assertions.assertEquals(AnsiTestArtifacts.expectedMainTableMultiPartitionCreateQueryWithUpperCase, preActionsSql.get(0));
Assertions.assertEquals(getExpectedMetadataTableCreateQueryWithUpperCase(), preActionsSql.get(1));
Assertions.assertEquals(expectedMilestoneQuery, milestoningSql.get(0));
Assertions.assertEquals(expectedUpsertQuery, milestoningSql.get(1));
Assertions.assertEquals(getExpectedMetadataTableIngestQueryWithUpperCase(), metadataIngestSql.get(0));
}

@Override
public void verifyUnitemporalSnapshotWithPartitionSpecListWithEmptyBatchHandling(GeneratorResult operations)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,10 @@ public class BigQueryTestArtifacts
"`id` INT64 NOT NULL,`name` STRING NOT NULL,`amount` FLOAT64,`account_type` STRING,`biz_date` DATE,`digest` STRING," +
"`batch_id_in` INT64 NOT NULL,`batch_id_out` INT64,PRIMARY KEY (`id`, `name`, `batch_id_in`) NOT ENFORCED)";

public static String expectedMainTableWithMultiPartitionCreateQueryUpperCase = "CREATE TABLE IF NOT EXISTS `MYDB`.`MAIN`" +
"(`ID` INT64 NOT NULL,`NAME` STRING NOT NULL,`AMOUNT` FLOAT64,`ACCOUNT_TYPE` STRING,`BIZ_DATE` DATE,`DIGEST` STRING," +
"`BATCH_ID_IN` INT64 NOT NULL,`BATCH_ID_OUT` INT64,PRIMARY KEY (`ID`, `NAME`, `BATCH_ID_IN`) NOT ENFORCED)";

public static String expectedMetadataTableCreateQuery = "CREATE TABLE IF NOT EXISTS batch_metadata" +
"(`table_name` STRING(255)," +
"`batch_start_ts_utc` DATETIME," +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,38 @@ public void verifyUnitemporalSnapshotWithPartitionSpecListNoDedupNoVersion(Gener
Assertions.assertEquals(getExpectedMetadataTableIngestQuery(), metadataIngestSql.get(0));
}

@Override
public void verifyUnitemporalSnapshotWithPartitionSpecListNoDedupNoVersionInUpperCase(GeneratorResult operations)
{
List<String> preActionsSql = operations.preActionsSql();
List<String> milestoningSql = operations.ingestSql();
List<String> metadataIngestSql = operations.metadataIngestSql();

String expectedMilestoneQuery = "UPDATE `MYDB`.`MAIN` as sink SET " +
"sink.`BATCH_ID_OUT` = (SELECT COALESCE(MAX(BATCH_METADATA.`TABLE_BATCH_ID`),0)+1 FROM BATCH_METADATA as BATCH_METADATA WHERE UPPER(BATCH_METADATA.`TABLE_NAME`) = 'MAIN')-1 " +
"WHERE (sink.`BATCH_ID_OUT` = 999999999) AND (NOT (EXISTS (SELECT * FROM `MYDB`.`STAGING` as stage " +
"WHERE ((sink.`ID` = stage.`ID`) AND (sink.`NAME` = stage.`NAME`)) AND (sink.`DIGEST` = stage.`DIGEST`)))) " +
"AND (((sink.`ACCOUNT_TYPE` = 'TYPE_1') AND (sink.`BIZ_DATE` = '2024-01-01')) " +
"OR ((sink.`ACCOUNT_TYPE` = 'TYPE_1') AND (sink.`BIZ_DATE` = '2024-01-02')) " +
"OR ((sink.`ACCOUNT_TYPE` = 'TYPE_2') AND (sink.`BIZ_DATE` = '2024-01-02')))";

String expectedUpsertQuery = "INSERT INTO `MYDB`.`MAIN` " +
"(`ID`, `NAME`, `AMOUNT`, `ACCOUNT_TYPE`, `BIZ_DATE`, `DIGEST`, `BATCH_ID_IN`, `BATCH_ID_OUT`) " +
"(SELECT stage.`ID`,stage.`NAME`,stage.`AMOUNT`,stage.`ACCOUNT_TYPE`,stage.`BIZ_DATE`,stage.`DIGEST`," +
"(SELECT COALESCE(MAX(BATCH_METADATA.`TABLE_BATCH_ID`),0)+1 FROM BATCH_METADATA as BATCH_METADATA WHERE UPPER(BATCH_METADATA.`TABLE_NAME`) = 'MAIN')," +
"999999999 FROM `MYDB`.`STAGING` as stage WHERE " +
"NOT (stage.`DIGEST` IN (SELECT sink.`DIGEST` FROM `MYDB`.`MAIN` as sink WHERE (sink.`BATCH_ID_OUT` = 999999999) AND " +
"(((sink.`ACCOUNT_TYPE` = 'TYPE_1') AND (sink.`BIZ_DATE` = '2024-01-01')) " +
"OR ((sink.`ACCOUNT_TYPE` = 'TYPE_1') AND (sink.`BIZ_DATE` = '2024-01-02')) " +
"OR ((sink.`ACCOUNT_TYPE` = 'TYPE_2') AND (sink.`BIZ_DATE` = '2024-01-02'))))))";

Assertions.assertEquals(BigQueryTestArtifacts.expectedMainTableWithMultiPartitionCreateQueryUpperCase, preActionsSql.get(0));
Assertions.assertEquals(BigQueryTestArtifacts.expectedMetadataTableCreateQueryWithUpperCase, preActionsSql.get(1));
Assertions.assertEquals(expectedMilestoneQuery, milestoningSql.get(0));
Assertions.assertEquals(expectedUpsertQuery, milestoningSql.get(1));
Assertions.assertEquals(getExpectedMetadataTableIngestQueryWithUpperCase(), metadataIngestSql.get(0));
}

@Override
public void verifyUnitemporalSnapshotWithPartitionSpecListWithEmptyBatchHandling(GeneratorResult operations)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ void testUnitemporalSnapshotMilestoningLogicWithMultiplePartitionValues() throws
createStagingTable(stagingTable);


List<Map<String, String>> partitionSpecList = new ArrayList<>();
List<Map<String, Object>> partitionSpecList = new ArrayList<>();
addPartitionSpec(partitionSpecList, "2024-01-01", "ACCOUNT_1");
addPartitionSpec(partitionSpecList, "2024-01-01", "ACCOUNT_2");
addPartitionSpec(partitionSpecList, "2024-01-02", "ACCOUNT_1");
Expand Down Expand Up @@ -333,9 +333,9 @@ void testUnitemporalSnapshotMilestoningLogicWithMultiplePartitionValues() throws
executePlansAndVerifyResults(ingestModeWithDeleteTargetData, options, datasets, schema, expectedDataPass3, expectedStats);
}

private static void addPartitionSpec(List<Map<String, String>> partitionSpecList, String date, String accountNum)
private static void addPartitionSpec(List<Map<String, Object>> partitionSpecList, String date, String accountNum)
{
partitionSpecList.add(new HashMap<String,String>()
partitionSpecList.add(new HashMap<String,Object>()
{
{
put(dateName, date);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,10 @@ public class MemsqlTestArtifacts
"(`ID` INTEGER NOT NULL,`NAME` VARCHAR(256) NOT NULL,`AMOUNT` DOUBLE,`BIZ_DATE` DATE,`DIGEST` VARCHAR(256)," +
"`BATCH_ID_IN` INTEGER NOT NULL,`BATCH_ID_OUT` INTEGER,PRIMARY KEY (`ID`, `NAME`, `BATCH_ID_IN`))";

public static String expectedMainTableMultiPartitionsCreateQueryWithUpperCase = "CREATE TABLE IF NOT EXISTS `MYDB`.`MAIN`" +
"(`ID` INTEGER NOT NULL,`NAME` VARCHAR(256) NOT NULL,`AMOUNT` DOUBLE,`ACCOUNT_TYPE` VARCHAR(256),`BIZ_DATE` DATE,`DIGEST` VARCHAR(256)," +
"`BATCH_ID_IN` INTEGER NOT NULL,`BATCH_ID_OUT` INTEGER,PRIMARY KEY (`ID`, `NAME`, `BATCH_ID_IN`))";

public static String expectedMetadataTableIngestQuery = "INSERT INTO batch_metadata (`table_name`, `table_batch_id`, `batch_start_ts_utc`, `batch_end_ts_utc`, `batch_status`)" +
" (SELECT 'main',(SELECT COALESCE(MAX(batch_metadata.`table_batch_id`),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.`table_name`) = 'MAIN'),'2000-01-01 00:00:00.000000',CURRENT_TIMESTAMP(),'DONE')";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package org.finos.legend.engine.persistence.components.ingestmode;

import org.finos.legend.engine.persistence.components.AnsiTestArtifacts;
import org.finos.legend.engine.persistence.components.common.DedupAndVersionErrorSqlType;
import org.finos.legend.engine.persistence.components.relational.RelationalSink;
import org.finos.legend.engine.persistence.components.relational.api.GeneratorResult;
Expand Down Expand Up @@ -216,6 +217,38 @@ public void verifyUnitemporalSnapshotWithPartitionSpecListNoDedupNoVersion(Gener
Assertions.assertEquals(getExpectedMetadataTableIngestQuery(), metadataIngestSql.get(0));
}

@Override
public void verifyUnitemporalSnapshotWithPartitionSpecListNoDedupNoVersionInUpperCase(GeneratorResult operations)
{
List<String> preActionsSql = operations.preActionsSql();
List<String> milestoningSql = operations.ingestSql();
List<String> metadataIngestSql = operations.metadataIngestSql();

String expectedMilestoneQuery = "UPDATE `MYDB`.`MAIN` as sink SET " +
"sink.`BATCH_ID_OUT` = (SELECT COALESCE(MAX(BATCH_METADATA.`TABLE_BATCH_ID`),0)+1 FROM BATCH_METADATA as BATCH_METADATA WHERE UPPER(BATCH_METADATA.`TABLE_NAME`) = 'MAIN')-1 " +
"WHERE (sink.`BATCH_ID_OUT` = 999999999) AND (NOT (EXISTS (SELECT * FROM `MYDB`.`STAGING` as stage " +
"WHERE ((sink.`ID` = stage.`ID`) AND (sink.`NAME` = stage.`NAME`)) AND (sink.`DIGEST` = stage.`DIGEST`)))) " +
"AND (((sink.`ACCOUNT_TYPE` = 'TYPE_1') AND (sink.`BIZ_DATE` = '2024-01-01')) " +
"OR ((sink.`ACCOUNT_TYPE` = 'TYPE_1') AND (sink.`BIZ_DATE` = '2024-01-02')) " +
"OR ((sink.`ACCOUNT_TYPE` = 'TYPE_2') AND (sink.`BIZ_DATE` = '2024-01-02')))";

String expectedUpsertQuery = "INSERT INTO `MYDB`.`MAIN` " +
"(`ID`, `NAME`, `AMOUNT`, `ACCOUNT_TYPE`, `BIZ_DATE`, `DIGEST`, `BATCH_ID_IN`, `BATCH_ID_OUT`) " +
"(SELECT stage.`ID`,stage.`NAME`,stage.`AMOUNT`,stage.`ACCOUNT_TYPE`,stage.`BIZ_DATE`,stage.`DIGEST`," +
"(SELECT COALESCE(MAX(BATCH_METADATA.`TABLE_BATCH_ID`),0)+1 FROM BATCH_METADATA as BATCH_METADATA WHERE UPPER(BATCH_METADATA.`TABLE_NAME`) = 'MAIN')," +
"999999999 FROM `MYDB`.`STAGING` as stage WHERE " +
"NOT (stage.`DIGEST` IN (SELECT sink.`DIGEST` FROM `MYDB`.`MAIN` as sink WHERE (sink.`BATCH_ID_OUT` = 999999999) AND " +
"(((sink.`ACCOUNT_TYPE` = 'TYPE_1') AND (sink.`BIZ_DATE` = '2024-01-01')) " +
"OR ((sink.`ACCOUNT_TYPE` = 'TYPE_1') AND (sink.`BIZ_DATE` = '2024-01-02')) " +
"OR ((sink.`ACCOUNT_TYPE` = 'TYPE_2') AND (sink.`BIZ_DATE` = '2024-01-02'))))))";

Assertions.assertEquals(MemsqlTestArtifacts.expectedMainTableMultiPartitionsCreateQueryWithUpperCase, preActionsSql.get(0));
Assertions.assertEquals(MemsqlTestArtifacts.expectedMetadataTableCreateQueryWithUpperCase, preActionsSql.get(1));
Assertions.assertEquals(expectedMilestoneQuery, milestoningSql.get(0));
Assertions.assertEquals(expectedUpsertQuery, milestoningSql.get(1));
Assertions.assertEquals(getExpectedMetadataTableIngestQueryWithUpperCase(), metadataIngestSql.get(0));
}

@Override
public void verifyUnitemporalSnapshotWithPartitionSpecListWithEmptyBatchHandling(GeneratorResult operations)
{
Expand Down
Loading

0 comments on commit 1a70a6f

Please sign in to comment.