Skip to content

Commit

Permalink
CXP-2779: new SchemaHistory interface, removed deprecated
Browse files Browse the repository at this point in the history
  • Loading branch information
mikekamornikov committed Dec 23, 2023
1 parent 1b3d22a commit cb8ee3b
Show file tree
Hide file tree
Showing 24 changed files with 527 additions and 277 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.debezium.relational.TableSchema;
import io.debezium.relational.TableSchemaBuilder;
import io.debezium.relational.Tables;
import io.debezium.relational.Tables.TableFilter;
import io.debezium.relational.ddl.DdlChanges;
import io.debezium.relational.ddl.DdlParser;
import io.debezium.relational.ddl.DdlParserListener.Event;
Expand Down Expand Up @@ -64,7 +65,6 @@ public class MySqlDatabaseSchema extends HistorizedRelationalDatabaseSchema {
private final static Logger LOGGER = LoggerFactory.getLogger(MySqlDatabaseSchema.class);

private final Set<String> ignoredQueryStatements = Collect.unmodifiableSet("BEGIN", "END", "FLUSH PRIVILEGES");
private final DdlParser ddlParser;
private final RelationalTableFilters filters;
private final DdlChanges ddlChanges;
private final Map<Long, TableId> tableIdsByTableNumber = new ConcurrentHashMap<>();
Expand All @@ -88,14 +88,14 @@ public MySqlDatabaseSchema(MySqlConnectorConfig connectorConfig, MySqlValueConve
connectorConfig.getSourceInfoStructMaker().schema(),
connectorConfig.getFieldNamer(),
false),
tableIdCaseInsensitive, connectorConfig.getKeyMapper());

this.ddlParser = new MySqlAntlrDdlParser(
true,
false,
connectorConfig.isSchemaCommentsHistoryEnabled(),
valueConverter,
getTableFilter());
tableIdCaseInsensitive, connectorConfig.getKeyMapper(),
new MySqlAntlrDdlParser(
true,
false,
connectorConfig.isSchemaCommentsHistoryEnabled(),
valueConverter,
connectorConfig.getTableFilters().dataCollectionFilter()));

this.ddlChanges = this.ddlParser.getDdlChanges();
this.connectorConfig = connectorConfig;
filters = connectorConfig.getTableFilters();
Expand Down Expand Up @@ -354,11 +354,6 @@ else if (event instanceof TableIndexEvent) {
return null;
}

@Override
protected DdlParser getDdlParser() {
return ddlParser;
}

/**
* Return true if the database schema history entity exists
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,25 @@
import static org.junit.Assert.fail;

import java.util.Map;
import java.time.Instant;

import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import io.debezium.connector.SnapshotRecord;
import io.debezium.connector.mysql.MySqlPartition;
import io.debezium.connector.mysql.SourceInfo;
import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.relational.Tables;
import io.debezium.relational.ddl.DdlParser;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Collect;
import io.debezium.util.Testing;

Expand All @@ -26,30 +37,22 @@
*/
public abstract class AbstractSchemaHistoryTest {

private static String SERVERNAME = "server1";
private static String BINLOG_FILE = "a.log";

protected SchemaHistory history;
protected Map<String, Object> source1;
protected Map<String, Object> source2;
protected Tables tables;
protected Tables t0;
protected Tables t1;
protected Tables t2;
protected Tables t3;
protected Tables t4;
protected Tables all;
protected Tables t0, t1, t2, t3, t4, all;
protected DdlParser parser;

@Before
public void beforeEach() {
parser = new MySqlAntlrDdlParser();
tables = new Tables();
t0 = new Tables();
t1 = new Tables();
t2 = new Tables();
t3 = new Tables();
t4 = new Tables();
all = new Tables();
source1 = server("abc");
source2 = server("xyz");
history = createHistory();
}

Expand All @@ -62,17 +65,24 @@ public void afterEach() {

protected abstract SchemaHistory createHistory();

protected Map<String, Object> server(String serverName) {
return Collect.linkMapOf("server", serverName);
protected MySqlPartition source(String server) {
return new MySqlPartition(server, "");
}

protected Map<String, Object> position(long position, int row) {
return Collect.linkMapOf(
SourceInfo.BINLOG_FILENAME_OFFSET_KEY, BINLOG_FILE,
SourceInfo.BINLOG_POSITION_OFFSET_KEY, position,
SourceInfo.BINLOG_ROW_IN_EVENT_OFFSET_KEY, row);
}

protected Map<String, Object> position(String filename, long position, int entry) {
return Collect.linkMapOf("file", filename, "position", position, "entry", entry);
protected Offsets<?, ?> offsets(String server, long pos, int row) {
return Offsets.of(source(server), new TestOffsetContext(position(pos, row)));
}

protected void record(long pos, int entry, String ddl, Tables... update) {
protected void record(long pos, int row, String ddl, Tables... update) {
try {
history.record(source1, position("a.log", pos, entry), "db", ddl);
history.record(source(SERVERNAME).getSourcePartition(), position(pos, row), "db", ddl);
}
catch (Throwable t) {
fail(t.getMessage());
Expand All @@ -85,15 +95,15 @@ protected void record(long pos, int entry, String ddl, Tables... update) {
}
}

protected Tables recover(long pos, int entry) {
protected Tables recover(long pos, int row) {
Tables result = new Tables();
history.recover(source1, position("a.log", pos, entry), result, parser);
history.recover(offsets(SERVERNAME, pos, row), result);
return result;
}

@Test
public void shouldRecordChangesAndRecoverToVariousPoints() {
record(01, 0, "CREATE TABLE foo ( first VARCHAR(22) NOT NULL );", all, t3, t2, t1, t0);
record(1, 0,"CREATE TABLE foo ( first VARCHAR(22) NOT NULL );", all, t3, t2, t1, t0);
record(23, 1, "CREATE TABLE\nperson ( name VARCHAR(22) NOT NULL );", all, t3, t2, t1);
record(30, 2, "CREATE TABLE address\n( street VARCHAR(22) NOT NULL );", all, t3, t2);
record(32, 3, "ALTER TABLE address ADD city VARCHAR(22) NOT NULL;", all, t3);
Expand All @@ -106,8 +116,8 @@ public void shouldRecordChangesAndRecoverToVariousPoints() {
Testing.print("t3 = " + t3);
}

assertThat(recover(01, 0)).isEqualTo(t0);
assertThat(recover(01, 3)).isEqualTo(t0);
assertThat(recover(1, 0)).isEqualTo(t0);
assertThat(recover(1, 3)).isEqualTo(t0);
assertThat(recover(10, 1)).isEqualTo(t0);
assertThat(recover(22, 999999)).isEqualTo(t0);
assertThat(recover(23, 0)).isEqualTo(t0);
Expand All @@ -130,4 +140,61 @@ public void shouldRecordChangesAndRecoverToVariousPoints() {
assertThat(recover(1033, 4)).isEqualTo(t3);
}

private static class TestOffsetContext implements OffsetContext {
private final Map<String, ?> offset;

TestOffsetContext(Map<String, ?> offset) {
this.offset = offset;
}

@Override
public Map<String, ?> getOffset() {
return offset;
}

@Override
public Schema getSourceInfoSchema() {
return null;
}

@Override
public Struct getSourceInfo() {
return null;
}

@Override
public boolean isSnapshotRunning() {
return false;
}

@Override
public void markSnapshotRecord(SnapshotRecord record) {

}

@Override
public void preSnapshotStart() {

}

@Override
public void preSnapshotCompletion() {

}

@Override
public void postSnapshotCompletion() {

}

@Override
public void event(DataCollectionId collectionId, Instant timestamp) {

}

@Override
public TransactionContext getTransactionContext() {
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,17 @@ public void beforeEach() {
@Override
protected SchemaHistory createHistory() {
SchemaHistory history = new FileSchemaHistory();
history.configure(Configuration.create()

Configuration config = Configuration.create()
.with(FileSchemaHistory.FILE_PATH, TEST_FILE_PATH.toAbsolutePath().toString())
.build(), null, SchemaHistoryMetrics.NOOP, true);
.build();
HistoryRecordComparator comparator = null;
SchemaHistoryListener listener = SchemaHistoryMetrics.NOOP;
HistoryRecordProcessorProvider processorProvider = (o, s) -> new HistoryRecordProcessor(o, s, parser, config, comparator, listener, true);

history.configure(config, comparator, listener, processorProvider);
history.start();

return history;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,15 @@ private void testHistoryTopicContent(String topicName, boolean skipUnparseableDD
.with(KafkaSchemaHistory.INTERNAL_CONNECTOR_CLASS, "org.apache.kafka.connect.source.SourceConnector")
.with(KafkaSchemaHistory.INTERNAL_CONNECTOR_ID, "dbz-test")
.build();
history.configure(config, null, SchemaHistoryMetrics.NOOP, true);

DdlParser recoveryParser = new MySqlAntlrDdlParser();
DdlParser ddlParser = new MySqlAntlrDdlParser();

HistoryRecordComparator comparator = null;
SchemaHistoryListener listener = SchemaHistoryMetrics.NOOP;
HistoryRecordProcessorProvider processorProvider = (o, s) -> new HistoryRecordProcessor(o, s, recoveryParser, config, comparator, listener, true);

history.configure(config, comparator, listener, processorProvider);
history.start();

// Should be able to call start more than once ...
Expand All @@ -152,16 +160,14 @@ private void testHistoryTopicContent(String topicName, boolean skipUnparseableDD
// Calling it another time to ensure we can work with the DB history topic already existing
history.initializeStorage();

DdlParser recoveryParser = new MySqlAntlrDdlParser();
DdlParser ddlParser = new MySqlAntlrDdlParser();
ddlParser.setCurrentSchema("db1"); // recover does this, so we need to as well
Tables tables1 = new Tables();
Tables tables2 = new Tables();
Tables tables3 = new Tables();

// Recover from the very beginning ...
setLogPosition(0);
history.recover(offsets, tables1, recoveryParser);
history.recover(offsets, tables1);

// There should have been nothing to recover ...
assertThat(tables1.size()).isEqualTo(0);
Expand Down Expand Up @@ -200,31 +206,31 @@ private void testHistoryTopicContent(String topicName, boolean skipUnparseableDD
// Stop the history (which should stop the producer) ...
history.stop();
history = new KafkaSchemaHistory();
history.configure(config, null, SchemaHistoryListener.NOOP, true);
history.configure(config, comparator, listener, processorProvider);
// no need to start

// Recover from the very beginning to just past the first change ...
Tables recoveredTables = new Tables();
setLogPosition(15);
history.recover(offsets, recoveredTables, recoveryParser);
history.recover(offsets, recoveredTables);
assertThat(recoveredTables).isEqualTo(tables1);

// Recover from the very beginning to just past the second change ...
recoveredTables = new Tables();
setLogPosition(50);
history.recover(offsets, recoveredTables, recoveryParser);
history.recover(offsets, recoveredTables);
assertThat(recoveredTables).isEqualTo(tables2);

// Recover from the very beginning to just past the third change ...
recoveredTables = new Tables();
setLogPosition(10010);
history.recover(offsets, recoveredTables, recoveryParser);
history.recover(offsets, recoveredTables);
assertThat(recoveredTables).isEqualTo(tables3);

// Recover from the very beginning to way past the third change ...
recoveredTables = new Tables();
setLogPosition(100000010);
history.recover(offsets, recoveredTables, recoveryParser);
history.recover(offsets, recoveredTables);
assertThat(recoveredTables).isEqualTo(tables3);
}

Expand Down Expand Up @@ -355,7 +361,13 @@ public void testExists() {
.with(KafkaSchemaHistory.INTERNAL_CONNECTOR_ID, "dbz-test")
.build();

history.configure(config, null, SchemaHistoryMetrics.NOOP, true);
DdlParser recoveryParser = new MySqlAntlrDdlParser();

HistoryRecordComparator comparator = null;
SchemaHistoryListener listener = SchemaHistoryMetrics.NOOP;
HistoryRecordProcessorProvider processorProvider = (o, s) -> new HistoryRecordProcessor(o, s, recoveryParser, config, comparator, listener, true);

history.configure(config, comparator, listener, processorProvider);
history.start();

// dummytopic should not exist yet
Expand All @@ -380,7 +392,13 @@ public void differentiateStorageExistsFromHistoryExists() {
50000)
.build();

history.configure(config, null, SchemaHistoryMetrics.NOOP, true);
DdlParser recoveryParser = new MySqlAntlrDdlParser();

HistoryRecordComparator comparator = null;
SchemaHistoryListener listener = SchemaHistoryMetrics.NOOP;
HistoryRecordProcessorProvider processorProvider = (o, s) -> new HistoryRecordProcessor(o, s, recoveryParser, config, comparator, listener, true);

history.configure(config, comparator, listener, processorProvider);

assertFalse(history.storageExists());
history.initializeStorage();
Expand Down Expand Up @@ -425,7 +443,13 @@ public void shouldConnectionTimeoutIfValueIsTooLow() {
.with(KafkaSchemaHistory.KAFKA_QUERY_TIMEOUT_MS, 1)
.build();

history.configure(config, null, SchemaHistoryMetrics.NOOP, true);
DdlParser recoveryParser = new MySqlAntlrDdlParser();

HistoryRecordComparator comparator = null;
SchemaHistoryListener listener = SchemaHistoryMetrics.NOOP;
HistoryRecordProcessorProvider processorProvider = (o, s) -> new HistoryRecordProcessor(o, s, recoveryParser, config, comparator, listener, true);

history.configure(config, comparator, listener, processorProvider);
history.start();

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,25 @@
*/
package io.debezium.relational.history;

import io.debezium.config.Configuration;

/**
* @author Randall Hauch
*/
public class MemorySchemaHistoryTest extends AbstractSchemaHistoryTest {

@Override
protected SchemaHistory createHistory() {
return new MemorySchemaHistory();
SchemaHistory history = new MemorySchemaHistory();

Configuration config = Configuration.empty();
HistoryRecordComparator comparator = null;
SchemaHistoryListener listener = SchemaHistoryMetrics.NOOP;
HistoryRecordProcessorProvider processorProvider = (o, s) -> new HistoryRecordProcessor(o, s, parser, config, comparator, listener, true);

history.configure(config, comparator, listener, processorProvider);
history.start();

return history;
}
}
Loading

0 comments on commit cb8ee3b

Please sign in to comment.