Skip to content

Commit

Permalink
Skip Table Creation option for Main and metadata table
Browse files Browse the repository at this point in the history
  • Loading branch information
prasar-ashutosh committed Apr 23, 2024
1 parent f4dfb51 commit 17cd030
Show file tree
Hide file tree
Showing 37 changed files with 61 additions and 130 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,19 +112,19 @@ Mandatory Params:

Optional Params:

| parameters | Description | Default Value |
|--------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------|
| cleanupStagingData | clean staging table after completion of ingestion | true |
| collectStatistics | Collect Statistics from ingestion | false |
| enableSchemaEvolution | Enable Schema Evolution to happen | false |
| caseConversion | Convert SQL objects like table, db, column names to upper or lower case.<br> Values supported - TO_UPPER, TO_LOWER, NONE | NONE |
| executionTimestampClock | Clock to use to derive the time | Clock.systemUTC() |
| batchStartTimestampPattern | Pattern for batchStartTimestamp. If this pattern is provided, it will replace the batchStartTimestamp values | None |
| batchEndTimestampPattern | Pattern for batchEndTimestamp. If this pattern is provided, it will replace the batchEndTimestamp values | None |
| batchIdPattern | Pattern for batch id. If this pattern is provided, it will replace the next batch id | None |
| createStagingDataset | Enables creation of staging Dataset | false |
| schemaEvolutionCapabilitySet | A set that enables fine grained schema evolution capabilities - ADD_COLUMN, DATA_TYPE_CONVERSION, DATA_TYPE_SIZE_CHANGE, COLUMN_NULLABILITY_CHANGE | Empty set |
| infiniteBatchIdValue | Value to be used for Infinite batch id | 999999999 |
| parameters | Description | Default Value |
|--------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------|
| cleanupStagingData | clean staging table after completion of ingestion | true |
| collectStatistics | Collect Statistics from ingestion | false |
| enableSchemaEvolution | Enable Schema Evolution to happen | false |
| caseConversion | Convert SQL objects like table, db, column names to upper or lower case.<br> Values supported - TO_UPPER, TO_LOWER, NONE | NONE |
| executionTimestampClock | Clock to use to derive the time | Clock.systemUTC() |
| batchStartTimestampPattern | Pattern for batchStartTimestamp. If this pattern is provided, it will replace the batchStartTimestamp values | None |
| batchEndTimestampPattern | Pattern for batchEndTimestamp. If this pattern is provided, it will replace the batchEndTimestamp values | None |
| batchIdPattern | Pattern for batch id. If this pattern is provided, it will replace the next batch id | None |
| skipMainAndMetadataDatasetCreation | skip main and metadata dataset creation | false |
| schemaEvolutionCapabilitySet | A set that enables fine grained schema evolution capabilities - ADD_COLUMN, DATA_TYPE_CONVERSION, DATA_TYPE_SIZE_CHANGE, COLUMN_NULLABILITY_CHANGE | Empty set |
| infiniteBatchIdValue | Value to be used for Infinite batch id | 999999999 |
| enableConcurrentSafety | Enables safety for concurrent ingestion on the same table. If enabled, the library creates a special lock table to block other concurrent ingestion on the same table | false |

**Step 4.2:** Use the generator object to extract the queries
Expand Down Expand Up @@ -191,7 +191,7 @@ Optional Params:
| caseConversion | Convert SQL objects like table, db, column names to upper or lower case.<br> Values supported - TO_UPPER, TO_LOWER, NONE | NONE |
| executionTimestampClock | Clock to use to derive the time | Clock.systemUTC() |
| createDatasets | A flag to enable or disable dataset creation in Executor mode | true |
| createStagingDataset | Enables creation of staging Dataset | false |
| skipMainAndMetadataDatasetCreation | skip main and metadata dataset creation | false |
| schemaEvolutionCapabilitySet | A set that enables fine grained schema evolution capabilities - ADD_COLUMN, DATA_TYPE_CONVERSION, DATA_TYPE_SIZE_CHANGE, COLUMN_NULLABILITY_CHANGE | Empty set |
| enableConcurrentSafety | Enables safety for concurrent ingestion on the same table. If enabled, the library creates a special lock table to block other concurrent ingestion on the same table | false |

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,13 +254,11 @@ public LogicalPlan buildLogicalPlanForPreActions(Resources resources)
{
List<Operation> operations = new ArrayList<>();

operations.add(Create.of(true, mainDataset()));
if (options().createStagingDataset())
if (!options().skipMainAndMetadataDatasetCreation())
{
operations.add(Create.of(true, stagingDataset()));
operations.add(Create.of(true, mainDataset()));
operations.add(Create.of(true, metadataDataset().orElseThrow(IllegalStateException::new).get()));
}
operations.add(Create.of(true, metadataDataset().orElseThrow(IllegalStateException::new).get()));

if (ingestMode().validityMilestoning().validityDerivation() instanceof SourceSpecifiesFromDateTime)
{
operations.add(Create.of(true, tempDataset));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,8 +332,11 @@ private void addAuditing(List<Value> fieldsToInsert, List<Value> fieldsToSelect)
public LogicalPlan buildLogicalPlanForPreActions(Resources resources)
{
List<Operation> operations = new ArrayList<>();
operations.add(Create.of(true, mainDataset()));
operations.add(Create.of(true, metadataDataset().orElseThrow(IllegalStateException::new).get()));
if (!options().skipMainAndMetadataDatasetCreation())
{
operations.add(Create.of(true, mainDataset()));
operations.add(Create.of(true, metadataDataset().orElseThrow(IllegalStateException::new).get()));
}
if (!transformWhileCopy)
{
operations.add(Create.of(false, externalDataset));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ default boolean enableSchemaEvolution()
}

@Default
default boolean createStagingDataset()
default boolean skipMainAndMetadataDatasetCreation()
{
return false;
}
Expand Down Expand Up @@ -361,12 +361,11 @@ public LogicalPlan buildLogicalPlanForAcquireLock(Resources resources)
public LogicalPlan buildLogicalPlanForPreActions(Resources resources)
{
List<Operation> operations = new ArrayList<>();
operations.add(Create.of(true, mainDataset()));
if (options().createStagingDataset())
if (!options().skipMainAndMetadataDatasetCreation())
{
operations.add(Create.of(true, originalStagingDataset()));
operations.add(Create.of(true, mainDataset()));
operations.add(Create.of(true, metadataDataset().orElseThrow(IllegalStateException::new).get()));
}
operations.add(Create.of(true, metadataDataset().orElseThrow(IllegalStateException::new).get()));
if (options().enableConcurrentSafety())
{
operations.add(Create.of(true, lockInfoDataset().orElseThrow(IllegalStateException::new).get()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,7 @@ public void verifyBitemporalDeltaBatchIdBasedNoDeleteIndNoDataSplits(GeneratorRe
"WHERE (sink.\"batch_id_out\" = 999999999) " +
"AND (sink.\"digest\" = stage.\"digest\") AND ((sink.\"id\" = stage.\"id\") AND (sink.\"name\" = stage.\"name\")) AND (sink.\"validity_from_target\" = stage.\"validity_from_reference\"))))";

Assertions.assertEquals(AnsiTestArtifacts.expectedBitemporalMainTableCreateQuery, preActionsSql.get(0));
Assertions.assertEquals(AnsiTestArtifacts.expectedBitemporalStagingTableCreateQuery, preActionsSql.get(1));
Assertions.assertEquals(getExpectedMetadataTableCreateQuery(), preActionsSql.get(2));
Assertions.assertEquals(AnsiTestArtifacts.expectedLockInfoTableCreateQuery, preActionsSql.get(3));
Assertions.assertEquals(AnsiTestArtifacts.expectedLockInfoTableCreateQuery, preActionsSql.get(0));

Assertions.assertEquals(expectedMilestoneQuery, milestoningSql.get(0));
Assertions.assertEquals(expectedUpsertQuery, milestoningSql.get(1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,9 @@ public void verifyBitemporalDeltaBatchIdBasedNoDeleteIndNoDataSplits(GeneratorRe
"(SELECT temp.\"id\",temp.\"name\",temp.\"amount\",temp.\"digest\",temp.\"batch_id_in\",temp.\"batch_id_out\",temp.\"validity_from_target\",temp.\"validity_through_target\" FROM \"mydb\".\"temp\" as temp)";

Assertions.assertEquals(AnsiTestArtifacts.expectedBitemporalFromOnlyMainTableCreateQuery, preActionsSql.get(0));
Assertions.assertEquals(AnsiTestArtifacts.expectedBitemporalFromOnlyStagingTableCreateQuery, preActionsSql.get(1));
Assertions.assertEquals(getExpectedMetadataTableCreateQuery(), preActionsSql.get(2));
Assertions.assertEquals(AnsiTestArtifacts.expectedBitemporalFromOnlyTempTableCreateQuery, preActionsSql.get(3));
Assertions.assertEquals(AnsiTestArtifacts.expectedLockInfoTableCreateQuery, preActionsSql.get(4));
Assertions.assertEquals(getExpectedMetadataTableCreateQuery(), preActionsSql.get(1));
Assertions.assertEquals(AnsiTestArtifacts.expectedBitemporalFromOnlyTempTableCreateQuery, preActionsSql.get(2));
Assertions.assertEquals(AnsiTestArtifacts.expectedLockInfoTableCreateQuery, preActionsSql.get(3));

Assertions.assertEquals(expectedStageToTemp, milestoningSql.get(0));
Assertions.assertEquals(expectedMainToTemp, milestoningSql.get(1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,8 @@ public void verifyAppendOnlyNoAuditingNoDedupNoVersioningNoFilterExistingRecords
"(SELECT COALESCE(MAX(batch_metadata.\"table_batch_id\"),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.\"table_name\") = 'MAIN') " +
"FROM \"mydb\".\"staging\" as stage)";
Assertions.assertEquals(AnsiTestArtifacts.expectedBaseTableCreateQueryWithNoPKs, preActionsSqlList.get(0));
Assertions.assertEquals(AnsiTestArtifacts.expectedBaseStagingTableCreateQueryWithNoPKs, preActionsSqlList.get(1));
Assertions.assertEquals(getExpectedMetadataTableCreateQuery(), preActionsSqlList.get(2));
Assertions.assertEquals(AnsiTestArtifacts.expectedLockInfoTableCreateQuery, preActionsSqlList.get(3));
Assertions.assertEquals(getExpectedMetadataTableCreateQuery(), preActionsSqlList.get(1));
Assertions.assertEquals(AnsiTestArtifacts.expectedLockInfoTableCreateQuery, preActionsSqlList.get(2));
Assertions.assertEquals(insertSql, milestoningSqlList.get(0));

Assertions.assertEquals(lockInitializedQuery, initializeLockSql.get(0));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,8 @@ public void verifyNontemporalDeltaNoAuditingNoDedupNoVersioning(GeneratorResult
"WHERE (sink.\"id\" = stage.\"id\") AND (sink.\"name\" = stage.\"name\"))))";

Assertions.assertEquals(AnsiTestArtifacts.expectedBaseTablePlusDigestCreateQuery, preActionsSqlList.get(0));
Assertions.assertEquals(AnsiTestArtifacts.expectedStagingTableWithDigestCreateQuery, preActionsSqlList.get(1));
Assertions.assertEquals(getExpectedMetadataTableCreateQuery(), preActionsSqlList.get(2));
Assertions.assertEquals(AnsiTestArtifacts.expectedLockInfoTableCreateQuery, preActionsSqlList.get(3));
Assertions.assertEquals(getExpectedMetadataTableCreateQuery(), preActionsSqlList.get(1));
Assertions.assertEquals(AnsiTestArtifacts.expectedLockInfoTableCreateQuery, preActionsSqlList.get(2));

Assertions.assertTrue(deduplicationAndVersioningSql.isEmpty());
Assertions.assertTrue(deduplicationAndVersioningErrorChecksSql.isEmpty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,7 @@ public void verifyNontemporalSnapshotNoAuditingNoDedupNoVersioning(GeneratorResu
"(SELECT COALESCE(MAX(batch_metadata.\"table_batch_id\"),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.\"table_name\") = 'MAIN') " +
"FROM \"mydb\".\"staging\" as stage)";

Assertions.assertEquals(AnsiTestArtifacts.expectedBaseTableCreateQuery, preActionsSqlList.get(0));
Assertions.assertEquals(AnsiTestArtifacts.expectedBaseStagingTableCreateQuery, preActionsSqlList.get(1));
Assertions.assertEquals(getExpectedMetadataTableCreateQuery(), preActionsSqlList.get(2));
Assertions.assertEquals(AnsiTestArtifacts.expectedLockInfoTableCreateQuery, preActionsSqlList.get(3));
Assertions.assertEquals(AnsiTestArtifacts.expectedLockInfoTableCreateQuery, preActionsSqlList.get(0));

Assertions.assertEquals(cleanUpMainTableSql, milestoningSqlList.get(0));
Assertions.assertEquals(insertSql, milestoningSqlList.get(1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,7 @@ public void verifyUnitemporalDeltaNoDeleteIndNoDedupNoVersion(GeneratorResult op
"WHERE (sink.\"batch_id_out\" = 999999999) " +
"AND (sink.\"digest\" = stage.\"digest\") AND ((sink.\"id\" = stage.\"id\") AND (sink.\"name\" = stage.\"name\")))))";

Assertions.assertEquals(AnsiTestArtifacts.expectedMainTableBatchIdBasedCreateQuery, preActionsSql.get(0));
Assertions.assertEquals(AnsiTestArtifacts.expectedStagingTableWithDigestCreateQuery, preActionsSql.get(1));
Assertions.assertEquals(getExpectedMetadataTableCreateQuery(), preActionsSql.get(2));
Assertions.assertEquals(AnsiTestArtifacts.expectedLockInfoTableCreateQuery, preActionsSql.get(3));
Assertions.assertEquals(AnsiTestArtifacts.expectedLockInfoTableCreateQuery, preActionsSql.get(0));

Assertions.assertEquals(expectedMilestoneQuery, milestoningSql.get(0));
Assertions.assertEquals(expectedUpsertQuery, milestoningSql.get(1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,7 @@ public void verifyUnitemporalSnapshotWithoutPartitionNoDedupNoVersion(GeneratorR
"FROM \"mydb\".\"staging\" as stage " +
"WHERE NOT (stage.\"digest\" IN (SELECT sink.\"digest\" FROM \"mydb\".\"main\" as sink WHERE sink.\"batch_id_out\" = 999999999)))";

Assertions.assertEquals(AnsiTestArtifacts.expectedMainTableBatchIdBasedCreateQuery, preActionsSql.get(0));
Assertions.assertEquals(AnsiTestArtifacts.expectedStagingTableWithDigestCreateQuery, preActionsSql.get(1));
Assertions.assertEquals(getExpectedMetadataTableCreateQuery(), preActionsSql.get(2));
Assertions.assertEquals(AnsiTestArtifacts.expectedLockInfoTableCreateQuery, preActionsSql.get(3));
Assertions.assertEquals(AnsiTestArtifacts.expectedLockInfoTableCreateQuery, preActionsSql.get(0));

Assertions.assertEquals(expectedMilestoneQuery, milestoningSql.get(0));
Assertions.assertEquals(expectedUpsertQuery, milestoningSql.get(1));
Expand Down Expand Up @@ -103,11 +100,8 @@ public void verifyUnitemporalSnapshotWithoutPartitionFailOnDupsNoVersion(Generat
"FROM \"mydb\".\"staging_temp_staging_lp_yosulf\" as stage " +
"WHERE NOT (stage.\"digest\" IN (SELECT sink.\"digest\" FROM \"mydb\".\"main\" as sink WHERE sink.\"batch_id_out\" = 999999999)))";

Assertions.assertEquals(AnsiTestArtifacts.expectedMainTableBatchIdBasedCreateQuery, preActionsSql.get(0));
Assertions.assertEquals(AnsiTestArtifacts.expectedStagingTableWithDigestCreateQuery, preActionsSql.get(1));
Assertions.assertEquals(getExpectedMetadataTableCreateQuery(), preActionsSql.get(2));
Assertions.assertEquals(AnsiTestArtifacts.expectedLockInfoTableCreateQuery, preActionsSql.get(3));
Assertions.assertEquals(AnsiTestArtifacts.expectedBaseTempStagingTablePlusDigestWithCount, preActionsSql.get(4));
Assertions.assertEquals(AnsiTestArtifacts.expectedLockInfoTableCreateQuery, preActionsSql.get(0));
Assertions.assertEquals(AnsiTestArtifacts.expectedBaseTempStagingTablePlusDigestWithCount, preActionsSql.get(1));

Assertions.assertEquals(AnsiTestArtifacts.expectedTempStagingCleanupQuery, deduplicationAndVersioningSql.get(0));
Assertions.assertEquals(AnsiTestArtifacts.expectedInsertIntoBaseTempStagingPlusDigestWithFilterDuplicates, deduplicationAndVersioningSql.get(1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public void verifyAppendOnlyNoAuditingNoDedupNoVersioningNoFilterExistingRecords
"FROM `mydb`.`staging` as stage)";

Assertions.assertEquals(BigQueryTestArtifacts.expectedBaseTableCreateQueryWithNoPKs, preActionsSqlList.get(0));
Assertions.assertEquals(BigQueryTestArtifacts.expectedStagingTableCreateQueryWithNoPKs, preActionsSqlList.get(1));
Assertions.assertEquals(BigQueryTestArtifacts.expectedMetadataTableCreateQuery, preActionsSqlList.get(1));
Assertions.assertEquals(insertSql, milestoningSqlList.get(0));

// Stats
Expand Down
Loading

0 comments on commit 17cd030

Please sign in to comment.