From 01f982cc9f07adb02654c008b252d569d962c6c3 Mon Sep 17 00:00:00 2001 From: Mikhail Kamornikau Date: Tue, 2 Jan 2024 16:08:49 +0100 Subject: [PATCH] CXP-2779: added SchemaHistorySnapshot interface and snapshot aware SchemaHistory implementation --- .../connector/mysql/MySqlConnectorConfig.java | 14 +- .../AbstractSchemaHistorySnapshotTest.java | 121 +++++++++++++ .../MemorySchemaHistorySnapshotTest.java | 12 ++ .../oracle/OracleConnectorConfig.java | 6 + .../sqlserver/SqlServerConnectorConfig.java | 10 ++ ...izedRelationalDatabaseConnectorConfig.java | 33 +++- .../history/SnapshotAwareSchemaHistory.java | 156 ++++++++++++++++ .../AbstractSchemaHistorySnapshot.java | 89 +++++++++ .../snapshot/MemorySchemaHistorySnapshot.java | 40 +++++ .../snapshot/NoopSchemaHistorySnapshot.java | 35 ++++ .../snapshot/SchemaHistorySnapshot.java | 26 +++ .../history/snapshot/SchemaPartitioner.java | 16 ++ .../SnapshotAwareSchemaHistoryTest.java | 170 ++++++++++++++++++ 13 files changed, 722 insertions(+), 6 deletions(-) create mode 100644 debezium-connector-mysql/src/test/java/io/debezium/relational/history/snapshot/AbstractSchemaHistorySnapshotTest.java create mode 100644 debezium-connector-mysql/src/test/java/io/debezium/relational/history/snapshot/MemorySchemaHistorySnapshotTest.java create mode 100644 debezium-core/src/main/java/io/debezium/relational/history/SnapshotAwareSchemaHistory.java create mode 100644 debezium-core/src/main/java/io/debezium/relational/history/snapshot/AbstractSchemaHistorySnapshot.java create mode 100644 debezium-core/src/main/java/io/debezium/relational/history/snapshot/MemorySchemaHistorySnapshot.java create mode 100644 debezium-core/src/main/java/io/debezium/relational/history/snapshot/NoopSchemaHistorySnapshot.java create mode 100644 debezium-core/src/main/java/io/debezium/relational/history/snapshot/SchemaHistorySnapshot.java create mode 100644 debezium-core/src/main/java/io/debezium/relational/history/snapshot/SchemaPartitioner.java create mode 100644 debezium-core/src/test/java/io/debezium/relational/history/SnapshotAwareSchemaHistoryTest.java diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java index 81670c9cc54..19e50acd81d 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java @@ -34,6 +34,7 @@ import io.debezium.relational.TableId; import io.debezium.relational.Tables.TableFilter; import io.debezium.relational.history.HistoryRecordComparator; +import io.debezium.relational.history.snapshot.SchemaPartitioner; import io.debezium.schema.DefaultTopicNamingStrategy; import io.debezium.util.Collect; @@ -382,10 +383,10 @@ public boolean flushResetsIsolationLevel() { } /** - * Determine which flavour of MySQL locking to use. - * - * @return the correct SQL to obtain a global lock for the current mode - */ + * Determine which flavour of MySQL locking to use. + * + * @return the correct SQL to obtain a global lock for the current mode + */ public String getLockStatement() { if (value.equals(MINIMAL_PERCONA.value)) { return "LOCK TABLES FOR BACKUP"; @@ -1164,6 +1165,11 @@ protected HistoryRecordComparator getHistoryRecordComparator() { return new MySqlHistoryRecordComparator(gtidSourceFilter()); } + @Override + protected SchemaPartitioner getSchemaPartitioner() { + return SchemaPartitioner.SINGLE_PARTITION; + } + public static boolean isBuiltInDatabase(String databaseName) { if (databaseName == null) { return false; diff --git a/debezium-connector-mysql/src/test/java/io/debezium/relational/history/snapshot/AbstractSchemaHistorySnapshotTest.java b/debezium-connector-mysql/src/test/java/io/debezium/relational/history/snapshot/AbstractSchemaHistorySnapshotTest.java new file mode 100644 index 00000000000..6e87d4ff35c --- /dev/null +++ b/debezium-connector-mysql/src/test/java/io/debezium/relational/history/snapshot/AbstractSchemaHistorySnapshotTest.java @@ -0,0 +1,121 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.relational.history.snapshot; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; + +import java.util.Map; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser; +import io.debezium.relational.Tables; +import io.debezium.relational.ddl.DdlParser; +import io.debezium.relational.history.MemorySchemaHistory; +import io.debezium.relational.history.SchemaHistory; + +public abstract class AbstractSchemaHistorySnapshotTest { + + protected SchemaHistorySnapshot snapshot; + protected SchemaHistory history; + protected Map source1; + protected Map source2; + protected Tables s1_t0, s1_t1, s1_t2, s1_t3; + protected Tables s2_t0, s2_t1, s2_t2, s2_t3; + protected DdlParser parser; + + @Before + public void beforeEach() { + history = new MemorySchemaHistory(); + snapshot = createHistorySnapshot(); + + source1 = server("abc"); + source2 = server("xyz"); + + parser = new MySqlAntlrDdlParser(); + s1_t0 = new Tables(); + s1_t1 = new Tables(); + s1_t2 = new Tables(); + s1_t3 = new Tables(); + + record(1, "CREATE TABLE foo ( first VARCHAR(22) NOT NULL );", s1_t3, s1_t2, s1_t1, s1_t0); + record(23, "CREATE TABLE\nperson ( name VARCHAR(22) NOT NULL );", s1_t3, s1_t2, s1_t1); + record(30, "CREATE TABLE address\n( street VARCHAR(22) NOT NULL );", s1_t3, s1_t2); + record(32, "ALTER TABLE address ADD city VARCHAR(22) NOT NULL;", s1_t3); + + snapshot.save(source1, position(32), s1_t3); + snapshot.save(source1, position(30), s1_t2); + snapshot.save(source1, position(23), s1_t1); + snapshot.save(source1, position(1), s1_t0); + + s2_t0 = new Tables(); + s2_t1 = new Tables(); + s2_t2 = new Tables(); + s2_t3 = new Tables(); + + record(2, "CREATE TABLE foo1 ( first VARCHAR(22) NOT NULL );", s2_t3, s2_t2, s2_t1, s2_t0); + record(12, "CREATE TABLE\nperson1 ( name VARCHAR(22) NOT NULL );", s2_t3, s2_t2, s2_t1); + record(16, "CREATE TABLE address1\n( street VARCHAR(22) NOT NULL );", s2_t3, s2_t2); + record(99, "ALTER TABLE address1 ADD city VARCHAR(22) NOT NULL;", s2_t3); + + snapshot.save(source2, position(55), s2_t3); + snapshot.save(source2, position(16), s2_t2); + snapshot.save(source2, position(12), s2_t1); + snapshot.save(source2, position(2), s2_t0); + } + + @After + public void afterEach() { + } + + protected abstract SchemaHistorySnapshot createHistorySnapshot(); + + protected Map server(String serverName) { + return Map.of("server", serverName); + } + + protected Map position(long pos) { + return Map.of("file", "mysql-bin-changelog.000011", "pos", pos); + } + + protected void record(long pos, String ddl, Tables... update) { + try { + history.record(source1, position(pos), "db", ddl); + } + catch (Throwable t) { + fail(t.getMessage()); + } + for (Tables tables : update) { + if (tables != null) { + parser.setCurrentSchema("db"); + parser.parse(ddl, tables); + } + } + } + + @Test + public void shouldRecordSnapshotsAndRecoverToClosest() { + assertNull(snapshot.read(source1, null)); + assertNull(snapshot.read(null, position(31))); + assertNull(snapshot.read(source1, position(31))); + assertEquals(s1_t2, snapshot.read(source1, position(30))); + + assertNull(snapshot.read(source2, position(30))); + assertEquals(s2_t1, snapshot.read(source2, position(12))); + + assertNull(snapshot.findClosest(null, position(31))); + assertNull(snapshot.findClosest(source1, null)); + assertEquals(position(30), snapshot.findClosest(source1, position(31))); + assertEquals(position(32), snapshot.findClosest(source1, position(99))); + + assertNull(snapshot.findClosest(source2, position(1))); + assertEquals(position(55), snapshot.findClosest(source2, position(99))); + } +} diff --git a/debezium-connector-mysql/src/test/java/io/debezium/relational/history/snapshot/MemorySchemaHistorySnapshotTest.java b/debezium-connector-mysql/src/test/java/io/debezium/relational/history/snapshot/MemorySchemaHistorySnapshotTest.java new file mode 100644 index 00000000000..5f17c4971c5 --- /dev/null +++ b/debezium-connector-mysql/src/test/java/io/debezium/relational/history/snapshot/MemorySchemaHistorySnapshotTest.java @@ -0,0 +1,12 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.relational.history.snapshot; + +public class MemorySchemaHistorySnapshotTest extends AbstractSchemaHistorySnapshotTest { + protected SchemaHistorySnapshot createHistorySnapshot() { + return new MemorySchemaHistorySnapshot(); + } +} diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java index 3bb4705d703..a757735bc83 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java @@ -47,6 +47,7 @@ import io.debezium.relational.TableId; import io.debezium.relational.Tables.TableFilter; import io.debezium.relational.history.HistoryRecordComparator; +import io.debezium.relational.history.snapshot.SchemaPartitioner; import io.debezium.util.Strings; /** @@ -829,6 +830,11 @@ protected HistoryRecordComparator getHistoryRecordComparator() { return getAdapter().getHistoryRecordComparator(); } + @Override + protected SchemaPartitioner getSchemaPartitioner() { + return SchemaPartitioner.SINGLE_PARTITION; + } + /** * Defines modes of representation of {@code interval} datatype */ diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java index 6353c65336f..a1553b2ea5a 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java @@ -33,6 +33,7 @@ import io.debezium.relational.TableId; import io.debezium.relational.Tables.TableFilter; import io.debezium.relational.history.HistoryRecordComparator; +import io.debezium.relational.history.snapshot.SchemaPartitioner; import io.debezium.spi.schema.DataCollectionId; import io.debezium.util.Strings; @@ -493,6 +494,15 @@ protected boolean isPositionAtOrBefore(Document recorded, Document desired) { }; } + @Override + protected SchemaPartitioner getSchemaPartitioner() { + return (source) -> { + return TableFilter.fromPredicate((tableId) -> { + return tableId.catalog().equals(((SqlServerPartition)source).getDatabaseName()); + }); + }; + } + @Override public String getContextName() { return Module.contextName(); diff --git a/debezium-core/src/main/java/io/debezium/relational/HistorizedRelationalDatabaseConnectorConfig.java b/debezium-core/src/main/java/io/debezium/relational/HistorizedRelationalDatabaseConnectorConfig.java index f6f8cdf4c4a..851e05d0708 100644 --- a/debezium-core/src/main/java/io/debezium/relational/HistorizedRelationalDatabaseConnectorConfig.java +++ b/debezium-core/src/main/java/io/debezium/relational/HistorizedRelationalDatabaseConnectorConfig.java @@ -27,6 +27,9 @@ import io.debezium.relational.history.SchemaHistory; import io.debezium.relational.history.SchemaHistoryListener; import io.debezium.relational.history.SchemaHistoryMetrics; +import io.debezium.relational.history.SnapshotAwareSchemaHistory; +import io.debezium.relational.history.snapshot.SchemaHistorySnapshot; +import io.debezium.relational.history.snapshot.SchemaPartitioner; /** * Configuration options shared across the relational CDC connectors which use a persistent database schema history. @@ -38,6 +41,7 @@ public abstract class HistorizedRelationalDatabaseConnectorConfig extends Relati protected static final int DEFAULT_SNAPSHOT_FETCH_SIZE = 2_000; private static final String DEFAULT_SCHEMA_HISTORY = "io.debezium.storage.kafka.history.KafkaSchemaHistory"; + private static final String DEFAULT_SCHEMA_HISTORY_SNAPSHOT = "io.debezium.relational.history.snapshot.NoopSchemaHistorySnapshot"; private final boolean useCatalogBeforeSchema; private final Class connectorClass; @@ -62,6 +66,17 @@ public abstract class HistorizedRelationalDatabaseConnectorConfig extends Relati + SchemaHistory.CONFIGURATION_FIELD_PREFIX_STRING + "' string.") .withDefault(DEFAULT_SCHEMA_HISTORY); + public static final Field SCHEMA_HISTORY_SNAPSHOT = Field.create("schema.history.internal.snapshot") + .withDisplayName("Database schema history snapshot class") + .withType(Type.CLASS) + .withWidth(Width.LONG) + .withImportance(Importance.LOW) + .withInvisibleRecommender() + .withDescription("The name of the SchemaHistorySnapshot class that should be used to store and recover database schema. " + + "The configuration properties for the history snapshot are prefixed with the '" + + SchemaHistorySnapshot.CONFIGURATION_FIELD_PREFIX_STRING + "' string.") + .withDefault(DEFAULT_SCHEMA_HISTORY_SNAPSHOT); + public static final Field SKIP_UNPARSEABLE_DDL_STATEMENTS = SchemaHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS; public static final Field STORE_ONLY_CAPTURED_TABLES_DDL = SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL; @@ -73,7 +88,8 @@ public abstract class HistorizedRelationalDatabaseConnectorConfig extends Relati SCHEMA_HISTORY, SKIP_UNPARSEABLE_DDL_STATEMENTS, STORE_ONLY_CAPTURED_TABLES_DDL, - STORE_ONLY_CAPTURED_DATABASES_DDL) + STORE_ONLY_CAPTURED_DATABASES_DDL, + SCHEMA_HISTORY_SNAPSHOT) .create(); protected HistorizedRelationalDatabaseConnectorConfig(Class connectorClass, @@ -137,11 +153,23 @@ public SchemaHistory getSchemaHistory(DdlParser ddlParser) { .withDefault(SchemaHistory.INTERNAL_CONNECTOR_ID, logicalName) .build(); + SchemaHistorySnapshot schemaSnapshot = config.getInstance(SCHEMA_HISTORY_SNAPSHOT, SchemaHistorySnapshot.class); + if (schemaSnapshot == null) { + throw new ConnectException("Unable to instantiate the database schema history snapshot class " + + config.getString(SCHEMA_HISTORY_SNAPSHOT)); + } + + Configuration schemaHistorySnapshotConfig = config.subset(SchemaHistorySnapshot.CONFIGURATION_FIELD_PREFIX_STRING, false); + HistoryRecordComparator historyComparator = getHistoryRecordComparator(); SchemaHistoryListener historyListener = new SchemaHistoryMetrics(this, multiPartitionMode()); + schemaSnapshot.configure(schemaHistorySnapshotConfig, historyComparator, getSchemaPartitioner()); + + schemaHistory = new SnapshotAwareSchemaHistory(schemaHistory, schemaSnapshot); + HistoryRecordProcessorProvider processorProvider = (o, s) -> new HistoryRecordProcessor(o, s, ddlParser, schemaHistoryConfig, historyComparator, historyListener, useCatalogBeforeSchema()); - schemaHistory.configure(schemaHistoryConfig, historyComparator, historyListener, processorProvider); + schemaHistory.configure(schemaHistoryConfig, historyComparator, historyListener, processorProvider); // validates return schemaHistory; } @@ -183,4 +211,5 @@ public boolean storeOnlyCapturedDatabases() { */ protected abstract HistoryRecordComparator getHistoryRecordComparator(); + protected abstract SchemaPartitioner getSchemaPartitioner(); } diff --git a/debezium-core/src/main/java/io/debezium/relational/history/SnapshotAwareSchemaHistory.java b/debezium-core/src/main/java/io/debezium/relational/history/SnapshotAwareSchemaHistory.java new file mode 100644 index 00000000000..c843082670a --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/relational/history/SnapshotAwareSchemaHistory.java @@ -0,0 +1,156 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.relational.history; + +import java.time.Instant; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Consumer; + +import io.debezium.config.Configuration; +import io.debezium.document.Document; +import io.debezium.pipeline.spi.OffsetContext; +import io.debezium.pipeline.spi.Offsets; +import io.debezium.pipeline.spi.Partition; +import io.debezium.relational.Tables; +import io.debezium.relational.history.snapshot.SchemaHistorySnapshot; +import io.debezium.util.Clock; + +public class SnapshotAwareSchemaHistory implements SchemaHistory { + + private final SchemaHistory delegate; + private final SchemaHistorySnapshot snapshot; + private Tables schemaToSave = null; + + public SnapshotAwareSchemaHistory(SchemaHistory delegate, SchemaHistorySnapshot snapshot) { + this.delegate = delegate; + this.snapshot = snapshot; + } + + @Override + public void configure(Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, HistoryRecordProcessorProvider processorProvider) { + delegate.configure(config, comparator, listener, (o, s) -> new HistoryRecordProcessor(o, s, comparator, listener, snapshot, processorProvider)); + } + + @Override + public void start() { + delegate.start(); + } + + @Override + public void record(Map source, Map position, String databaseName, String ddl) throws SchemaHistoryException { + record(source, position, databaseName, null, ddl, null, Clock.SYSTEM.currentTimeAsInstant()); + } + + @Override + public void record(Map source, Map position, String databaseName, String schemaName, String ddl, TableChanges changes, Instant timestamp) + throws SchemaHistoryException { + delegate.record(source, position, databaseName, schemaName, ddl, changes, timestamp); + snapshot.save(source, position, schemaToSave); + } + + @Override + public void recover(Offsets offsets, Tables schema) { + delegate.recover(offsets, schema); + + // once recovered the schema can be used to create snapshots + schemaToSave = schema; + } + + @Override + public void stop() { + delegate.stop(); + } + + @Override + public boolean exists() { + return delegate.exists(); + } + + @Override + public boolean storageExists() { + return delegate.storageExists(); + } + + @Override + public void initializeStorage() { + delegate.initializeStorage(); + } + + private static class HistoryRecordProcessor implements Consumer { + + private final HistoryRecordComparator comparator; + private final SchemaHistoryListener listener; + private final SchemaHistorySnapshot snapshot; + private final Consumer originalProcessor; + private final Map stopPoints; + + public HistoryRecordProcessor(Offsets offsets, Tables schema, HistoryRecordComparator comparator, SchemaHistoryListener listener, SchemaHistorySnapshot snapshot, HistoryRecordProcessorProvider processorProvider) { + this.comparator = comparator; + this.listener = listener; + this.snapshot = snapshot; + this.stopPoints = getStopPoints(offsets, schema); + this.originalProcessor = processorProvider.get(offsets, schema); + } + + private Map getStopPoints(Offsets offsets, Tables schema) { + Map stopPoints = new HashMap<>(); + getSnapshotOffsets(offsets).forEach((Map source, Map position) -> { + Tables schemaPartition = snapshot.read(source, position); + if (schemaPartition != null) { + merge(schema, schemaPartition); + } + + Document srcDocument = Document.create(); + if (source != null) { + source.forEach(srcDocument::set); + } + stopPoints.put(srcDocument, new HistoryRecord(source, position)); + }); + return stopPoints; + } + + private void merge(Tables schema, Tables partialSchema) { + partialSchema.tableIds().forEach((tableId) -> schema.overwriteTable(partialSchema.forTable(tableId))); + } + + private Map, Map> getSnapshotOffsets(Offsets offsets) { + Map, Map> snapshotOffsets = new HashMap<>(); + offsets.forEach((Map.Entry entry) -> { + Map source = entry.getKey().getSourcePartition(); + Map position = null; + if (entry.getValue() != null) { + position = entry.getValue().getOffset(); + } + snapshotOffsets.put(source, snapshot.findClosest(source, position)); + }); + return snapshotOffsets; + } + + /** + * The code ignores records covered by schema snapshot - from the beginning to snapshot + * offsets (SO). Then it switches to the original processor which handles records from SO + * to connector offsets (CO). + * + * <--------------> : snapshot processor + * ---------------(SO)-----(CO)----> : schema history + * <-----> : original processor + * @param recovered the recovered schema history record + */ + @Override + public void accept(HistoryRecord recovered) { + listener.onChangeFromHistory(recovered); + Document srcDocument = recovered.document().getDocument(HistoryRecord.Fields.SOURCE); + if (stopPoints.containsKey(srcDocument) && !stopPoints.get(srcDocument).position().isEmpty() && comparator.isAtOrBefore(recovered, stopPoints.get(srcDocument))) { + // schema snapshot contains this record, skip processing + listener.onChangeApplied(recovered); + } + else { + originalProcessor.accept(recovered); + } + } + } +} diff --git a/debezium-core/src/main/java/io/debezium/relational/history/snapshot/AbstractSchemaHistorySnapshot.java b/debezium-core/src/main/java/io/debezium/relational/history/snapshot/AbstractSchemaHistorySnapshot.java new file mode 100644 index 00000000000..d404a264106 --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/relational/history/snapshot/AbstractSchemaHistorySnapshot.java @@ -0,0 +1,89 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.relational.history.snapshot; + +import java.util.List; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.debezium.config.Configuration; +import io.debezium.relational.Tables; +import io.debezium.relational.history.HistoryRecord; +import io.debezium.relational.history.HistoryRecordComparator; +import io.debezium.util.FunctionalReadWriteLock; + +public abstract class AbstractSchemaHistorySnapshot implements SchemaHistorySnapshot { + + protected final Logger logger = LoggerFactory.getLogger(getClass()); + + protected Configuration config; + protected HistoryRecordComparator comparator = HistoryRecordComparator.INSTANCE; + protected SchemaPartitioner schemaPartitioner = SchemaPartitioner.SINGLE_PARTITION; + private final FunctionalReadWriteLock lock = FunctionalReadWriteLock.reentrant(); + + protected AbstractSchemaHistorySnapshot() { + } + + @Override + public void configure(Configuration config, HistoryRecordComparator comparator, SchemaPartitioner schemaPartitioner) { + this.config = config; + this.comparator = comparator; + this.schemaPartitioner = schemaPartitioner; + } + + @Override + public void save(Map source, Map position, Tables schema) { + if (source == null || position == null || schema == null) { + return; + } + + lock.write(() -> doSave(source, position, getSchemaPartition(source, schema))); + } + + private Tables getSchemaPartition(Map source, Tables schema) { + if (schemaPartitioner == null) { + return schema; + } + return schema.subset(schemaPartitioner.fromSource(source)); + } + + @Override + public Tables read(Map source, Map position) { + if (source == null || position == null) { + return null; + } + return lock.read(() -> doRead(source, position)); + } + + @Override + public Map findClosest(Map source, Map position) { + if (source == null || position == null) { + return null; + } + HistoryRecord historyRecord = new HistoryRecord(source, position); + return lock.read(() -> findAll(source)) + .stream() + // snapshot position keys match given position keys + .filter((p) -> p.keySet().equals(position.keySet())) + // snapshot position is smaller than given position + .filter((p) -> comparator.isAtOrBefore(new HistoryRecord(source, p), historyRecord)) + // sort positions ASC and get last + .max((p1, p2) -> { + HistoryRecord h1 = new HistoryRecord(source, p1); + HistoryRecord h2 = new HistoryRecord(source, p2); + return comparator.isAtOrBefore(h1, h2) ? -1 : comparator.isAtOrBefore(h2, h1) ? 1 : 0; + }) + .orElse(null); + } + + protected abstract void doSave(Map source, Map position, Tables schema); + + protected abstract Tables doRead(Map source, Map position); + + protected abstract List> findAll(Map source); +} diff --git a/debezium-core/src/main/java/io/debezium/relational/history/snapshot/MemorySchemaHistorySnapshot.java b/debezium-core/src/main/java/io/debezium/relational/history/snapshot/MemorySchemaHistorySnapshot.java new file mode 100644 index 00000000000..4fa7e19e863 --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/relational/history/snapshot/MemorySchemaHistorySnapshot.java @@ -0,0 +1,40 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.relational.history.snapshot; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +import io.debezium.relational.Tables; + +public class MemorySchemaHistorySnapshot extends AbstractSchemaHistorySnapshot { + + private final Map, Map>, Tables> snapshots = new HashMap<>(); + + public MemorySchemaHistorySnapshot() { + } + + @Override + public void doSave(Map source, Map position, Tables schema) { + snapshots.put(Map.of(source, position), schema); + } + + @Override + public Tables doRead(Map source, Map position) { + return snapshots.get(Map.of(source, position)); + } + + @Override + protected List> findAll(Map source) { + return snapshots.keySet().stream() + .map((e) -> e.get(source)) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + } +} diff --git a/debezium-core/src/main/java/io/debezium/relational/history/snapshot/NoopSchemaHistorySnapshot.java b/debezium-core/src/main/java/io/debezium/relational/history/snapshot/NoopSchemaHistorySnapshot.java new file mode 100644 index 00000000000..e49c54e3c73 --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/relational/history/snapshot/NoopSchemaHistorySnapshot.java @@ -0,0 +1,35 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.relational.history.snapshot; + +import java.util.Map; + +import io.debezium.config.Configuration; +import io.debezium.relational.Tables; +import io.debezium.relational.history.HistoryRecordComparator; + +public class NoopSchemaHistorySnapshot implements SchemaHistorySnapshot { + + @Override + public void configure(Configuration config, HistoryRecordComparator comparator, SchemaPartitioner schemaPartitioner) { + + } + + @Override + public void save(Map source, Map position, Tables schema) { + + } + + @Override + public Tables read(Map source, Map position) { + return null; + } + + @Override + public Map findClosest(Map source, Map position) { + return null; + } +} diff --git a/debezium-core/src/main/java/io/debezium/relational/history/snapshot/SchemaHistorySnapshot.java b/debezium-core/src/main/java/io/debezium/relational/history/snapshot/SchemaHistorySnapshot.java new file mode 100644 index 00000000000..b76dc14e6fb --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/relational/history/snapshot/SchemaHistorySnapshot.java @@ -0,0 +1,26 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.relational.history.snapshot; + +import java.util.Map; + +import io.debezium.config.Configuration; +import io.debezium.relational.Tables; +import io.debezium.relational.history.HistoryRecordComparator; +import io.debezium.relational.history.SchemaHistory; + +public interface SchemaHistorySnapshot { + + String CONFIGURATION_FIELD_PREFIX_STRING = SchemaHistory.CONFIGURATION_FIELD_PREFIX_STRING + "snapshot."; + + void configure(Configuration config, HistoryRecordComparator comparator, SchemaPartitioner schemaPartitioner); + + void save(Map source, Map position, Tables schema); + + Tables read(Map source, Map position); + + Map findClosest(Map source, Map position); +} diff --git a/debezium-core/src/main/java/io/debezium/relational/history/snapshot/SchemaPartitioner.java b/debezium-core/src/main/java/io/debezium/relational/history/snapshot/SchemaPartitioner.java new file mode 100644 index 00000000000..04cfa6a93da --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/relational/history/snapshot/SchemaPartitioner.java @@ -0,0 +1,16 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.relational.history.snapshot; + +import java.util.Map; + +import io.debezium.relational.Tables.TableFilter; + +public interface SchemaPartitioner { + TableFilter fromSource(Map source); + + SchemaPartitioner SINGLE_PARTITION = source -> TableFilter.includeAll(); +} diff --git a/debezium-core/src/test/java/io/debezium/relational/history/SnapshotAwareSchemaHistoryTest.java b/debezium-core/src/test/java/io/debezium/relational/history/SnapshotAwareSchemaHistoryTest.java new file mode 100644 index 00000000000..abed697556e --- /dev/null +++ b/debezium-core/src/test/java/io/debezium/relational/history/SnapshotAwareSchemaHistoryTest.java @@ -0,0 +1,170 @@ +package io.debezium.relational.history; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import io.debezium.connector.SnapshotRecord; +import io.debezium.config.Configuration; +import io.debezium.pipeline.spi.OffsetContext; +import io.debezium.pipeline.spi.Offsets; +import io.debezium.pipeline.spi.Partition; +import io.debezium.pipeline.txmetadata.TransactionContext; +import io.debezium.relational.Tables; +import io.debezium.relational.history.snapshot.MemorySchemaHistorySnapshot; +import io.debezium.relational.history.snapshot.SchemaHistorySnapshot; +import io.debezium.relational.history.snapshot.SchemaPartitioner; +import io.debezium.spi.schema.DataCollectionId; + +public class SnapshotAwareSchemaHistoryTest { + + private SchemaHistorySnapshot historySnapshot; + private SchemaHistory delegate; + private SnapshotAwareSchemaHistory schemaHistory; + private final List recovered = new ArrayList<>(); + private final static Map SOURCE = Map.of("server", "dbserver1"); + + @Before + public void beforeEach() { + HistoryRecordComparator comparator = HistoryRecordComparator.INSTANCE; + historySnapshot = new MemorySchemaHistorySnapshot(); + historySnapshot.configure(Configuration.empty(), comparator, SchemaPartitioner.SINGLE_PARTITION); + + delegate = new MemorySchemaHistory(); + + schemaHistory = new SnapshotAwareSchemaHistory(delegate, historySnapshot); + schemaHistory.configure(Configuration.empty(), comparator, SchemaHistoryListener.NOOP, (o, s) -> recovered::add); + + recovered.clear(); + } + + @After + public void afterEach() { + } + + protected Partition partition() { + return () -> SOURCE; + } + + protected Map contextOffset(long pos) { + return Map.of("pos", pos); + } + + protected OffsetContext position(long pos) { + return new TestOffsetContext(contextOffset(pos)); + } + + protected Offsets offsets(long pos) { + return Offsets.of(partition(), position(pos)); + } + + protected void record(long pos, String ddl) { + try { + delegate.record(SOURCE, contextOffset(pos), "db", ddl); + } + catch (Throwable t) { + fail(t.getMessage()); + } + } + + @Test + public void shouldSkipRecordsRecoveredFromSnapshot() { + delegate.record(SOURCE, contextOffset(1), "db", "CREATE TABLE foo ( first VARCHAR(22) NOT NULL );"); + delegate.record(SOURCE, contextOffset(3), "db", "CREATE TABLE\nperson ( name VARCHAR(22) NOT NULL );"); + delegate.record(SOURCE, contextOffset(5), "db", "CREATE TABLE address\n( street VARCHAR(22) NOT NULL );"); + delegate.record(SOURCE, contextOffset(10), "db", "ALTER TABLE address ADD city VARCHAR(22) NOT NULL;"); + delegate.record(SOURCE, contextOffset(15), "db", "ALTER TABLE address ADD zip VARCHAR(22) NOT NULL;"); + + historySnapshot.save(SOURCE, contextOffset(4), new Tables()); + historySnapshot.save(SOURCE, contextOffset(8), new Tables()); + + schemaHistory.recover(offsets(1), new Tables()); + assertEquals(5, recovered.size()); + recovered.clear(); + + schemaHistory.recover(offsets(4), new Tables()); + assertEquals(3, recovered.size()); + recovered.clear(); + + schemaHistory.recover(offsets(6), new Tables()); + assertEquals(3, recovered.size()); + recovered.clear(); + + schemaHistory.recover(offsets(9), new Tables()); + assertEquals(2, recovered.size()); + recovered.clear(); + + schemaHistory.recover(offsets(20), new Tables()); + assertEquals(2, recovered.size()); + recovered.clear(); + } + + private static class TestOffsetContext implements OffsetContext { + private final Map offset; + + TestOffsetContext(Map offset) { + this.offset = offset; + } + + @Override + public Map getOffset() { + return offset; + } + + @Override + public Schema getSourceInfoSchema() { + return null; + } + + @Override + public Struct getSourceInfo() { + return null; + } + + @Override + public boolean isSnapshotRunning() { + return false; + } + + @Override + public void markSnapshotRecord(SnapshotRecord record) { + + } + + @Override + public void preSnapshotStart() { + + } + + @Override + public void preSnapshotCompletion() { + + } + + @Override + public void postSnapshotCompletion() { + + } + + @Override + public void event(DataCollectionId collectionId, Instant timestamp) { + + } + + @Override + public TransactionContext getTransactionContext() { + return null; + } + } +}