Skip to content

Commit

Permalink
CXP-2779: implemented MemorySchemaHistorySnapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
mikekamornikov committed Dec 14, 2023
1 parent e7e4897 commit 6dfd01f
Show file tree
Hide file tree
Showing 14 changed files with 485 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import io.debezium.relational.TableId;
import io.debezium.relational.Tables.TableFilter;
import io.debezium.relational.history.HistoryRecordComparator;
import io.debezium.relational.history.snapshot.SchemaPartitioner;
import io.debezium.schema.DefaultTopicNamingStrategy;
import io.debezium.util.Collect;

Expand Down Expand Up @@ -1166,7 +1167,7 @@ protected HistoryRecordComparator getHistoryRecordComparator() {

@Override
protected SchemaPartitioner getSchemaPartitioner() {
return SchemaPartitioner.NOOP;
return SchemaPartitioner.SINGLE_PARTITION;
}

public static boolean isBuiltInDatabase(String databaseName) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package io.debezium.relational.history.snapshot;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;

import java.util.Map;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser;
import io.debezium.relational.Tables;
import io.debezium.relational.ddl.DdlParser;
import io.debezium.relational.history.MemorySchemaHistory;
import io.debezium.relational.history.SchemaHistory;

public abstract class AbstractSchemaHistorySnapshotTest {

protected SchemaHistorySnapshot snapshot;
protected SchemaHistory history;
protected Map<String, Object> source1;
protected Map<String, Object> source2;
protected Tables s1_t0, s1_t1, s1_t2, s1_t3;
protected Tables s2_t0, s2_t1, s2_t2, s2_t3;
protected DdlParser parser;

@Before
public void beforeEach() {
history = new MemorySchemaHistory();
snapshot = createHistorySnapshot();

source1 = server("abc");
source2 = server("xyz");

parser = new MySqlAntlrDdlParser();
s1_t0 = new Tables();
s1_t1 = new Tables();
s1_t2 = new Tables();
s1_t3 = new Tables();

record(1, "CREATE TABLE foo ( first VARCHAR(22) NOT NULL );", s1_t3, s1_t2, s1_t1, s1_t0);
record(23, "CREATE TABLE\nperson ( name VARCHAR(22) NOT NULL );", s1_t3, s1_t2, s1_t1);
record(30, "CREATE TABLE address\n( street VARCHAR(22) NOT NULL );", s1_t3, s1_t2);
record(32, "ALTER TABLE address ADD city VARCHAR(22) NOT NULL;", s1_t3);

snapshot.save(source1, position(32), s1_t3);
snapshot.save(source1, position(30), s1_t2);
snapshot.save(source1, position(23), s1_t1);
snapshot.save(source1, position(1), s1_t0);

s2_t0 = new Tables();
s2_t1 = new Tables();
s2_t2 = new Tables();
s2_t3 = new Tables();

record(2, "CREATE TABLE foo1 ( first VARCHAR(22) NOT NULL );", s2_t3, s2_t2, s2_t1, s2_t0);
record(12, "CREATE TABLE\nperson1 ( name VARCHAR(22) NOT NULL );", s2_t3, s2_t2, s2_t1);
record(16, "CREATE TABLE address1\n( street VARCHAR(22) NOT NULL );", s2_t3, s2_t2);
record(99, "ALTER TABLE address1 ADD city VARCHAR(22) NOT NULL;", s2_t3);

snapshot.save(source2, position(55), s2_t3);
snapshot.save(source2, position(16), s2_t2);
snapshot.save(source2, position(12), s2_t1);
snapshot.save(source2, position(2), s2_t0);
}

@After
public void afterEach() {
}

protected abstract SchemaHistorySnapshot createHistorySnapshot();

protected Map<String, Object> server(String serverName) {
return Map.of("server", serverName);
}

protected Map<String, Object> position(long pos) {
return Map.of("file", "mysql-bin-changelog.000011", "pos", pos);
}

protected void record(long pos, String ddl, Tables... update) {
try {
history.record(source1, position(pos), "db", ddl);
}
catch (Throwable t) {
fail(t.getMessage());
}
for (Tables tables : update) {
if (tables != null) {
parser.setCurrentSchema("db");
parser.parse(ddl, tables);
}
}
}

@Test
public void shouldRecordSnapshotsAndRecoverToClosest() {
assertNull(snapshot.read(source1, null));
assertNull(snapshot.read(null, position(31)));
assertNull(snapshot.read(source1, position(31)));
assertEquals(s1_t2, snapshot.read(source1, position(30)));

assertNull(snapshot.read(source2, position(30)));
assertEquals(s2_t1, snapshot.read(source2, position(12)));

assertNull(snapshot.findClosest(null, position(31)));
assertNull(snapshot.findClosest(source1, null));
assertEquals(position(30), snapshot.findClosest(source1, position(31)));
assertEquals(position(32), snapshot.findClosest(source1, position(99)));

assertNull(snapshot.findClosest(source2, position(1)));
assertEquals(position(55), snapshot.findClosest(source2, position(99)));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.debezium.relational.history.snapshot;

public class MemorySchemaHistorySnapshotTest extends AbstractSchemaHistorySnapshotTest {
protected SchemaHistorySnapshot createHistorySnapshot() {
return new MemorySchemaHistorySnapshot();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import io.debezium.relational.TableId;
import io.debezium.relational.Tables.TableFilter;
import io.debezium.relational.history.HistoryRecordComparator;
import io.debezium.relational.history.snapshot.SchemaPartitioner;
import io.debezium.util.Strings;

/**
Expand Down Expand Up @@ -831,7 +832,7 @@ protected HistoryRecordComparator getHistoryRecordComparator() {

@Override
protected SchemaPartitioner getSchemaPartitioner() {
return SchemaPartitioner.NOOP;
return SchemaPartitioner.SINGLE_PARTITION;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.debezium.relational.TableId;
import io.debezium.relational.Tables.TableFilter;
import io.debezium.relational.history.HistoryRecordComparator;
import io.debezium.relational.history.snapshot.SchemaPartitioner;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Strings;

Expand Down Expand Up @@ -495,7 +496,11 @@ protected boolean isPositionAtOrBefore(Document recorded, Document desired) {

@Override
protected SchemaPartitioner getSchemaPartitioner() {
return (s) -> TableFilter.fromPredicate((t) -> t.catalog().equals(s.get("database")));
return (source) -> {
return TableFilter.fromPredicate((tableId) -> {
return tableId.catalog().equals(((SqlServerPartition)source).getDatabaseName());
});
};
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
*/
package io.debezium.relational;

import java.util.Map;
import java.util.function.Predicate;
import java.util.regex.Pattern;

Expand All @@ -28,6 +27,8 @@
import io.debezium.relational.history.SchemaHistoryListener;
import io.debezium.relational.history.SchemaHistoryMetrics;
import io.debezium.relational.history.SnapshotAwareSchemaHistory;
import io.debezium.relational.history.snapshot.SchemaHistorySnapshot;
import io.debezium.relational.history.snapshot.SchemaPartitioner;

/**
* Configuration options shared across the relational CDC connectors which use a persistent database schema history.
Expand All @@ -39,6 +40,7 @@ public abstract class HistorizedRelationalDatabaseConnectorConfig extends Relati
protected static final int DEFAULT_SNAPSHOT_FETCH_SIZE = 2_000;

private static final String DEFAULT_SCHEMA_HISTORY = "io.debezium.storage.kafka.history.KafkaSchemaHistory";
private static final String DEFAULT_SCHEMA_HISTORY_SNAPSHOT = "io.debezium.relational.history.snapshot.SchemaHistorySnapshot.NOOP";

private final boolean useCatalogBeforeSchema;
private final Class<? extends SourceConnector> connectorClass;
Expand All @@ -63,6 +65,17 @@ public abstract class HistorizedRelationalDatabaseConnectorConfig extends Relati
+ SchemaHistory.CONFIGURATION_FIELD_PREFIX_STRING + "' string.")
.withDefault(DEFAULT_SCHEMA_HISTORY);

public static final Field SCHEMA_HISTORY_SNAPSHOT = Field.create("schema.history.internal.snapshot")
.withDisplayName("Database schema history snapshot class")
.withType(Type.CLASS)
.withWidth(Width.LONG)
.withImportance(Importance.LOW)
.withInvisibleRecommender()
.withDescription("The name of the SchemaHistorySnapshot class that should be used to store and recover database schema. "
+ "The configuration properties for the history snapshot are prefixed with the '"
+ SchemaHistorySnapshot.CONFIGURATION_FIELD_PREFIX_STRING + "' string.")
.withDefault(DEFAULT_SCHEMA_HISTORY_SNAPSHOT);

public static final Field SKIP_UNPARSEABLE_DDL_STATEMENTS = SchemaHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS;

public static final Field STORE_ONLY_CAPTURED_TABLES_DDL = SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL;
Expand Down Expand Up @@ -120,7 +133,7 @@ protected HistorizedRelationalDatabaseConnectorConfig(Class<? extends SourceConn
/**
* Returns a configured (but not yet started) instance of the database schema history.
*/
public SchemaHistory getSchemaHistory(Tables initialSchema) {
public SchemaHistory getSchemaHistory() {
Configuration config = getConfig();

SchemaHistory schemaHistory = config.getInstance(SCHEMA_HISTORY, SchemaHistory.class);
Expand All @@ -138,7 +151,13 @@ public SchemaHistory getSchemaHistory(Tables initialSchema) {
.withDefault(SchemaHistory.INTERNAL_CONNECTOR_ID, logicalName)
.build();

schemaHistory = new SnapshotAwareSchemaHistory(schemaHistory, initialSchema, getSchemaPartitioner());
SchemaHistorySnapshot schemaSnapshot = config.getInstance(SCHEMA_HISTORY_SNAPSHOT, SchemaHistorySnapshot.class);
if (schemaSnapshot == null) {
throw new ConnectException("Unable to instantiate the database schema history snapshot class " +
config.getString(SCHEMA_HISTORY_SNAPSHOT));
}

schemaHistory = new SnapshotAwareSchemaHistory(schemaHistory, schemaSnapshot, getSchemaPartitioner());

HistoryRecordComparator historyComparator = getHistoryRecordComparator();
SchemaHistoryListener historyListener = new SchemaHistoryMetrics(this, multiPartitionMode());
Expand Down Expand Up @@ -187,10 +206,4 @@ public boolean storeOnlyCapturedDatabases() {
protected abstract HistoryRecordComparator getHistoryRecordComparator();

protected abstract SchemaPartitioner getSchemaPartitioner();

public interface SchemaPartitioner {
TableFilter fromPartition(Map<String, ?> source);

public static SchemaPartitioner NOOP = (s) -> Tables.TableFilter.includeAll();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ protected HistorizedRelationalDatabaseSchema(HistorizedRelationalDatabaseConnect
boolean tableIdCaseInsensitive, KeyMapper customKeysMapper) {
super(config, topicNamingStrategy, tableFilter, columnFilter, schemaBuilder, tableIdCaseInsensitive, customKeysMapper);

this.schemaHistory = config.getSchemaHistory(tables());
this.schemaHistory = config.getSchemaHistory();
this.schemaHistory.start();
this.historizedConnectorConfig = config;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,31 @@ public interface DdlParser {
DdlChanges getDdlChanges();

SystemVariables systemVariables();

DdlParser NOOP = new DdlParser() {
@Override
public void parse(String ddlContent, Tables databaseTables) {

}

@Override
public void setCurrentDatabase(String databaseName) {

}

@Override
public void setCurrentSchema(String schemaName) {

}

@Override
public DdlChanges getDdlChanges() {
return null;
}

@Override
public SystemVariables systemVariables() {
return null;
}
};
}
Loading

0 comments on commit 6dfd01f

Please sign in to comment.