Skip to content

Commit

Permalink
CXP-2779: extracted HistoryRecordProcessor logic, added SnapshotAware…
Browse files Browse the repository at this point in the history
… SchemaHistory implementation, added SchemaHistorySnapshot with NOOP (default) and inmemory implementation
  • Loading branch information
mikekamornikov committed Dec 14, 2023
1 parent 7c8892c commit ddf50bd
Show file tree
Hide file tree
Showing 27 changed files with 847 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import io.debezium.relational.TableId;
import io.debezium.relational.Tables.TableFilter;
import io.debezium.relational.history.HistoryRecordComparator;
import io.debezium.relational.history.snapshot.SchemaPartitioner;
import io.debezium.schema.DefaultTopicNamingStrategy;
import io.debezium.util.Collect;

Expand Down Expand Up @@ -1164,6 +1165,11 @@ protected HistoryRecordComparator getHistoryRecordComparator() {
return new MySqlHistoryRecordComparator(gtidSourceFilter());
}

@Override
protected SchemaPartitioner getSchemaPartitioner() {
return SchemaPartitioner.SINGLE_PARTITION;
}

public static boolean isBuiltInDatabase(String databaseName) {
if (databaseName == null) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public void shouldStartWithEmptyTopicAndStoreDataAndRecoverAllState() throws Exc
}

private void testHistoryTopicContent(String topicName, boolean skipUnparseableDDL) {
interceptor = new LogInterceptor(KafkaSchemaHistory.class);
interceptor = new LogInterceptor(HistoryRecordProcessor.class);
// Start up the history ...
Configuration config = Configuration.create()
.with(KafkaSchemaHistory.BOOTSTRAP_SERVERS, kafka.brokerList())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package io.debezium.relational.history.snapshot;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;

import java.util.Map;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser;
import io.debezium.relational.Tables;
import io.debezium.relational.ddl.DdlParser;
import io.debezium.relational.history.MemorySchemaHistory;
import io.debezium.relational.history.SchemaHistory;

public abstract class AbstractSchemaHistorySnapshotTest {

protected SchemaHistorySnapshot snapshot;
protected SchemaHistory history;
protected Map<String, Object> source1;
protected Map<String, Object> source2;
protected Tables s1_t0, s1_t1, s1_t2, s1_t3;
protected Tables s2_t0, s2_t1, s2_t2, s2_t3;
protected DdlParser parser;

@Before
public void beforeEach() {
history = new MemorySchemaHistory();
snapshot = createHistorySnapshot();

source1 = server("abc");
source2 = server("xyz");

parser = new MySqlAntlrDdlParser();
s1_t0 = new Tables();
s1_t1 = new Tables();
s1_t2 = new Tables();
s1_t3 = new Tables();

record(1, "CREATE TABLE foo ( first VARCHAR(22) NOT NULL );", s1_t3, s1_t2, s1_t1, s1_t0);
record(23, "CREATE TABLE\nperson ( name VARCHAR(22) NOT NULL );", s1_t3, s1_t2, s1_t1);
record(30, "CREATE TABLE address\n( street VARCHAR(22) NOT NULL );", s1_t3, s1_t2);
record(32, "ALTER TABLE address ADD city VARCHAR(22) NOT NULL;", s1_t3);

snapshot.save(source1, position(32), s1_t3);
snapshot.save(source1, position(30), s1_t2);
snapshot.save(source1, position(23), s1_t1);
snapshot.save(source1, position(1), s1_t0);

s2_t0 = new Tables();
s2_t1 = new Tables();
s2_t2 = new Tables();
s2_t3 = new Tables();

record(2, "CREATE TABLE foo1 ( first VARCHAR(22) NOT NULL );", s2_t3, s2_t2, s2_t1, s2_t0);
record(12, "CREATE TABLE\nperson1 ( name VARCHAR(22) NOT NULL );", s2_t3, s2_t2, s2_t1);
record(16, "CREATE TABLE address1\n( street VARCHAR(22) NOT NULL );", s2_t3, s2_t2);
record(99, "ALTER TABLE address1 ADD city VARCHAR(22) NOT NULL;", s2_t3);

snapshot.save(source2, position(55), s2_t3);
snapshot.save(source2, position(16), s2_t2);
snapshot.save(source2, position(12), s2_t1);
snapshot.save(source2, position(2), s2_t0);
}

@After
public void afterEach() {
}

protected abstract SchemaHistorySnapshot createHistorySnapshot();

protected Map<String, Object> server(String serverName) {
return Map.of("server", serverName);
}

protected Map<String, Object> position(long pos) {
return Map.of("file", "mysql-bin-changelog.000011", "pos", pos);
}

protected void record(long pos, String ddl, Tables... update) {
try {
history.record(source1, position(pos), "db", ddl);
}
catch (Throwable t) {
fail(t.getMessage());
}
for (Tables tables : update) {
if (tables != null) {
parser.setCurrentSchema("db");
parser.parse(ddl, tables);
}
}
}

@Test
public void shouldRecordSnapshotsAndRecoverToClosest() {
assertNull(snapshot.read(source1, null));
assertNull(snapshot.read(null, position(31)));
assertNull(snapshot.read(source1, position(31)));
assertEquals(s1_t2, snapshot.read(source1, position(30)));

assertNull(snapshot.read(source2, position(30)));
assertEquals(s2_t1, snapshot.read(source2, position(12)));

assertNull(snapshot.findClosest(null, position(31)));
assertNull(snapshot.findClosest(source1, null));
assertEquals(position(30), snapshot.findClosest(source1, position(31)));
assertEquals(position(32), snapshot.findClosest(source1, position(99)));

assertNull(snapshot.findClosest(source2, position(1)));
assertEquals(position(55), snapshot.findClosest(source2, position(99)));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.debezium.relational.history.snapshot;

public class MemorySchemaHistorySnapshotTest extends AbstractSchemaHistorySnapshotTest {
protected SchemaHistorySnapshot createHistorySnapshot() {
return new MemorySchemaHistorySnapshot();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import io.debezium.relational.TableId;
import io.debezium.relational.Tables.TableFilter;
import io.debezium.relational.history.HistoryRecordComparator;
import io.debezium.relational.history.snapshot.SchemaPartitioner;
import io.debezium.util.Strings;

/**
Expand Down Expand Up @@ -829,6 +830,11 @@ protected HistoryRecordComparator getHistoryRecordComparator() {
return getAdapter().getHistoryRecordComparator();
}

@Override
protected SchemaPartitioner getSchemaPartitioner() {
return SchemaPartitioner.SINGLE_PARTITION;
}

/**
* Defines modes of representation of {@code interval} datatype
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.debezium.relational.TableId;
import io.debezium.relational.Tables.TableFilter;
import io.debezium.relational.history.HistoryRecordComparator;
import io.debezium.relational.history.snapshot.SchemaPartitioner;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Strings;

Expand Down Expand Up @@ -493,6 +494,15 @@ protected boolean isPositionAtOrBefore(Document recorded, Document desired) {
};
}

@Override
protected SchemaPartitioner getSchemaPartitioner() {
return (source) -> {
return TableFilter.fromPredicate((tableId) -> {
return tableId.catalog().equals(((SqlServerPartition)source).getDatabaseName());
});
};
}

@Override
public String getContextName() {
return Module.contextName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import io.debezium.relational.Tables;
import io.debezium.relational.ddl.DdlParser;
import io.debezium.relational.history.HistoryRecordComparator;
import io.debezium.relational.history.HistoryRecordProcessorProvider;
import io.debezium.relational.history.SchemaHistory;
import io.debezium.relational.history.SchemaHistoryException;
import io.debezium.relational.history.SchemaHistoryListener;
Expand Down Expand Up @@ -2907,8 +2908,8 @@ public boolean exists() {

@Override
public void configure(Configuration config, HistoryRecordComparator comparator,
SchemaHistoryListener listener, boolean useCatalogBeforeSchema) {
delegate.configure(config, comparator, listener, useCatalogBeforeSchema);
SchemaHistoryListener listener, HistoryRecordProcessorProvider processorProvider) {
delegate.configure(config, comparator, listener, processorProvider);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,14 @@
import io.debezium.relational.Selectors.TableIdToStringMapper;
import io.debezium.relational.Tables.TableFilter;
import io.debezium.relational.history.HistoryRecordComparator;
import io.debezium.relational.history.HistoryRecordProcessor;
import io.debezium.relational.history.HistoryRecordProcessorProvider;
import io.debezium.relational.history.SchemaHistory;
import io.debezium.relational.history.SchemaHistoryListener;
import io.debezium.relational.history.SchemaHistoryMetrics;
import io.debezium.relational.history.SnapshotAwareSchemaHistory;
import io.debezium.relational.history.snapshot.SchemaHistorySnapshot;
import io.debezium.relational.history.snapshot.SchemaPartitioner;

/**
* Configuration options shared across the relational CDC connectors which use a persistent database schema history.
Expand All @@ -34,6 +40,7 @@ public abstract class HistorizedRelationalDatabaseConnectorConfig extends Relati
protected static final int DEFAULT_SNAPSHOT_FETCH_SIZE = 2_000;

private static final String DEFAULT_SCHEMA_HISTORY = "io.debezium.storage.kafka.history.KafkaSchemaHistory";
private static final String DEFAULT_SCHEMA_HISTORY_SNAPSHOT = "io.debezium.relational.history.snapshot.SchemaHistorySnapshot.NOOP";

private final boolean useCatalogBeforeSchema;
private final Class<? extends SourceConnector> connectorClass;
Expand All @@ -58,6 +65,17 @@ public abstract class HistorizedRelationalDatabaseConnectorConfig extends Relati
+ SchemaHistory.CONFIGURATION_FIELD_PREFIX_STRING + "' string.")
.withDefault(DEFAULT_SCHEMA_HISTORY);

public static final Field SCHEMA_HISTORY_SNAPSHOT = Field.create("schema.history.internal.snapshot")
.withDisplayName("Database schema history snapshot class")
.withType(Type.CLASS)
.withWidth(Width.LONG)
.withImportance(Importance.LOW)
.withInvisibleRecommender()
.withDescription("The name of the SchemaHistorySnapshot class that should be used to store and recover database schema. "
+ "The configuration properties for the history snapshot are prefixed with the '"
+ SchemaHistorySnapshot.CONFIGURATION_FIELD_PREFIX_STRING + "' string.")
.withDefault(DEFAULT_SCHEMA_HISTORY_SNAPSHOT);

public static final Field SKIP_UNPARSEABLE_DDL_STATEMENTS = SchemaHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS;

public static final Field STORE_ONLY_CAPTURED_TABLES_DDL = SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL;
Expand Down Expand Up @@ -133,9 +151,19 @@ public SchemaHistory getSchemaHistory() {
.withDefault(SchemaHistory.INTERNAL_CONNECTOR_ID, logicalName)
.build();

SchemaHistorySnapshot schemaSnapshot = config.getInstance(SCHEMA_HISTORY_SNAPSHOT, SchemaHistorySnapshot.class);
if (schemaSnapshot == null) {
throw new ConnectException("Unable to instantiate the database schema history snapshot class " +
config.getString(SCHEMA_HISTORY_SNAPSHOT));
}

schemaHistory = new SnapshotAwareSchemaHistory(schemaHistory, schemaSnapshot, getSchemaPartitioner());

HistoryRecordComparator historyComparator = getHistoryRecordComparator();
schemaHistory.configure(schemaHistoryConfig, historyComparator,
new SchemaHistoryMetrics(this, multiPartitionMode()), useCatalogBeforeSchema()); // validates
SchemaHistoryListener historyListener = new SchemaHistoryMetrics(this, multiPartitionMode());
HistoryRecordProcessorProvider processorProvider = (o, s, p) -> new HistoryRecordProcessor(o, s, p, schemaHistoryConfig, historyComparator, historyListener,
useCatalogBeforeSchema());
schemaHistory.configure(schemaHistoryConfig, historyComparator, historyListener, processorProvider); // validates

return schemaHistory;
}
Expand Down Expand Up @@ -177,4 +205,5 @@ public boolean storeOnlyCapturedDatabases() {
*/
protected abstract HistoryRecordComparator getHistoryRecordComparator();

protected abstract SchemaPartitioner getSchemaPartitioner();
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,31 @@ public interface DdlParser {
DdlChanges getDdlChanges();

SystemVariables systemVariables();

DdlParser NOOP = new DdlParser() {
@Override
public void parse(String ddlContent, Tables databaseTables) {

}

@Override
public void setCurrentDatabase(String databaseName) {

}

@Override
public void setCurrentSchema(String schemaName) {

}

@Override
public DdlChanges getDdlChanges() {
return null;
}

@Override
public SystemVariables systemVariables() {
return null;
}
};
}
Loading

0 comments on commit ddf50bd

Please sign in to comment.