diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java index 81670c9cc54..f331404471a 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java @@ -1164,6 +1164,11 @@ protected HistoryRecordComparator getHistoryRecordComparator() { return new MySqlHistoryRecordComparator(gtidSourceFilter()); } + @Override + protected SchemaPartitioner getSchemaPartitioner() { + return SchemaPartitioner.NOOP; + } + public static boolean isBuiltInDatabase(String databaseName) { if (databaseName == null) { return false; diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java index 3bb4705d703..7e8d48060e4 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java @@ -829,6 +829,11 @@ protected HistoryRecordComparator getHistoryRecordComparator() { return getAdapter().getHistoryRecordComparator(); } + @Override + protected SchemaPartitioner getSchemaPartitioner() { + return SchemaPartitioner.NOOP; + } + /** * Defines modes of representation of {@code interval} datatype */ diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java index 6353c65336f..b7b7dbd63bc 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java @@ -493,6 +493,11 @@ protected boolean isPositionAtOrBefore(Document recorded, Document desired) { }; } + @Override + protected SchemaPartitioner getSchemaPartitioner() { + return (s) -> TableFilter.fromPredicate((t) -> t.catalog().equals(s.get("database"))); + } + @Override public String getContextName() { return Module.contextName(); diff --git a/debezium-core/src/main/java/io/debezium/relational/HistorizedRelationalDatabaseConnectorConfig.java b/debezium-core/src/main/java/io/debezium/relational/HistorizedRelationalDatabaseConnectorConfig.java index 7dc9d51be51..1740a155d0a 100644 --- a/debezium-core/src/main/java/io/debezium/relational/HistorizedRelationalDatabaseConnectorConfig.java +++ b/debezium-core/src/main/java/io/debezium/relational/HistorizedRelationalDatabaseConnectorConfig.java @@ -5,6 +5,7 @@ */ package io.debezium.relational; +import java.util.Map; import java.util.function.Predicate; import java.util.regex.Pattern; @@ -137,7 +138,7 @@ public SchemaHistory getSchemaHistory(Tables initialSchema) { .withDefault(SchemaHistory.INTERNAL_CONNECTOR_ID, logicalName) .build(); - schemaHistory = new SnapshotAwareSchemaHistory(schemaHistory, initialSchema); + schemaHistory = new SnapshotAwareSchemaHistory(schemaHistory, initialSchema, getSchemaPartitioner()); HistoryRecordComparator historyComparator = getHistoryRecordComparator(); SchemaHistoryListener historyListener = new SchemaHistoryMetrics(this, multiPartitionMode()); @@ -185,4 +186,11 @@ public boolean storeOnlyCapturedDatabases() { */ protected abstract HistoryRecordComparator getHistoryRecordComparator(); + protected abstract SchemaPartitioner getSchemaPartitioner(); + + public interface SchemaPartitioner { + TableFilter fromPartition(Map source); + + public static SchemaPartitioner NOOP = (s) -> Tables.TableFilter.includeAll(); + } } diff --git a/debezium-core/src/main/java/io/debezium/relational/history/HistoryRecord.java b/debezium-core/src/main/java/io/debezium/relational/history/HistoryRecord.java index 9be0baf7061..ae8d8a469e1 100644 --- a/debezium-core/src/main/java/io/debezium/relational/history/HistoryRecord.java +++ b/debezium-core/src/main/java/io/debezium/relational/history/HistoryRecord.java @@ -32,6 +32,10 @@ public HistoryRecord(Document document) { this.doc = document; } + public HistoryRecord(Map source, Map position) { + this(source, position, null, null, null, null, null); + } + public HistoryRecord(Map source, Map position, String databaseName, String schemaName, String ddl, TableChanges changes, Instant timestamp) { this.doc = Document.create(); diff --git a/debezium-core/src/main/java/io/debezium/relational/history/HistoryRecordProcessor.java b/debezium-core/src/main/java/io/debezium/relational/history/HistoryRecordProcessor.java index 1ff0666d839..8c0c0187d44 100644 --- a/debezium-core/src/main/java/io/debezium/relational/history/HistoryRecordProcessor.java +++ b/debezium-core/src/main/java/io/debezium/relational/history/HistoryRecordProcessor.java @@ -77,7 +77,7 @@ private Map getStopPoints(Map, Map new HistoryRecordProcessor(o, s, p, processorProvider)); + this.snapshotProcessor = new SnapshotProcessor(getSchemaHistorySnapshot(config), comparator, schemaPartitioner); + delegate.configure(config, comparator, listener, (o, s, p) -> new HistoryRecordProcessor(o, s, p, comparator, listener, snapshotProcessor, processorProvider)); + } - // TODO: init snapshot + private SchemaHistorySnapshot getSchemaHistorySnapshot(Configuration config) { + // TODO: use config to init SchemaHistorySnapshot instance + return SchemaHistorySnapshot.NOOP; } @Override @@ -46,19 +49,18 @@ public void start() { @Override public void record(Map source, Map position, String databaseName, String ddl) throws SchemaHistoryException { delegate.record(source, position, databaseName, ddl); - snapshot.save(source, position, schema); + snapshotProcessor.save(source, position, schema); } @Override public void record(Map source, Map position, String databaseName, String schemaName, String ddl, TableChanges changes, Instant timestamp) throws SchemaHistoryException { delegate.record(source, position, databaseName, schemaName, ddl, changes, timestamp); - snapshot.save(source, position, schema); + snapshotProcessor.save(source, position, schema); } @Override public void recover(Map, Map> offsets, Tables schema, DdlParser ddlParser) { - snapshot.read(offsets, schema); delegate.recover(offsets, schema, ddlParser); } @@ -82,13 +84,80 @@ public void initializeStorage() { delegate.initializeStorage(); } - private class HistoryRecordProcessor implements Consumer { + private static class SnapshotProcessor { + private final SchemaHistorySnapshot adapter; + private final HistoryRecordComparator comparator; + private final SchemaPartitioner schemaPartitioner; + + public SnapshotProcessor(SchemaHistorySnapshot adapter, HistoryRecordComparator comparator, SchemaPartitioner schemaPartitioner) { + this.adapter = adapter; + this.comparator = comparator; + this.schemaPartitioner = schemaPartitioner; + } + + public void save(Map source, Map position, Tables schema) { + adapter.save(source, position, getPartialSchema(source, schema)); + } + + public Map, Map> read(Map, Map> offsets, Tables schema) { + Map, Map> snapshotOffsets = getSnapshotOffsets(offsets); + snapshotOffsets.forEach((Map source, Map position) -> { + if (position == null) { + return; + } + + Tables partialSchema = adapter.read(source, position); + // that is what could be schema.merge(partialSchema) + partialSchema.tableIds().forEach((tableId) -> { + schema.overwriteTable(partialSchema.forTable(tableId)); + }); + snapshotOffsets.put(source, position); + }); + return snapshotOffsets; + } + + private Map, Map> getSnapshotOffsets(Map, Map> offsets) { + Map, Map> snapshotOffsets = new HashMap<>(); + offsets.forEach((Map source, Map position) -> { + HistoryRecord historyRecord = new HistoryRecord(source, position); + Map snapshotPosition = adapter.find(source).stream() + // snapshot position keys match connector position keys + .filter((p) -> p.keySet().equals(position.keySet())) + // snapshot position is smaller than connector position + .filter((p) -> comparator.isAtOrBefore(new HistoryRecord(source, p), historyRecord)) + // sort positions ASC and get last + .max((p1, p2) -> { + HistoryRecord h1 = new HistoryRecord(source, p1); + HistoryRecord h2 = new HistoryRecord(source, p2); + return comparator.isAtOrBefore(h1, h2) ? 1 : comparator.isAtOrBefore(h2, h1) ? 0 : -1; + }) + // null means there is no valid snapshot for source + .orElse(null); + + snapshotOffsets.put(source, snapshotPosition); + }); + return snapshotOffsets; + } + + private Tables getPartialSchema(Map source, Tables schema) { + if (schemaPartitioner == null) { + return schema; + } + return schema.subset(schemaPartitioner.fromPartition(source)); + } + } + + private static class HistoryRecordProcessor implements Consumer { private final Map stopPoints; + private final HistoryRecordComparator comparator; + private final SchemaHistoryListener listener; private final Consumer originalProcessor; - public HistoryRecordProcessor(Map, Map> offsets, Tables schema, DdlParser ddlParser, HistoryRecordProcessorProvider processorProvider) { - this.stopPoints = getStopPoints(snapshot.getOffsets(offsets)); + public HistoryRecordProcessor(Map, Map> offsets, Tables schema, DdlParser ddlParser, HistoryRecordComparator comparator, SchemaHistoryListener listener, SnapshotProcessor snapshotProcessor, HistoryRecordProcessorProvider processorProvider) { + this.stopPoints = getStopPoints(snapshotProcessor.read(offsets, schema)); + this.comparator = comparator; + this.listener = listener; this.originalProcessor = processorProvider.get(offsets, schema, ddlParser); } @@ -99,7 +168,7 @@ private Map getStopPoints(Map, Map getStopPoints(Map, Map : snapshot processor * ---------------(SO)-----(CO)----> : schema history * <-----> : original processor - * @param recovered the input argument + * @param recovered the recovered schema history record */ @Override public void accept(HistoryRecord recovered) { listener.onChangeFromHistory(recovered); Document srcDocument = recovered.document().getDocument(HistoryRecord.Fields.SOURCE); - if (stopPoints.containsKey(srcDocument) && comparator.isAtOrBefore(recovered, stopPoints.get(srcDocument))) { + if (stopPoints.containsKey(srcDocument) && stopPoints.get(srcDocument).isValid() && comparator.isAtOrBefore(recovered, stopPoints.get(srcDocument))) { // schema snapshot contains this record, skip processing listener.onChangeApplied(recovered); } diff --git a/debezium-core/src/main/java/io/debezium/relational/history/snapshot/SchemaHistorySnapshot.java b/debezium-core/src/main/java/io/debezium/relational/history/snapshot/SchemaHistorySnapshot.java index 265afa22f1d..ebe2831a632 100644 --- a/debezium-core/src/main/java/io/debezium/relational/history/snapshot/SchemaHistorySnapshot.java +++ b/debezium-core/src/main/java/io/debezium/relational/history/snapshot/SchemaHistorySnapshot.java @@ -7,32 +7,19 @@ import io.debezium.relational.Tables; +import java.util.List; import java.util.Map; -import java.util.stream.Collectors; public interface SchemaHistorySnapshot { String CONFIGURATION_FIELD_PREFIX_STRING = "schema.history.internal.snapshot."; void save(Map source, Map position, Tables schema); + Tables read(Map source, Map position); + List> find(Map source); - void read(Map, Map> offsets, Tables schema); - - /** - * TODO: use connectorOffsets to: - * 1. find all serialized snapshots - * 2. filter out the ones not matching source partition - * 3. filter out the ones with offset structure which differs from given - * 4. sort the list by offsets using HistoryRecordComparator - * 5. find the closest (smaller) offsets to the given ones - * - * TODO: implement offset -> snapshot name - * TODO: implement snapshot name -> offset - * - * @param connectorOffsets - * @return - */ - Map, Map> getOffsets(Map, Map> connectorOffsets); + // TODO: implement offset -> snapshot name + // TODO: implement snapshot name -> offset SchemaHistorySnapshot NOOP = new SchemaHistorySnapshot() { @Override @@ -41,13 +28,13 @@ public void save(Map source, Map position, Tables schema) } @Override - public void read(Map, Map> offsets, Tables schema) { - + public Tables read(Map source, Map position) { + return null; } @Override - public Map, Map> getOffsets(Map, Map> connectorOffsets) { - return connectorOffsets.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> null)); + public List> find(Map source) { + return List.of(); } }; }