From c7393e7bf577149312e994b1e378908fe35227f0 Mon Sep 17 00:00:00 2001 From: Mikhail Kamornikau Date: Mon, 11 Dec 2023 10:19:15 +0100 Subject: [PATCH] CXP-2779: introduced SchemaHistorySnapshot interface --- ...izedRelationalDatabaseConnectorConfig.java | 4 +- .../HistorizedRelationalDatabaseSchema.java | 2 +- .../history/SnapshotAwareSchemaHistory.java | 32 ++++++----- .../snapshot/SchemaHistorySnapshot.java | 53 +++++++++++++++++++ 4 files changed, 75 insertions(+), 16 deletions(-) create mode 100644 debezium-core/src/main/java/io/debezium/relational/history/snapshot/SchemaHistorySnapshot.java diff --git a/debezium-core/src/main/java/io/debezium/relational/HistorizedRelationalDatabaseConnectorConfig.java b/debezium-core/src/main/java/io/debezium/relational/HistorizedRelationalDatabaseConnectorConfig.java index f1afddbc546..7dc9d51be51 100644 --- a/debezium-core/src/main/java/io/debezium/relational/HistorizedRelationalDatabaseConnectorConfig.java +++ b/debezium-core/src/main/java/io/debezium/relational/HistorizedRelationalDatabaseConnectorConfig.java @@ -119,7 +119,7 @@ protected HistorizedRelationalDatabaseConnectorConfig(Class new HistoryRecordProcessor(o, s, p, processorProvider)); + + // TODO: init snapshot } @Override @@ -40,22 +46,19 @@ public void start() { @Override public void record(Map source, Map position, String databaseName, String ddl) throws SchemaHistoryException { delegate.record(source, position, databaseName, ddl); - storeSnapshot(); + snapshot.save(source, position, schema); } @Override public void record(Map source, Map position, String databaseName, String schemaName, String ddl, TableChanges changes, Instant timestamp) throws SchemaHistoryException { delegate.record(source, position, databaseName, schemaName, ddl, changes, timestamp); - storeSnapshot(); - } - - private void storeSnapshot() { - // TODO: use SchemaHistorySnapshot implementation + snapshot.save(source, position, schema); } @Override public void recover(Map, Map> offsets, Tables schema, DdlParser ddlParser) { + snapshot.read(offsets, schema); delegate.recover(offsets, schema, ddlParser); } @@ -81,21 +84,24 @@ public void initializeStorage() { private class HistoryRecordProcessor implements Consumer { - private final Map stopPoints = new HashMap<>(); + private final Map stopPoints; private final Consumer originalProcessor; public HistoryRecordProcessor(Map, Map> offsets, Tables schema, DdlParser ddlParser, HistoryRecordProcessorProvider processorProvider) { - // TODO: find correct snapshots - // TODO: get snapshot positions from offsets + this.stopPoints = getStopPoints(snapshot.getOffsets(offsets)); + this.originalProcessor = processorProvider.get(offsets, schema, ddlParser); + } + + private Map getStopPoints(Map, Map> offsets) { + Map stopPoints = new HashMap<>(); offsets.forEach((Map source, Map position) -> { Document srcDocument = Document.create(); if (source != null) { source.forEach(srcDocument::set); } - // stopPoints.put(srcDocument, new HistoryRecord(source, position, null, null, null, null, null)); - stopPoints.put(srcDocument, new HistoryRecord(source, null, null, null, null, null, null)); + stopPoints.put(srcDocument, new HistoryRecord(source, position, null, null, null, null, null)); }); - this.originalProcessor = processorProvider.get(offsets, schema, ddlParser); + return stopPoints; } /** diff --git a/debezium-core/src/main/java/io/debezium/relational/history/snapshot/SchemaHistorySnapshot.java b/debezium-core/src/main/java/io/debezium/relational/history/snapshot/SchemaHistorySnapshot.java new file mode 100644 index 00000000000..265afa22f1d --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/relational/history/snapshot/SchemaHistorySnapshot.java @@ -0,0 +1,53 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.relational.history.snapshot; + +import io.debezium.relational.Tables; + +import java.util.Map; +import java.util.stream.Collectors; + +public interface SchemaHistorySnapshot { + + String CONFIGURATION_FIELD_PREFIX_STRING = "schema.history.internal.snapshot."; + + void save(Map source, Map position, Tables schema); + + void read(Map, Map> offsets, Tables schema); + + /** + * TODO: use connectorOffsets to: + * 1. find all serialized snapshots + * 2. filter out the ones not matching source partition + * 3. filter out the ones with offset structure which differs from given + * 4. sort the list by offsets using HistoryRecordComparator + * 5. find the closest (smaller) offsets to the given ones + * + * TODO: implement offset -> snapshot name + * TODO: implement snapshot name -> offset + * + * @param connectorOffsets + * @return + */ + Map, Map> getOffsets(Map, Map> connectorOffsets); + + SchemaHistorySnapshot NOOP = new SchemaHistorySnapshot() { + @Override + public void save(Map source, Map position, Tables schema) { + + } + + @Override + public void read(Map, Map> offsets, Tables schema) { + + } + + @Override + public Map, Map> getOffsets(Map, Map> connectorOffsets) { + return connectorOffsets.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> null)); + } + }; +}