Skip to content

Commit

Permalink
Persistence Component: Tests for unitemporal snapshot with partition …
Browse files Browse the repository at this point in the history
…without digest
  • Loading branch information
rengam32 committed Dec 24, 2024
1 parent 50a89fc commit 551506b
Show file tree
Hide file tree
Showing 14 changed files with 730 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,7 @@ public PartitioningStrategy visitPartitioning(PartitioningAbstract partitionStra
{
return Partitioning.builder()
.addAllPartitionFields(applyCase(partitionStrategy.partitionFields()))
.deleteStrategy(partitionStrategy.deleteStrategy())
.putAllPartitionValuesByField(applyCase(partitionStrategy.partitionValuesByField()))
.addAllPartitionSpecList(applyCaseForListOfMap(partitionStrategy.partitionSpecList()))
.derivePartitionSpec(partitionStrategy.derivePartitionSpec())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@
)
public interface UnitemporalSnapshotAbstract extends IngestMode, TransactionMilestoned
{
Optional<String> digestField();

@Value.Default
default PartitioningStrategy partitioningStrategy()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,24 +210,23 @@ else if (!partition.partitionSpecList().isEmpty())
*/
protected Update getSqlToMilestoneRows(List<Pair<FieldValue, Value>> values)
{
Condition notExistsWhereClause = Not.of(Exists.of(
Selection.builder()
.source(stagingDataset())
.condition(And.builder().addConditions(primaryKeysMatchCondition, digestMatchCondition).build())
.addAllFields(LogicalPlanUtils.ALL_COLUMNS())
.build()));
List<Condition> whereClause = new ArrayList<>(Arrays.asList(openRecordCondition));

List<Condition> whereClause = new ArrayList<>((Arrays.asList(openRecordCondition)));
if (!(partitioning.isPresent() && partitioning.get().deleteStrategy() == DeleteStrategy.DELETE_ALL))
{
Condition notExistsWhereClause = Not.of(Exists.of(
Selection.builder()
.source(stagingDataset())
.condition(And.builder().addConditions(primaryKeysMatchCondition, digestMatchCondition).build())
.addAllFields(LogicalPlanUtils.ALL_COLUMNS())
.build()));
whereClause.add(notExistsWhereClause);
}

if (partitioning.isPresent())
{
Partitioning partition = partitioning.get();

if (partition.deleteStrategy() == DeleteStrategy.DELETE_UPDATED)
{
whereClause.add(notExistsWhereClause);
}

if (!partition.partitionValuesByField().isEmpty())
{
whereClause.add(LogicalPlanUtils.getPartitionColumnValueMatchInCondition(mainDataset(), partition.partitionValuesByField()));
Expand All @@ -247,10 +246,6 @@ else if (!partition.partitionSpecList().isEmpty())
whereClause.add(partitionColumnCondition);
}
}
else
{
whereClause.add(notExistsWhereClause);
}

return UpdateAbstract.of(mainDataset(), values, And.of(whereClause));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,18 @@ public class AnsiTestArtifacts
"\"id\" INTEGER NOT NULL,\"name\" VARCHAR NOT NULL,\"amount\" DOUBLE,\"biz_date\" DATE,\"digest\" VARCHAR," +
"\"batch_id_in\" INTEGER NOT NULL,\"batch_id_out\" INTEGER,PRIMARY KEY (\"id\", \"name\", \"batch_id_in\"))";

public static String expectedMainTableBatchIdBasedWithoutDigestCreateQuery = "CREATE TABLE IF NOT EXISTS \"mydb\".\"main\"(" +
"\"id\" INTEGER NOT NULL,\"name\" VARCHAR NOT NULL,\"amount\" DOUBLE,\"biz_date\" DATE," +
"\"batch_id_in\" INTEGER NOT NULL,\"batch_id_out\" INTEGER,PRIMARY KEY (\"id\", \"name\", \"batch_id_in\"))";

public static String expectedMainTableWithMultiPartitionsCreateQuery = "CREATE TABLE IF NOT EXISTS \"mydb\".\"main\"(" +
"\"id\" INTEGER NOT NULL,\"name\" VARCHAR NOT NULL,\"amount\" DOUBLE,\"account_type\" INTEGER,\"biz_date\" DATE,\"digest\" VARCHAR," +
"\"batch_id_in\" INTEGER NOT NULL,\"batch_id_out\" INTEGER,PRIMARY KEY (\"id\", \"name\", \"batch_id_in\"))";

public static String expectedMainTableWithMultiPartitionsWithoutDigestCreateQuery = "CREATE TABLE IF NOT EXISTS \"mydb\".\"main\"(" +
"\"id\" INTEGER NOT NULL,\"name\" VARCHAR NOT NULL,\"amount\" DOUBLE,\"account_type\" INTEGER,\"biz_date\" DATE," +
"\"batch_id_in\" INTEGER NOT NULL,\"batch_id_out\" INTEGER,PRIMARY KEY (\"id\", \"name\", \"batch_id_in\"))";

public static String expectedMainTableBatchIdAndVersionBasedCreateQuery = "CREATE TABLE IF NOT EXISTS \"mydb\".\"main\"(" +
"\"id\" INTEGER NOT NULL,\"name\" VARCHAR NOT NULL,\"amount\" DOUBLE,\"biz_date\" DATE,\"digest\" VARCHAR,\"version\" INTEGER," +
"\"batch_id_in\" INTEGER NOT NULL,\"batch_id_out\" INTEGER,PRIMARY KEY (\"id\", \"name\", \"batch_id_in\"))";
Expand All @@ -87,6 +95,10 @@ public class AnsiTestArtifacts
"(\"ID\" INTEGER NOT NULL,\"NAME\" VARCHAR NOT NULL,\"AMOUNT\" DOUBLE,\"ACCOUNT_TYPE\" INTEGER,\"BIZ_DATE\" DATE,\"DIGEST\" VARCHAR," +
"\"BATCH_ID_IN\" INTEGER NOT NULL,\"BATCH_ID_OUT\" INTEGER,PRIMARY KEY (\"ID\", \"NAME\", \"BATCH_ID_IN\"))";

public static String expectedMainTableMultiPartitionWithoutDigestCreateQueryWithUpperCase = "CREATE TABLE IF NOT EXISTS \"MYDB\".\"MAIN\"" +
"(\"ID\" INTEGER NOT NULL,\"NAME\" VARCHAR NOT NULL,\"AMOUNT\" DOUBLE,\"ACCOUNT_TYPE\" INTEGER,\"BIZ_DATE\" DATE," +
"\"BATCH_ID_IN\" INTEGER NOT NULL,\"BATCH_ID_OUT\" INTEGER,PRIMARY KEY (\"ID\", \"NAME\", \"BATCH_ID_IN\"))";

public static String expectedMainTableTimeBasedCreateQuery = "CREATE TABLE IF NOT EXISTS \"mydb\".\"main\"(" +
"\"id\" INTEGER NOT NULL,\"name\" VARCHAR NOT NULL,\"amount\" DOUBLE,\"biz_date\" DATE,\"digest\" VARCHAR," +
"\"batch_time_in\" DATETIME NOT NULL,\"batch_time_out\" DATETIME,PRIMARY KEY (\"id\", \"name\", \"batch_time_in\"))";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,134 @@ public void verifyUnitemporalSnapshotWithCleanStagingData(GeneratorResult operat
Assertions.assertEquals(AnsiTestArtifacts.expectedStagingCleanupQuery, postActionsSql.get(0));
}

@Override
public void verifyUnitemporalSnapshotWithPartitionNoDedupNoVersionNoDigest(GeneratorResult operations)
{
List<String> preActionsSql = operations.preActionsSql();
List<String> milestoningSql = operations.ingestSql();
List<String> metadataIngestSql = operations.metadataIngestSql();

String expectedMilestoneQuery = "UPDATE \"mydb\".\"main\" as sink SET sink.\"batch_id_out\" = " +
"(SELECT COALESCE(MAX(batch_metadata.\"table_batch_id\"),0)+1 FROM batch_metadata as batch_metadata WHERE " +
"UPPER(batch_metadata.\"table_name\") = 'MAIN')-1 WHERE (sink.\"batch_id_out\" = 999999999) AND " +
"(EXISTS (SELECT * FROM \"mydb\".\"staging\" as stage WHERE sink.\"biz_date\" = stage.\"biz_date\"))";

String expectedUpsertQuery = "INSERT INTO \"mydb\".\"main\" (\"id\", \"name\", \"amount\", \"biz_date\", \"batch_id_in\", \"batch_id_out\") " +
"(SELECT stage.\"id\",stage.\"name\",stage.\"amount\",stage.\"biz_date\"," +
"(SELECT COALESCE(MAX(batch_metadata.\"table_batch_id\"),0)+1 FROM batch_metadata as batch_metadata WHERE " +
"UPPER(batch_metadata.\"table_name\") = 'MAIN'),999999999 FROM \"mydb\".\"staging\" as stage)";

Assertions.assertEquals(AnsiTestArtifacts.expectedMainTableBatchIdBasedWithoutDigestCreateQuery, preActionsSql.get(0));
Assertions.assertEquals(getExpectedMetadataTableCreateQuery(), preActionsSql.get(1));

Assertions.assertEquals(expectedMilestoneQuery, milestoningSql.get(0));
Assertions.assertEquals(expectedUpsertQuery, milestoningSql.get(1));
Assertions.assertEquals(getExpectedMetadataTableIngestQuery(), metadataIngestSql.get(0));
verifyStats(operations, incomingRecordCount, rowsUpdated, rowsDeleted, rowsInserted, rowsTerminated);
}

@Override
public void verifyUnitemporalSnapshotWithPartitionFiltersNoDedupNoVersionNoDigest(GeneratorResult operations)
{
List<String> preActionsSql = operations.preActionsSql();
List<String> milestoningSql = operations.ingestSql();
List<String> metadataIngestSql = operations.metadataIngestSql();

String expectedMilestoneQuery = "UPDATE \"mydb\".\"main\" as sink " +
"SET sink.\"batch_id_out\" = (SELECT COALESCE(MAX(batch_metadata.\"table_batch_id\"),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.\"table_name\") = 'MAIN')-1 " +
"WHERE (sink.\"batch_id_out\" = 999999999) " +
"AND (sink.\"biz_date\" IN ('2000-01-01 00:00:00','2000-01-02 00:00:00'))";

String expectedUpsertQuery = "INSERT INTO \"mydb\".\"main\" " +
"(\"id\", \"name\", \"amount\", \"biz_date\", \"batch_id_in\", \"batch_id_out\") " +
"(SELECT stage.\"id\",stage.\"name\",stage.\"amount\",stage.\"biz_date\"," +
"(SELECT COALESCE(MAX(batch_metadata.\"table_batch_id\"),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.\"table_name\") = 'MAIN'),999999999 " +
"FROM \"mydb\".\"staging\" as stage)";

Assertions.assertEquals(AnsiTestArtifacts.expectedMainTableBatchIdBasedWithoutDigestCreateQuery, preActionsSql.get(0));
Assertions.assertEquals(getExpectedMetadataTableCreateQuery(), preActionsSql.get(1));

Assertions.assertEquals(expectedMilestoneQuery, milestoningSql.get(0));
Assertions.assertEquals(expectedUpsertQuery, milestoningSql.get(1));
Assertions.assertEquals(getExpectedMetadataTableIngestQuery(), metadataIngestSql.get(0));
}

@Override
public void verifyUnitemporalSnapshotWithPartitionSpecListNoDedupNoVersionNoDigest(GeneratorResult operations)
{
List<String> preActionsSql = operations.preActionsSql();
List<String> milestoningSql = operations.ingestSql();
List<String> metadataIngestSql = operations.metadataIngestSql();

String expectedMilestoneQuery = "UPDATE \"mydb\".\"main\" as sink SET " +
"sink.\"batch_id_out\" = (SELECT COALESCE(MAX(batch_metadata.\"table_batch_id\"),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.\"table_name\") = 'MAIN')-1 " +
"WHERE (sink.\"batch_id_out\" = 999999999) " +
"AND (((sink.\"biz_date\" = '2024-01-01') AND (sink.\"account_type\" = 1)) " +
"OR ((sink.\"biz_date\" = '2024-01-02') AND (sink.\"account_type\" = 1)) " +
"OR ((sink.\"biz_date\" = '2024-01-02') AND (sink.\"account_type\" = 2)))";

String expectedUpsertQuery = "INSERT INTO \"mydb\".\"main\" " +
"(\"id\", \"name\", \"amount\", \"account_type\", \"biz_date\", \"batch_id_in\", \"batch_id_out\") " +
"(SELECT stage.\"id\",stage.\"name\",stage.\"amount\",stage.\"account_type\",stage.\"biz_date\"," +
"(SELECT COALESCE(MAX(batch_metadata.\"table_batch_id\"),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.\"table_name\") = 'MAIN'),999999999 " +
"FROM \"mydb\".\"staging\" as stage)";

Assertions.assertEquals(AnsiTestArtifacts.expectedMainTableWithMultiPartitionsWithoutDigestCreateQuery, preActionsSql.get(0));
Assertions.assertEquals(getExpectedMetadataTableCreateQuery(), preActionsSql.get(1));

Assertions.assertEquals(expectedMilestoneQuery, milestoningSql.get(0));
Assertions.assertEquals(expectedUpsertQuery, milestoningSql.get(1));
Assertions.assertEquals(getExpectedMetadataTableIngestQuery(), metadataIngestSql.get(0));
}

@Override
public void verifyUnitemporalSnapshotWithPartitionSpecListNoDedupNoVersionInUpperCaseNoDigest(GeneratorResult operations)
{
List<String> preActionsSql = operations.preActionsSql();
List<String> milestoningSql = operations.ingestSql();
List<String> metadataIngestSql = operations.metadataIngestSql();

String expectedMilestoneQuery = "UPDATE \"MYDB\".\"MAIN\" as sink " +
"SET sink.\"BATCH_ID_OUT\" = (SELECT COALESCE(MAX(BATCH_METADATA.\"TABLE_BATCH_ID\"),0)+1 FROM BATCH_METADATA as BATCH_METADATA WHERE UPPER(BATCH_METADATA.\"TABLE_NAME\") = 'MAIN')-1 " +
"WHERE (sink.\"BATCH_ID_OUT\" = 999999999) " +
"AND (((sink.\"ACCOUNT_TYPE\" = 1) AND (sink.\"BIZ_DATE\" = '2024-01-01')) " +
"OR ((sink.\"ACCOUNT_TYPE\" = 1) AND (sink.\"BIZ_DATE\" = '2024-01-02')) " +
"OR ((sink.\"ACCOUNT_TYPE\" = 2) AND (sink.\"BIZ_DATE\" = '2024-01-02')))";

String expectedUpsertQuery = "INSERT INTO \"MYDB\".\"MAIN\" " +
"(\"ID\", \"NAME\", \"AMOUNT\", \"ACCOUNT_TYPE\", \"BIZ_DATE\", \"BATCH_ID_IN\", \"BATCH_ID_OUT\") " +
"(SELECT stage.\"ID\",stage.\"NAME\",stage.\"AMOUNT\",stage.\"ACCOUNT_TYPE\",stage.\"BIZ_DATE\"," +
"(SELECT COALESCE(MAX(BATCH_METADATA.\"TABLE_BATCH_ID\"),0)+1 FROM BATCH_METADATA as BATCH_METADATA WHERE UPPER(BATCH_METADATA.\"TABLE_NAME\") = 'MAIN')," +
"999999999 FROM \"MYDB\".\"STAGING\" as stage)";

Assertions.assertEquals(AnsiTestArtifacts.expectedMainTableMultiPartitionWithoutDigestCreateQueryWithUpperCase, preActionsSql.get(0));
Assertions.assertEquals(getExpectedMetadataTableCreateQueryWithUpperCase(), preActionsSql.get(1));
Assertions.assertEquals(expectedMilestoneQuery, milestoningSql.get(0));
Assertions.assertEquals(expectedUpsertQuery, milestoningSql.get(1));
Assertions.assertEquals(getExpectedMetadataTableIngestQueryWithUpperCase(), metadataIngestSql.get(0));
}

@Override
public void verifyUnitemporalSnapshotWithPartitionSpecListWithEmptyBatchHandlingNoDigest(GeneratorResult operations)
{
List<String> preActionsSql = operations.preActionsSql();
List<String> milestoningSql = operations.ingestSql();
List<String> metadataIngestSql = operations.metadataIngestSql();

String expectedMilestoneQuery = "UPDATE \"mydb\".\"main\" as sink SET " +
"sink.\"batch_id_out\" = (SELECT COALESCE(MAX(batch_metadata.\"table_batch_id\"),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.\"table_name\") = 'MAIN')-1 " +
"WHERE (sink.\"batch_id_out\" = 999999999) AND " +
"(((sink.\"biz_date\" = '2024-01-01') AND (sink.\"account_type\" = 1)) " +
"OR ((sink.\"biz_date\" = '2024-01-02') AND (sink.\"account_type\" = 1)) " +
"OR ((sink.\"biz_date\" = '2024-01-02') AND (sink.\"account_type\" = 2)))";

Assertions.assertEquals(AnsiTestArtifacts.expectedMainTableWithMultiPartitionsWithoutDigestCreateQuery, preActionsSql.get(0));
Assertions.assertEquals(getExpectedMetadataTableCreateQuery(), preActionsSql.get(1));

Assertions.assertEquals(expectedMilestoneQuery, milestoningSql.get(0));
Assertions.assertEquals(getExpectedMetadataTableIngestQuery(), metadataIngestSql.get(0));
}

@Override
public RelationalSink getRelationalSink()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,14 +201,26 @@ public class BigQueryTestArtifacts
"`id` INT64 NOT NULL,`name` STRING NOT NULL,`amount` FLOAT64,`biz_date` DATE,`digest` STRING," +
"`batch_id_in` INT64 NOT NULL,`batch_id_out` INT64,PRIMARY KEY (`id`, `name`, `batch_id_in`) NOT ENFORCED)";

public static String expectedMainTableBatchIdWithoutDigestBasedCreateQuery = "CREATE TABLE IF NOT EXISTS `mydb`.`main`(" +
"`id` INT64 NOT NULL,`name` STRING NOT NULL,`amount` FLOAT64,`biz_date` DATE," +
"`batch_id_in` INT64 NOT NULL,`batch_id_out` INT64,PRIMARY KEY (`id`, `name`, `batch_id_in`) NOT ENFORCED)";

public static String expectedMainTableWithMultiPartitionCreateQuery = "CREATE TABLE IF NOT EXISTS `mydb`.`main`(" +
"`id` INT64 NOT NULL,`name` STRING NOT NULL,`amount` FLOAT64,`account_type` INT64,`biz_date` DATE,`digest` STRING," +
"`batch_id_in` INT64 NOT NULL,`batch_id_out` INT64,PRIMARY KEY (`id`, `name`, `batch_id_in`) NOT ENFORCED)";

public static String expectedMainTableWithMultiPartitionWithoutDigestCreateQuery = "CREATE TABLE IF NOT EXISTS `mydb`.`main`(" +
"`id` INT64 NOT NULL,`name` STRING NOT NULL,`amount` FLOAT64,`account_type` INT64,`biz_date` DATE," +
"`batch_id_in` INT64 NOT NULL,`batch_id_out` INT64,PRIMARY KEY (`id`, `name`, `batch_id_in`) NOT ENFORCED)";

public static String expectedMainTableWithMultiPartitionCreateQueryUpperCase = "CREATE TABLE IF NOT EXISTS `MYDB`.`MAIN`" +
"(`ID` INT64 NOT NULL,`NAME` STRING NOT NULL,`AMOUNT` FLOAT64,`ACCOUNT_TYPE` INT64,`BIZ_DATE` DATE,`DIGEST` STRING," +
"`BATCH_ID_IN` INT64 NOT NULL,`BATCH_ID_OUT` INT64,PRIMARY KEY (`ID`, `NAME`, `BATCH_ID_IN`) NOT ENFORCED)";

public static String expectedMainTableWithMultiPartitionWithoutDigestCreateQueryUpperCase = "CREATE TABLE IF NOT EXISTS `MYDB`.`MAIN`" +
"(`ID` INT64 NOT NULL,`NAME` STRING NOT NULL,`AMOUNT` FLOAT64,`ACCOUNT_TYPE` INT64,`BIZ_DATE` DATE," +
"`BATCH_ID_IN` INT64 NOT NULL,`BATCH_ID_OUT` INT64,PRIMARY KEY (`ID`, `NAME`, `BATCH_ID_IN`) NOT ENFORCED)";

public static String expectedMetadataTableCreateQuery = "CREATE TABLE IF NOT EXISTS batch_metadata" +
"(`table_name` STRING(255)," +
"`batch_start_ts_utc` DATETIME," +
Expand Down
Loading

0 comments on commit 551506b

Please sign in to comment.