Skip to content

Commit

Permalink
Persistence Component: Implement Lightweight Schema Evolution API + P…
Browse files Browse the repository at this point in the history
…erformance Optimization for Schema Evolution (finos#2779)
  • Loading branch information
kumuwu authored Apr 30, 2024
1 parent 3902a5e commit 2c833be
Show file tree
Hide file tree
Showing 20 changed files with 821 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,58 +30,7 @@ public Dataset applyCaseOnDataset(Dataset dataset, Function<String, String> stra
Optional<String> newSchemaName = dataset.datasetReference().group().map(strategy);
Optional<String> newDatabaseName = dataset.datasetReference().database().map(strategy);

List<Field> newDatasetFields = new ArrayList<>();
for (Field field : dataset.schema().fields())
{
Field newField = field.withName(strategy.apply(field.name()));
newDatasetFields.add(newField);
}

List<Index> newDatasetIndices = new ArrayList<>();
for (Index index : dataset.schema().indexes())
{
List<String> indexColumnNames = new ArrayList<>();
for (String columnName : index.columns())
{
String newColumnName = strategy.apply(columnName);
indexColumnNames.add(newColumnName);
}
Index newIndex = index.withIndexName(strategy.apply(index.indexName())).withColumns(indexColumnNames);
newDatasetIndices.add(newIndex);
}

ColumnStoreSpecification newColumnStoreSpecification = null;
if (dataset.schema().columnStoreSpecification().isPresent())
{
ColumnStoreSpecification columnStoreSpecification = dataset.schema().columnStoreSpecification().get();
List<Field> newColumnStoreKeys = new ArrayList<>();
for (Field field : columnStoreSpecification.columnStoreKeys())
{
Field newField = field.withName(strategy.apply(field.name()));
newColumnStoreKeys.add(newField);
}
newColumnStoreSpecification = columnStoreSpecification.withColumnStoreKeys(newColumnStoreKeys);
}

ShardSpecification newShardSpecification = null;
if (dataset.schema().shardSpecification().isPresent())
{
ShardSpecification shardSpecification = dataset.schema().shardSpecification().get();
List<Field> newShardKeys = new ArrayList<>();
for (Field field : shardSpecification.shardKeys())
{
Field newField = field.withName(strategy.apply(field.name()));
newShardKeys.add(newField);
}
newShardSpecification = shardSpecification.withShardKeys(newShardKeys);
}

SchemaDefinition schemaDefinition = SchemaDefinition.builder()
.addAllFields(newDatasetFields)
.addAllIndexes(newDatasetIndices)
.columnStoreSpecification(newColumnStoreSpecification)
.shardSpecification(newShardSpecification)
.build();
SchemaDefinition schemaDefinition = applyCaseOnSchemaDefinition(dataset.schema(), strategy);

if (dataset instanceof DatasetDefinition)
{
Expand Down Expand Up @@ -181,4 +130,60 @@ public LockInfoDataset applyCaseOnLockInfoDataset(LockInfoDataset lockInfoDatase
.tableNameField(strategy.apply(lockInfoDataset.tableNameField()))
.build();
}

public SchemaDefinition applyCaseOnSchemaDefinition(SchemaDefinition schema, Function<String, String> strategy)
{
List<Field> newDatasetFields = new ArrayList<>();
for (Field field : schema.fields())
{
Field newField = field.withName(strategy.apply(field.name()));
newDatasetFields.add(newField);
}

List<Index> newDatasetIndices = new ArrayList<>();
for (Index index : schema.indexes())
{
List<String> indexColumnNames = new ArrayList<>();
for (String columnName : index.columns())
{
String newColumnName = strategy.apply(columnName);
indexColumnNames.add(newColumnName);
}
Index newIndex = index.withIndexName(strategy.apply(index.indexName())).withColumns(indexColumnNames);
newDatasetIndices.add(newIndex);
}

ColumnStoreSpecification newColumnStoreSpecification = null;
if (schema.columnStoreSpecification().isPresent())
{
ColumnStoreSpecification columnStoreSpecification = schema.columnStoreSpecification().get();
List<Field> newColumnStoreKeys = new ArrayList<>();
for (Field field : columnStoreSpecification.columnStoreKeys())
{
Field newField = field.withName(strategy.apply(field.name()));
newColumnStoreKeys.add(newField);
}
newColumnStoreSpecification = columnStoreSpecification.withColumnStoreKeys(newColumnStoreKeys);
}

ShardSpecification newShardSpecification = null;
if (schema.shardSpecification().isPresent())
{
ShardSpecification shardSpecification = schema.shardSpecification().get();
List<Field> newShardKeys = new ArrayList<>();
for (Field field : shardSpecification.shardKeys())
{
Field newField = field.withName(strategy.apply(field.name()));
newShardKeys.add(newField);
}
newShardSpecification = shardSpecification.withShardKeys(newShardKeys);
}

return SchemaDefinition.builder()
.addAllFields(newDatasetFields)
.addAllIndexes(newDatasetIndices)
.columnStoreSpecification(newColumnStoreSpecification)
.shardSpecification(newShardSpecification)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,9 @@ default DatasetReference datasetReference()
{
return this;
}

default Optional<DatasetAdditionalProperties> datasetAdditionalProperties()
{
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public interface RelationalExecutionHelper

void validateDatasetSchema(Dataset dataset, TypeMapping datatypeMapping);

Dataset constructDatasetFromDatabase(Dataset dataset, TypeMapping mapping);
Dataset constructDatasetFromDatabase(Dataset dataset, TypeMapping mapping, boolean escape);

void executeStatement(String sql);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public SchemaEvolution(Sink sink, IngestMode ingestMode, Set<SchemaEvolutionCapa
this.schemaEvolutionCapabilitySet = schemaEvolutionCapabilitySet;
}

public SchemaEvolutionResult buildLogicalPlanForSchemaEvolution(Dataset mainDataset, Dataset stagingDataset)
public SchemaEvolutionResult buildLogicalPlanForSchemaEvolution(Dataset mainDataset, SchemaDefinition stagingDataset)
{
List<Operation> operations = new ArrayList<>();
Set<Field> modifiedFields = new HashSet<>();
Expand All @@ -116,9 +116,9 @@ public SchemaEvolutionResult buildLogicalPlanForSchemaEvolution(Dataset mainData
return SchemaEvolutionResult.of(LogicalPlan.of(operations), mainDataset.withSchema(evolvedSchema));
}

private void validatePrimaryKeys(Dataset mainDataset, Dataset stagingDataset)
private void validatePrimaryKeys(Dataset mainDataset, SchemaDefinition stagingDataset)
{
List<Field> stagingFilteredFields = stagingDataset.schema().fields().stream().filter(field -> !(ingestMode.accept(STAGING_TABLE_FIELDS_TO_IGNORE).contains(field.name()))).collect(Collectors.toList());
List<Field> stagingFilteredFields = stagingDataset.fields().stream().filter(field -> !(ingestMode.accept(STAGING_TABLE_FIELDS_TO_IGNORE).contains(field.name()))).collect(Collectors.toList());
Set<String> stagingPkNames = stagingFilteredFields.stream().filter(Field::primaryKey).map(Field::name).collect(Collectors.toSet());
List<Field> mainFilteredFields = mainDataset.schema().fields().stream().filter(field -> !(ingestMode.accept(MAIN_TABLE_FIELDS_TO_IGNORE).contains(field.name()))).collect(Collectors.toList());
Set<String> mainPkNames = mainFilteredFields.stream().filter(Field::primaryKey).map(Field::name).collect(Collectors.toSet());
Expand All @@ -129,14 +129,11 @@ private void validatePrimaryKeys(Dataset mainDataset, Dataset stagingDataset)
}

//Validate all columns (allowing exceptions) in staging dataset must have a matching column in main dataset
private List<Operation> stagingToMainTableColumnMatch(Dataset mainDataset,
Dataset stagingDataset,
Set<String> fieldsToIgnore,
Set<Field> modifiedFields)
private List<Operation> stagingToMainTableColumnMatch(Dataset mainDataset, SchemaDefinition stagingDataset, Set<String> fieldsToIgnore, Set<Field> modifiedFields)
{
List<Operation> operations = new ArrayList<>();
List<Field> mainFields = mainDataset.schema().fields();
List<Field> stagingFields = stagingDataset.schema().fields();
List<Field> stagingFields = stagingDataset.fields();
List<Field> filteredFields = stagingFields.stream().filter(field -> !fieldsToIgnore.contains(field.name())).collect(Collectors.toList());
for (Field stagingField : filteredFields)
{
Expand All @@ -145,12 +142,19 @@ private List<Operation> stagingToMainTableColumnMatch(Dataset mainDataset,
if (matchedMainField == null)
{
// Add the new column in the main table if database supports ADD_COLUMN capability and
// if user capability supports ADD_COLUMN or is empty (since empty means no overriden preference)
// if user capability supports ADD_COLUMN
if (sink.capabilities().contains(Capability.ADD_COLUMN)
&& (schemaEvolutionCapabilitySet.contains(SchemaEvolutionCapability.ADD_COLUMN)))
{
operations.add(Alter.of(mainDataset, Alter.AlterOperation.ADD, stagingField, Optional.empty()));
modifiedFields.add(stagingField);
if (stagingField.nullable())
{
operations.add(Alter.of(mainDataset, Alter.AlterOperation.ADD, stagingField, Optional.empty()));
modifiedFields.add(stagingField);
}
else
{
throw new IncompatibleSchemaChangeException(String.format("Non-nullable field \"%s\" in staging dataset cannot be added, as it is backward-incompatible change.", stagingFieldName));
}
}
else
{
Expand Down Expand Up @@ -270,11 +274,11 @@ private void alterColumnWithNullable(Field newField, Dataset mainDataset, List<O
}
}

private List<Operation> mainToStagingTableColumnMatch(Dataset mainDataset, Dataset stagingDataset, Set<String> fieldsToIgnore, Set<Field> modifiedFields)
private List<Operation> mainToStagingTableColumnMatch(Dataset mainDataset, SchemaDefinition stagingDataset, Set<String> fieldsToIgnore, Set<Field> modifiedFields)
{
List<Operation> operations = new ArrayList<>();
List<Field> mainFields = mainDataset.schema().fields();
Set<String> stagingFieldNames = stagingDataset.schema().fields().stream().map(Field::name).collect(Collectors.toSet());
Set<String> stagingFieldNames = stagingDataset.fields().stream().map(Field::name).collect(Collectors.toSet());
for (Field mainField : mainFields.stream().filter(field -> !fieldsToIgnore.contains(field.name())).collect(Collectors.toList()))
{
String mainFieldName = mainField.name();
Expand Down
Loading

0 comments on commit 2c833be

Please sign in to comment.