Skip to content

Commit

Permalink
CXP-2779: extracted HistoryRecordProcessor logic, added SnapshotAware…
Browse files Browse the repository at this point in the history
… SchemaHistory implementation
  • Loading branch information
mikekamornikov committed Dec 10, 2023
1 parent 7c8892c commit 493101f
Show file tree
Hide file tree
Showing 15 changed files with 337 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public void shouldStartWithEmptyTopicAndStoreDataAndRecoverAllState() throws Exc
}

private void testHistoryTopicContent(String topicName, boolean skipUnparseableDDL) {
interceptor = new LogInterceptor(KafkaSchemaHistory.class);
interceptor = new LogInterceptor(HistoryRecordProcessor.class);
// Start up the history ...
Configuration config = Configuration.create()
.with(KafkaSchemaHistory.BOOTSTRAP_SERVERS, kafka.brokerList())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import io.debezium.relational.Tables;
import io.debezium.relational.ddl.DdlParser;
import io.debezium.relational.history.HistoryRecordComparator;
import io.debezium.relational.history.HistoryRecordProcessorProvider;
import io.debezium.relational.history.SchemaHistory;
import io.debezium.relational.history.SchemaHistoryException;
import io.debezium.relational.history.SchemaHistoryListener;
Expand Down Expand Up @@ -2907,8 +2908,8 @@ public boolean exists() {

@Override
public void configure(Configuration config, HistoryRecordComparator comparator,
SchemaHistoryListener listener, boolean useCatalogBeforeSchema) {
delegate.configure(config, comparator, listener, useCatalogBeforeSchema);
SchemaHistoryListener listener, HistoryRecordProcessorProvider processorProvider) {
delegate.configure(config, comparator, listener, processorProvider);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@
import io.debezium.relational.Selectors.TableIdToStringMapper;
import io.debezium.relational.Tables.TableFilter;
import io.debezium.relational.history.HistoryRecordComparator;
import io.debezium.relational.history.HistoryRecordProcessor;
import io.debezium.relational.history.HistoryRecordProcessorProvider;
import io.debezium.relational.history.SchemaHistory;
import io.debezium.relational.history.SchemaHistoryListener;
import io.debezium.relational.history.SchemaHistoryMetrics;
import io.debezium.relational.history.SnapshotAwareSchemaHistory;

/**
* Configuration options shared across the relational CDC connectors which use a persistent database schema history.
Expand Down Expand Up @@ -133,9 +137,13 @@ public SchemaHistory getSchemaHistory() {
.withDefault(SchemaHistory.INTERNAL_CONNECTOR_ID, logicalName)
.build();

schemaHistory = new SnapshotAwareSchemaHistory(schemaHistory);

HistoryRecordComparator historyComparator = getHistoryRecordComparator();
schemaHistory.configure(schemaHistoryConfig, historyComparator,
new SchemaHistoryMetrics(this, multiPartitionMode()), useCatalogBeforeSchema()); // validates
SchemaHistoryListener historyListener = new SchemaHistoryMetrics(this, multiPartitionMode());
HistoryRecordProcessorProvider processorProvider = (o, s, p) -> new HistoryRecordProcessor(o, s, p, schemaHistoryConfig, historyComparator, historyListener,
useCatalogBeforeSchema());
schemaHistory.configure(schemaHistoryConfig, historyComparator, historyListener, processorProvider); // validates

return schemaHistory;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,16 @@
package io.debezium.relational.history;

import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.regex.Pattern;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.document.Array;
import io.debezium.document.Document;
import io.debezium.function.Predicates;
import io.debezium.relational.Tables;
import io.debezium.relational.ddl.DdlParser;
import io.debezium.relational.history.TableChanges.TableChange;
import io.debezium.relational.history.TableChanges.TableChangeType;
import io.debezium.relational.history.TableChanges.TableChangesSerializer;
import io.debezium.text.MultipleParsingExceptions;
import io.debezium.text.ParsingException;
import io.debezium.util.Clock;

/**
Expand All @@ -40,28 +29,17 @@ public abstract class AbstractSchemaHistory implements SchemaHistory {
public static Field.Set ALL_FIELDS = Field.setOf(NAME, INTERNAL_CONNECTOR_CLASS, INTERNAL_CONNECTOR_ID);

protected Configuration config;
private HistoryRecordComparator comparator = HistoryRecordComparator.INSTANCE;
private boolean skipUnparseableDDL;
private Predicate<String> ddlFilter = x -> false;
private SchemaHistoryListener listener = SchemaHistoryListener.NOOP;
private boolean useCatalogBeforeSchema;
private boolean preferDdl = false;
private final TableChangesSerializer<Array> tableChangesSerializer = new JsonTableChangeSerializer();
private HistoryRecordProcessorProvider processorProvider = HistoryRecordProcessor::new;

protected AbstractSchemaHistory() {
}

@Override
public void configure(Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, boolean useCatalogBeforeSchema) {
public void configure(Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, HistoryRecordProcessorProvider processorProvider) {
this.config = config;
this.comparator = comparator != null ? comparator : HistoryRecordComparator.INSTANCE;
this.skipUnparseableDDL = config.getBoolean(SKIP_UNPARSEABLE_DDL_STATEMENTS);

final String ddlFilter = config.getString(DDL_FILTER);
this.ddlFilter = (ddlFilter != null) ? Predicates.includes(ddlFilter, Pattern.CASE_INSENSITIVE | Pattern.DOTALL) : (x -> false);
this.listener = listener;
this.useCatalogBeforeSchema = useCatalogBeforeSchema;
this.preferDdl = config.getBoolean(INTERNAL_PREFER_DDL);
this.processorProvider = processorProvider;
}

@Override
Expand All @@ -88,72 +66,7 @@ public final void record(Map<String, ?> source, Map<String, ?> position, String
@Override
public void recover(Map<Map<String, ?>, Map<String, ?>> offsets, Tables schema, DdlParser ddlParser) {
listener.recoveryStarted();
Map<Document, HistoryRecord> stopPoints = new HashMap<>();
offsets.forEach((Map<String, ?> source, Map<String, ?> position) -> {
Document srcDocument = Document.create();
if (source != null) {
source.forEach(srcDocument::set);
}
stopPoints.put(srcDocument, new HistoryRecord(source, position, null, null, null, null, null));
});

recoverRecords(recovered -> {
listener.onChangeFromHistory(recovered);
Document srcDocument = recovered.document().getDocument(HistoryRecord.Fields.SOURCE);
if (stopPoints.containsKey(srcDocument) && comparator.isAtOrBefore(recovered, stopPoints.get(srcDocument))) {
Array tableChanges = recovered.tableChanges();
String ddl = recovered.ddl();

if (!preferDdl && tableChanges != null && !tableChanges.isEmpty()) {
TableChanges changes = tableChangesSerializer.deserialize(tableChanges, useCatalogBeforeSchema);
for (TableChange entry : changes) {
if (entry.getType() == TableChangeType.CREATE) {
schema.overwriteTable(entry.getTable());
}
else if (entry.getType() == TableChangeType.ALTER) {
if (entry.getPreviousId() != null) {
schema.removeTable(entry.getPreviousId());
}
schema.overwriteTable(entry.getTable());
}
// DROP
else {
schema.removeTable(entry.getId());
}
}
listener.onChangeApplied(recovered);
}
else if (ddl != null && ddlParser != null) {
if (recovered.databaseName() != null) {
ddlParser.setCurrentDatabase(recovered.databaseName()); // may be null
}
if (recovered.schemaName() != null) {
ddlParser.setCurrentSchema(recovered.schemaName()); // may be null
}
if (ddlFilter.test(ddl)) {
logger.info("a DDL '{}' was filtered out of processing by regular expression '{}'", ddl,
config.getString(DDL_FILTER));
return;
}
try {
logger.debug("Applying: {}", ddl);
ddlParser.parse(ddl, schema);
listener.onChangeApplied(recovered);
}
catch (final ParsingException | MultipleParsingExceptions e) {
if (skipUnparseableDDL) {
logger.warn("Ignoring unparseable statements '{}' stored in database schema history", ddl, e);
}
else {
throw e;
}
}
}
}
else {
logger.debug("Skipping: {}", recovered.ddl());
}
});
recoverRecords(processorProvider.get(offsets, schema, ddlParser));
listener.recoveryStopped();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.relational.history;

import static io.debezium.relational.history.SchemaHistory.DDL_FILTER;
import static io.debezium.relational.history.SchemaHistory.INTERNAL_PREFER_DDL;
import static io.debezium.relational.history.SchemaHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS;

import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.regex.Pattern;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.debezium.config.Configuration;
import io.debezium.document.Array;
import io.debezium.document.Document;
import io.debezium.function.Predicates;
import io.debezium.relational.Tables;
import io.debezium.relational.ddl.DdlParser;
import io.debezium.relational.history.TableChanges.TableChangesSerializer;
import io.debezium.text.MultipleParsingExceptions;
import io.debezium.text.ParsingException;

public class HistoryRecordProcessor implements Consumer<HistoryRecord> {

protected final Logger logger = LoggerFactory.getLogger(HistoryRecordProcessor.class);

private final Map<Document, HistoryRecord> stopPoints;
private final Tables schema;
private final DdlParser ddlParser;
private final TableChangesSerializer<Array> tableChangesSerializer = new JsonTableChangeSerializer();
private final Configuration config;
private final HistoryRecordComparator comparator;
private final SchemaHistoryListener listener;
private final boolean useCatalogBeforeSchema;
private boolean skipUnparseableDDL;
private boolean preferDdl;
private Predicate<String> ddlFilter = x -> false;

public HistoryRecordProcessor(Map<Map<String, ?>, Map<String, ?>> offsets, Tables schema, DdlParser ddlParser) {
this(offsets, schema, ddlParser, null, null, null, false);
}

public HistoryRecordProcessor(
Map<Map<String, ?>, Map<String, ?>> offsets, Tables schema, DdlParser ddlParser,
Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener,
boolean useCatalogBeforeSchema) {
this.stopPoints = getStopPoints(offsets);
this.schema = schema;
this.ddlParser = ddlParser;

this.config = config;
this.comparator = comparator != null ? comparator : HistoryRecordComparator.INSTANCE;
this.listener = listener != null ? listener : SchemaHistoryListener.NOOP;

this.useCatalogBeforeSchema = useCatalogBeforeSchema;
if (config != null) {
this.skipUnparseableDDL = config.getBoolean(SKIP_UNPARSEABLE_DDL_STATEMENTS);
this.preferDdl = config.getBoolean(INTERNAL_PREFER_DDL);

final String ddlFilter = config.getString(DDL_FILTER);
this.ddlFilter = (ddlFilter != null) ? Predicates.includes(ddlFilter, Pattern.CASE_INSENSITIVE | Pattern.DOTALL) : (x -> false);
}
}

private Map<Document, HistoryRecord> getStopPoints(Map<Map<String, ?>, Map<String, ?>> offsets) {
Map<Document, HistoryRecord> stopPoints = new HashMap<>();
offsets.forEach((Map<String, ?> source, Map<String, ?> position) -> {
Document srcDocument = Document.create();
if (source != null) {
source.forEach(srcDocument::set);
}
stopPoints.put(srcDocument, new HistoryRecord(source, position, null, null, null, null, null));
});
return stopPoints;
}

@Override
public void accept(HistoryRecord recovered) {
listener.onChangeFromHistory(recovered);
Document srcDocument = recovered.document().getDocument(HistoryRecord.Fields.SOURCE);
if (stopPoints.containsKey(srcDocument) && comparator.isAtOrBefore(recovered, stopPoints.get(srcDocument))) {
Array tableChanges = recovered.tableChanges();
String ddl = recovered.ddl();

if (!preferDdl && tableChanges != null && !tableChanges.isEmpty()) {
TableChanges changes = tableChangesSerializer.deserialize(tableChanges, useCatalogBeforeSchema);
for (TableChanges.TableChange entry : changes) {
if (entry.getType() == TableChanges.TableChangeType.CREATE) {
schema.overwriteTable(entry.getTable());
}
else if (entry.getType() == TableChanges.TableChangeType.ALTER) {
if (entry.getPreviousId() != null) {
schema.removeTable(entry.getPreviousId());
}
schema.overwriteTable(entry.getTable());
}
// DROP
else {
schema.removeTable(entry.getId());
}
}
listener.onChangeApplied(recovered);
}
else if (ddl != null && ddlParser != null) {
if (recovered.databaseName() != null) {
ddlParser.setCurrentDatabase(recovered.databaseName()); // may be null
}
if (recovered.schemaName() != null) {
ddlParser.setCurrentSchema(recovered.schemaName()); // may be null
}
if (ddlFilter.test(ddl)) {
logger.info("a DDL '{}' was filtered out of processing by regular expression '{}'", ddl,
config.getString(DDL_FILTER));
return;
}
try {
logger.debug("Applying: {}", ddl);
ddlParser.parse(ddl, schema);
listener.onChangeApplied(recovered);
}
catch (final ParsingException | MultipleParsingExceptions e) {
if (skipUnparseableDDL) {
logger.warn("Ignoring unparseable statements '{}' stored in database schema history", ddl, e);
}
else {
throw e;
}
}
}
}
else {
logger.debug("Skipping: {}", recovered.ddl());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.relational.history;

import java.util.Map;
import java.util.function.Consumer;

import io.debezium.relational.Tables;
import io.debezium.relational.ddl.DdlParser;

public interface HistoryRecordProcessorProvider {
Consumer<HistoryRecord> get(Map<Map<String, ?>, Map<String, ?>> offsets, Tables schema, DdlParser ddlParser);
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,21 @@ public interface SchemaHistory {
the catalog and the second as the table name, or false if the first should be used as the schema and the
second as the table name
*/
void configure(Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, boolean useCatalogBeforeSchema);
default void configure(Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, boolean useCatalogBeforeSchema) {
configure(config, comparator, listener, (o, s, p) -> new HistoryRecordProcessor(o, s, p, config, comparator, listener, useCatalogBeforeSchema));
}

/**
* Configure this instance.
*
* @param config the configuration for this history store
* @param comparator the function that should be used to compare history records during
* {@link #recover(Map, Map, Tables, DdlParser) recovery}; may be null if the
* {@link HistoryRecordComparator#INSTANCE default comparator} is to be used
* @param listener TODO
* @param processorProvider TODO
*/
void configure(Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, HistoryRecordProcessorProvider processorProvider);

/**
* Start the history.
Expand Down
Loading

0 comments on commit 493101f

Please sign in to comment.