Skip to content

Commit

Permalink
CXP-2779: added SchemaHistorySnapshot interface and snapshot aware Sc…
Browse files Browse the repository at this point in the history
…hemaHistory implementation
  • Loading branch information
mikekamornikov committed Jan 2, 2024
1 parent f943b4c commit 01f982c
Show file tree
Hide file tree
Showing 13 changed files with 722 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import io.debezium.relational.TableId;
import io.debezium.relational.Tables.TableFilter;
import io.debezium.relational.history.HistoryRecordComparator;
import io.debezium.relational.history.snapshot.SchemaPartitioner;
import io.debezium.schema.DefaultTopicNamingStrategy;
import io.debezium.util.Collect;

Expand Down Expand Up @@ -382,10 +383,10 @@ public boolean flushResetsIsolationLevel() {
}

/**
* Determine which flavour of MySQL locking to use.
*
* @return the correct SQL to obtain a global lock for the current mode
*/
* Determine which flavour of MySQL locking to use.
*
* @return the correct SQL to obtain a global lock for the current mode
*/
public String getLockStatement() {
if (value.equals(MINIMAL_PERCONA.value)) {
return "LOCK TABLES FOR BACKUP";
Expand Down Expand Up @@ -1164,6 +1165,11 @@ protected HistoryRecordComparator getHistoryRecordComparator() {
return new MySqlHistoryRecordComparator(gtidSourceFilter());
}

@Override
protected SchemaPartitioner getSchemaPartitioner() {
return SchemaPartitioner.SINGLE_PARTITION;
}

public static boolean isBuiltInDatabase(String databaseName) {
if (databaseName == null) {
return false;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.relational.history.snapshot;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;

import java.util.Map;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser;
import io.debezium.relational.Tables;
import io.debezium.relational.ddl.DdlParser;
import io.debezium.relational.history.MemorySchemaHistory;
import io.debezium.relational.history.SchemaHistory;

public abstract class AbstractSchemaHistorySnapshotTest {

protected SchemaHistorySnapshot snapshot;
protected SchemaHistory history;
protected Map<String, Object> source1;
protected Map<String, Object> source2;
protected Tables s1_t0, s1_t1, s1_t2, s1_t3;
protected Tables s2_t0, s2_t1, s2_t2, s2_t3;
protected DdlParser parser;

@Before
public void beforeEach() {
history = new MemorySchemaHistory();
snapshot = createHistorySnapshot();

source1 = server("abc");
source2 = server("xyz");

parser = new MySqlAntlrDdlParser();
s1_t0 = new Tables();
s1_t1 = new Tables();
s1_t2 = new Tables();
s1_t3 = new Tables();

record(1, "CREATE TABLE foo ( first VARCHAR(22) NOT NULL );", s1_t3, s1_t2, s1_t1, s1_t0);
record(23, "CREATE TABLE\nperson ( name VARCHAR(22) NOT NULL );", s1_t3, s1_t2, s1_t1);
record(30, "CREATE TABLE address\n( street VARCHAR(22) NOT NULL );", s1_t3, s1_t2);
record(32, "ALTER TABLE address ADD city VARCHAR(22) NOT NULL;", s1_t3);

snapshot.save(source1, position(32), s1_t3);
snapshot.save(source1, position(30), s1_t2);
snapshot.save(source1, position(23), s1_t1);
snapshot.save(source1, position(1), s1_t0);

s2_t0 = new Tables();
s2_t1 = new Tables();
s2_t2 = new Tables();
s2_t3 = new Tables();

record(2, "CREATE TABLE foo1 ( first VARCHAR(22) NOT NULL );", s2_t3, s2_t2, s2_t1, s2_t0);
record(12, "CREATE TABLE\nperson1 ( name VARCHAR(22) NOT NULL );", s2_t3, s2_t2, s2_t1);
record(16, "CREATE TABLE address1\n( street VARCHAR(22) NOT NULL );", s2_t3, s2_t2);
record(99, "ALTER TABLE address1 ADD city VARCHAR(22) NOT NULL;", s2_t3);

snapshot.save(source2, position(55), s2_t3);
snapshot.save(source2, position(16), s2_t2);
snapshot.save(source2, position(12), s2_t1);
snapshot.save(source2, position(2), s2_t0);
}

@After
public void afterEach() {
}

protected abstract SchemaHistorySnapshot createHistorySnapshot();

protected Map<String, Object> server(String serverName) {
return Map.of("server", serverName);
}

protected Map<String, Object> position(long pos) {
return Map.of("file", "mysql-bin-changelog.000011", "pos", pos);
}

protected void record(long pos, String ddl, Tables... update) {
try {
history.record(source1, position(pos), "db", ddl);
}
catch (Throwable t) {
fail(t.getMessage());
}
for (Tables tables : update) {
if (tables != null) {
parser.setCurrentSchema("db");
parser.parse(ddl, tables);
}
}
}

@Test
public void shouldRecordSnapshotsAndRecoverToClosest() {
assertNull(snapshot.read(source1, null));
assertNull(snapshot.read(null, position(31)));
assertNull(snapshot.read(source1, position(31)));
assertEquals(s1_t2, snapshot.read(source1, position(30)));

assertNull(snapshot.read(source2, position(30)));
assertEquals(s2_t1, snapshot.read(source2, position(12)));

assertNull(snapshot.findClosest(null, position(31)));
assertNull(snapshot.findClosest(source1, null));
assertEquals(position(30), snapshot.findClosest(source1, position(31)));
assertEquals(position(32), snapshot.findClosest(source1, position(99)));

assertNull(snapshot.findClosest(source2, position(1)));
assertEquals(position(55), snapshot.findClosest(source2, position(99)));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.relational.history.snapshot;

public class MemorySchemaHistorySnapshotTest extends AbstractSchemaHistorySnapshotTest {
protected SchemaHistorySnapshot createHistorySnapshot() {
return new MemorySchemaHistorySnapshot();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import io.debezium.relational.TableId;
import io.debezium.relational.Tables.TableFilter;
import io.debezium.relational.history.HistoryRecordComparator;
import io.debezium.relational.history.snapshot.SchemaPartitioner;
import io.debezium.util.Strings;

/**
Expand Down Expand Up @@ -829,6 +830,11 @@ protected HistoryRecordComparator getHistoryRecordComparator() {
return getAdapter().getHistoryRecordComparator();
}

@Override
protected SchemaPartitioner getSchemaPartitioner() {
return SchemaPartitioner.SINGLE_PARTITION;
}

/**
* Defines modes of representation of {@code interval} datatype
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.debezium.relational.TableId;
import io.debezium.relational.Tables.TableFilter;
import io.debezium.relational.history.HistoryRecordComparator;
import io.debezium.relational.history.snapshot.SchemaPartitioner;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Strings;

Expand Down Expand Up @@ -493,6 +494,15 @@ protected boolean isPositionAtOrBefore(Document recorded, Document desired) {
};
}

@Override
protected SchemaPartitioner getSchemaPartitioner() {
return (source) -> {
return TableFilter.fromPredicate((tableId) -> {
return tableId.catalog().equals(((SqlServerPartition)source).getDatabaseName());
});
};
}

@Override
public String getContextName() {
return Module.contextName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
import io.debezium.relational.history.SchemaHistory;
import io.debezium.relational.history.SchemaHistoryListener;
import io.debezium.relational.history.SchemaHistoryMetrics;
import io.debezium.relational.history.SnapshotAwareSchemaHistory;
import io.debezium.relational.history.snapshot.SchemaHistorySnapshot;
import io.debezium.relational.history.snapshot.SchemaPartitioner;

/**
* Configuration options shared across the relational CDC connectors which use a persistent database schema history.
Expand All @@ -38,6 +41,7 @@ public abstract class HistorizedRelationalDatabaseConnectorConfig extends Relati
protected static final int DEFAULT_SNAPSHOT_FETCH_SIZE = 2_000;

private static final String DEFAULT_SCHEMA_HISTORY = "io.debezium.storage.kafka.history.KafkaSchemaHistory";
private static final String DEFAULT_SCHEMA_HISTORY_SNAPSHOT = "io.debezium.relational.history.snapshot.NoopSchemaHistorySnapshot";

private final boolean useCatalogBeforeSchema;
private final Class<? extends SourceConnector> connectorClass;
Expand All @@ -62,6 +66,17 @@ public abstract class HistorizedRelationalDatabaseConnectorConfig extends Relati
+ SchemaHistory.CONFIGURATION_FIELD_PREFIX_STRING + "' string.")
.withDefault(DEFAULT_SCHEMA_HISTORY);

public static final Field SCHEMA_HISTORY_SNAPSHOT = Field.create("schema.history.internal.snapshot")
.withDisplayName("Database schema history snapshot class")
.withType(Type.CLASS)
.withWidth(Width.LONG)
.withImportance(Importance.LOW)
.withInvisibleRecommender()
.withDescription("The name of the SchemaHistorySnapshot class that should be used to store and recover database schema. "
+ "The configuration properties for the history snapshot are prefixed with the '"
+ SchemaHistorySnapshot.CONFIGURATION_FIELD_PREFIX_STRING + "' string.")
.withDefault(DEFAULT_SCHEMA_HISTORY_SNAPSHOT);

public static final Field SKIP_UNPARSEABLE_DDL_STATEMENTS = SchemaHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS;

public static final Field STORE_ONLY_CAPTURED_TABLES_DDL = SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL;
Expand All @@ -73,7 +88,8 @@ public abstract class HistorizedRelationalDatabaseConnectorConfig extends Relati
SCHEMA_HISTORY,
SKIP_UNPARSEABLE_DDL_STATEMENTS,
STORE_ONLY_CAPTURED_TABLES_DDL,
STORE_ONLY_CAPTURED_DATABASES_DDL)
STORE_ONLY_CAPTURED_DATABASES_DDL,
SCHEMA_HISTORY_SNAPSHOT)
.create();

protected HistorizedRelationalDatabaseConnectorConfig(Class<? extends SourceConnector> connectorClass,
Expand Down Expand Up @@ -137,11 +153,23 @@ public SchemaHistory getSchemaHistory(DdlParser ddlParser) {
.withDefault(SchemaHistory.INTERNAL_CONNECTOR_ID, logicalName)
.build();

SchemaHistorySnapshot schemaSnapshot = config.getInstance(SCHEMA_HISTORY_SNAPSHOT, SchemaHistorySnapshot.class);
if (schemaSnapshot == null) {
throw new ConnectException("Unable to instantiate the database schema history snapshot class " +
config.getString(SCHEMA_HISTORY_SNAPSHOT));
}

Configuration schemaHistorySnapshotConfig = config.subset(SchemaHistorySnapshot.CONFIGURATION_FIELD_PREFIX_STRING, false);

HistoryRecordComparator historyComparator = getHistoryRecordComparator();
SchemaHistoryListener historyListener = new SchemaHistoryMetrics(this, multiPartitionMode());
schemaSnapshot.configure(schemaHistorySnapshotConfig, historyComparator, getSchemaPartitioner());

schemaHistory = new SnapshotAwareSchemaHistory(schemaHistory, schemaSnapshot);

HistoryRecordProcessorProvider processorProvider = (o, s) -> new HistoryRecordProcessor(o, s, ddlParser, schemaHistoryConfig, historyComparator, historyListener,
useCatalogBeforeSchema());
schemaHistory.configure(schemaHistoryConfig, historyComparator, historyListener, processorProvider);
schemaHistory.configure(schemaHistoryConfig, historyComparator, historyListener, processorProvider); // validates

return schemaHistory;
}
Expand Down Expand Up @@ -183,4 +211,5 @@ public boolean storeOnlyCapturedDatabases() {
*/
protected abstract HistoryRecordComparator getHistoryRecordComparator();

protected abstract SchemaPartitioner getSchemaPartitioner();
}
Loading

0 comments on commit 01f982c

Please sign in to comment.