From ddf50bdb12d5b921536bb7487728573b20c497c1 Mon Sep 17 00:00:00 2001 From: Mikhail Kamornikau Date: Sun, 10 Dec 2023 13:04:28 +0100 Subject: [PATCH] CXP-2779: extracted HistoryRecordProcessor logic, added SnapshotAware SchemaHistory implementation, added SchemaHistorySnapshot with NOOP (default) and inmemory implementation --- .../connector/mysql/MySqlConnectorConfig.java | 6 + .../history/KafkaSchemaHistoryTest.java | 2 +- .../AbstractSchemaHistorySnapshotTest.java | 116 +++++++++++++ .../MemorySchemaHistorySnapshotTest.java | 7 + .../oracle/OracleConnectorConfig.java | 6 + .../sqlserver/SqlServerConnectorConfig.java | 10 ++ .../sqlserver/SqlServerConnectorIT.java | 5 +- ...izedRelationalDatabaseConnectorConfig.java | 33 +++- .../io/debezium/relational/ddl/DdlParser.java | 27 +++ .../history/AbstractSchemaHistory.java | 95 +--------- .../relational/history/HistoryRecord.java | 4 + .../history/HistoryRecordProcessor.java | 143 +++++++++++++++ .../HistoryRecordProcessorProvider.java | 16 ++ .../relational/history/SchemaHistory.java | 16 +- .../history/SnapshotAwareSchemaHistory.java | 164 ++++++++++++++++++ .../AbstractSchemaHistorySnapshot.java | 79 +++++++++ .../snapshot/MemorySchemaHistorySnapshot.java | 40 +++++ .../snapshot/SchemaHistorySnapshot.java | 49 ++++++ .../history/snapshot/SchemaPartitioner.java | 16 ++ .../SnapshotAwareSchemaHistoryTest.java | 89 ++++++++++ .../blob/history/AzureBlobSchemaHistory.java | 5 +- .../file/history/FileSchemaHistory.java | 5 +- .../jdbc/history/JdbcSchemaHistory.java | 5 +- .../kafka/history/KafkaSchemaHistory.java | 5 +- .../redis/history/RedisSchemaHistory.java | 5 +- .../history/RocketMqSchemaHistory.java | 5 +- .../storage/s3/history/S3SchemaHistory.java | 5 +- 27 files changed, 847 insertions(+), 111 deletions(-) create mode 100644 debezium-connector-mysql/src/test/java/io/debezium/relational/history/snapshot/AbstractSchemaHistorySnapshotTest.java create mode 100644 debezium-connector-mysql/src/test/java/io/debezium/relational/history/snapshot/MemorySchemaHistorySnapshotTest.java create mode 100644 debezium-core/src/main/java/io/debezium/relational/history/HistoryRecordProcessor.java create mode 100644 debezium-core/src/main/java/io/debezium/relational/history/HistoryRecordProcessorProvider.java create mode 100644 debezium-core/src/main/java/io/debezium/relational/history/SnapshotAwareSchemaHistory.java create mode 100644 debezium-core/src/main/java/io/debezium/relational/history/snapshot/AbstractSchemaHistorySnapshot.java create mode 100644 debezium-core/src/main/java/io/debezium/relational/history/snapshot/MemorySchemaHistorySnapshot.java create mode 100644 debezium-core/src/main/java/io/debezium/relational/history/snapshot/SchemaHistorySnapshot.java create mode 100644 debezium-core/src/main/java/io/debezium/relational/history/snapshot/SchemaPartitioner.java create mode 100644 debezium-core/src/test/java/io/debezium/relational/history/SnapshotAwareSchemaHistoryTest.java diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java index 81670c9cc54..22cb2b4e97a 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java @@ -34,6 +34,7 @@ import io.debezium.relational.TableId; import io.debezium.relational.Tables.TableFilter; import io.debezium.relational.history.HistoryRecordComparator; +import io.debezium.relational.history.snapshot.SchemaPartitioner; import io.debezium.schema.DefaultTopicNamingStrategy; import io.debezium.util.Collect; @@ -1164,6 +1165,11 @@ protected HistoryRecordComparator getHistoryRecordComparator() { return new MySqlHistoryRecordComparator(gtidSourceFilter()); } + @Override + protected SchemaPartitioner getSchemaPartitioner() { + return SchemaPartitioner.SINGLE_PARTITION; + } + public static boolean isBuiltInDatabase(String databaseName) { if (databaseName == null) { return false; diff --git a/debezium-connector-mysql/src/test/java/io/debezium/relational/history/KafkaSchemaHistoryTest.java b/debezium-connector-mysql/src/test/java/io/debezium/relational/history/KafkaSchemaHistoryTest.java index 47cdfadf34d..45a26fe3f43 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/relational/history/KafkaSchemaHistoryTest.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/relational/history/KafkaSchemaHistoryTest.java @@ -120,7 +120,7 @@ public void shouldStartWithEmptyTopicAndStoreDataAndRecoverAllState() throws Exc } private void testHistoryTopicContent(String topicName, boolean skipUnparseableDDL) { - interceptor = new LogInterceptor(KafkaSchemaHistory.class); + interceptor = new LogInterceptor(HistoryRecordProcessor.class); // Start up the history ... Configuration config = Configuration.create() .with(KafkaSchemaHistory.BOOTSTRAP_SERVERS, kafka.brokerList()) diff --git a/debezium-connector-mysql/src/test/java/io/debezium/relational/history/snapshot/AbstractSchemaHistorySnapshotTest.java b/debezium-connector-mysql/src/test/java/io/debezium/relational/history/snapshot/AbstractSchemaHistorySnapshotTest.java new file mode 100644 index 00000000000..d81dda5dd13 --- /dev/null +++ b/debezium-connector-mysql/src/test/java/io/debezium/relational/history/snapshot/AbstractSchemaHistorySnapshotTest.java @@ -0,0 +1,116 @@ +package io.debezium.relational.history.snapshot; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; + +import java.util.Map; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser; +import io.debezium.relational.Tables; +import io.debezium.relational.ddl.DdlParser; +import io.debezium.relational.history.MemorySchemaHistory; +import io.debezium.relational.history.SchemaHistory; + +public abstract class AbstractSchemaHistorySnapshotTest { + + protected SchemaHistorySnapshot snapshot; + protected SchemaHistory history; + protected Map source1; + protected Map source2; + protected Tables s1_t0, s1_t1, s1_t2, s1_t3; + protected Tables s2_t0, s2_t1, s2_t2, s2_t3; + protected DdlParser parser; + + @Before + public void beforeEach() { + history = new MemorySchemaHistory(); + snapshot = createHistorySnapshot(); + + source1 = server("abc"); + source2 = server("xyz"); + + parser = new MySqlAntlrDdlParser(); + s1_t0 = new Tables(); + s1_t1 = new Tables(); + s1_t2 = new Tables(); + s1_t3 = new Tables(); + + record(1, "CREATE TABLE foo ( first VARCHAR(22) NOT NULL );", s1_t3, s1_t2, s1_t1, s1_t0); + record(23, "CREATE TABLE\nperson ( name VARCHAR(22) NOT NULL );", s1_t3, s1_t2, s1_t1); + record(30, "CREATE TABLE address\n( street VARCHAR(22) NOT NULL );", s1_t3, s1_t2); + record(32, "ALTER TABLE address ADD city VARCHAR(22) NOT NULL;", s1_t3); + + snapshot.save(source1, position(32), s1_t3); + snapshot.save(source1, position(30), s1_t2); + snapshot.save(source1, position(23), s1_t1); + snapshot.save(source1, position(1), s1_t0); + + s2_t0 = new Tables(); + s2_t1 = new Tables(); + s2_t2 = new Tables(); + s2_t3 = new Tables(); + + record(2, "CREATE TABLE foo1 ( first VARCHAR(22) NOT NULL );", s2_t3, s2_t2, s2_t1, s2_t0); + record(12, "CREATE TABLE\nperson1 ( name VARCHAR(22) NOT NULL );", s2_t3, s2_t2, s2_t1); + record(16, "CREATE TABLE address1\n( street VARCHAR(22) NOT NULL );", s2_t3, s2_t2); + record(99, "ALTER TABLE address1 ADD city VARCHAR(22) NOT NULL;", s2_t3); + + snapshot.save(source2, position(55), s2_t3); + snapshot.save(source2, position(16), s2_t2); + snapshot.save(source2, position(12), s2_t1); + snapshot.save(source2, position(2), s2_t0); + } + + @After + public void afterEach() { + } + + protected abstract SchemaHistorySnapshot createHistorySnapshot(); + + protected Map server(String serverName) { + return Map.of("server", serverName); + } + + protected Map position(long pos) { + return Map.of("file", "mysql-bin-changelog.000011", "pos", pos); + } + + protected void record(long pos, String ddl, Tables... update) { + try { + history.record(source1, position(pos), "db", ddl); + } + catch (Throwable t) { + fail(t.getMessage()); + } + for (Tables tables : update) { + if (tables != null) { + parser.setCurrentSchema("db"); + parser.parse(ddl, tables); + } + } + } + + @Test + public void shouldRecordSnapshotsAndRecoverToClosest() { + assertNull(snapshot.read(source1, null)); + assertNull(snapshot.read(null, position(31))); + assertNull(snapshot.read(source1, position(31))); + assertEquals(s1_t2, snapshot.read(source1, position(30))); + + assertNull(snapshot.read(source2, position(30))); + assertEquals(s2_t1, snapshot.read(source2, position(12))); + + assertNull(snapshot.findClosest(null, position(31))); + assertNull(snapshot.findClosest(source1, null)); + assertEquals(position(30), snapshot.findClosest(source1, position(31))); + assertEquals(position(32), snapshot.findClosest(source1, position(99))); + + assertNull(snapshot.findClosest(source2, position(1))); + assertEquals(position(55), snapshot.findClosest(source2, position(99))); + } +} diff --git a/debezium-connector-mysql/src/test/java/io/debezium/relational/history/snapshot/MemorySchemaHistorySnapshotTest.java b/debezium-connector-mysql/src/test/java/io/debezium/relational/history/snapshot/MemorySchemaHistorySnapshotTest.java new file mode 100644 index 00000000000..7336cf0c961 --- /dev/null +++ b/debezium-connector-mysql/src/test/java/io/debezium/relational/history/snapshot/MemorySchemaHistorySnapshotTest.java @@ -0,0 +1,7 @@ +package io.debezium.relational.history.snapshot; + +public class MemorySchemaHistorySnapshotTest extends AbstractSchemaHistorySnapshotTest { + protected SchemaHistorySnapshot createHistorySnapshot() { + return new MemorySchemaHistorySnapshot(); + } +} diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java index 3bb4705d703..a757735bc83 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java @@ -47,6 +47,7 @@ import io.debezium.relational.TableId; import io.debezium.relational.Tables.TableFilter; import io.debezium.relational.history.HistoryRecordComparator; +import io.debezium.relational.history.snapshot.SchemaPartitioner; import io.debezium.util.Strings; /** @@ -829,6 +830,11 @@ protected HistoryRecordComparator getHistoryRecordComparator() { return getAdapter().getHistoryRecordComparator(); } + @Override + protected SchemaPartitioner getSchemaPartitioner() { + return SchemaPartitioner.SINGLE_PARTITION; + } + /** * Defines modes of representation of {@code interval} datatype */ diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java index 6353c65336f..a1553b2ea5a 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java @@ -33,6 +33,7 @@ import io.debezium.relational.TableId; import io.debezium.relational.Tables.TableFilter; import io.debezium.relational.history.HistoryRecordComparator; +import io.debezium.relational.history.snapshot.SchemaPartitioner; import io.debezium.spi.schema.DataCollectionId; import io.debezium.util.Strings; @@ -493,6 +494,15 @@ protected boolean isPositionAtOrBefore(Document recorded, Document desired) { }; } + @Override + protected SchemaPartitioner getSchemaPartitioner() { + return (source) -> { + return TableFilter.fromPredicate((tableId) -> { + return tableId.catalog().equals(((SqlServerPartition)source).getDatabaseName()); + }); + }; + } + @Override public String getContextName() { return Module.contextName(); diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java index 33469f99413..783aeb741a6 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java @@ -67,6 +67,7 @@ import io.debezium.relational.Tables; import io.debezium.relational.ddl.DdlParser; import io.debezium.relational.history.HistoryRecordComparator; +import io.debezium.relational.history.HistoryRecordProcessorProvider; import io.debezium.relational.history.SchemaHistory; import io.debezium.relational.history.SchemaHistoryException; import io.debezium.relational.history.SchemaHistoryListener; @@ -2907,8 +2908,8 @@ public boolean exists() { @Override public void configure(Configuration config, HistoryRecordComparator comparator, - SchemaHistoryListener listener, boolean useCatalogBeforeSchema) { - delegate.configure(config, comparator, listener, useCatalogBeforeSchema); + SchemaHistoryListener listener, HistoryRecordProcessorProvider processorProvider) { + delegate.configure(config, comparator, listener, processorProvider); } @Override diff --git a/debezium-core/src/main/java/io/debezium/relational/HistorizedRelationalDatabaseConnectorConfig.java b/debezium-core/src/main/java/io/debezium/relational/HistorizedRelationalDatabaseConnectorConfig.java index 4b0438096f6..b5cbc513c3c 100644 --- a/debezium-core/src/main/java/io/debezium/relational/HistorizedRelationalDatabaseConnectorConfig.java +++ b/debezium-core/src/main/java/io/debezium/relational/HistorizedRelationalDatabaseConnectorConfig.java @@ -21,8 +21,14 @@ import io.debezium.relational.Selectors.TableIdToStringMapper; import io.debezium.relational.Tables.TableFilter; import io.debezium.relational.history.HistoryRecordComparator; +import io.debezium.relational.history.HistoryRecordProcessor; +import io.debezium.relational.history.HistoryRecordProcessorProvider; import io.debezium.relational.history.SchemaHistory; +import io.debezium.relational.history.SchemaHistoryListener; import io.debezium.relational.history.SchemaHistoryMetrics; +import io.debezium.relational.history.SnapshotAwareSchemaHistory; +import io.debezium.relational.history.snapshot.SchemaHistorySnapshot; +import io.debezium.relational.history.snapshot.SchemaPartitioner; /** * Configuration options shared across the relational CDC connectors which use a persistent database schema history. @@ -34,6 +40,7 @@ public abstract class HistorizedRelationalDatabaseConnectorConfig extends Relati protected static final int DEFAULT_SNAPSHOT_FETCH_SIZE = 2_000; private static final String DEFAULT_SCHEMA_HISTORY = "io.debezium.storage.kafka.history.KafkaSchemaHistory"; + private static final String DEFAULT_SCHEMA_HISTORY_SNAPSHOT = "io.debezium.relational.history.snapshot.SchemaHistorySnapshot.NOOP"; private final boolean useCatalogBeforeSchema; private final Class connectorClass; @@ -58,6 +65,17 @@ public abstract class HistorizedRelationalDatabaseConnectorConfig extends Relati + SchemaHistory.CONFIGURATION_FIELD_PREFIX_STRING + "' string.") .withDefault(DEFAULT_SCHEMA_HISTORY); + public static final Field SCHEMA_HISTORY_SNAPSHOT = Field.create("schema.history.internal.snapshot") + .withDisplayName("Database schema history snapshot class") + .withType(Type.CLASS) + .withWidth(Width.LONG) + .withImportance(Importance.LOW) + .withInvisibleRecommender() + .withDescription("The name of the SchemaHistorySnapshot class that should be used to store and recover database schema. " + + "The configuration properties for the history snapshot are prefixed with the '" + + SchemaHistorySnapshot.CONFIGURATION_FIELD_PREFIX_STRING + "' string.") + .withDefault(DEFAULT_SCHEMA_HISTORY_SNAPSHOT); + public static final Field SKIP_UNPARSEABLE_DDL_STATEMENTS = SchemaHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS; public static final Field STORE_ONLY_CAPTURED_TABLES_DDL = SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL; @@ -133,9 +151,19 @@ public SchemaHistory getSchemaHistory() { .withDefault(SchemaHistory.INTERNAL_CONNECTOR_ID, logicalName) .build(); + SchemaHistorySnapshot schemaSnapshot = config.getInstance(SCHEMA_HISTORY_SNAPSHOT, SchemaHistorySnapshot.class); + if (schemaSnapshot == null) { + throw new ConnectException("Unable to instantiate the database schema history snapshot class " + + config.getString(SCHEMA_HISTORY_SNAPSHOT)); + } + + schemaHistory = new SnapshotAwareSchemaHistory(schemaHistory, schemaSnapshot, getSchemaPartitioner()); + HistoryRecordComparator historyComparator = getHistoryRecordComparator(); - schemaHistory.configure(schemaHistoryConfig, historyComparator, - new SchemaHistoryMetrics(this, multiPartitionMode()), useCatalogBeforeSchema()); // validates + SchemaHistoryListener historyListener = new SchemaHistoryMetrics(this, multiPartitionMode()); + HistoryRecordProcessorProvider processorProvider = (o, s, p) -> new HistoryRecordProcessor(o, s, p, schemaHistoryConfig, historyComparator, historyListener, + useCatalogBeforeSchema()); + schemaHistory.configure(schemaHistoryConfig, historyComparator, historyListener, processorProvider); // validates return schemaHistory; } @@ -177,4 +205,5 @@ public boolean storeOnlyCapturedDatabases() { */ protected abstract HistoryRecordComparator getHistoryRecordComparator(); + protected abstract SchemaPartitioner getSchemaPartitioner(); } diff --git a/debezium-core/src/main/java/io/debezium/relational/ddl/DdlParser.java b/debezium-core/src/main/java/io/debezium/relational/ddl/DdlParser.java index 1e87dd412d0..d528250fcd8 100644 --- a/debezium-core/src/main/java/io/debezium/relational/ddl/DdlParser.java +++ b/debezium-core/src/main/java/io/debezium/relational/ddl/DdlParser.java @@ -40,4 +40,31 @@ public interface DdlParser { DdlChanges getDdlChanges(); SystemVariables systemVariables(); + + DdlParser NOOP = new DdlParser() { + @Override + public void parse(String ddlContent, Tables databaseTables) { + + } + + @Override + public void setCurrentDatabase(String databaseName) { + + } + + @Override + public void setCurrentSchema(String schemaName) { + + } + + @Override + public DdlChanges getDdlChanges() { + return null; + } + + @Override + public SystemVariables systemVariables() { + return null; + } + }; } diff --git a/debezium-core/src/main/java/io/debezium/relational/history/AbstractSchemaHistory.java b/debezium-core/src/main/java/io/debezium/relational/history/AbstractSchemaHistory.java index bca0cc4af71..cc3218e64ea 100644 --- a/debezium-core/src/main/java/io/debezium/relational/history/AbstractSchemaHistory.java +++ b/debezium-core/src/main/java/io/debezium/relational/history/AbstractSchemaHistory.java @@ -6,27 +6,16 @@ package io.debezium.relational.history; import java.time.Instant; -import java.util.HashMap; import java.util.Map; import java.util.function.Consumer; -import java.util.function.Predicate; -import java.util.regex.Pattern; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.debezium.config.Configuration; import io.debezium.config.Field; -import io.debezium.document.Array; -import io.debezium.document.Document; -import io.debezium.function.Predicates; import io.debezium.relational.Tables; import io.debezium.relational.ddl.DdlParser; -import io.debezium.relational.history.TableChanges.TableChange; -import io.debezium.relational.history.TableChanges.TableChangeType; -import io.debezium.relational.history.TableChanges.TableChangesSerializer; -import io.debezium.text.MultipleParsingExceptions; -import io.debezium.text.ParsingException; import io.debezium.util.Clock; /** @@ -40,28 +29,17 @@ public abstract class AbstractSchemaHistory implements SchemaHistory { public static Field.Set ALL_FIELDS = Field.setOf(NAME, INTERNAL_CONNECTOR_CLASS, INTERNAL_CONNECTOR_ID); protected Configuration config; - private HistoryRecordComparator comparator = HistoryRecordComparator.INSTANCE; - private boolean skipUnparseableDDL; - private Predicate ddlFilter = x -> false; private SchemaHistoryListener listener = SchemaHistoryListener.NOOP; - private boolean useCatalogBeforeSchema; - private boolean preferDdl = false; - private final TableChangesSerializer tableChangesSerializer = new JsonTableChangeSerializer(); + private HistoryRecordProcessorProvider processorProvider = HistoryRecordProcessor::new; protected AbstractSchemaHistory() { } @Override - public void configure(Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, boolean useCatalogBeforeSchema) { + public void configure(Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, HistoryRecordProcessorProvider processorProvider) { this.config = config; - this.comparator = comparator != null ? comparator : HistoryRecordComparator.INSTANCE; - this.skipUnparseableDDL = config.getBoolean(SKIP_UNPARSEABLE_DDL_STATEMENTS); - - final String ddlFilter = config.getString(DDL_FILTER); - this.ddlFilter = (ddlFilter != null) ? Predicates.includes(ddlFilter, Pattern.CASE_INSENSITIVE | Pattern.DOTALL) : (x -> false); this.listener = listener; - this.useCatalogBeforeSchema = useCatalogBeforeSchema; - this.preferDdl = config.getBoolean(INTERNAL_PREFER_DDL); + this.processorProvider = processorProvider; } @Override @@ -88,72 +66,7 @@ public final void record(Map source, Map position, String @Override public void recover(Map, Map> offsets, Tables schema, DdlParser ddlParser) { listener.recoveryStarted(); - Map stopPoints = new HashMap<>(); - offsets.forEach((Map source, Map position) -> { - Document srcDocument = Document.create(); - if (source != null) { - source.forEach(srcDocument::set); - } - stopPoints.put(srcDocument, new HistoryRecord(source, position, null, null, null, null, null)); - }); - - recoverRecords(recovered -> { - listener.onChangeFromHistory(recovered); - Document srcDocument = recovered.document().getDocument(HistoryRecord.Fields.SOURCE); - if (stopPoints.containsKey(srcDocument) && comparator.isAtOrBefore(recovered, stopPoints.get(srcDocument))) { - Array tableChanges = recovered.tableChanges(); - String ddl = recovered.ddl(); - - if (!preferDdl && tableChanges != null && !tableChanges.isEmpty()) { - TableChanges changes = tableChangesSerializer.deserialize(tableChanges, useCatalogBeforeSchema); - for (TableChange entry : changes) { - if (entry.getType() == TableChangeType.CREATE) { - schema.overwriteTable(entry.getTable()); - } - else if (entry.getType() == TableChangeType.ALTER) { - if (entry.getPreviousId() != null) { - schema.removeTable(entry.getPreviousId()); - } - schema.overwriteTable(entry.getTable()); - } - // DROP - else { - schema.removeTable(entry.getId()); - } - } - listener.onChangeApplied(recovered); - } - else if (ddl != null && ddlParser != null) { - if (recovered.databaseName() != null) { - ddlParser.setCurrentDatabase(recovered.databaseName()); // may be null - } - if (recovered.schemaName() != null) { - ddlParser.setCurrentSchema(recovered.schemaName()); // may be null - } - if (ddlFilter.test(ddl)) { - logger.info("a DDL '{}' was filtered out of processing by regular expression '{}'", ddl, - config.getString(DDL_FILTER)); - return; - } - try { - logger.debug("Applying: {}", ddl); - ddlParser.parse(ddl, schema); - listener.onChangeApplied(recovered); - } - catch (final ParsingException | MultipleParsingExceptions e) { - if (skipUnparseableDDL) { - logger.warn("Ignoring unparseable statements '{}' stored in database schema history", ddl, e); - } - else { - throw e; - } - } - } - } - else { - logger.debug("Skipping: {}", recovered.ddl()); - } - }); + recoverRecords(processorProvider.get(offsets, schema, ddlParser)); listener.recoveryStopped(); } diff --git a/debezium-core/src/main/java/io/debezium/relational/history/HistoryRecord.java b/debezium-core/src/main/java/io/debezium/relational/history/HistoryRecord.java index 9be0baf7061..ae8d8a469e1 100644 --- a/debezium-core/src/main/java/io/debezium/relational/history/HistoryRecord.java +++ b/debezium-core/src/main/java/io/debezium/relational/history/HistoryRecord.java @@ -32,6 +32,10 @@ public HistoryRecord(Document document) { this.doc = document; } + public HistoryRecord(Map source, Map position) { + this(source, position, null, null, null, null, null); + } + public HistoryRecord(Map source, Map position, String databaseName, String schemaName, String ddl, TableChanges changes, Instant timestamp) { this.doc = Document.create(); diff --git a/debezium-core/src/main/java/io/debezium/relational/history/HistoryRecordProcessor.java b/debezium-core/src/main/java/io/debezium/relational/history/HistoryRecordProcessor.java new file mode 100644 index 00000000000..8c0c0187d44 --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/relational/history/HistoryRecordProcessor.java @@ -0,0 +1,143 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.relational.history; + +import static io.debezium.relational.history.SchemaHistory.DDL_FILTER; +import static io.debezium.relational.history.SchemaHistory.INTERNAL_PREFER_DDL; +import static io.debezium.relational.history.SchemaHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS; + +import java.util.HashMap; +import java.util.Map; +import java.util.function.Consumer; +import java.util.function.Predicate; +import java.util.regex.Pattern; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.debezium.config.Configuration; +import io.debezium.document.Array; +import io.debezium.document.Document; +import io.debezium.function.Predicates; +import io.debezium.relational.Tables; +import io.debezium.relational.ddl.DdlParser; +import io.debezium.relational.history.TableChanges.TableChangesSerializer; +import io.debezium.text.MultipleParsingExceptions; +import io.debezium.text.ParsingException; + +public class HistoryRecordProcessor implements Consumer { + + protected final Logger logger = LoggerFactory.getLogger(HistoryRecordProcessor.class); + + private final Map stopPoints; + private final Tables schema; + private final DdlParser ddlParser; + private final TableChangesSerializer tableChangesSerializer = new JsonTableChangeSerializer(); + private final Configuration config; + private final HistoryRecordComparator comparator; + private final SchemaHistoryListener listener; + private final boolean useCatalogBeforeSchema; + private boolean skipUnparseableDDL; + private boolean preferDdl; + private Predicate ddlFilter = x -> false; + + public HistoryRecordProcessor(Map, Map> offsets, Tables schema, DdlParser ddlParser) { + this(offsets, schema, ddlParser, null, null, null, false); + } + + public HistoryRecordProcessor( + Map, Map> offsets, Tables schema, DdlParser ddlParser, + Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, + boolean useCatalogBeforeSchema) { + this.stopPoints = getStopPoints(offsets); + this.schema = schema; + this.ddlParser = ddlParser; + + this.config = config; + this.comparator = comparator != null ? comparator : HistoryRecordComparator.INSTANCE; + this.listener = listener != null ? listener : SchemaHistoryListener.NOOP; + + this.useCatalogBeforeSchema = useCatalogBeforeSchema; + if (config != null) { + this.skipUnparseableDDL = config.getBoolean(SKIP_UNPARSEABLE_DDL_STATEMENTS); + this.preferDdl = config.getBoolean(INTERNAL_PREFER_DDL); + + final String ddlFilter = config.getString(DDL_FILTER); + this.ddlFilter = (ddlFilter != null) ? Predicates.includes(ddlFilter, Pattern.CASE_INSENSITIVE | Pattern.DOTALL) : (x -> false); + } + } + + private Map getStopPoints(Map, Map> offsets) { + Map stopPoints = new HashMap<>(); + offsets.forEach((Map source, Map position) -> { + Document srcDocument = Document.create(); + if (source != null) { + source.forEach(srcDocument::set); + } + stopPoints.put(srcDocument, new HistoryRecord(source, position)); + }); + return stopPoints; + } + + @Override + public void accept(HistoryRecord recovered) { + listener.onChangeFromHistory(recovered); + Document srcDocument = recovered.document().getDocument(HistoryRecord.Fields.SOURCE); + if (stopPoints.containsKey(srcDocument) && comparator.isAtOrBefore(recovered, stopPoints.get(srcDocument))) { + Array tableChanges = recovered.tableChanges(); + String ddl = recovered.ddl(); + + if (!preferDdl && tableChanges != null && !tableChanges.isEmpty()) { + TableChanges changes = tableChangesSerializer.deserialize(tableChanges, useCatalogBeforeSchema); + for (TableChanges.TableChange entry : changes) { + if (entry.getType() == TableChanges.TableChangeType.CREATE) { + schema.overwriteTable(entry.getTable()); + } + else if (entry.getType() == TableChanges.TableChangeType.ALTER) { + if (entry.getPreviousId() != null) { + schema.removeTable(entry.getPreviousId()); + } + schema.overwriteTable(entry.getTable()); + } + // DROP + else { + schema.removeTable(entry.getId()); + } + } + listener.onChangeApplied(recovered); + } + else if (ddl != null && ddlParser != null) { + if (recovered.databaseName() != null) { + ddlParser.setCurrentDatabase(recovered.databaseName()); // may be null + } + if (recovered.schemaName() != null) { + ddlParser.setCurrentSchema(recovered.schemaName()); // may be null + } + if (ddlFilter.test(ddl)) { + logger.info("a DDL '{}' was filtered out of processing by regular expression '{}'", ddl, + config.getString(DDL_FILTER)); + return; + } + try { + logger.debug("Applying: {}", ddl); + ddlParser.parse(ddl, schema); + listener.onChangeApplied(recovered); + } + catch (final ParsingException | MultipleParsingExceptions e) { + if (skipUnparseableDDL) { + logger.warn("Ignoring unparseable statements '{}' stored in database schema history", ddl, e); + } + else { + throw e; + } + } + } + } + else { + logger.debug("Skipping: {}", recovered.ddl()); + } + } +} diff --git a/debezium-core/src/main/java/io/debezium/relational/history/HistoryRecordProcessorProvider.java b/debezium-core/src/main/java/io/debezium/relational/history/HistoryRecordProcessorProvider.java new file mode 100644 index 00000000000..3ff52936ed3 --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/relational/history/HistoryRecordProcessorProvider.java @@ -0,0 +1,16 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.relational.history; + +import java.util.Map; +import java.util.function.Consumer; + +import io.debezium.relational.Tables; +import io.debezium.relational.ddl.DdlParser; + +public interface HistoryRecordProcessorProvider { + Consumer get(Map, Map> offsets, Tables schema, DdlParser ddlParser); +} diff --git a/debezium-core/src/main/java/io/debezium/relational/history/SchemaHistory.java b/debezium-core/src/main/java/io/debezium/relational/history/SchemaHistory.java index 5a650102980..bff8b83339e 100644 --- a/debezium-core/src/main/java/io/debezium/relational/history/SchemaHistory.java +++ b/debezium-core/src/main/java/io/debezium/relational/history/SchemaHistory.java @@ -138,7 +138,21 @@ public interface SchemaHistory { the catalog and the second as the table name, or false if the first should be used as the schema and the second as the table name */ - void configure(Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, boolean useCatalogBeforeSchema); + default void configure(Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, boolean useCatalogBeforeSchema) { + configure(config, comparator, listener, (o, s, p) -> new HistoryRecordProcessor(o, s, p, config, comparator, listener, useCatalogBeforeSchema)); + } + + /** + * Configure this instance. + * + * @param config the configuration for this history store + * @param comparator the function that should be used to compare history records during + * {@link #recover(Map, Map, Tables, DdlParser) recovery}; may be null if the + * {@link HistoryRecordComparator#INSTANCE default comparator} is to be used + * @param listener TODO + * @param processorProvider TODO + */ + void configure(Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, HistoryRecordProcessorProvider processorProvider); /** * Start the history. diff --git a/debezium-core/src/main/java/io/debezium/relational/history/SnapshotAwareSchemaHistory.java b/debezium-core/src/main/java/io/debezium/relational/history/SnapshotAwareSchemaHistory.java new file mode 100644 index 00000000000..567b5948b5f --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/relational/history/SnapshotAwareSchemaHistory.java @@ -0,0 +1,164 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.relational.history; + +import java.time.Instant; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Consumer; + +import io.debezium.config.Configuration; +import io.debezium.document.Document; +import io.debezium.relational.Tables; +import io.debezium.relational.ddl.DdlParser; +import io.debezium.relational.history.snapshot.SchemaHistorySnapshot; +import io.debezium.relational.history.snapshot.SchemaPartitioner; +import io.debezium.util.Clock; + +public class SnapshotAwareSchemaHistory implements SchemaHistory { + + private final SchemaHistory delegate; + private final SchemaPartitioner schemaPartitioner; + private SchemaHistorySnapshot snapshot = SchemaHistorySnapshot.NOOP; + private Tables schemaToSave = null; + + public SnapshotAwareSchemaHistory(SchemaHistory delegate, SchemaHistorySnapshot snapshot, SchemaPartitioner schemaPartitioner) { + this.delegate = delegate; + this.snapshot = snapshot; + this.schemaPartitioner = schemaPartitioner; + } + + @Override + public void configure(Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, HistoryRecordProcessorProvider processorProvider) { + Configuration schemaHistoryConfig = config.subset(SchemaHistorySnapshot.CONFIGURATION_FIELD_PREFIX_STRING, false); + snapshot.configure(schemaHistoryConfig, comparator); + delegate.configure(config, comparator, listener, (o, s, p) -> new HistoryRecordProcessor(o, s, p, comparator, listener, snapshot, processorProvider)); + } + + @Override + public void start() { + delegate.start(); + } + + @Override + public void record(Map source, Map position, String databaseName, String ddl) throws SchemaHistoryException { + record(source, position, databaseName, null, ddl, null, Clock.SYSTEM.currentTimeAsInstant()); + } + + @Override + public void record(Map source, Map position, String databaseName, String schemaName, String ddl, TableChanges changes, Instant timestamp) + throws SchemaHistoryException { + delegate.record(source, position, databaseName, schemaName, ddl, changes, timestamp); + + if (schemaToSave != null) { + snapshot.save(source, position, getSchemaPartition(source, schemaToSave)); + } + } + + private Tables getSchemaPartition(Map source, Tables schema) { + if (schemaPartitioner == null) { + return schema; + } + return schema.subset(schemaPartitioner.fromSource(source)); + } + + @Override + public void recover(Map, Map> offsets, Tables schema, DdlParser ddlParser) { + delegate.recover(offsets, schema, ddlParser); + + // once recovered the schema can be used to create snapshots + schemaToSave = schema; + } + + @Override + public void stop() { + delegate.stop(); + } + + @Override + public boolean exists() { + return delegate.exists(); + } + + @Override + public boolean storageExists() { + return delegate.storageExists(); + } + + @Override + public void initializeStorage() { + delegate.initializeStorage(); + } + + private static class HistoryRecordProcessor implements Consumer { + + private final HistoryRecordComparator comparator; + private final SchemaHistoryListener listener; + private final SchemaHistorySnapshot snapshot; + private final Consumer originalProcessor; + private final Map stopPoints; + + public HistoryRecordProcessor(Map, Map> offsets, Tables schema, DdlParser ddlParser, HistoryRecordComparator comparator, SchemaHistoryListener listener, SchemaHistorySnapshot snapshot, HistoryRecordProcessorProvider processorProvider) { + this.comparator = comparator; + this.listener = listener; + this.snapshot = snapshot; + this.stopPoints = getStopPoints(offsets, schema); + this.originalProcessor = processorProvider.get(offsets, schema, ddlParser); + } + + private Map getStopPoints(Map, Map> offsets, Tables schema) { + Map stopPoints = new HashMap<>(); + getSnapshotOffsets(offsets).forEach((Map source, Map position) -> { + Tables schemaPartition = snapshot.read(source, position); + if (schemaPartition != null) { + merge(schema, schemaPartition); + } + + Document srcDocument = Document.create(); + if (source != null) { + source.forEach(srcDocument::set); + } + stopPoints.put(srcDocument, new HistoryRecord(source, position)); + }); + return stopPoints; + } + + private void merge(Tables schema, Tables partialSchema) { + partialSchema.tableIds().forEach((tableId) -> schema.overwriteTable(partialSchema.forTable(tableId))); + } + + private Map, Map> getSnapshotOffsets(Map, Map> offsets) { + Map, Map> snapshotOffsets = new HashMap<>(); + offsets.forEach((Map source, Map position) -> { + snapshotOffsets.put(source, snapshot.findClosest(source, position)); + }); + return snapshotOffsets; + } + + /** + * The code ignores records covered by schema snapshot - from the beginning to snapshot + * offsets (SO). Then it switches to the original processor which handles records from SO + * to connector offsets (CO). + * + * <--------------> : snapshot processor + * ---------------(SO)-----(CO)----> : schema history + * <-----> : original processor + * @param recovered the recovered schema history record + */ + @Override + public void accept(HistoryRecord recovered) { + listener.onChangeFromHistory(recovered); + Document srcDocument = recovered.document().getDocument(HistoryRecord.Fields.SOURCE); + if (stopPoints.containsKey(srcDocument) && !stopPoints.get(srcDocument).position().isEmpty() && comparator.isAtOrBefore(recovered, stopPoints.get(srcDocument))) { + // schema snapshot contains this record, skip processing + listener.onChangeApplied(recovered); + } + else { + originalProcessor.accept(recovered); + } + } + } +} diff --git a/debezium-core/src/main/java/io/debezium/relational/history/snapshot/AbstractSchemaHistorySnapshot.java b/debezium-core/src/main/java/io/debezium/relational/history/snapshot/AbstractSchemaHistorySnapshot.java new file mode 100644 index 00000000000..5ce0e8de9f0 --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/relational/history/snapshot/AbstractSchemaHistorySnapshot.java @@ -0,0 +1,79 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.relational.history.snapshot; + +import java.util.List; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.debezium.config.Configuration; +import io.debezium.relational.Tables; +import io.debezium.relational.history.HistoryRecord; +import io.debezium.relational.history.HistoryRecordComparator; +import io.debezium.util.FunctionalReadWriteLock; + +public abstract class AbstractSchemaHistorySnapshot implements SchemaHistorySnapshot { + + protected final Logger logger = LoggerFactory.getLogger(getClass()); + + protected Configuration config; + private HistoryRecordComparator comparator = HistoryRecordComparator.INSTANCE; + private final FunctionalReadWriteLock lock = FunctionalReadWriteLock.reentrant(); + + protected AbstractSchemaHistorySnapshot() { + } + + @Override + public void configure(Configuration config, HistoryRecordComparator comparator) { + this.config = config; + this.comparator = comparator; + } + + @Override + public void save(Map source, Map position, Tables schema) { + if (source == null || position == null || schema == null) { + return; + } + lock.write(() -> doSave(source, position, schema)); + } + + @Override + public Tables read(Map source, Map position) { + if (source == null || position == null) { + return null; + } + return lock.read(() -> doRead(source, position)); + } + + @Override + public Map findClosest(Map source, Map position) { + if (source == null || position == null) { + return null; + } + HistoryRecord historyRecord = new HistoryRecord(source, position); + return lock.read(() -> findAll(source)) + .stream() + // snapshot position keys match given position keys + .filter((p) -> p.keySet().equals(position.keySet())) + // snapshot position is smaller than given position + .filter((p) -> comparator.isAtOrBefore(new HistoryRecord(source, p), historyRecord)) + // sort positions ASC and get last + .max((p1, p2) -> { + HistoryRecord h1 = new HistoryRecord(source, p1); + HistoryRecord h2 = new HistoryRecord(source, p2); + return comparator.isAtOrBefore(h1, h2) ? -1 : comparator.isAtOrBefore(h2, h1) ? 1 : 0; + }) + .orElse(null); + } + + protected abstract void doSave(Map source, Map position, Tables schema); + + protected abstract Tables doRead(Map source, Map position); + + protected abstract List> findAll(Map source); +} diff --git a/debezium-core/src/main/java/io/debezium/relational/history/snapshot/MemorySchemaHistorySnapshot.java b/debezium-core/src/main/java/io/debezium/relational/history/snapshot/MemorySchemaHistorySnapshot.java new file mode 100644 index 00000000000..31fc8e7a7d4 --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/relational/history/snapshot/MemorySchemaHistorySnapshot.java @@ -0,0 +1,40 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.relational.history.snapshot; + +import io.debezium.relational.Tables; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +public class MemorySchemaHistorySnapshot extends AbstractSchemaHistorySnapshot { + + private final Map, Map>, Tables> snapshots = new HashMap<>(); + + public MemorySchemaHistorySnapshot() { + } + + @Override + public void doSave(Map source, Map position, Tables schema) { + snapshots.put(Map.of(source, position), schema); + } + + @Override + public Tables doRead(Map source, Map position) { + return snapshots.get(Map.of(source, position)); + } + + @Override + protected List> findAll(Map source) { + return snapshots.keySet().stream() + .map((e) -> e.get(source)) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + } +} diff --git a/debezium-core/src/main/java/io/debezium/relational/history/snapshot/SchemaHistorySnapshot.java b/debezium-core/src/main/java/io/debezium/relational/history/snapshot/SchemaHistorySnapshot.java new file mode 100644 index 00000000000..3bf6067b18b --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/relational/history/snapshot/SchemaHistorySnapshot.java @@ -0,0 +1,49 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.relational.history.snapshot; + +import io.debezium.config.Configuration; +import io.debezium.relational.Tables; +import io.debezium.relational.history.HistoryRecordComparator; +import io.debezium.relational.history.SchemaHistory; + +import java.util.List; +import java.util.Map; + +public interface SchemaHistorySnapshot { + + String CONFIGURATION_FIELD_PREFIX_STRING = SchemaHistory.CONFIGURATION_FIELD_PREFIX_STRING + "snapshot."; + + void configure(Configuration config, HistoryRecordComparator comparator); + + void save(Map source, Map position, Tables schema); + + Tables read(Map source, Map position); + + Map findClosest(Map source, Map position); + + SchemaHistorySnapshot NOOP = new SchemaHistorySnapshot() { + @Override + public void configure(Configuration config, HistoryRecordComparator comparator) { + + } + + @Override + public void save(Map source, Map position, Tables schema) { + + } + + @Override + public Tables read(Map source, Map position) { + return null; + } + + @Override + public Map findClosest(Map source, Map position) { + return null; + } + }; +} diff --git a/debezium-core/src/main/java/io/debezium/relational/history/snapshot/SchemaPartitioner.java b/debezium-core/src/main/java/io/debezium/relational/history/snapshot/SchemaPartitioner.java new file mode 100644 index 00000000000..04cfa6a93da --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/relational/history/snapshot/SchemaPartitioner.java @@ -0,0 +1,16 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.relational.history.snapshot; + +import java.util.Map; + +import io.debezium.relational.Tables.TableFilter; + +public interface SchemaPartitioner { + TableFilter fromSource(Map source); + + SchemaPartitioner SINGLE_PARTITION = source -> TableFilter.includeAll(); +} diff --git a/debezium-core/src/test/java/io/debezium/relational/history/SnapshotAwareSchemaHistoryTest.java b/debezium-core/src/test/java/io/debezium/relational/history/SnapshotAwareSchemaHistoryTest.java new file mode 100644 index 00000000000..7c0b6b37230 --- /dev/null +++ b/debezium-core/src/test/java/io/debezium/relational/history/SnapshotAwareSchemaHistoryTest.java @@ -0,0 +1,89 @@ +package io.debezium.relational.history; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import io.debezium.config.Configuration; +import io.debezium.relational.SystemVariables; +import io.debezium.relational.Tables; +import io.debezium.relational.ddl.DdlChanges; +import io.debezium.relational.ddl.DdlParser; +import io.debezium.relational.history.snapshot.MemorySchemaHistorySnapshot; +import io.debezium.relational.history.snapshot.SchemaHistorySnapshot; +import io.debezium.relational.history.snapshot.SchemaPartitioner; + +public class SnapshotAwareSchemaHistoryTest { + + private SchemaHistorySnapshot historySnapshot; + private SchemaHistory delegate; + private List recovered = new ArrayList<>(); + private Map source = Map.of("server", "xyz"); + + @Before + public void beforeEach() { + delegate = new MemorySchemaHistory(); + historySnapshot = new MemorySchemaHistorySnapshot(); + recovered.clear(); + } + + @After + public void afterEach() { + } + + protected Map position(long pos) { + return Map.of("pos", pos); + } + + protected void record(long pos, String ddl) { + try { + delegate.record(source, position(pos), "db", ddl); + } + catch (Throwable t) { + fail(t.getMessage()); + } + } + + @Test + public void shouldSkipRecordsRecoveredFromSnapshot() { + SchemaHistory schemaHistory = new SnapshotAwareSchemaHistory(delegate, historySnapshot, SchemaPartitioner.SINGLE_PARTITION); + schemaHistory.configure(Configuration.empty(), HistoryRecordComparator.INSTANCE, SchemaHistoryListener.NOOP, (o, s, p) -> (r) -> recovered.add(r)); + + delegate.record(source, position(1), "db", "CREATE TABLE foo ( first VARCHAR(22) NOT NULL );"); + delegate.record(source, position(3), "db", "CREATE TABLE\nperson ( name VARCHAR(22) NOT NULL );"); + delegate.record(source, position(5), "db", "CREATE TABLE address\n( street VARCHAR(22) NOT NULL );"); + delegate.record(source, position(10), "db", "ALTER TABLE address ADD city VARCHAR(22) NOT NULL;"); + delegate.record(source, position(15), "db", "ALTER TABLE address ADD zip VARCHAR(22) NOT NULL;"); + + historySnapshot.save(source, position(4), new Tables()); + historySnapshot.save(source, position(8), new Tables()); + + DdlParser noopParser = DdlParser.NOOP; + schemaHistory.recover(source, position(1), new Tables(), noopParser); + assertEquals(5, recovered.stream().count()); + recovered.clear(); + + schemaHistory.recover(source, position(4), new Tables(), noopParser); + assertEquals(3, recovered.stream().count()); + recovered.clear(); + + schemaHistory.recover(source, position(6), new Tables(), noopParser); + assertEquals(3, recovered.stream().count()); + recovered.clear(); + + schemaHistory.recover(source, position(9), new Tables(), noopParser); + assertEquals(2, recovered.stream().count()); + recovered.clear(); + + schemaHistory.recover(source, position(20), new Tables(), noopParser); + assertEquals(2, recovered.stream().count()); + recovered.clear(); + } +} \ No newline at end of file diff --git a/debezium-storage/debezium-storage-azure-blob/src/main/java/io/debezium/storage/azure/blob/history/AzureBlobSchemaHistory.java b/debezium-storage/debezium-storage-azure-blob/src/main/java/io/debezium/storage/azure/blob/history/AzureBlobSchemaHistory.java index 78cb185d7a3..d888c91e523 100644 --- a/debezium-storage/debezium-storage-azure-blob/src/main/java/io/debezium/storage/azure/blob/history/AzureBlobSchemaHistory.java +++ b/debezium-storage/debezium-storage-azure-blob/src/main/java/io/debezium/storage/azure/blob/history/AzureBlobSchemaHistory.java @@ -23,6 +23,7 @@ import io.debezium.relational.history.AbstractFileBasedSchemaHistory; import io.debezium.relational.history.HistoryRecord; import io.debezium.relational.history.HistoryRecordComparator; +import io.debezium.relational.history.HistoryRecordProcessorProvider; import io.debezium.relational.history.SchemaHistory; import io.debezium.relational.history.SchemaHistoryException; import io.debezium.relational.history.SchemaHistoryListener; @@ -74,8 +75,8 @@ public class AzureBlobSchemaHistory extends AbstractFileBasedSchemaHistory { private String blobName; @Override - public void configure(Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, boolean useCatalogBeforeSchema) { - super.configure(config, comparator, listener, useCatalogBeforeSchema); + public void configure(Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, HistoryRecordProcessorProvider processorProvider) { + super.configure(config, comparator, listener, processorProvider); if (!config.validateAndRecord(ALL_FIELDS, LOGGER::error)) { throw new DebeziumException( "Error configuring an instance of " + getClass().getSimpleName() + "; check the logs for details"); diff --git a/debezium-storage/debezium-storage-file/src/main/java/io/debezium/storage/file/history/FileSchemaHistory.java b/debezium-storage/debezium-storage-file/src/main/java/io/debezium/storage/file/history/FileSchemaHistory.java index 8ac0f2ad41b..5bac157edbe 100644 --- a/debezium-storage/debezium-storage-file/src/main/java/io/debezium/storage/file/history/FileSchemaHistory.java +++ b/debezium-storage/debezium-storage-file/src/main/java/io/debezium/storage/file/history/FileSchemaHistory.java @@ -24,6 +24,7 @@ import io.debezium.relational.history.AbstractFileBasedSchemaHistory; import io.debezium.relational.history.HistoryRecord; import io.debezium.relational.history.HistoryRecordComparator; +import io.debezium.relational.history.HistoryRecordProcessorProvider; import io.debezium.relational.history.SchemaHistory; import io.debezium.relational.history.SchemaHistoryException; import io.debezium.relational.history.SchemaHistoryListener; @@ -47,7 +48,7 @@ public final class FileSchemaHistory extends AbstractFileBasedSchemaHistory { private Path path; @Override - public void configure(Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, boolean useCatalogBeforeSchema) { + public void configure(Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, HistoryRecordProcessorProvider processorProvider) { if (!config.validateAndRecord(ALL_FIELDS, logger::error)) { throw new DebeziumException( "Error configuring an instance of " + getClass().getSimpleName() + "; check the logs for details"); @@ -55,7 +56,7 @@ public void configure(Configuration config, HistoryRecordComparator comparator, if (running.get()) { throw new SchemaHistoryException("Database schema history file already initialized to " + path); } - super.configure(config, comparator, listener, useCatalogBeforeSchema); + super.configure(config, comparator, listener, processorProvider); path = Paths.get(config.getString(FILE_PATH)); } diff --git a/debezium-storage/debezium-storage-jdbc/src/main/java/io/debezium/storage/jdbc/history/JdbcSchemaHistory.java b/debezium-storage/debezium-storage-jdbc/src/main/java/io/debezium/storage/jdbc/history/JdbcSchemaHistory.java index 82165cc4b60..3ee0697e318 100644 --- a/debezium-storage/debezium-storage-jdbc/src/main/java/io/debezium/storage/jdbc/history/JdbcSchemaHistory.java +++ b/debezium-storage/debezium-storage-jdbc/src/main/java/io/debezium/storage/jdbc/history/JdbcSchemaHistory.java @@ -33,6 +33,7 @@ import io.debezium.relational.history.AbstractSchemaHistory; import io.debezium.relational.history.HistoryRecord; import io.debezium.relational.history.HistoryRecordComparator; +import io.debezium.relational.history.HistoryRecordProcessorProvider; import io.debezium.relational.history.SchemaHistory; import io.debezium.relational.history.SchemaHistoryException; import io.debezium.relational.history.SchemaHistoryListener; @@ -59,12 +60,12 @@ public final class JdbcSchemaHistory extends AbstractSchemaHistory { private JdbcSchemaHistoryConfig config; @Override - public void configure(Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, boolean useCatalogBeforeSchema) { + public void configure(Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, HistoryRecordProcessorProvider processorProvider) { this.config = new JdbcSchemaHistoryConfig(config); if (running.get()) { throw new IllegalStateException("Database history already initialized db: " + this.config.getJdbcUrl()); } - super.configure(config, comparator, listener, useCatalogBeforeSchema); + super.configure(config, comparator, listener, processorProvider); try { conn = DriverManager.getConnection(this.config.getJdbcUrl(), this.config.getUser(), this.config.getPassword()); diff --git a/debezium-storage/debezium-storage-kafka/src/main/java/io/debezium/storage/kafka/history/KafkaSchemaHistory.java b/debezium-storage/debezium-storage-kafka/src/main/java/io/debezium/storage/kafka/history/KafkaSchemaHistory.java index 75dc28eba4f..3d0a708a153 100644 --- a/debezium-storage/debezium-storage-kafka/src/main/java/io/debezium/storage/kafka/history/KafkaSchemaHistory.java +++ b/debezium-storage/debezium-storage-kafka/src/main/java/io/debezium/storage/kafka/history/KafkaSchemaHistory.java @@ -63,6 +63,7 @@ import io.debezium.relational.history.AbstractSchemaHistory; import io.debezium.relational.history.HistoryRecord; import io.debezium.relational.history.HistoryRecordComparator; +import io.debezium.relational.history.HistoryRecordProcessorProvider; import io.debezium.relational.history.SchemaHistory; import io.debezium.relational.history.SchemaHistoryException; import io.debezium.relational.history.SchemaHistoryListener; @@ -202,8 +203,8 @@ private static boolean hasNewTopicConstructorWithOptionals() { } @Override - public void configure(Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, boolean useCatalogBeforeSchema) { - super.configure(config, comparator, listener, useCatalogBeforeSchema); + public void configure(Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, HistoryRecordProcessorProvider processorProvider) { + super.configure(config, comparator, listener, processorProvider); if (!config.validateAndRecord(ALL_FIELDS, LOGGER::error)) { throw new ConnectException("Error configuring an instance of " + getClass().getSimpleName() + "; check the logs for details"); } diff --git a/debezium-storage/debezium-storage-redis/src/main/java/io/debezium/storage/redis/history/RedisSchemaHistory.java b/debezium-storage/debezium-storage-redis/src/main/java/io/debezium/storage/redis/history/RedisSchemaHistory.java index 5067dea0d44..baae47added 100644 --- a/debezium-storage/debezium-storage-redis/src/main/java/io/debezium/storage/redis/history/RedisSchemaHistory.java +++ b/debezium-storage/debezium-storage-redis/src/main/java/io/debezium/storage/redis/history/RedisSchemaHistory.java @@ -24,6 +24,7 @@ import io.debezium.relational.history.AbstractSchemaHistory; import io.debezium.relational.history.HistoryRecord; import io.debezium.relational.history.HistoryRecordComparator; +import io.debezium.relational.history.HistoryRecordProcessorProvider; import io.debezium.relational.history.SchemaHistory; import io.debezium.relational.history.SchemaHistoryException; import io.debezium.relational.history.SchemaHistoryListener; @@ -61,11 +62,11 @@ void connect() { } @Override - public void configure(Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, boolean useCatalogBeforeSchema) { + public void configure(Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, HistoryRecordProcessorProvider processorProvider) { this.config = new RedisSchemaHistoryConfig(config); this.initialRetryDelay = Duration.ofMillis(this.config.getInitialRetryDelay()); this.maxRetryDelay = Duration.ofMillis(this.config.getMaxRetryDelay()); - super.configure(config, comparator, listener, useCatalogBeforeSchema); + super.configure(config, comparator, listener, processorProvider); } @Override diff --git a/debezium-storage/debezium-storage-rocketmq/src/main/java/io/debezium/storage/rocketmq/history/RocketMqSchemaHistory.java b/debezium-storage/debezium-storage-rocketmq/src/main/java/io/debezium/storage/rocketmq/history/RocketMqSchemaHistory.java index 8a39ec84648..b68772ee6dc 100644 --- a/debezium-storage/debezium-storage-rocketmq/src/main/java/io/debezium/storage/rocketmq/history/RocketMqSchemaHistory.java +++ b/debezium-storage/debezium-storage-rocketmq/src/main/java/io/debezium/storage/rocketmq/history/RocketMqSchemaHistory.java @@ -38,6 +38,7 @@ import io.debezium.relational.history.AbstractSchemaHistory; import io.debezium.relational.history.HistoryRecord; import io.debezium.relational.history.HistoryRecordComparator; +import io.debezium.relational.history.HistoryRecordProcessorProvider; import io.debezium.relational.history.SchemaHistory; import io.debezium.relational.history.SchemaHistoryException; import io.debezium.relational.history.SchemaHistoryListener; @@ -133,8 +134,8 @@ private static Field.Validator forRocketMq(final Field.Validator validator) { } @Override - public void configure(Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, boolean useCatalogBeforeSchema) { - super.configure(config, comparator, listener, useCatalogBeforeSchema); + public void configure(Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, HistoryRecordProcessorProvider processorProvider) { + super.configure(config, comparator, listener, processorProvider); this.topicName = config.getString(TOPIC); this.dbHistoryName = config.getString(SchemaHistory.NAME, UUID.randomUUID().toString()); this.maxRecoveryAttempts = config.getInteger(RECOVERY_POLL_ATTEMPTS); diff --git a/debezium-storage/debezium-storage-s3/src/main/java/io/debezium/storage/s3/history/S3SchemaHistory.java b/debezium-storage/debezium-storage-s3/src/main/java/io/debezium/storage/s3/history/S3SchemaHistory.java index 1e856b1361c..df5f871132a 100644 --- a/debezium-storage/debezium-storage-s3/src/main/java/io/debezium/storage/s3/history/S3SchemaHistory.java +++ b/debezium-storage/debezium-storage-s3/src/main/java/io/debezium/storage/s3/history/S3SchemaHistory.java @@ -22,6 +22,7 @@ import io.debezium.relational.history.AbstractFileBasedSchemaHistory; import io.debezium.relational.history.HistoryRecord; import io.debezium.relational.history.HistoryRecordComparator; +import io.debezium.relational.history.HistoryRecordProcessorProvider; import io.debezium.relational.history.SchemaHistory; import io.debezium.relational.history.SchemaHistoryException; import io.debezium.relational.history.SchemaHistoryListener; @@ -112,8 +113,8 @@ public class S3SchemaHistory extends AbstractFileBasedSchemaHistory { private volatile S3Client client = null; @Override - public void configure(Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, boolean useCatalogBeforeSchema) { - super.configure(config, comparator, listener, useCatalogBeforeSchema); + public void configure(Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, HistoryRecordProcessorProvider processorProvider) { + super.configure(config, comparator, listener, processorProvider); if (!config.validateAndRecord(ALL_FIELDS, LOGGER::error)) { throw new DebeziumException( "Error configuring an instance of " + getClass().getSimpleName() + "; check the logs for details");