Skip to content

Commit

Permalink
CXP-2779: simplified SchemaHistorySnapshot interface
Browse files Browse the repository at this point in the history
  • Loading branch information
mikekamornikov committed Dec 12, 2023
1 parent c7393e7 commit e7e4897
Show file tree
Hide file tree
Showing 8 changed files with 125 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1164,6 +1164,11 @@ protected HistoryRecordComparator getHistoryRecordComparator() {
return new MySqlHistoryRecordComparator(gtidSourceFilter());
}

@Override
protected SchemaPartitioner getSchemaPartitioner() {
return SchemaPartitioner.NOOP;
}

public static boolean isBuiltInDatabase(String databaseName) {
if (databaseName == null) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -829,6 +829,11 @@ protected HistoryRecordComparator getHistoryRecordComparator() {
return getAdapter().getHistoryRecordComparator();
}

@Override
protected SchemaPartitioner getSchemaPartitioner() {
return SchemaPartitioner.NOOP;
}

/**
* Defines modes of representation of {@code interval} datatype
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,11 @@ protected boolean isPositionAtOrBefore(Document recorded, Document desired) {
};
}

@Override
protected SchemaPartitioner getSchemaPartitioner() {
return (s) -> TableFilter.fromPredicate((t) -> t.catalog().equals(s.get("database")));
}

@Override
public String getContextName() {
return Module.contextName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
package io.debezium.relational;

import java.util.Map;
import java.util.function.Predicate;
import java.util.regex.Pattern;

Expand Down Expand Up @@ -137,7 +138,7 @@ public SchemaHistory getSchemaHistory(Tables initialSchema) {
.withDefault(SchemaHistory.INTERNAL_CONNECTOR_ID, logicalName)
.build();

schemaHistory = new SnapshotAwareSchemaHistory(schemaHistory, initialSchema);
schemaHistory = new SnapshotAwareSchemaHistory(schemaHistory, initialSchema, getSchemaPartitioner());

HistoryRecordComparator historyComparator = getHistoryRecordComparator();
SchemaHistoryListener historyListener = new SchemaHistoryMetrics(this, multiPartitionMode());
Expand Down Expand Up @@ -185,4 +186,11 @@ public boolean storeOnlyCapturedDatabases() {
*/
protected abstract HistoryRecordComparator getHistoryRecordComparator();

protected abstract SchemaPartitioner getSchemaPartitioner();

public interface SchemaPartitioner {
TableFilter fromPartition(Map<String, ?> source);

public static SchemaPartitioner NOOP = (s) -> Tables.TableFilter.includeAll();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ public HistoryRecord(Document document) {
this.doc = document;
}

public HistoryRecord(Map<String, ?> source, Map<String, ?> position) {
this(source, position, null, null, null, null, null);
}

public HistoryRecord(Map<String, ?> source, Map<String, ?> position, String databaseName, String schemaName,
String ddl, TableChanges changes, Instant timestamp) {
this.doc = Document.create();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ private Map<Document, HistoryRecord> getStopPoints(Map<Map<String, ?>, Map<Strin
if (source != null) {
source.forEach(srcDocument::set);
}
stopPoints.put(srcDocument, new HistoryRecord(source, position, null, null, null, null, null));
stopPoints.put(srcDocument, new HistoryRecord(source, position));
});
return stopPoints;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,30 +12,33 @@

import io.debezium.config.Configuration;
import io.debezium.document.Document;
import io.debezium.relational.HistorizedRelationalDatabaseConnectorConfig.SchemaPartitioner;
import io.debezium.relational.Tables;
import io.debezium.relational.ddl.DdlParser;
import io.debezium.relational.history.snapshot.SchemaHistorySnapshot;

public class SnapshotAwareSchemaHistory implements SchemaHistory {

private final SchemaHistory delegate;
private Tables schema;
private HistoryRecordComparator comparator;
private SchemaHistoryListener listener;
private SchemaHistorySnapshot snapshot = SchemaHistorySnapshot.NOOP;
private final Tables schema;
private final SchemaPartitioner schemaPartitioner;
private SnapshotProcessor snapshotProcessor = new SnapshotProcessor(SchemaHistorySnapshot.NOOP, HistoryRecordComparator.INSTANCE, SchemaPartitioner.NOOP);

public SnapshotAwareSchemaHistory(SchemaHistory delegate, Tables initialSchema) {
public SnapshotAwareSchemaHistory(SchemaHistory delegate, Tables initialSchema, SchemaPartitioner schemaPartitioner) {
this.delegate = delegate;
this.schema = initialSchema;
this.schemaPartitioner = schemaPartitioner;
}

@Override
public void configure(Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, HistoryRecordProcessorProvider processorProvider) {
this.comparator = comparator;
this.listener = listener;
delegate.configure(config, comparator, listener, (o, s, p) -> new HistoryRecordProcessor(o, s, p, processorProvider));
this.snapshotProcessor = new SnapshotProcessor(getSchemaHistorySnapshot(config), comparator, schemaPartitioner);
delegate.configure(config, comparator, listener, (o, s, p) -> new HistoryRecordProcessor(o, s, p, comparator, listener, snapshotProcessor, processorProvider));
}

// TODO: init snapshot
private SchemaHistorySnapshot getSchemaHistorySnapshot(Configuration config) {
// TODO: use config to init SchemaHistorySnapshot instance
return SchemaHistorySnapshot.NOOP;
}

@Override
Expand All @@ -46,19 +49,18 @@ public void start() {
@Override
public void record(Map<String, ?> source, Map<String, ?> position, String databaseName, String ddl) throws SchemaHistoryException {
delegate.record(source, position, databaseName, ddl);
snapshot.save(source, position, schema);
snapshotProcessor.save(source, position, schema);
}

@Override
public void record(Map<String, ?> source, Map<String, ?> position, String databaseName, String schemaName, String ddl, TableChanges changes, Instant timestamp)
throws SchemaHistoryException {
delegate.record(source, position, databaseName, schemaName, ddl, changes, timestamp);
snapshot.save(source, position, schema);
snapshotProcessor.save(source, position, schema);
}

@Override
public void recover(Map<Map<String, ?>, Map<String, ?>> offsets, Tables schema, DdlParser ddlParser) {
snapshot.read(offsets, schema);
delegate.recover(offsets, schema, ddlParser);
}

Expand All @@ -82,13 +84,80 @@ public void initializeStorage() {
delegate.initializeStorage();
}

private class HistoryRecordProcessor implements Consumer<HistoryRecord> {
private static class SnapshotProcessor {
private final SchemaHistorySnapshot adapter;
private final HistoryRecordComparator comparator;
private final SchemaPartitioner schemaPartitioner;

public SnapshotProcessor(SchemaHistorySnapshot adapter, HistoryRecordComparator comparator, SchemaPartitioner schemaPartitioner) {
this.adapter = adapter;
this.comparator = comparator;
this.schemaPartitioner = schemaPartitioner;
}

public void save(Map<String, ?> source, Map<String, ?> position, Tables schema) {
adapter.save(source, position, getPartialSchema(source, schema));
}

public Map<Map<String, ?>, Map<String, ?>> read(Map<Map<String, ?>, Map<String, ?>> offsets, Tables schema) {
Map<Map<String, ?>, Map<String, ?>> snapshotOffsets = getSnapshotOffsets(offsets);
snapshotOffsets.forEach((Map<String, ?> source, Map<String, ?> position) -> {
if (position == null) {
return;
}

Tables partialSchema = adapter.read(source, position);
// that is what could be schema.merge(partialSchema)
partialSchema.tableIds().forEach((tableId) -> {
schema.overwriteTable(partialSchema.forTable(tableId));
});
snapshotOffsets.put(source, position);
});
return snapshotOffsets;
}

private Map<Map<String, ?>, Map<String, ?>> getSnapshotOffsets(Map<Map<String, ?>, Map<String, ?>> offsets) {
Map<Map<String, ?>, Map<String, ?>> snapshotOffsets = new HashMap<>();
offsets.forEach((Map<String, ?> source, Map<String, ?> position) -> {
HistoryRecord historyRecord = new HistoryRecord(source, position);
Map<String, ?> snapshotPosition = adapter.find(source).stream()
// snapshot position keys match connector position keys
.filter((p) -> p.keySet().equals(position.keySet()))
// snapshot position is smaller than connector position
.filter((p) -> comparator.isAtOrBefore(new HistoryRecord(source, p), historyRecord))
// sort positions ASC and get last
.max((p1, p2) -> {
HistoryRecord h1 = new HistoryRecord(source, p1);
HistoryRecord h2 = new HistoryRecord(source, p2);
return comparator.isAtOrBefore(h1, h2) ? 1 : comparator.isAtOrBefore(h2, h1) ? 0 : -1;
})
// null means there is no valid snapshot for source
.orElse(null);

snapshotOffsets.put(source, snapshotPosition);
});
return snapshotOffsets;
}

private Tables getPartialSchema(Map<String, ?> source, Tables schema) {
if (schemaPartitioner == null) {
return schema;
}
return schema.subset(schemaPartitioner.fromPartition(source));
}
}

private static class HistoryRecordProcessor implements Consumer<HistoryRecord> {

private final Map<Document, HistoryRecord> stopPoints;
private final HistoryRecordComparator comparator;
private final SchemaHistoryListener listener;
private final Consumer<HistoryRecord> originalProcessor;

public HistoryRecordProcessor(Map<Map<String, ?>, Map<String, ?>> offsets, Tables schema, DdlParser ddlParser, HistoryRecordProcessorProvider processorProvider) {
this.stopPoints = getStopPoints(snapshot.getOffsets(offsets));
public HistoryRecordProcessor(Map<Map<String, ?>, Map<String, ?>> offsets, Tables schema, DdlParser ddlParser, HistoryRecordComparator comparator, SchemaHistoryListener listener, SnapshotProcessor snapshotProcessor, HistoryRecordProcessorProvider processorProvider) {
this.stopPoints = getStopPoints(snapshotProcessor.read(offsets, schema));
this.comparator = comparator;
this.listener = listener;
this.originalProcessor = processorProvider.get(offsets, schema, ddlParser);
}

Expand All @@ -99,7 +168,7 @@ private Map<Document, HistoryRecord> getStopPoints(Map<Map<String, ?>, Map<Strin
if (source != null) {
source.forEach(srcDocument::set);
}
stopPoints.put(srcDocument, new HistoryRecord(source, position, null, null, null, null, null));
stopPoints.put(srcDocument, new HistoryRecord(source, position));
});
return stopPoints;
}
Expand All @@ -112,13 +181,13 @@ private Map<Document, HistoryRecord> getStopPoints(Map<Map<String, ?>, Map<Strin
* <--------------> : snapshot processor
* ---------------(SO)-----(CO)----> : schema history
* <-----> : original processor
* @param recovered the input argument
* @param recovered the recovered schema history record
*/
@Override
public void accept(HistoryRecord recovered) {
listener.onChangeFromHistory(recovered);
Document srcDocument = recovered.document().getDocument(HistoryRecord.Fields.SOURCE);
if (stopPoints.containsKey(srcDocument) && comparator.isAtOrBefore(recovered, stopPoints.get(srcDocument))) {
if (stopPoints.containsKey(srcDocument) && stopPoints.get(srcDocument).isValid() && comparator.isAtOrBefore(recovered, stopPoints.get(srcDocument))) {
// schema snapshot contains this record, skip processing
listener.onChangeApplied(recovered);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,32 +7,19 @@

import io.debezium.relational.Tables;

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public interface SchemaHistorySnapshot {

String CONFIGURATION_FIELD_PREFIX_STRING = "schema.history.internal.snapshot.";

void save(Map<String, ?> source, Map<String, ?> position, Tables schema);
Tables read(Map<String, ?> source, Map<String, ?> position);
List<Map<String, ?>> find(Map<String, ?> source);

void read(Map<Map<String, ?>, Map<String, ?>> offsets, Tables schema);

/**
* TODO: use connectorOffsets to:
* 1. find all serialized snapshots
* 2. filter out the ones not matching source partition
* 3. filter out the ones with offset structure which differs from given
* 4. sort the list by offsets using HistoryRecordComparator
* 5. find the closest (smaller) offsets to the given ones
*
* TODO: implement offset -> snapshot name
* TODO: implement snapshot name -> offset
*
* @param connectorOffsets
* @return
*/
Map<Map<String, ?>, Map<String, ?>> getOffsets(Map<Map<String, ?>, Map<String, ?>> connectorOffsets);
// TODO: implement offset -> snapshot name
// TODO: implement snapshot name -> offset

SchemaHistorySnapshot NOOP = new SchemaHistorySnapshot() {
@Override
Expand All @@ -41,13 +28,13 @@ public void save(Map<String, ?> source, Map<String, ?> position, Tables schema)
}

@Override
public void read(Map<Map<String, ?>, Map<String, ?>> offsets, Tables schema) {

public Tables read(Map<String, ?> source, Map<String, ?> position) {
return null;
}

@Override
public Map<Map<String, ?>, Map<String, ?>> getOffsets(Map<Map<String, ?>, Map<String, ?>> connectorOffsets) {
return connectorOffsets.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> null));
public List<Map<String, ?>> find(Map<String, ?> source) {
return List.of();
}
};
}

0 comments on commit e7e4897

Please sign in to comment.