From 84d8b170beacf7fa7a1f8d96ad2ed19e86408bd6 Mon Sep 17 00:00:00 2001 From: Mikhail Kamornikau Date: Thu, 14 Dec 2023 14:48:38 +0100 Subject: [PATCH] CXP-2779: implemented MemorySchemaHistorySnapshot --- .../connector/mysql/MySqlConnectorConfig.java | 3 +- .../AbstractSchemaHistorySnapshotTest.java | 116 +++++++++++++++ .../MemorySchemaHistorySnapshotTest.java | 7 + .../oracle/OracleConnectorConfig.java | 3 +- .../sqlserver/SqlServerConnectorConfig.java | 7 +- ...izedRelationalDatabaseConnectorConfig.java | 31 ++-- .../HistorizedRelationalDatabaseSchema.java | 2 +- .../io/debezium/relational/ddl/DdlParser.java | 27 ++++ .../history/SnapshotAwareSchemaHistory.java | 133 +++++++----------- .../AbstractSchemaHistorySnapshot.java | 79 +++++++++++ .../snapshot/MemorySchemaHistorySnapshot.java | 39 +++++ .../snapshot/SchemaHistorySnapshot.java | 21 ++- .../history/snapshot/SchemaPartitioner.java | 16 +++ .../SnapshotAwareSchemaHistoryTest.java | 89 ++++++++++++ 14 files changed, 470 insertions(+), 103 deletions(-) create mode 100644 debezium-connector-mysql/src/test/java/io/debezium/relational/history/snapshot/AbstractSchemaHistorySnapshotTest.java create mode 100644 debezium-connector-mysql/src/test/java/io/debezium/relational/history/snapshot/MemorySchemaHistorySnapshotTest.java create mode 100644 debezium-core/src/main/java/io/debezium/relational/history/snapshot/AbstractSchemaHistorySnapshot.java create mode 100644 debezium-core/src/main/java/io/debezium/relational/history/snapshot/MemorySchemaHistorySnapshot.java create mode 100644 debezium-core/src/main/java/io/debezium/relational/history/snapshot/SchemaPartitioner.java create mode 100644 debezium-core/src/test/java/io/debezium/relational/history/SnapshotAwareSchemaHistoryTest.java diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java index f331404471a..22cb2b4e97a 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java @@ -34,6 +34,7 @@ import io.debezium.relational.TableId; import io.debezium.relational.Tables.TableFilter; import io.debezium.relational.history.HistoryRecordComparator; +import io.debezium.relational.history.snapshot.SchemaPartitioner; import io.debezium.schema.DefaultTopicNamingStrategy; import io.debezium.util.Collect; @@ -1166,7 +1167,7 @@ protected HistoryRecordComparator getHistoryRecordComparator() { @Override protected SchemaPartitioner getSchemaPartitioner() { - return SchemaPartitioner.NOOP; + return SchemaPartitioner.SINGLE_PARTITION; } public static boolean isBuiltInDatabase(String databaseName) { diff --git a/debezium-connector-mysql/src/test/java/io/debezium/relational/history/snapshot/AbstractSchemaHistorySnapshotTest.java b/debezium-connector-mysql/src/test/java/io/debezium/relational/history/snapshot/AbstractSchemaHistorySnapshotTest.java new file mode 100644 index 00000000000..d81dda5dd13 --- /dev/null +++ b/debezium-connector-mysql/src/test/java/io/debezium/relational/history/snapshot/AbstractSchemaHistorySnapshotTest.java @@ -0,0 +1,116 @@ +package io.debezium.relational.history.snapshot; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; + +import java.util.Map; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser; +import io.debezium.relational.Tables; +import io.debezium.relational.ddl.DdlParser; +import io.debezium.relational.history.MemorySchemaHistory; +import io.debezium.relational.history.SchemaHistory; + +public abstract class AbstractSchemaHistorySnapshotTest { + + protected SchemaHistorySnapshot snapshot; + protected SchemaHistory history; + protected Map source1; + protected Map source2; + protected Tables s1_t0, s1_t1, s1_t2, s1_t3; + protected Tables s2_t0, s2_t1, s2_t2, s2_t3; + protected DdlParser parser; + + @Before + public void beforeEach() { + history = new MemorySchemaHistory(); + snapshot = createHistorySnapshot(); + + source1 = server("abc"); + source2 = server("xyz"); + + parser = new MySqlAntlrDdlParser(); + s1_t0 = new Tables(); + s1_t1 = new Tables(); + s1_t2 = new Tables(); + s1_t3 = new Tables(); + + record(1, "CREATE TABLE foo ( first VARCHAR(22) NOT NULL );", s1_t3, s1_t2, s1_t1, s1_t0); + record(23, "CREATE TABLE\nperson ( name VARCHAR(22) NOT NULL );", s1_t3, s1_t2, s1_t1); + record(30, "CREATE TABLE address\n( street VARCHAR(22) NOT NULL );", s1_t3, s1_t2); + record(32, "ALTER TABLE address ADD city VARCHAR(22) NOT NULL;", s1_t3); + + snapshot.save(source1, position(32), s1_t3); + snapshot.save(source1, position(30), s1_t2); + snapshot.save(source1, position(23), s1_t1); + snapshot.save(source1, position(1), s1_t0); + + s2_t0 = new Tables(); + s2_t1 = new Tables(); + s2_t2 = new Tables(); + s2_t3 = new Tables(); + + record(2, "CREATE TABLE foo1 ( first VARCHAR(22) NOT NULL );", s2_t3, s2_t2, s2_t1, s2_t0); + record(12, "CREATE TABLE\nperson1 ( name VARCHAR(22) NOT NULL );", s2_t3, s2_t2, s2_t1); + record(16, "CREATE TABLE address1\n( street VARCHAR(22) NOT NULL );", s2_t3, s2_t2); + record(99, "ALTER TABLE address1 ADD city VARCHAR(22) NOT NULL;", s2_t3); + + snapshot.save(source2, position(55), s2_t3); + snapshot.save(source2, position(16), s2_t2); + snapshot.save(source2, position(12), s2_t1); + snapshot.save(source2, position(2), s2_t0); + } + + @After + public void afterEach() { + } + + protected abstract SchemaHistorySnapshot createHistorySnapshot(); + + protected Map server(String serverName) { + return Map.of("server", serverName); + } + + protected Map position(long pos) { + return Map.of("file", "mysql-bin-changelog.000011", "pos", pos); + } + + protected void record(long pos, String ddl, Tables... update) { + try { + history.record(source1, position(pos), "db", ddl); + } + catch (Throwable t) { + fail(t.getMessage()); + } + for (Tables tables : update) { + if (tables != null) { + parser.setCurrentSchema("db"); + parser.parse(ddl, tables); + } + } + } + + @Test + public void shouldRecordSnapshotsAndRecoverToClosest() { + assertNull(snapshot.read(source1, null)); + assertNull(snapshot.read(null, position(31))); + assertNull(snapshot.read(source1, position(31))); + assertEquals(s1_t2, snapshot.read(source1, position(30))); + + assertNull(snapshot.read(source2, position(30))); + assertEquals(s2_t1, snapshot.read(source2, position(12))); + + assertNull(snapshot.findClosest(null, position(31))); + assertNull(snapshot.findClosest(source1, null)); + assertEquals(position(30), snapshot.findClosest(source1, position(31))); + assertEquals(position(32), snapshot.findClosest(source1, position(99))); + + assertNull(snapshot.findClosest(source2, position(1))); + assertEquals(position(55), snapshot.findClosest(source2, position(99))); + } +} diff --git a/debezium-connector-mysql/src/test/java/io/debezium/relational/history/snapshot/MemorySchemaHistorySnapshotTest.java b/debezium-connector-mysql/src/test/java/io/debezium/relational/history/snapshot/MemorySchemaHistorySnapshotTest.java new file mode 100644 index 00000000000..7336cf0c961 --- /dev/null +++ b/debezium-connector-mysql/src/test/java/io/debezium/relational/history/snapshot/MemorySchemaHistorySnapshotTest.java @@ -0,0 +1,7 @@ +package io.debezium.relational.history.snapshot; + +public class MemorySchemaHistorySnapshotTest extends AbstractSchemaHistorySnapshotTest { + protected SchemaHistorySnapshot createHistorySnapshot() { + return new MemorySchemaHistorySnapshot(); + } +} diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java index 7e8d48060e4..a757735bc83 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java @@ -47,6 +47,7 @@ import io.debezium.relational.TableId; import io.debezium.relational.Tables.TableFilter; import io.debezium.relational.history.HistoryRecordComparator; +import io.debezium.relational.history.snapshot.SchemaPartitioner; import io.debezium.util.Strings; /** @@ -831,7 +832,7 @@ protected HistoryRecordComparator getHistoryRecordComparator() { @Override protected SchemaPartitioner getSchemaPartitioner() { - return SchemaPartitioner.NOOP; + return SchemaPartitioner.SINGLE_PARTITION; } /** diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java index b7b7dbd63bc..a1553b2ea5a 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java @@ -33,6 +33,7 @@ import io.debezium.relational.TableId; import io.debezium.relational.Tables.TableFilter; import io.debezium.relational.history.HistoryRecordComparator; +import io.debezium.relational.history.snapshot.SchemaPartitioner; import io.debezium.spi.schema.DataCollectionId; import io.debezium.util.Strings; @@ -495,7 +496,11 @@ protected boolean isPositionAtOrBefore(Document recorded, Document desired) { @Override protected SchemaPartitioner getSchemaPartitioner() { - return (s) -> TableFilter.fromPredicate((t) -> t.catalog().equals(s.get("database"))); + return (source) -> { + return TableFilter.fromPredicate((tableId) -> { + return tableId.catalog().equals(((SqlServerPartition)source).getDatabaseName()); + }); + }; } @Override diff --git a/debezium-core/src/main/java/io/debezium/relational/HistorizedRelationalDatabaseConnectorConfig.java b/debezium-core/src/main/java/io/debezium/relational/HistorizedRelationalDatabaseConnectorConfig.java index 1740a155d0a..b5cbc513c3c 100644 --- a/debezium-core/src/main/java/io/debezium/relational/HistorizedRelationalDatabaseConnectorConfig.java +++ b/debezium-core/src/main/java/io/debezium/relational/HistorizedRelationalDatabaseConnectorConfig.java @@ -5,7 +5,6 @@ */ package io.debezium.relational; -import java.util.Map; import java.util.function.Predicate; import java.util.regex.Pattern; @@ -28,6 +27,8 @@ import io.debezium.relational.history.SchemaHistoryListener; import io.debezium.relational.history.SchemaHistoryMetrics; import io.debezium.relational.history.SnapshotAwareSchemaHistory; +import io.debezium.relational.history.snapshot.SchemaHistorySnapshot; +import io.debezium.relational.history.snapshot.SchemaPartitioner; /** * Configuration options shared across the relational CDC connectors which use a persistent database schema history. @@ -39,6 +40,7 @@ public abstract class HistorizedRelationalDatabaseConnectorConfig extends Relati protected static final int DEFAULT_SNAPSHOT_FETCH_SIZE = 2_000; private static final String DEFAULT_SCHEMA_HISTORY = "io.debezium.storage.kafka.history.KafkaSchemaHistory"; + private static final String DEFAULT_SCHEMA_HISTORY_SNAPSHOT = "io.debezium.relational.history.snapshot.SchemaHistorySnapshot.NOOP"; private final boolean useCatalogBeforeSchema; private final Class connectorClass; @@ -63,6 +65,17 @@ public abstract class HistorizedRelationalDatabaseConnectorConfig extends Relati + SchemaHistory.CONFIGURATION_FIELD_PREFIX_STRING + "' string.") .withDefault(DEFAULT_SCHEMA_HISTORY); + public static final Field SCHEMA_HISTORY_SNAPSHOT = Field.create("schema.history.internal.snapshot") + .withDisplayName("Database schema history snapshot class") + .withType(Type.CLASS) + .withWidth(Width.LONG) + .withImportance(Importance.LOW) + .withInvisibleRecommender() + .withDescription("The name of the SchemaHistorySnapshot class that should be used to store and recover database schema. " + + "The configuration properties for the history snapshot are prefixed with the '" + + SchemaHistorySnapshot.CONFIGURATION_FIELD_PREFIX_STRING + "' string.") + .withDefault(DEFAULT_SCHEMA_HISTORY_SNAPSHOT); + public static final Field SKIP_UNPARSEABLE_DDL_STATEMENTS = SchemaHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS; public static final Field STORE_ONLY_CAPTURED_TABLES_DDL = SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL; @@ -120,7 +133,7 @@ protected HistorizedRelationalDatabaseConnectorConfig(Class source); - - public static SchemaPartitioner NOOP = (s) -> Tables.TableFilter.includeAll(); - } } diff --git a/debezium-core/src/main/java/io/debezium/relational/HistorizedRelationalDatabaseSchema.java b/debezium-core/src/main/java/io/debezium/relational/HistorizedRelationalDatabaseSchema.java index 312f70ee185..14eaf6e80fd 100644 --- a/debezium-core/src/main/java/io/debezium/relational/HistorizedRelationalDatabaseSchema.java +++ b/debezium-core/src/main/java/io/debezium/relational/HistorizedRelationalDatabaseSchema.java @@ -46,7 +46,7 @@ protected HistorizedRelationalDatabaseSchema(HistorizedRelationalDatabaseConnect boolean tableIdCaseInsensitive, KeyMapper customKeysMapper) { super(config, topicNamingStrategy, tableFilter, columnFilter, schemaBuilder, tableIdCaseInsensitive, customKeysMapper); - this.schemaHistory = config.getSchemaHistory(tables()); + this.schemaHistory = config.getSchemaHistory(); this.schemaHistory.start(); this.historizedConnectorConfig = config; } diff --git a/debezium-core/src/main/java/io/debezium/relational/ddl/DdlParser.java b/debezium-core/src/main/java/io/debezium/relational/ddl/DdlParser.java index 1e87dd412d0..d528250fcd8 100644 --- a/debezium-core/src/main/java/io/debezium/relational/ddl/DdlParser.java +++ b/debezium-core/src/main/java/io/debezium/relational/ddl/DdlParser.java @@ -40,4 +40,31 @@ public interface DdlParser { DdlChanges getDdlChanges(); SystemVariables systemVariables(); + + DdlParser NOOP = new DdlParser() { + @Override + public void parse(String ddlContent, Tables databaseTables) { + + } + + @Override + public void setCurrentDatabase(String databaseName) { + + } + + @Override + public void setCurrentSchema(String schemaName) { + + } + + @Override + public DdlChanges getDdlChanges() { + return null; + } + + @Override + public SystemVariables systemVariables() { + return null; + } + }; } diff --git a/debezium-core/src/main/java/io/debezium/relational/history/SnapshotAwareSchemaHistory.java b/debezium-core/src/main/java/io/debezium/relational/history/SnapshotAwareSchemaHistory.java index 27df8f4ca57..6067c8fbfee 100644 --- a/debezium-core/src/main/java/io/debezium/relational/history/SnapshotAwareSchemaHistory.java +++ b/debezium-core/src/main/java/io/debezium/relational/history/SnapshotAwareSchemaHistory.java @@ -12,33 +12,30 @@ import io.debezium.config.Configuration; import io.debezium.document.Document; -import io.debezium.relational.HistorizedRelationalDatabaseConnectorConfig.SchemaPartitioner; import io.debezium.relational.Tables; import io.debezium.relational.ddl.DdlParser; import io.debezium.relational.history.snapshot.SchemaHistorySnapshot; +import io.debezium.relational.history.snapshot.SchemaPartitioner; +import io.debezium.util.Clock; public class SnapshotAwareSchemaHistory implements SchemaHistory { private final SchemaHistory delegate; - private final Tables schema; private final SchemaPartitioner schemaPartitioner; - private SnapshotProcessor snapshotProcessor = new SnapshotProcessor(SchemaHistorySnapshot.NOOP, HistoryRecordComparator.INSTANCE, SchemaPartitioner.NOOP); + private SchemaHistorySnapshot snapshot = SchemaHistorySnapshot.NOOP; + private Tables schemaToSave = null; - public SnapshotAwareSchemaHistory(SchemaHistory delegate, Tables initialSchema, SchemaPartitioner schemaPartitioner) { + public SnapshotAwareSchemaHistory(SchemaHistory delegate, SchemaHistorySnapshot snapshot, SchemaPartitioner schemaPartitioner) { this.delegate = delegate; - this.schema = initialSchema; + this.snapshot = snapshot; this.schemaPartitioner = schemaPartitioner; } @Override public void configure(Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, HistoryRecordProcessorProvider processorProvider) { - this.snapshotProcessor = new SnapshotProcessor(getSchemaHistorySnapshot(config), comparator, schemaPartitioner); - delegate.configure(config, comparator, listener, (o, s, p) -> new HistoryRecordProcessor(o, s, p, comparator, listener, snapshotProcessor, processorProvider)); - } - - private SchemaHistorySnapshot getSchemaHistorySnapshot(Configuration config) { - // TODO: use config to init SchemaHistorySnapshot instance - return SchemaHistorySnapshot.NOOP; + Configuration schemaHistoryConfig = config.subset(SchemaHistorySnapshot.CONFIGURATION_FIELD_PREFIX_STRING, false); + snapshot.configure(schemaHistoryConfig, comparator); + delegate.configure(config, comparator, listener, (o, s, p) -> new HistoryRecordProcessor(o, s, p, comparator, listener, snapshot, processorProvider)); } @Override @@ -48,20 +45,32 @@ public void start() { @Override public void record(Map source, Map position, String databaseName, String ddl) throws SchemaHistoryException { - delegate.record(source, position, databaseName, ddl); - snapshotProcessor.save(source, position, schema); + record(source, position, databaseName, null, ddl, null, Clock.SYSTEM.currentTimeAsInstant()); } @Override public void record(Map source, Map position, String databaseName, String schemaName, String ddl, TableChanges changes, Instant timestamp) throws SchemaHistoryException { delegate.record(source, position, databaseName, schemaName, ddl, changes, timestamp); - snapshotProcessor.save(source, position, schema); + + if (schemaToSave != null) { + snapshot.save(source, position, getSchemaPartition(source, schemaToSave)); + } + } + + private Tables getSchemaPartition(Map source, Tables schema) { + if (schemaPartitioner == null) { + return schema; + } + return schema.subset(schemaPartitioner.fromSource(source)); } @Override public void recover(Map, Map> offsets, Tables schema, DdlParser ddlParser) { delegate.recover(offsets, schema, ddlParser); + + // once recovered the schema can be used to create snapshots + schemaToSave = schema; } @Override @@ -84,86 +93,30 @@ public void initializeStorage() { delegate.initializeStorage(); } - private static class SnapshotProcessor { - private final SchemaHistorySnapshot adapter; - private final HistoryRecordComparator comparator; - private final SchemaPartitioner schemaPartitioner; - - public SnapshotProcessor(SchemaHistorySnapshot adapter, HistoryRecordComparator comparator, SchemaPartitioner schemaPartitioner) { - this.adapter = adapter; - this.comparator = comparator; - this.schemaPartitioner = schemaPartitioner; - } - - public void save(Map source, Map position, Tables schema) { - adapter.save(source, position, getPartialSchema(source, schema)); - } - - public Map, Map> read(Map, Map> offsets, Tables schema) { - Map, Map> snapshotOffsets = getSnapshotOffsets(offsets); - snapshotOffsets.forEach((Map source, Map position) -> { - if (position == null) { - return; - } - - Tables partialSchema = adapter.read(source, position); - // that is what could be schema.merge(partialSchema) - partialSchema.tableIds().forEach((tableId) -> { - schema.overwriteTable(partialSchema.forTable(tableId)); - }); - snapshotOffsets.put(source, position); - }); - return snapshotOffsets; - } - - private Map, Map> getSnapshotOffsets(Map, Map> offsets) { - Map, Map> snapshotOffsets = new HashMap<>(); - offsets.forEach((Map source, Map position) -> { - HistoryRecord historyRecord = new HistoryRecord(source, position); - Map snapshotPosition = adapter.find(source).stream() - // snapshot position keys match connector position keys - .filter((p) -> p.keySet().equals(position.keySet())) - // snapshot position is smaller than connector position - .filter((p) -> comparator.isAtOrBefore(new HistoryRecord(source, p), historyRecord)) - // sort positions ASC and get last - .max((p1, p2) -> { - HistoryRecord h1 = new HistoryRecord(source, p1); - HistoryRecord h2 = new HistoryRecord(source, p2); - return comparator.isAtOrBefore(h1, h2) ? 1 : comparator.isAtOrBefore(h2, h1) ? 0 : -1; - }) - // null means there is no valid snapshot for source - .orElse(null); - - snapshotOffsets.put(source, snapshotPosition); - }); - return snapshotOffsets; - } - - private Tables getPartialSchema(Map source, Tables schema) { - if (schemaPartitioner == null) { - return schema; - } - return schema.subset(schemaPartitioner.fromPartition(source)); - } - } - private static class HistoryRecordProcessor implements Consumer { - private final Map stopPoints; private final HistoryRecordComparator comparator; private final SchemaHistoryListener listener; + private final SchemaHistorySnapshot snapshot; private final Consumer originalProcessor; + private final Map stopPoints; - public HistoryRecordProcessor(Map, Map> offsets, Tables schema, DdlParser ddlParser, HistoryRecordComparator comparator, SchemaHistoryListener listener, SnapshotProcessor snapshotProcessor, HistoryRecordProcessorProvider processorProvider) { - this.stopPoints = getStopPoints(snapshotProcessor.read(offsets, schema)); + public HistoryRecordProcessor(Map, Map> offsets, Tables schema, DdlParser ddlParser, HistoryRecordComparator comparator, SchemaHistoryListener listener, SchemaHistorySnapshot snapshot, HistoryRecordProcessorProvider processorProvider) { this.comparator = comparator; this.listener = listener; + this.snapshot = snapshot; + this.stopPoints = getStopPoints(offsets, schema); this.originalProcessor = processorProvider.get(offsets, schema, ddlParser); } - private Map getStopPoints(Map, Map> offsets) { + private Map getStopPoints(Map, Map> offsets, Tables schema) { Map stopPoints = new HashMap<>(); - offsets.forEach((Map source, Map position) -> { + getSnapshotOffsets(offsets).forEach((Map source, Map position) -> { + Tables schemaPartition = snapshot.read(source, position); + if (schemaPartition != null) { + merge(schema, schemaPartition); + } + Document srcDocument = Document.create(); if (source != null) { source.forEach(srcDocument::set); @@ -173,6 +126,18 @@ private Map getStopPoints(Map, Map schema.overwriteTable(partialSchema.forTable(tableId))); + } + + private Map, Map> getSnapshotOffsets(Map, Map> offsets) { + Map, Map> snapshotOffsets = new HashMap<>(); + offsets.forEach((Map source, Map position) -> { + snapshotOffsets.put(source, snapshot.findClosest(source, position)); + }); + return snapshotOffsets; + } + /** * The code ignores records covered by schema snapshot - from the beginning to snapshot * offsets (SO). Then it switched to original processor which handles records from SO to @@ -187,7 +152,7 @@ private Map getStopPoints(Map, Map source, Map position, Tables schema) { + if (source == null || position == null || schema == null) { + return; + } + lock.write(() -> doSave(source, position, schema)); + } + + @Override + public Tables read(Map source, Map position) { + if (source == null || position == null) { + return null; + } + return lock.read(() -> doRead(source, position)); + } + + @Override + public Map findClosest(Map source, Map position) { + if (source == null || position == null) { + return null; + } + HistoryRecord historyRecord = new HistoryRecord(source, position); + return lock.read(() -> findAll(source)) + .stream() + // snapshot position keys match given position keys + .filter((p) -> p.keySet().equals(position.keySet())) + // snapshot position is smaller than given position + .filter((p) -> comparator.isAtOrBefore(new HistoryRecord(source, p), historyRecord)) + // sort positions ASC and get last + .max((p1, p2) -> { + HistoryRecord h1 = new HistoryRecord(source, p1); + HistoryRecord h2 = new HistoryRecord(source, p2); + return comparator.isAtOrBefore(h1, h2) ? -1 : comparator.isAtOrBefore(h2, h1) ? 1 : 0; + }) + .orElse(null); + } + + protected abstract void doSave(Map source, Map position, Tables schema); + + protected abstract Tables doRead(Map source, Map position); + + protected abstract List> findAll(Map source); +} diff --git a/debezium-core/src/main/java/io/debezium/relational/history/snapshot/MemorySchemaHistorySnapshot.java b/debezium-core/src/main/java/io/debezium/relational/history/snapshot/MemorySchemaHistorySnapshot.java new file mode 100644 index 00000000000..8b32221ddbb --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/relational/history/snapshot/MemorySchemaHistorySnapshot.java @@ -0,0 +1,39 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.relational.history.snapshot; + +import io.debezium.relational.Tables; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +public class MemorySchemaHistorySnapshot extends AbstractSchemaHistorySnapshot { + + private final Map, Map>, Tables> snapshots = new HashMap<>(); + + public MemorySchemaHistorySnapshot() { + } + + @Override + public void doSave(Map source, Map position, Tables schema) { + snapshots.put(Map.of(source, position), schema); + } + + @Override + public Tables doRead(Map source, Map position) { + return snapshots.get(Map.of(source, position)); + } + + protected List> findAll(Map source) { + return snapshots.keySet().stream() + .map((e) -> e.get(source)) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + } +} diff --git a/debezium-core/src/main/java/io/debezium/relational/history/snapshot/SchemaHistorySnapshot.java b/debezium-core/src/main/java/io/debezium/relational/history/snapshot/SchemaHistorySnapshot.java index ebe2831a632..3bf6067b18b 100644 --- a/debezium-core/src/main/java/io/debezium/relational/history/snapshot/SchemaHistorySnapshot.java +++ b/debezium-core/src/main/java/io/debezium/relational/history/snapshot/SchemaHistorySnapshot.java @@ -5,23 +5,32 @@ */ package io.debezium.relational.history.snapshot; +import io.debezium.config.Configuration; import io.debezium.relational.Tables; +import io.debezium.relational.history.HistoryRecordComparator; +import io.debezium.relational.history.SchemaHistory; import java.util.List; import java.util.Map; public interface SchemaHistorySnapshot { - String CONFIGURATION_FIELD_PREFIX_STRING = "schema.history.internal.snapshot."; + String CONFIGURATION_FIELD_PREFIX_STRING = SchemaHistory.CONFIGURATION_FIELD_PREFIX_STRING + "snapshot."; + + void configure(Configuration config, HistoryRecordComparator comparator); void save(Map source, Map position, Tables schema); + Tables read(Map source, Map position); - List> find(Map source); - // TODO: implement offset -> snapshot name - // TODO: implement snapshot name -> offset + Map findClosest(Map source, Map position); SchemaHistorySnapshot NOOP = new SchemaHistorySnapshot() { + @Override + public void configure(Configuration config, HistoryRecordComparator comparator) { + + } + @Override public void save(Map source, Map position, Tables schema) { @@ -33,8 +42,8 @@ public Tables read(Map source, Map position) { } @Override - public List> find(Map source) { - return List.of(); + public Map findClosest(Map source, Map position) { + return null; } }; } diff --git a/debezium-core/src/main/java/io/debezium/relational/history/snapshot/SchemaPartitioner.java b/debezium-core/src/main/java/io/debezium/relational/history/snapshot/SchemaPartitioner.java new file mode 100644 index 00000000000..04cfa6a93da --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/relational/history/snapshot/SchemaPartitioner.java @@ -0,0 +1,16 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.relational.history.snapshot; + +import java.util.Map; + +import io.debezium.relational.Tables.TableFilter; + +public interface SchemaPartitioner { + TableFilter fromSource(Map source); + + SchemaPartitioner SINGLE_PARTITION = source -> TableFilter.includeAll(); +} diff --git a/debezium-core/src/test/java/io/debezium/relational/history/SnapshotAwareSchemaHistoryTest.java b/debezium-core/src/test/java/io/debezium/relational/history/SnapshotAwareSchemaHistoryTest.java new file mode 100644 index 00000000000..7c0b6b37230 --- /dev/null +++ b/debezium-core/src/test/java/io/debezium/relational/history/SnapshotAwareSchemaHistoryTest.java @@ -0,0 +1,89 @@ +package io.debezium.relational.history; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import io.debezium.config.Configuration; +import io.debezium.relational.SystemVariables; +import io.debezium.relational.Tables; +import io.debezium.relational.ddl.DdlChanges; +import io.debezium.relational.ddl.DdlParser; +import io.debezium.relational.history.snapshot.MemorySchemaHistorySnapshot; +import io.debezium.relational.history.snapshot.SchemaHistorySnapshot; +import io.debezium.relational.history.snapshot.SchemaPartitioner; + +public class SnapshotAwareSchemaHistoryTest { + + private SchemaHistorySnapshot historySnapshot; + private SchemaHistory delegate; + private List recovered = new ArrayList<>(); + private Map source = Map.of("server", "xyz"); + + @Before + public void beforeEach() { + delegate = new MemorySchemaHistory(); + historySnapshot = new MemorySchemaHistorySnapshot(); + recovered.clear(); + } + + @After + public void afterEach() { + } + + protected Map position(long pos) { + return Map.of("pos", pos); + } + + protected void record(long pos, String ddl) { + try { + delegate.record(source, position(pos), "db", ddl); + } + catch (Throwable t) { + fail(t.getMessage()); + } + } + + @Test + public void shouldSkipRecordsRecoveredFromSnapshot() { + SchemaHistory schemaHistory = new SnapshotAwareSchemaHistory(delegate, historySnapshot, SchemaPartitioner.SINGLE_PARTITION); + schemaHistory.configure(Configuration.empty(), HistoryRecordComparator.INSTANCE, SchemaHistoryListener.NOOP, (o, s, p) -> (r) -> recovered.add(r)); + + delegate.record(source, position(1), "db", "CREATE TABLE foo ( first VARCHAR(22) NOT NULL );"); + delegate.record(source, position(3), "db", "CREATE TABLE\nperson ( name VARCHAR(22) NOT NULL );"); + delegate.record(source, position(5), "db", "CREATE TABLE address\n( street VARCHAR(22) NOT NULL );"); + delegate.record(source, position(10), "db", "ALTER TABLE address ADD city VARCHAR(22) NOT NULL;"); + delegate.record(source, position(15), "db", "ALTER TABLE address ADD zip VARCHAR(22) NOT NULL;"); + + historySnapshot.save(source, position(4), new Tables()); + historySnapshot.save(source, position(8), new Tables()); + + DdlParser noopParser = DdlParser.NOOP; + schemaHistory.recover(source, position(1), new Tables(), noopParser); + assertEquals(5, recovered.stream().count()); + recovered.clear(); + + schemaHistory.recover(source, position(4), new Tables(), noopParser); + assertEquals(3, recovered.stream().count()); + recovered.clear(); + + schemaHistory.recover(source, position(6), new Tables(), noopParser); + assertEquals(3, recovered.stream().count()); + recovered.clear(); + + schemaHistory.recover(source, position(9), new Tables(), noopParser); + assertEquals(2, recovered.stream().count()); + recovered.clear(); + + schemaHistory.recover(source, position(20), new Tables(), noopParser); + assertEquals(2, recovered.stream().count()); + recovered.clear(); + } +} \ No newline at end of file