Skip to content

Commit

Permalink
CXP-2779: introduced SchemaHistorySnapshot interface
Browse files Browse the repository at this point in the history
  • Loading branch information
mikekamornikov committed Dec 11, 2023
1 parent 493101f commit c7393e7
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ protected HistorizedRelationalDatabaseConnectorConfig(Class<? extends SourceConn
/**
* Returns a configured (but not yet started) instance of the database schema history.
*/
public SchemaHistory getSchemaHistory() {
public SchemaHistory getSchemaHistory(Tables initialSchema) {
Configuration config = getConfig();

SchemaHistory schemaHistory = config.getInstance(SCHEMA_HISTORY, SchemaHistory.class);
Expand All @@ -137,7 +137,7 @@ public SchemaHistory getSchemaHistory() {
.withDefault(SchemaHistory.INTERNAL_CONNECTOR_ID, logicalName)
.build();

schemaHistory = new SnapshotAwareSchemaHistory(schemaHistory);
schemaHistory = new SnapshotAwareSchemaHistory(schemaHistory, initialSchema);

HistoryRecordComparator historyComparator = getHistoryRecordComparator();
SchemaHistoryListener historyListener = new SchemaHistoryMetrics(this, multiPartitionMode());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ protected HistorizedRelationalDatabaseSchema(HistorizedRelationalDatabaseConnect
boolean tableIdCaseInsensitive, KeyMapper customKeysMapper) {
super(config, topicNamingStrategy, tableFilter, columnFilter, schemaBuilder, tableIdCaseInsensitive, customKeysMapper);

this.schemaHistory = config.getSchemaHistory();
this.schemaHistory = config.getSchemaHistory(tables());
this.schemaHistory.start();
this.historizedConnectorConfig = config;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,28 @@
import io.debezium.document.Document;
import io.debezium.relational.Tables;
import io.debezium.relational.ddl.DdlParser;
import io.debezium.relational.history.snapshot.SchemaHistorySnapshot;

public class SnapshotAwareSchemaHistory implements SchemaHistory {

private final SchemaHistory delegate;
private Tables schema;
private HistoryRecordComparator comparator;
private SchemaHistoryListener listener;
private SchemaHistorySnapshot snapshot = SchemaHistorySnapshot.NOOP;

public SnapshotAwareSchemaHistory(SchemaHistory delegate) {
public SnapshotAwareSchemaHistory(SchemaHistory delegate, Tables initialSchema) {
this.delegate = delegate;
this.schema = initialSchema;
}

@Override
public void configure(Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, HistoryRecordProcessorProvider processorProvider) {
this.comparator = comparator;
this.listener = listener;
delegate.configure(config, comparator, listener, (o, s, p) -> new HistoryRecordProcessor(o, s, p, processorProvider));

// TODO: init snapshot
}

@Override
Expand All @@ -40,22 +46,19 @@ public void start() {
@Override
public void record(Map<String, ?> source, Map<String, ?> position, String databaseName, String ddl) throws SchemaHistoryException {
delegate.record(source, position, databaseName, ddl);
storeSnapshot();
snapshot.save(source, position, schema);
}

@Override
public void record(Map<String, ?> source, Map<String, ?> position, String databaseName, String schemaName, String ddl, TableChanges changes, Instant timestamp)
throws SchemaHistoryException {
delegate.record(source, position, databaseName, schemaName, ddl, changes, timestamp);
storeSnapshot();
}

private void storeSnapshot() {
// TODO: use SchemaHistorySnapshot implementation
snapshot.save(source, position, schema);
}

@Override
public void recover(Map<Map<String, ?>, Map<String, ?>> offsets, Tables schema, DdlParser ddlParser) {
snapshot.read(offsets, schema);
delegate.recover(offsets, schema, ddlParser);
}

Expand All @@ -81,21 +84,24 @@ public void initializeStorage() {

private class HistoryRecordProcessor implements Consumer<HistoryRecord> {

private final Map<Document, HistoryRecord> stopPoints = new HashMap<>();
private final Map<Document, HistoryRecord> stopPoints;
private final Consumer<HistoryRecord> originalProcessor;

public HistoryRecordProcessor(Map<Map<String, ?>, Map<String, ?>> offsets, Tables schema, DdlParser ddlParser, HistoryRecordProcessorProvider processorProvider) {
// TODO: find correct snapshots
// TODO: get snapshot positions from offsets
this.stopPoints = getStopPoints(snapshot.getOffsets(offsets));
this.originalProcessor = processorProvider.get(offsets, schema, ddlParser);
}

private Map<Document, HistoryRecord> getStopPoints(Map<Map<String, ?>, Map<String, ?>> offsets) {
Map<Document, HistoryRecord> stopPoints = new HashMap<>();
offsets.forEach((Map<String, ?> source, Map<String, ?> position) -> {
Document srcDocument = Document.create();
if (source != null) {
source.forEach(srcDocument::set);
}
// stopPoints.put(srcDocument, new HistoryRecord(source, position, null, null, null, null, null));
stopPoints.put(srcDocument, new HistoryRecord(source, null, null, null, null, null, null));
stopPoints.put(srcDocument, new HistoryRecord(source, position, null, null, null, null, null));
});
this.originalProcessor = processorProvider.get(offsets, schema, ddlParser);
return stopPoints;
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.relational.history.snapshot;

import io.debezium.relational.Tables;

import java.util.Map;
import java.util.stream.Collectors;

public interface SchemaHistorySnapshot {

String CONFIGURATION_FIELD_PREFIX_STRING = "schema.history.internal.snapshot.";

void save(Map<String, ?> source, Map<String, ?> position, Tables schema);

void read(Map<Map<String, ?>, Map<String, ?>> offsets, Tables schema);

/**
* TODO: use connectorOffsets to:
* 1. find all serialized snapshots
* 2. filter out the ones not matching source partition
* 3. filter out the ones with offset structure which differs from given
* 4. sort the list by offsets using HistoryRecordComparator
* 5. find the closest (smaller) offsets to the given ones
*
* TODO: implement offset -> snapshot name
* TODO: implement snapshot name -> offset
*
* @param connectorOffsets
* @return
*/
Map<Map<String, ?>, Map<String, ?>> getOffsets(Map<Map<String, ?>, Map<String, ?>> connectorOffsets);

SchemaHistorySnapshot NOOP = new SchemaHistorySnapshot() {
@Override
public void save(Map<String, ?> source, Map<String, ?> position, Tables schema) {

}

@Override
public void read(Map<Map<String, ?>, Map<String, ?>> offsets, Tables schema) {

}

@Override
public Map<Map<String, ?>, Map<String, ?>> getOffsets(Map<Map<String, ?>, Map<String, ?>> connectorOffsets) {
return connectorOffsets.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> null));
}
};
}

0 comments on commit c7393e7

Please sign in to comment.