diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java index 33469f99413..783aeb741a6 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java @@ -67,6 +67,7 @@ import io.debezium.relational.Tables; import io.debezium.relational.ddl.DdlParser; import io.debezium.relational.history.HistoryRecordComparator; +import io.debezium.relational.history.HistoryRecordProcessorProvider; import io.debezium.relational.history.SchemaHistory; import io.debezium.relational.history.SchemaHistoryException; import io.debezium.relational.history.SchemaHistoryListener; @@ -2907,8 +2908,8 @@ public boolean exists() { @Override public void configure(Configuration config, HistoryRecordComparator comparator, - SchemaHistoryListener listener, boolean useCatalogBeforeSchema) { - delegate.configure(config, comparator, listener, useCatalogBeforeSchema); + SchemaHistoryListener listener, HistoryRecordProcessorProvider processorProvider) { + delegate.configure(config, comparator, listener, processorProvider); } @Override diff --git a/debezium-core/src/main/java/io/debezium/relational/HistorizedRelationalDatabaseConnectorConfig.java b/debezium-core/src/main/java/io/debezium/relational/HistorizedRelationalDatabaseConnectorConfig.java index 4b0438096f6..d07014e01c0 100644 --- a/debezium-core/src/main/java/io/debezium/relational/HistorizedRelationalDatabaseConnectorConfig.java +++ b/debezium-core/src/main/java/io/debezium/relational/HistorizedRelationalDatabaseConnectorConfig.java @@ -21,8 +21,12 @@ import io.debezium.relational.Selectors.TableIdToStringMapper; import io.debezium.relational.Tables.TableFilter; import io.debezium.relational.history.HistoryRecordComparator; +import io.debezium.relational.history.HistoryRecordProcessor; +import io.debezium.relational.history.HistoryRecordProcessorProvider; import io.debezium.relational.history.SchemaHistory; +import io.debezium.relational.history.SchemaHistoryListener; import io.debezium.relational.history.SchemaHistoryMetrics; +import io.debezium.relational.history.SnapshotAwareSchemaHistory; /** * Configuration options shared across the relational CDC connectors which use a persistent database schema history. @@ -133,9 +137,12 @@ public SchemaHistory getSchemaHistory() { .withDefault(SchemaHistory.INTERNAL_CONNECTOR_ID, logicalName) .build(); + schemaHistory = new SnapshotAwareSchemaHistory(schemaHistory); + HistoryRecordComparator historyComparator = getHistoryRecordComparator(); - schemaHistory.configure(schemaHistoryConfig, historyComparator, - new SchemaHistoryMetrics(this, multiPartitionMode()), useCatalogBeforeSchema()); // validates + SchemaHistoryListener historyListener = new SchemaHistoryMetrics(this, multiPartitionMode()); + HistoryRecordProcessorProvider processorProvider = (o, s, p) -> new HistoryRecordProcessor(o, s, p, schemaHistoryConfig, historyComparator, historyListener, useCatalogBeforeSchema()); + schemaHistory.configure(schemaHistoryConfig, historyComparator, historyListener, processorProvider); // validates return schemaHistory; } diff --git a/debezium-core/src/main/java/io/debezium/relational/history/AbstractSchemaHistory.java b/debezium-core/src/main/java/io/debezium/relational/history/AbstractSchemaHistory.java index bca0cc4af71..aa3c780d26b 100644 --- a/debezium-core/src/main/java/io/debezium/relational/history/AbstractSchemaHistory.java +++ b/debezium-core/src/main/java/io/debezium/relational/history/AbstractSchemaHistory.java @@ -6,27 +6,16 @@ package io.debezium.relational.history; import java.time.Instant; -import java.util.HashMap; import java.util.Map; import java.util.function.Consumer; -import java.util.function.Predicate; -import java.util.regex.Pattern; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.debezium.config.Configuration; import io.debezium.config.Field; -import io.debezium.document.Array; -import io.debezium.document.Document; -import io.debezium.function.Predicates; import io.debezium.relational.Tables; import io.debezium.relational.ddl.DdlParser; -import io.debezium.relational.history.TableChanges.TableChange; -import io.debezium.relational.history.TableChanges.TableChangeType; -import io.debezium.relational.history.TableChanges.TableChangesSerializer; -import io.debezium.text.MultipleParsingExceptions; -import io.debezium.text.ParsingException; import io.debezium.util.Clock; /** @@ -40,28 +29,17 @@ public abstract class AbstractSchemaHistory implements SchemaHistory { public static Field.Set ALL_FIELDS = Field.setOf(NAME, INTERNAL_CONNECTOR_CLASS, INTERNAL_CONNECTOR_ID); protected Configuration config; - private HistoryRecordComparator comparator = HistoryRecordComparator.INSTANCE; - private boolean skipUnparseableDDL; - private Predicate ddlFilter = x -> false; private SchemaHistoryListener listener = SchemaHistoryListener.NOOP; - private boolean useCatalogBeforeSchema; - private boolean preferDdl = false; - private final TableChangesSerializer tableChangesSerializer = new JsonTableChangeSerializer(); + private HistoryRecordProcessorProvider processorProvider = HistoryRecordProcessorProvider.NOOP; protected AbstractSchemaHistory() { } @Override - public void configure(Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, boolean useCatalogBeforeSchema) { + public void configure(Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, HistoryRecordProcessorProvider processorProvider) { this.config = config; - this.comparator = comparator != null ? comparator : HistoryRecordComparator.INSTANCE; - this.skipUnparseableDDL = config.getBoolean(SKIP_UNPARSEABLE_DDL_STATEMENTS); - - final String ddlFilter = config.getString(DDL_FILTER); - this.ddlFilter = (ddlFilter != null) ? Predicates.includes(ddlFilter, Pattern.CASE_INSENSITIVE | Pattern.DOTALL) : (x -> false); this.listener = listener; - this.useCatalogBeforeSchema = useCatalogBeforeSchema; - this.preferDdl = config.getBoolean(INTERNAL_PREFER_DDL); + this.processorProvider = processorProvider; } @Override @@ -88,72 +66,7 @@ public final void record(Map source, Map position, String @Override public void recover(Map, Map> offsets, Tables schema, DdlParser ddlParser) { listener.recoveryStarted(); - Map stopPoints = new HashMap<>(); - offsets.forEach((Map source, Map position) -> { - Document srcDocument = Document.create(); - if (source != null) { - source.forEach(srcDocument::set); - } - stopPoints.put(srcDocument, new HistoryRecord(source, position, null, null, null, null, null)); - }); - - recoverRecords(recovered -> { - listener.onChangeFromHistory(recovered); - Document srcDocument = recovered.document().getDocument(HistoryRecord.Fields.SOURCE); - if (stopPoints.containsKey(srcDocument) && comparator.isAtOrBefore(recovered, stopPoints.get(srcDocument))) { - Array tableChanges = recovered.tableChanges(); - String ddl = recovered.ddl(); - - if (!preferDdl && tableChanges != null && !tableChanges.isEmpty()) { - TableChanges changes = tableChangesSerializer.deserialize(tableChanges, useCatalogBeforeSchema); - for (TableChange entry : changes) { - if (entry.getType() == TableChangeType.CREATE) { - schema.overwriteTable(entry.getTable()); - } - else if (entry.getType() == TableChangeType.ALTER) { - if (entry.getPreviousId() != null) { - schema.removeTable(entry.getPreviousId()); - } - schema.overwriteTable(entry.getTable()); - } - // DROP - else { - schema.removeTable(entry.getId()); - } - } - listener.onChangeApplied(recovered); - } - else if (ddl != null && ddlParser != null) { - if (recovered.databaseName() != null) { - ddlParser.setCurrentDatabase(recovered.databaseName()); // may be null - } - if (recovered.schemaName() != null) { - ddlParser.setCurrentSchema(recovered.schemaName()); // may be null - } - if (ddlFilter.test(ddl)) { - logger.info("a DDL '{}' was filtered out of processing by regular expression '{}'", ddl, - config.getString(DDL_FILTER)); - return; - } - try { - logger.debug("Applying: {}", ddl); - ddlParser.parse(ddl, schema); - listener.onChangeApplied(recovered); - } - catch (final ParsingException | MultipleParsingExceptions e) { - if (skipUnparseableDDL) { - logger.warn("Ignoring unparseable statements '{}' stored in database schema history", ddl, e); - } - else { - throw e; - } - } - } - } - else { - logger.debug("Skipping: {}", recovered.ddl()); - } - }); + recoverRecords(processorProvider.get(offsets, schema, ddlParser)); listener.recoveryStopped(); } diff --git a/debezium-core/src/main/java/io/debezium/relational/history/HistoryRecordProcessor.java b/debezium-core/src/main/java/io/debezium/relational/history/HistoryRecordProcessor.java new file mode 100644 index 00000000000..fd5fa2548c3 --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/relational/history/HistoryRecordProcessor.java @@ -0,0 +1,138 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.relational.history; + +import java.util.HashMap; +import java.util.Map; +import java.util.function.Consumer; +import java.util.function.Predicate; +import java.util.regex.Pattern; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.debezium.config.Configuration; +import io.debezium.document.Array; +import io.debezium.document.Document; +import io.debezium.function.Predicates; +import io.debezium.relational.Tables; +import io.debezium.relational.ddl.DdlParser; +import io.debezium.relational.history.TableChanges.TableChangesSerializer; +import io.debezium.text.MultipleParsingExceptions; +import io.debezium.text.ParsingException; + +import static io.debezium.relational.history.SchemaHistory.DDL_FILTER; +import static io.debezium.relational.history.SchemaHistory.INTERNAL_PREFER_DDL; +import static io.debezium.relational.history.SchemaHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS; + +public class HistoryRecordProcessor implements Consumer { + + protected final Logger logger = LoggerFactory.getLogger(HistoryRecordProcessor.class); + + private final Map stopPoints; + private final Tables schema; + private final DdlParser ddlParser; + private final TableChangesSerializer tableChangesSerializer = new JsonTableChangeSerializer(); + private final Configuration config; + private final HistoryRecordComparator comparator; + private final SchemaHistoryListener listener; + private final boolean useCatalogBeforeSchema; + private final boolean skipUnparseableDDL; + private final boolean preferDdl; + private final Predicate ddlFilter; + + + public HistoryRecordProcessor( + Map, Map> offsets, Tables schema, DdlParser ddlParser, + Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, + boolean useCatalogBeforeSchema) { + this.stopPoints = getStopPoints(offsets); + this.schema = schema; + this.ddlParser = ddlParser; + + this.config = config; + this.comparator = comparator != null ? comparator : HistoryRecordComparator.INSTANCE; + this.listener = listener; + + this.useCatalogBeforeSchema = useCatalogBeforeSchema; + this.skipUnparseableDDL = config.getBoolean(SKIP_UNPARSEABLE_DDL_STATEMENTS); + this.preferDdl = config.getBoolean(INTERNAL_PREFER_DDL); + + final String ddlFilter = config.getString(DDL_FILTER); + this.ddlFilter = (ddlFilter != null) ? Predicates.includes(ddlFilter, Pattern.CASE_INSENSITIVE | Pattern.DOTALL) : (x -> false); + } + + private Map getStopPoints(Map, Map> offsets) { + Map stopPoints = new HashMap<>(); + offsets.forEach((Map source, Map position) -> { + Document srcDocument = Document.create(); + if (source != null) { + source.forEach(srcDocument::set); + } + stopPoints.put(srcDocument, new HistoryRecord(source, position, null, null, null, null, null)); + }); + return stopPoints; + } + + @Override + public void accept(HistoryRecord recovered) { + listener.onChangeFromHistory(recovered); + Document srcDocument = recovered.document().getDocument(HistoryRecord.Fields.SOURCE); + if (stopPoints.containsKey(srcDocument) && comparator.isAtOrBefore(recovered, stopPoints.get(srcDocument))) { + Array tableChanges = recovered.tableChanges(); + String ddl = recovered.ddl(); + + if (!preferDdl && tableChanges != null && !tableChanges.isEmpty()) { + TableChanges changes = tableChangesSerializer.deserialize(tableChanges, useCatalogBeforeSchema); + for (TableChanges.TableChange entry : changes) { + if (entry.getType() == TableChanges.TableChangeType.CREATE) { + schema.overwriteTable(entry.getTable()); + } + else if (entry.getType() == TableChanges.TableChangeType.ALTER) { + if (entry.getPreviousId() != null) { + schema.removeTable(entry.getPreviousId()); + } + schema.overwriteTable(entry.getTable()); + } + // DROP + else { + schema.removeTable(entry.getId()); + } + } + listener.onChangeApplied(recovered); + } + else if (ddl != null && ddlParser != null) { + if (recovered.databaseName() != null) { + ddlParser.setCurrentDatabase(recovered.databaseName()); // may be null + } + if (recovered.schemaName() != null) { + ddlParser.setCurrentSchema(recovered.schemaName()); // may be null + } + if (ddlFilter.test(ddl)) { + logger.info("a DDL '{}' was filtered out of processing by regular expression '{}'", ddl, + config.getString(DDL_FILTER)); + return; + } + try { + logger.debug("Applying: {}", ddl); + ddlParser.parse(ddl, schema); + listener.onChangeApplied(recovered); + } + catch (final ParsingException | MultipleParsingExceptions e) { + if (skipUnparseableDDL) { + logger.warn("Ignoring unparseable statements '{}' stored in database schema history", ddl, e); + } + else { + throw e; + } + } + } + } + else { + logger.debug("Skipping: {}", recovered.ddl()); + } + } +} diff --git a/debezium-core/src/main/java/io/debezium/relational/history/HistoryRecordProcessorProvider.java b/debezium-core/src/main/java/io/debezium/relational/history/HistoryRecordProcessorProvider.java new file mode 100644 index 00000000000..a5d20f3b5e9 --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/relational/history/HistoryRecordProcessorProvider.java @@ -0,0 +1,18 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.relational.history; + +import java.util.Map; +import java.util.function.Consumer; + +import io.debezium.relational.Tables; +import io.debezium.relational.ddl.DdlParser; + +public interface HistoryRecordProcessorProvider { + Consumer get(Map, Map> offsets, Tables schema, DdlParser ddlParser); + + HistoryRecordProcessorProvider NOOP = (o, s, p) -> (r) -> {}; +} diff --git a/debezium-core/src/main/java/io/debezium/relational/history/SchemaHistory.java b/debezium-core/src/main/java/io/debezium/relational/history/SchemaHistory.java index 5a650102980..49f8d1f83ed 100644 --- a/debezium-core/src/main/java/io/debezium/relational/history/SchemaHistory.java +++ b/debezium-core/src/main/java/io/debezium/relational/history/SchemaHistory.java @@ -138,7 +138,21 @@ public interface SchemaHistory { the catalog and the second as the table name, or false if the first should be used as the schema and the second as the table name */ - void configure(Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, boolean useCatalogBeforeSchema); + default void configure(Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, boolean useCatalogBeforeSchema) { + configure(config, comparator, listener, (o, s, p) -> new HistoryRecordProcessor(o, s, p, config, comparator, listener, useCatalogBeforeSchema)); + } + + /** + * Configure this instance. + * + * @param config the configuration for this history store + * @param comparator the function that should be used to compare history records during + * {@link #recover(Map, Map, Tables, DdlParser) recovery}; may be null if the + * {@link HistoryRecordComparator#INSTANCE default comparator} is to be used + * @param listener TODO + * @param processorProvider + */ + void configure(Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, HistoryRecordProcessorProvider processorProvider); /** * Start the history. diff --git a/debezium-core/src/main/java/io/debezium/relational/history/SnapshotAwareSchemaHistory.java b/debezium-core/src/main/java/io/debezium/relational/history/SnapshotAwareSchemaHistory.java new file mode 100644 index 00000000000..18552937651 --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/relational/history/SnapshotAwareSchemaHistory.java @@ -0,0 +1,122 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.relational.history; + +import java.time.Instant; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Consumer; + +import io.debezium.config.Configuration; +import io.debezium.document.Document; +import io.debezium.relational.Tables; +import io.debezium.relational.ddl.DdlParser; + +public class SnapshotAwareSchemaHistory implements SchemaHistory { + + private final SchemaHistory delegate; + private HistoryRecordComparator comparator; + private SchemaHistoryListener listener; + + public SnapshotAwareSchemaHistory(SchemaHistory delegate) { + this.delegate = delegate; + } + + @Override + public void configure(Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, HistoryRecordProcessorProvider processorProvider) { + this.comparator = comparator; + this.listener = listener; + delegate.configure(config, comparator, listener, (o, s, p) -> new HistoryRecordProcessor(o, s, p, processorProvider)); + } + + @Override + public void start() { + delegate.start(); + } + + @Override + public void record(Map source, Map position, String databaseName, String ddl) throws SchemaHistoryException { + delegate.record(source, position, databaseName, ddl); + storeSnapshot(); + } + + @Override + public void record(Map source, Map position, String databaseName, String schemaName, String ddl, TableChanges changes, Instant timestamp) throws SchemaHistoryException { + delegate.record(source, position, databaseName, schemaName, ddl, changes, timestamp); + storeSnapshot(); + } + + private void storeSnapshot() { + // TODO: use SchemaHistorySnapshot implementation + } + + @Override + public void recover(Map, Map> offsets, Tables schema, DdlParser ddlParser) { + delegate.recover(offsets, schema, ddlParser); + } + + @Override + public void stop() { + delegate.stop(); + } + + @Override + public boolean exists() { + return delegate.exists(); + } + + @Override + public boolean storageExists() { + return delegate.storageExists(); + } + + @Override + public void initializeStorage() { + delegate.initializeStorage(); + } + + private class HistoryRecordProcessor implements Consumer { + + private final Map stopPoints = new HashMap<>(); + private final Consumer originalProcessor; + + public HistoryRecordProcessor(Map, Map> offsets, Tables schema, DdlParser ddlParser, HistoryRecordProcessorProvider processorProvider) { + // TODO: find correct snapshots + // TODO: get snapshot positions from offsets + offsets.forEach((Map source, Map position) -> { + Document srcDocument = Document.create(); + if (source != null) { + source.forEach(srcDocument::set); + } + // stopPoints.put(srcDocument, new HistoryRecord(source, position, null, null, null, null, null)); + stopPoints.put(srcDocument, new HistoryRecord(source, null, null, null, null, null, null)); + }); + this.originalProcessor = processorProvider.get(offsets, schema, ddlParser); + } + + /** + * The code ignores records covered by schema snapshot - from the beginning to snapshot + * offsets (SO). Then it switched to original processor which handles records from SO to + * connector offsets (CO). + * + * <--------------> : snapshot processor + * ---------------(SO)-----(CO)----> : schema history + * <-----> : original processor + * @param recovered the input argument + */ + @Override + public void accept(HistoryRecord recovered) { + listener.onChangeFromHistory(recovered); + Document srcDocument = recovered.document().getDocument(HistoryRecord.Fields.SOURCE); + if (stopPoints.containsKey(srcDocument) && comparator.isAtOrBefore(recovered, stopPoints.get(srcDocument))) { + // schema snapshot contains this record, skip processing + listener.onChangeApplied(recovered); + } else { + originalProcessor.accept(recovered); + } + } + } +} diff --git a/debezium-storage/debezium-storage-azure-blob/src/main/java/io/debezium/storage/azure/blob/history/AzureBlobSchemaHistory.java b/debezium-storage/debezium-storage-azure-blob/src/main/java/io/debezium/storage/azure/blob/history/AzureBlobSchemaHistory.java index 78cb185d7a3..d888c91e523 100644 --- a/debezium-storage/debezium-storage-azure-blob/src/main/java/io/debezium/storage/azure/blob/history/AzureBlobSchemaHistory.java +++ b/debezium-storage/debezium-storage-azure-blob/src/main/java/io/debezium/storage/azure/blob/history/AzureBlobSchemaHistory.java @@ -23,6 +23,7 @@ import io.debezium.relational.history.AbstractFileBasedSchemaHistory; import io.debezium.relational.history.HistoryRecord; import io.debezium.relational.history.HistoryRecordComparator; +import io.debezium.relational.history.HistoryRecordProcessorProvider; import io.debezium.relational.history.SchemaHistory; import io.debezium.relational.history.SchemaHistoryException; import io.debezium.relational.history.SchemaHistoryListener; @@ -74,8 +75,8 @@ public class AzureBlobSchemaHistory extends AbstractFileBasedSchemaHistory { private String blobName; @Override - public void configure(Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, boolean useCatalogBeforeSchema) { - super.configure(config, comparator, listener, useCatalogBeforeSchema); + public void configure(Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, HistoryRecordProcessorProvider processorProvider) { + super.configure(config, comparator, listener, processorProvider); if (!config.validateAndRecord(ALL_FIELDS, LOGGER::error)) { throw new DebeziumException( "Error configuring an instance of " + getClass().getSimpleName() + "; check the logs for details"); diff --git a/debezium-storage/debezium-storage-file/src/main/java/io/debezium/storage/file/history/FileSchemaHistory.java b/debezium-storage/debezium-storage-file/src/main/java/io/debezium/storage/file/history/FileSchemaHistory.java index 8ac0f2ad41b..5bac157edbe 100644 --- a/debezium-storage/debezium-storage-file/src/main/java/io/debezium/storage/file/history/FileSchemaHistory.java +++ b/debezium-storage/debezium-storage-file/src/main/java/io/debezium/storage/file/history/FileSchemaHistory.java @@ -24,6 +24,7 @@ import io.debezium.relational.history.AbstractFileBasedSchemaHistory; import io.debezium.relational.history.HistoryRecord; import io.debezium.relational.history.HistoryRecordComparator; +import io.debezium.relational.history.HistoryRecordProcessorProvider; import io.debezium.relational.history.SchemaHistory; import io.debezium.relational.history.SchemaHistoryException; import io.debezium.relational.history.SchemaHistoryListener; @@ -47,7 +48,7 @@ public final class FileSchemaHistory extends AbstractFileBasedSchemaHistory { private Path path; @Override - public void configure(Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, boolean useCatalogBeforeSchema) { + public void configure(Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, HistoryRecordProcessorProvider processorProvider) { if (!config.validateAndRecord(ALL_FIELDS, logger::error)) { throw new DebeziumException( "Error configuring an instance of " + getClass().getSimpleName() + "; check the logs for details"); @@ -55,7 +56,7 @@ public void configure(Configuration config, HistoryRecordComparator comparator, if (running.get()) { throw new SchemaHistoryException("Database schema history file already initialized to " + path); } - super.configure(config, comparator, listener, useCatalogBeforeSchema); + super.configure(config, comparator, listener, processorProvider); path = Paths.get(config.getString(FILE_PATH)); } diff --git a/debezium-storage/debezium-storage-jdbc/src/main/java/io/debezium/storage/jdbc/history/JdbcSchemaHistory.java b/debezium-storage/debezium-storage-jdbc/src/main/java/io/debezium/storage/jdbc/history/JdbcSchemaHistory.java index 82165cc4b60..3ee0697e318 100644 --- a/debezium-storage/debezium-storage-jdbc/src/main/java/io/debezium/storage/jdbc/history/JdbcSchemaHistory.java +++ b/debezium-storage/debezium-storage-jdbc/src/main/java/io/debezium/storage/jdbc/history/JdbcSchemaHistory.java @@ -33,6 +33,7 @@ import io.debezium.relational.history.AbstractSchemaHistory; import io.debezium.relational.history.HistoryRecord; import io.debezium.relational.history.HistoryRecordComparator; +import io.debezium.relational.history.HistoryRecordProcessorProvider; import io.debezium.relational.history.SchemaHistory; import io.debezium.relational.history.SchemaHistoryException; import io.debezium.relational.history.SchemaHistoryListener; @@ -59,12 +60,12 @@ public final class JdbcSchemaHistory extends AbstractSchemaHistory { private JdbcSchemaHistoryConfig config; @Override - public void configure(Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, boolean useCatalogBeforeSchema) { + public void configure(Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, HistoryRecordProcessorProvider processorProvider) { this.config = new JdbcSchemaHistoryConfig(config); if (running.get()) { throw new IllegalStateException("Database history already initialized db: " + this.config.getJdbcUrl()); } - super.configure(config, comparator, listener, useCatalogBeforeSchema); + super.configure(config, comparator, listener, processorProvider); try { conn = DriverManager.getConnection(this.config.getJdbcUrl(), this.config.getUser(), this.config.getPassword()); diff --git a/debezium-storage/debezium-storage-kafka/src/main/java/io/debezium/storage/kafka/history/KafkaSchemaHistory.java b/debezium-storage/debezium-storage-kafka/src/main/java/io/debezium/storage/kafka/history/KafkaSchemaHistory.java index 75dc28eba4f..3d0a708a153 100644 --- a/debezium-storage/debezium-storage-kafka/src/main/java/io/debezium/storage/kafka/history/KafkaSchemaHistory.java +++ b/debezium-storage/debezium-storage-kafka/src/main/java/io/debezium/storage/kafka/history/KafkaSchemaHistory.java @@ -63,6 +63,7 @@ import io.debezium.relational.history.AbstractSchemaHistory; import io.debezium.relational.history.HistoryRecord; import io.debezium.relational.history.HistoryRecordComparator; +import io.debezium.relational.history.HistoryRecordProcessorProvider; import io.debezium.relational.history.SchemaHistory; import io.debezium.relational.history.SchemaHistoryException; import io.debezium.relational.history.SchemaHistoryListener; @@ -202,8 +203,8 @@ private static boolean hasNewTopicConstructorWithOptionals() { } @Override - public void configure(Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, boolean useCatalogBeforeSchema) { - super.configure(config, comparator, listener, useCatalogBeforeSchema); + public void configure(Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, HistoryRecordProcessorProvider processorProvider) { + super.configure(config, comparator, listener, processorProvider); if (!config.validateAndRecord(ALL_FIELDS, LOGGER::error)) { throw new ConnectException("Error configuring an instance of " + getClass().getSimpleName() + "; check the logs for details"); } diff --git a/debezium-storage/debezium-storage-redis/src/main/java/io/debezium/storage/redis/history/RedisSchemaHistory.java b/debezium-storage/debezium-storage-redis/src/main/java/io/debezium/storage/redis/history/RedisSchemaHistory.java index 5067dea0d44..baae47added 100644 --- a/debezium-storage/debezium-storage-redis/src/main/java/io/debezium/storage/redis/history/RedisSchemaHistory.java +++ b/debezium-storage/debezium-storage-redis/src/main/java/io/debezium/storage/redis/history/RedisSchemaHistory.java @@ -24,6 +24,7 @@ import io.debezium.relational.history.AbstractSchemaHistory; import io.debezium.relational.history.HistoryRecord; import io.debezium.relational.history.HistoryRecordComparator; +import io.debezium.relational.history.HistoryRecordProcessorProvider; import io.debezium.relational.history.SchemaHistory; import io.debezium.relational.history.SchemaHistoryException; import io.debezium.relational.history.SchemaHistoryListener; @@ -61,11 +62,11 @@ void connect() { } @Override - public void configure(Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, boolean useCatalogBeforeSchema) { + public void configure(Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, HistoryRecordProcessorProvider processorProvider) { this.config = new RedisSchemaHistoryConfig(config); this.initialRetryDelay = Duration.ofMillis(this.config.getInitialRetryDelay()); this.maxRetryDelay = Duration.ofMillis(this.config.getMaxRetryDelay()); - super.configure(config, comparator, listener, useCatalogBeforeSchema); + super.configure(config, comparator, listener, processorProvider); } @Override diff --git a/debezium-storage/debezium-storage-rocketmq/src/main/java/io/debezium/storage/rocketmq/history/RocketMqSchemaHistory.java b/debezium-storage/debezium-storage-rocketmq/src/main/java/io/debezium/storage/rocketmq/history/RocketMqSchemaHistory.java index 8a39ec84648..b68772ee6dc 100644 --- a/debezium-storage/debezium-storage-rocketmq/src/main/java/io/debezium/storage/rocketmq/history/RocketMqSchemaHistory.java +++ b/debezium-storage/debezium-storage-rocketmq/src/main/java/io/debezium/storage/rocketmq/history/RocketMqSchemaHistory.java @@ -38,6 +38,7 @@ import io.debezium.relational.history.AbstractSchemaHistory; import io.debezium.relational.history.HistoryRecord; import io.debezium.relational.history.HistoryRecordComparator; +import io.debezium.relational.history.HistoryRecordProcessorProvider; import io.debezium.relational.history.SchemaHistory; import io.debezium.relational.history.SchemaHistoryException; import io.debezium.relational.history.SchemaHistoryListener; @@ -133,8 +134,8 @@ private static Field.Validator forRocketMq(final Field.Validator validator) { } @Override - public void configure(Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, boolean useCatalogBeforeSchema) { - super.configure(config, comparator, listener, useCatalogBeforeSchema); + public void configure(Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, HistoryRecordProcessorProvider processorProvider) { + super.configure(config, comparator, listener, processorProvider); this.topicName = config.getString(TOPIC); this.dbHistoryName = config.getString(SchemaHistory.NAME, UUID.randomUUID().toString()); this.maxRecoveryAttempts = config.getInteger(RECOVERY_POLL_ATTEMPTS); diff --git a/debezium-storage/debezium-storage-s3/src/main/java/io/debezium/storage/s3/history/S3SchemaHistory.java b/debezium-storage/debezium-storage-s3/src/main/java/io/debezium/storage/s3/history/S3SchemaHistory.java index 1e856b1361c..df5f871132a 100644 --- a/debezium-storage/debezium-storage-s3/src/main/java/io/debezium/storage/s3/history/S3SchemaHistory.java +++ b/debezium-storage/debezium-storage-s3/src/main/java/io/debezium/storage/s3/history/S3SchemaHistory.java @@ -22,6 +22,7 @@ import io.debezium.relational.history.AbstractFileBasedSchemaHistory; import io.debezium.relational.history.HistoryRecord; import io.debezium.relational.history.HistoryRecordComparator; +import io.debezium.relational.history.HistoryRecordProcessorProvider; import io.debezium.relational.history.SchemaHistory; import io.debezium.relational.history.SchemaHistoryException; import io.debezium.relational.history.SchemaHistoryListener; @@ -112,8 +113,8 @@ public class S3SchemaHistory extends AbstractFileBasedSchemaHistory { private volatile S3Client client = null; @Override - public void configure(Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, boolean useCatalogBeforeSchema) { - super.configure(config, comparator, listener, useCatalogBeforeSchema); + public void configure(Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, HistoryRecordProcessorProvider processorProvider) { + super.configure(config, comparator, listener, processorProvider); if (!config.validateAndRecord(ALL_FIELDS, LOGGER::error)) { throw new DebeziumException( "Error configuring an instance of " + getClass().getSimpleName() + "; check the logs for details");