Skip to content

Commit

Permalink
DBZ-7698 Add ordered transaction metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
twthorn committed Mar 28, 2024
1 parent e49c9d5 commit b1bd015
Show file tree
Hide file tree
Showing 23 changed files with 608 additions and 183 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@

import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.postgresql.connection.LogicalDecodingMessage;
import io.debezium.connector.postgresql.pipeline.txmetadata.PostgresTransactionMonitor;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.EventDispatcher;
Expand Down Expand Up @@ -43,16 +42,8 @@ public PostgresEventDispatcher(PostgresConnectorConfig connectorConfig, TopicNam
ChangeEventCreator changeEventCreator, InconsistentSchemaHandler<PostgresPartition, T> inconsistentSchemaHandler,
EventMetadataProvider metadataProvider, Heartbeat heartbeat, SchemaNameAdjuster schemaNameAdjuster,
SignalProcessor<PostgresPartition, PostgresOffsetContext> signalProcessor) {
super(connectorConfig, topicNamingStrategy, schema, queue, filter, changeEventCreator, inconsistentSchemaHandler, heartbeat, schemaNameAdjuster,
new PostgresTransactionMonitor(
connectorConfig,
metadataProvider,
schemaNameAdjuster,
(record) -> {
queue.enqueue(new DataChangeEvent(record));
},
topicNamingStrategy.transactionTopic()),
signalProcessor);
super(connectorConfig, topicNamingStrategy, schema, queue, filter, changeEventCreator, inconsistentSchemaHandler,
metadataProvider, heartbeat, schemaNameAdjuster, signalProcessor);
this.queue = queue;
this.logicalDecodingMessageMonitor = new LogicalDecodingMessageMonitor(connectorConfig, this::enqueueLogicalDecodingMessage);
this.messageFilter = connectorConfig.getMessageFilter();
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.postgresql.pipeline.txmetadata;

import java.time.Instant;

import org.apache.kafka.connect.data.Struct;

import io.debezium.config.Configuration;
import io.debezium.connector.postgresql.PostgresOffsetContext;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.txmetadata.BasicTransactionStructMaker;
import io.debezium.spi.schema.DataCollectionId;

public class PostgresTransactionStructMaker extends BasicTransactionStructMaker {
public PostgresTransactionStructMaker(Configuration configuration) {
super(configuration);
}

@Override
public Struct prepareTxKey(OffsetContext offsetContext) {
return adjustTxId(new Struct(transactionKeySchema), offsetContext);
}

@Override
public Struct prepareTxBeginValue(OffsetContext offsetContext, Instant timestamp) {
return adjustTxId(super.prepareTxBeginValue(offsetContext, timestamp), offsetContext);
}

@Override
public Struct prepareTxEndValue(OffsetContext offsetContext, Instant timestamp) {
return adjustTxId(super.prepareTxEndValue(offsetContext, timestamp), offsetContext);
}

@Override
public Struct prepareTxStruct(OffsetContext offsetContext, DataCollectionId source) {
return adjustTxId(super.prepareTxStruct(offsetContext, source), offsetContext);
}

private Struct adjustTxId(Struct txStruct, OffsetContext offsetContext) {
final String lsn = Long.toString(((PostgresOffsetContext) offsetContext).asOffsetState().lastSeenLsn().asLong());
final String txId = offsetContext.getTransactionContext().getTransactionId();
txStruct.put(DEBEZIUM_TRANSACTION_ID_KEY, String.format("%s:%s", txId, lsn));
return txStruct;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@
import io.debezium.heartbeat.HeartbeatImpl;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.notification.channels.SinkNotificationChannel;
import io.debezium.pipeline.txmetadata.BasicTransactionStructMaker;
import io.debezium.pipeline.txmetadata.TransactionOrderMetadata;
import io.debezium.pipeline.txmetadata.TransactionStructMaker;
import io.debezium.relational.CustomConverterRegistry;
import io.debezium.relational.TableId;
import io.debezium.schema.SchemaNameAdjuster;
Expand Down Expand Up @@ -710,6 +713,32 @@ public static SnapshotQueryMode parse(String value, String defaultValue) {
.withDescription("Enables transaction metadata extraction together with event counting")
.withDefault(Boolean.FALSE);

public static final Field PROVIDE_ORDERED_TRANSACTION_METADATA = Field.create("provide.ordered.transaction.metadata")
.withDisplayName("Provide ordered transaction meatadata")
.withType(Type.BOOLEAN)
.withDefault(false)
.withWidth(Width.SHORT)
.withImportance(ConfigDef.Importance.LOW)
.withDescription(
"Whether to provide order metadata on transactions");

public static final Field TRANSACTION_STRUCT_MAKER = Field.create("transaction.struct.maker")
.withDisplayName("Make transaction struct & schema")
.withType(Type.CLASS)
.withWidth(Width.MEDIUM)
.withImportance(Importance.LOW)
.withDefault(BasicTransactionStructMaker.class.getName())
.withDescription(
"Class to make transaction struct & schema");

public static final Field TRANSACTION_ORDER_METADATA_FIELD = Field.create("transaction.ordered.metadata")
.withDisplayName("Class to provide ordered transaction metadata")
.withType(Type.CLASS)
.withWidth(Width.MEDIUM)
.withImportance(ConfigDef.Importance.LOW)
.withDescription(
"Class to provide order metadata on transactions");

public static final Field EVENT_PROCESSING_FAILURE_HANDLING_MODE = Field.create("event.processing.failure.handling.mode")
.withDisplayName("Event deserialization failure handling")
.withGroup(Field.createGroupEntry(Field.Group.ADVANCED, 12))
Expand Down Expand Up @@ -991,6 +1020,7 @@ public static SnapshotQueryMode parse(String value, String defaultValue) {
POLL_INTERVAL_MS,
MAX_QUEUE_SIZE_IN_BYTES,
PROVIDE_TRANSACTION_METADATA,
PROVIDE_ORDERED_TRANSACTION_METADATA,
SKIPPED_OPERATIONS,
SNAPSHOT_DELAY_MS,
SNAPSHOT_MODE_TABLES,
Expand All @@ -1013,6 +1043,8 @@ public static SnapshotQueryMode parse(String value, String defaultValue) {
TOPIC_NAMING_STRATEGY,
NOTIFICATION_ENABLED_CHANNELS,
SinkNotificationChannel.NOTIFICATION_TOPIC,
TRANSACTION_ORDER_METADATA_FIELD,
TRANSACTION_STRUCT_MAKER,
CUSTOM_METRIC_TAGS)
.create();

Expand All @@ -1035,7 +1067,10 @@ public static SnapshotQueryMode parse(String value, String defaultValue) {
private final String snapshotModeCustomName;
private final Integer queryFetchSize;
private final SourceInfoStructMaker<? extends AbstractSourceInfo> sourceInfoStructMaker;
private final TransactionOrderMetadata transactionOrderMetadata;
private final TransactionStructMaker transactionStructMaker;
private final boolean shouldProvideTransactionMetadata;
private final boolean shouldProvideOrderedTransactionMetadata;
private final EventProcessingFailureHandlingMode eventProcessingFailureHandlingMode;
private final CustomConverterRegistry customConverterRegistry;
private final BinaryHandlingMode binaryHandlingMode;
Expand Down Expand Up @@ -1085,7 +1120,10 @@ protected CommonConnectorConfig(Configuration config, int defaultSnapshotFetchSi
this.fieldNameAdjustmentMode = FieldNameAdjustmentMode.parse(config.getString(FIELD_NAME_ADJUSTMENT_MODE));
this.eventConvertingFailureHandlingMode = EventConvertingFailureHandlingMode.parse(config.getString(EVENT_CONVERTING_FAILURE_HANDLING_MODE));
this.sourceInfoStructMaker = getSourceInfoStructMaker(Version.V2);
this.transactionOrderMetadata = getTransactionOrderMetadata();
this.transactionStructMaker = getTransactionStructMaker();
this.shouldProvideTransactionMetadata = config.getBoolean(PROVIDE_TRANSACTION_METADATA);
this.shouldProvideOrderedTransactionMetadata = config.getBoolean(PROVIDE_ORDERED_TRANSACTION_METADATA);
this.eventProcessingFailureHandlingMode = EventProcessingFailureHandlingMode.parse(config.getString(EVENT_PROCESSING_FAILURE_HANDLING_MODE));
this.customConverterRegistry = new CustomConverterRegistry(getCustomConverters());
this.binaryHandlingMode = BinaryHandlingMode.parse(config.getString(BINARY_HANDLING_MODE));
Expand Down Expand Up @@ -1230,6 +1268,10 @@ public boolean shouldProvideTransactionMetadata() {
return shouldProvideTransactionMetadata;
}

public boolean shouldProvideOrderedTransactionMetadata() {
return shouldProvideOrderedTransactionMetadata;
}

public boolean skipMessagesWithoutChange() {
return skipMessagesWithoutChange;
}
Expand Down Expand Up @@ -1294,6 +1336,14 @@ public <T extends AbstractSourceInfo> SourceInfoStructMaker<T> getSourceInfoStru
return (SourceInfoStructMaker<T>) sourceInfoStructMaker;
}

public TransactionOrderMetadata getTransactionOrderMetadata() {
return getTransactionOrderMetadata(TRANSACTION_ORDER_METADATA_FIELD);
}

public TransactionStructMaker getTransactionStructMaker() {
return getTransactionStructMaker(TRANSACTION_STRUCT_MAKER);
}

public EnumSet<Envelope.Operation> getSkippedOperations() {
return skippedOperations;
}
Expand Down Expand Up @@ -1544,4 +1594,32 @@ public <T extends AbstractSourceInfo> SourceInfoStructMaker<T> getSourceInfoStru
sourceInfoStructMaker.init(connector, version, connectorConfig);
return sourceInfoStructMaker;
}

public TransactionStructMaker getTransactionStructMaker(Field transactionStructMakerField) {
final TransactionStructMaker transactionStructMaker;
if (!shouldProvideOrderedTransactionMetadata) {
// for backward compatibility, return the normal one
transactionStructMaker = config.getInstance(TRANSACTION_STRUCT_MAKER, BasicTransactionStructMaker.class);
}
else {
transactionStructMaker = config.getInstance(transactionStructMakerField, TransactionStructMaker.class);
}
transactionStructMaker.setSchemaNameAdjuster(schemaNameAdjuster());
if (transactionStructMaker == null) {
throw new DebeziumException("Unable to instantiate the transaction struct maker class " + TRANSACTION_STRUCT_MAKER);
}
return transactionStructMaker;
}

public TransactionOrderMetadata getTransactionOrderMetadata(Field transactionOrderMetadataField) {
if (!shouldProvideOrderedTransactionMetadata) {
// for backward compatibility, if the setting is disabled we won't use this anyway
return null;
}
final TransactionOrderMetadata transactionOrderMetadata = config.getInstance(transactionOrderMetadataField, TransactionOrderMetadata.class);
if (transactionOrderMetadata == null) {
throw new DebeziumException("Unable to instantiate the transaction ordered metadata class " + TRANSACTION_ORDER_METADATA_FIELD);
}
return transactionOrderMetadata;
}
}
10 changes: 10 additions & 0 deletions debezium-core/src/main/java/io/debezium/data/Envelope.java
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,16 @@ default Builder withSource(Schema sourceSchema) {
return withSchema(sourceSchema, FieldName.SOURCE);
}

/**
* Define the {@link Schema} used in the {@link FieldName#TRANSACTION} field.
*
* @param transactionSchema the schema of the {@link FieldName#TRANSACTION} field; may not be null
* @return this builder so methods can be chained; never null
*/
default Builder withTransaction(Schema transactionSchema) {
return withSchema(transactionSchema, FieldName.TRANSACTION);
}

/**
* Define the {@link Schema} used for an arbitrary field in the envelope.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import io.debezium.pipeline.spi.SchemaChangeEventEmitter;
import io.debezium.pipeline.txmetadata.BasicTransactionInfo;
import io.debezium.pipeline.txmetadata.TransactionInfo;
import io.debezium.pipeline.txmetadata.TransactionMonitor;
import io.debezium.processors.PostProcessorRegistry;
import io.debezium.processors.spi.PostProcessor;
Expand Down Expand Up @@ -349,7 +351,11 @@ public void dispatchTransactionCommittedEvent(P partition, OffsetContext offset,
}

public void dispatchTransactionStartedEvent(P partition, String transactionId, OffsetContext offset, Instant timestamp) throws InterruptedException {
transactionMonitor.transactionStartedEvent(partition, transactionId, offset, timestamp);
dispatchTransactionStartedEvent(partition, new BasicTransactionInfo(transactionId), offset, timestamp);
}

public void dispatchTransactionStartedEvent(P partition, TransactionInfo transactionInfo, OffsetContext offset, Instant timestamp) throws InterruptedException {
transactionMonitor.transactionStartedEvent(partition, transactionInfo, offset, timestamp);
if (incrementalSnapshotChangeEventSource != null) {
incrementalSnapshotChangeEventSource.processTransactionStartedEvent(partition, offset);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import org.apache.kafka.connect.data.Struct;

import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.txmetadata.BasicTransactionInfo;
import io.debezium.pipeline.txmetadata.TransactionInfo;
import io.debezium.spi.schema.DataCollectionId;

/**
Expand Down Expand Up @@ -46,4 +48,8 @@ default String toSummaryString(DataCollectionId source, OffsetContext offset, Ob
.key(key)
.toString();
}

default TransactionInfo getTransactionInfo(DataCollectionId source, OffsetContext offset, Object key, Struct value) {
return new BasicTransactionInfo(getTransactionId(source, offset, key, value));
}
}
Loading

0 comments on commit b1bd015

Please sign in to comment.