Skip to content

Commit

Permalink
Split size change user capability into length change and scale change (
Browse files Browse the repository at this point in the history
  • Loading branch information
kumuwu authored Aug 6, 2024
1 parent fff2713 commit 620c28e
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public enum SchemaEvolutionCapability
{
ADD_COLUMN,
DATA_TYPE_CONVERSION,
DATA_TYPE_SIZE_CHANGE,
DATA_TYPE_LENGTH_CHANGE,
DATA_TYPE_SCALE_CHANGE,
COLUMN_NULLABILITY_CHANGE
}
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ private void evolveDataType(Field newField, Field mainDataField, Dataset mainDat
if (!Objects.equals(mainDataField.type().length(), newField.type().length()))
{
if (!sink.capabilities().contains(Capability.DATA_TYPE_LENGTH_CHANGE)
|| (!schemaEvolutionCapabilitySet.contains(SchemaEvolutionCapability.DATA_TYPE_SIZE_CHANGE)))
|| (!schemaEvolutionCapabilitySet.contains(SchemaEvolutionCapability.DATA_TYPE_LENGTH_CHANGE)))
{
throw new IncompatibleSchemaChangeException(String.format("Data type length changes couldn't be performed on column \"%s\" since sink/user capability does not allow it", newField.name()));
}
Expand All @@ -240,7 +240,7 @@ private void evolveDataType(Field newField, Field mainDataField, Dataset mainDat
if (!Objects.equals(mainDataField.type().scale(), newField.type().scale()))
{
if (!sink.capabilities().contains(Capability.DATA_TYPE_SCALE_CHANGE)
|| (!schemaEvolutionCapabilitySet.contains(SchemaEvolutionCapability.DATA_TYPE_SIZE_CHANGE)))
|| (!schemaEvolutionCapabilitySet.contains(SchemaEvolutionCapability.DATA_TYPE_SCALE_CHANGE)))
{
throw new IncompatibleSchemaChangeException(String.format("Data type scale changes couldn't be performed on column \"%s\" since sink/user capability does not allow it", newField.name()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,11 +190,7 @@ void testSnapshotMilestoningWithAddColumnWithoutUserProvidedSchemaEvolutionCapab
try
{
SchemaEvolutionResult result = schemaEvolution.buildLogicalPlanForSchemaEvolution(mainTable, stagingTable.schema());
SqlPlan physicalPlanForSchemaEvolution = transformer.generatePhysicalPlan(result.logicalPlan());

// Use the planner utils to return the sql
List<String> sqlsForSchemaEvolution = physicalPlanForSchemaEvolution.getSqlList();
Assertions.assertEquals(expectedSchemaEvolutionAddColumnWithUpperCase, sqlsForSchemaEvolution.get(0));
Assertions.fail("Exception was not thrown");
}
catch (IncompatibleSchemaChangeException e)
{
Expand All @@ -218,7 +214,7 @@ void testSnapshotMilestoningWithColumnLengthChangeEvolution()

NontemporalSnapshot ingestMode = NontemporalSnapshot.builder().auditing(NoAuditing.builder().build()).build();
Set<SchemaEvolutionCapability> schemaEvolutionCapabilitySet = new HashSet<>();
schemaEvolutionCapabilitySet.add(SchemaEvolutionCapability.DATA_TYPE_SIZE_CHANGE);
schemaEvolutionCapabilitySet.add(SchemaEvolutionCapability.DATA_TYPE_LENGTH_CHANGE);
SchemaEvolution schemaEvolution = new SchemaEvolution(relationalSink, ingestMode, schemaEvolutionCapabilitySet, true);

SchemaEvolutionResult result = schemaEvolution.buildLogicalPlanForSchemaEvolution(mainTable, stagingTable.schema());
Expand Down Expand Up @@ -248,7 +244,7 @@ void testSnapshotMilestoningWithColumnLengthChangeEvolutionWithUpperCase()

NontemporalSnapshot ingestMode = NontemporalSnapshot.builder().auditing(NoAuditing.builder().build()).build();
Set<SchemaEvolutionCapability> schemaEvolutionCapabilitySet = new HashSet<>();
schemaEvolutionCapabilitySet.add(SchemaEvolutionCapability.DATA_TYPE_SIZE_CHANGE);
schemaEvolutionCapabilitySet.add(SchemaEvolutionCapability.DATA_TYPE_LENGTH_CHANGE);
SchemaEvolution schemaEvolution = new SchemaEvolution(relationalSink, ingestMode, schemaEvolutionCapabilitySet, true);

SchemaEvolutionResult result = schemaEvolution.buildLogicalPlanForSchemaEvolution(mainTable, stagingTable.schema());
Expand Down Expand Up @@ -281,12 +277,7 @@ void testSnapshotMilestoningWithColumnLengthChangeEvolutionAndUserProvidedSchema
try
{
SchemaEvolutionResult result = schemaEvolution.buildLogicalPlanForSchemaEvolution(mainTable, stagingTable.schema());
RelationalTransformer transformer = new RelationalTransformer(relationalSink);
SqlPlan physicalPlanForSchemaEvolution = transformer.generatePhysicalPlan(result.logicalPlan());

List<String> sqlsForSchemaEvolution = physicalPlanForSchemaEvolution.getSqlList();
Assertions.assertEquals(expectedSchemaEvolutionModifySize, sqlsForSchemaEvolution.get(0));

Assertions.fail("Exception was not thrown");
}
catch (IncompatibleSchemaChangeException e)
{
Expand All @@ -310,7 +301,7 @@ void testSnapshotMilestoningWithColumnScaleChangeEvolution()

NontemporalSnapshot ingestMode = NontemporalSnapshot.builder().auditing(NoAuditing.builder().build()).build();
Set<SchemaEvolutionCapability> schemaEvolutionCapabilitySet = new HashSet<>();
schemaEvolutionCapabilitySet.add(SchemaEvolutionCapability.DATA_TYPE_SIZE_CHANGE);
schemaEvolutionCapabilitySet.add(SchemaEvolutionCapability.DATA_TYPE_SCALE_CHANGE);
SchemaEvolution schemaEvolution = new SchemaEvolution(relationalSink, ingestMode, schemaEvolutionCapabilitySet, false);

SchemaEvolutionResult result = schemaEvolution.buildLogicalPlanForSchemaEvolution(mainTable, stagingTable.schema());
Expand Down Expand Up @@ -344,8 +335,6 @@ void testSnapshotMilestoningWithColumnScaleChangeEvolutionAndUserProvidedSchemaE
try
{
SchemaEvolutionResult result = schemaEvolution.buildLogicalPlanForSchemaEvolution(mainTable, stagingTable.schema());
RelationalTransformer transformer = new RelationalTransformer(relationalSink);
SqlPlan physicalPlanForSchemaEvolution = transformer.generatePhysicalPlan(result.logicalPlan());
Assertions.fail("Exception was not thrown");
}
catch (IncompatibleSchemaChangeException e)
Expand Down Expand Up @@ -430,10 +419,7 @@ void testSnapshotMilestoningWithAlterNullabilityWithoutUserCapability()
try
{
SchemaEvolutionResult result = schemaEvolution.buildLogicalPlanForSchemaEvolution(mainTable, stagingTable.schema());
RelationalTransformer transformer = new RelationalTransformer(relationalSink);
SqlPlan physicalPlanForSchemaEvolution = transformer.generatePhysicalPlan(result.logicalPlan());
List<String> sqlsForSchemaEvolution = physicalPlanForSchemaEvolution.getSqlList();
Assertions.assertEquals(2, sqlsForSchemaEvolution.size());
Assertions.fail("Exception was not thrown");
}
catch (IncompatibleSchemaChangeException e)
{
Expand Down Expand Up @@ -521,11 +507,7 @@ void testSnapshotMilestoningWithNonBreakingDataTypeEvolutionAndUserProvidedSchem
try
{
SchemaEvolutionResult result = schemaEvolution.buildLogicalPlanForSchemaEvolution(mainTable, stagingTable.schema());
RelationalTransformer transformer = new RelationalTransformer(relationalSink);
SqlPlan physicalPlanForSchemaEvolution = transformer.generatePhysicalPlan(result.logicalPlan());

List<String> sqlsForSchemaEvolution = physicalPlanForSchemaEvolution.getSqlList();
Assertions.assertEquals(expectedSchemaNonBreakingChange, sqlsForSchemaEvolution.get(0));
Assertions.fail("Exception was not thrown");
}
catch (IncompatibleSchemaChangeException e)
{
Expand Down Expand Up @@ -556,12 +538,7 @@ void testSnapshotMilestoningWithNonBreakingDataTypeEvolutionAndSizingChange()
try
{
SchemaEvolutionResult result = schemaEvolution.buildLogicalPlanForSchemaEvolution(mainTable, stagingTable.schema());
RelationalTransformer transformer = new RelationalTransformer(relationalSink);
SqlPlan physicalPlanForSchemaEvolution = transformer.generatePhysicalPlan(result.logicalPlan());

List<String> sqlsForSchemaEvolution = physicalPlanForSchemaEvolution.getSqlList();

Assertions.assertEquals(expectedSchemaNonBreakingChangeWithSizing, sqlsForSchemaEvolution.get(0));
Assertions.fail("Exception was not thrown");
}
catch (IncompatibleSchemaChangeException e)
{
Expand All @@ -587,23 +564,16 @@ void testSnapshotMilestoningWithNonBreakingDataTypeEvolutionAndSizingChangeAllow
NontemporalSnapshot ingestMode = NontemporalSnapshot.builder().auditing(NoAuditing.builder().build()).build();
Set<SchemaEvolutionCapability> schemaEvolutionCapabilitySet = new HashSet<>();
schemaEvolutionCapabilitySet.add(SchemaEvolutionCapability.DATA_TYPE_CONVERSION);
schemaEvolutionCapabilitySet.add(SchemaEvolutionCapability.DATA_TYPE_SIZE_CHANGE);
schemaEvolutionCapabilitySet.add(SchemaEvolutionCapability.DATA_TYPE_LENGTH_CHANGE);
SchemaEvolution schemaEvolution = new SchemaEvolution(relationalSink, ingestMode, schemaEvolutionCapabilitySet, true);

try
{
SchemaEvolutionResult result = schemaEvolution.buildLogicalPlanForSchemaEvolution(mainTable, stagingTable.schema());
RelationalTransformer transformer = new RelationalTransformer(relationalSink);
SqlPlan physicalPlanForSchemaEvolution = transformer.generatePhysicalPlan(result.logicalPlan());
SchemaEvolutionResult result = schemaEvolution.buildLogicalPlanForSchemaEvolution(mainTable, stagingTable.schema());
RelationalTransformer transformer = new RelationalTransformer(relationalSink);
SqlPlan physicalPlanForSchemaEvolution = transformer.generatePhysicalPlan(result.logicalPlan());

List<String> sqlsForSchemaEvolution = physicalPlanForSchemaEvolution.getSqlList();
List<String> sqlsForSchemaEvolution = physicalPlanForSchemaEvolution.getSqlList();

Assertions.assertEquals(expectedSchemaNonBreakingChangeWithSizing, sqlsForSchemaEvolution.get(0));
}
catch (IncompatibleSchemaChangeException e)
{
Assertions.assertEquals("Data sizing changes couldn't be performed on column \"amount\" since user capability does not allow it", e.getMessage());
}
Assertions.assertEquals(expectedSchemaNonBreakingChangeWithSizing, sqlsForSchemaEvolution.get(0));
}

// Breaking data type change from DOUBLE --> VARCHAR. Throws exception
Expand Down Expand Up @@ -685,9 +655,7 @@ void testSnapshotMilestoningWithColumnMissingInStagingTableWithoutUserCapability
try
{
SchemaEvolutionResult result = schemaEvolution.buildLogicalPlanForSchemaEvolution(mainTable, stagingTable.schema());
RelationalTransformer transformer = new RelationalTransformer(relationalSink);
SqlPlan physicalPlanForSchemaEvolution = transformer.generatePhysicalPlan(result.logicalPlan());
List<String> sqlsForSchemaEvolution = physicalPlanForSchemaEvolution.getSqlList();
Assertions.fail("Exception was not thrown");
}
catch (IncompatibleSchemaChangeException e)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ void testDataTypeSizeChange() throws Exception
.build();

Set<SchemaEvolutionCapability> schemaEvolutionCapabilitySet = new HashSet<>();
schemaEvolutionCapabilitySet.add(SchemaEvolutionCapability.DATA_TYPE_SIZE_CHANGE);
schemaEvolutionCapabilitySet.add(SchemaEvolutionCapability.DATA_TYPE_LENGTH_CHANGE);

String[] schema = new String[]{idName, nameName, incomeName, startTimeName, expiryDateName, digestName, batchUpdateTimeName, batchIdName};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ void testDataTypeSizeChange() throws Exception

PlannerOptions options = PlannerOptions.builder().cleanupStagingData(false).collectStatistics(true).enableSchemaEvolution(true).build();
Set<SchemaEvolutionCapability> schemaEvolutionCapabilitySet = new HashSet<>();
schemaEvolutionCapabilitySet.add(SchemaEvolutionCapability.DATA_TYPE_SIZE_CHANGE);
schemaEvolutionCapabilitySet.add(SchemaEvolutionCapability.DATA_TYPE_LENGTH_CHANGE);
Datasets datasets = Datasets.of(mainTable, stagingTable);

String[] schema = new String[]{idName, nameName, incomeName, startTimeName, expiryDateName, digestName, batchUpdateTimeName, batchIdName};
Expand Down Expand Up @@ -331,7 +331,8 @@ void testDataTypeConversionAndDataTypeSizeChange() throws Exception
PlannerOptions options = PlannerOptions.builder().cleanupStagingData(false).collectStatistics(true).enableSchemaEvolution(true).build();
Set<SchemaEvolutionCapability> schemaEvolutionCapabilitySet = new HashSet<>();
schemaEvolutionCapabilitySet.add(SchemaEvolutionCapability.DATA_TYPE_CONVERSION);
schemaEvolutionCapabilitySet.add(SchemaEvolutionCapability.DATA_TYPE_SIZE_CHANGE);
schemaEvolutionCapabilitySet.add(SchemaEvolutionCapability.DATA_TYPE_LENGTH_CHANGE);
schemaEvolutionCapabilitySet.add(SchemaEvolutionCapability.DATA_TYPE_SCALE_CHANGE);
Datasets datasets = Datasets.of(mainTable, stagingTable);

String[] schema = new String[]{idName, nameName, incomeName, startTimeName, expiryDateName, digestName, batchUpdateTimeName, batchIdName};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ void testSnapshotMilestoningWithColumnLengthChangeEvolution()

NontemporalSnapshot ingestMode = NontemporalSnapshot.builder().auditing(NoAuditing.builder().build()).build();
Set<SchemaEvolutionCapability> schemaEvolutionCapabilitySet = new HashSet<>();
schemaEvolutionCapabilitySet.add(SchemaEvolutionCapability.DATA_TYPE_SIZE_CHANGE);
schemaEvolutionCapabilitySet.add(SchemaEvolutionCapability.DATA_TYPE_LENGTH_CHANGE);
SchemaEvolution schemaEvolution = new SchemaEvolution(relationalSink, ingestMode, schemaEvolutionCapabilitySet, true);

SchemaEvolutionResult result = schemaEvolution.buildLogicalPlanForSchemaEvolution(mainTable, stagingTable.schema());
Expand Down Expand Up @@ -124,7 +124,7 @@ void testSnapshotMilestoningWithColumnLengthChangeEvolutionWithUpperCase()

NontemporalSnapshot ingestMode = NontemporalSnapshot.builder().auditing(NoAuditing.builder().build()).build();
Set<SchemaEvolutionCapability> schemaEvolutionCapabilitySet = new HashSet<>();
schemaEvolutionCapabilitySet.add(SchemaEvolutionCapability.DATA_TYPE_SIZE_CHANGE);
schemaEvolutionCapabilitySet.add(SchemaEvolutionCapability.DATA_TYPE_LENGTH_CHANGE);
SchemaEvolution schemaEvolution = new SchemaEvolution(relationalSink, ingestMode, schemaEvolutionCapabilitySet, true);

SchemaEvolutionResult result = schemaEvolution.buildLogicalPlanForSchemaEvolution(mainTable, stagingTable.schema());
Expand Down Expand Up @@ -184,7 +184,7 @@ void testSnapshotMilestoningWithColumnScaleChangeEvolution()

NontemporalSnapshot ingestMode = NontemporalSnapshot.builder().auditing(NoAuditing.builder().build()).build();
Set<SchemaEvolutionCapability> schemaEvolutionCapabilitySet = new HashSet<>();
schemaEvolutionCapabilitySet.add(SchemaEvolutionCapability.DATA_TYPE_SIZE_CHANGE);
schemaEvolutionCapabilitySet.add(SchemaEvolutionCapability.DATA_TYPE_LENGTH_CHANGE);
SchemaEvolution schemaEvolution = new SchemaEvolution(relationalSink, ingestMode, schemaEvolutionCapabilitySet, false);

try
Expand Down Expand Up @@ -244,7 +244,7 @@ void testSnapshotMilestoningWithImplicitDataTypeEvolutionAndLengthEvolution()

NontemporalSnapshot ingestMode = NontemporalSnapshot.builder().auditing(NoAuditing.builder().build()).build();
Set<SchemaEvolutionCapability> schemaEvolutionCapabilitySet = new HashSet<>();
schemaEvolutionCapabilitySet.add(SchemaEvolutionCapability.DATA_TYPE_SIZE_CHANGE);
schemaEvolutionCapabilitySet.add(SchemaEvolutionCapability.DATA_TYPE_LENGTH_CHANGE);
SchemaEvolution schemaEvolution = new SchemaEvolution(relationalSink, ingestMode, schemaEvolutionCapabilitySet, true);

SchemaEvolutionResult result = schemaEvolution.buildLogicalPlanForSchemaEvolution(mainTable, stagingTable.schema());
Expand Down

0 comments on commit 620c28e

Please sign in to comment.