diff --git a/src/main/java/org/apache/doris/kafka/connector/writer/schema/DebeziumSchemaChange.java b/src/main/java/org/apache/doris/kafka/connector/writer/schema/DebeziumSchemaChange.java index fc0823b..1eeab0e 100644 --- a/src/main/java/org/apache/doris/kafka/connector/writer/schema/DebeziumSchemaChange.java +++ b/src/main/java/org/apache/doris/kafka/connector/writer/schema/DebeziumSchemaChange.java @@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting; import io.debezium.data.Envelope; +import io.debezium.util.Strings; import java.io.IOException; import java.util.ArrayList; import java.util.HashSet; @@ -45,6 +46,9 @@ public class DebeziumSchemaChange extends DorisWriter { private static final Logger LOG = LoggerFactory.getLogger(DebeziumSchemaChange.class); + public static final String SCHEMA_CHANGE_VALUE = "SchemaChangeValue"; + public static final String TABLE_CHANGES = "tableChanges"; + public static final String TABLE_CHANGES_TYPE = "type"; private final Map topic2TableMap; private SchemaChangeManager schemaChangeManager; private DorisSystemService dorisSystemService; @@ -80,36 +84,67 @@ private void init() { @Override public void insert(SinkRecord record) { + if (!validate(record)) { + processedOffset.set(record.kafkaOffset()); + return; + } schemaChange(record); } - @Override - public void commit(int partition) { - // do nothing - } + private boolean validate(final SinkRecord record) { + if (!isSchemaChange(record)) { + LOG.warn( + "Current topic={}, the message does not contain schema change change information, please check schema.topic", + dorisOptions.getSchemaTopic()); + throw new SchemaChangeException( + "The message does not contain schema change change information, please check schema.topic"); + } - private void schemaChange(final SinkRecord record) { - String tableName = resolveTableName(record); + tableName = resolveTableName(record); if (tableName == null) { LOG.warn( "Ignored to write record from topic '{}' partition '{}' offset '{}'. No resolvable table name", record.topic(), record.kafkaPartition(), record.kafkaOffset()); - processedOffset.set(record.kafkaOffset()); - return; + return false; } + + if (!sinkTableSet.contains(tableName)) { + LOG.warn( + "The " + + tableName + + " is not defined and requires synchronized data. If you need to synchronize the table data, please configure it in 'doris.topic2table.map'"); + return false; + } + Struct recordStruct = (Struct) (record.value()); - List tableChanges = recordStruct.getArray("tableChanges"); + if (isTruncate(recordStruct)) { + LOG.warn("Truncate {} table is not supported", tableName); + return false; + } + + List tableChanges = recordStruct.getArray(TABLE_CHANGES); Struct tableChange = (Struct) tableChanges.get(0); - if ("DROP".equalsIgnoreCase(tableChange.getString("type")) - || "CREATE".equalsIgnoreCase(tableChange.getString("type"))) { + if ("DROP".equalsIgnoreCase(tableChange.getString(TABLE_CHANGES_TYPE)) + || "CREATE".equalsIgnoreCase(tableChange.getString(TABLE_CHANGES_TYPE))) { LOG.warn( "CREATE and DROP {} tables are currently not supported. Please create or drop them manually.", tableName); - processedOffset.set(record.kafkaOffset()); - return; + return false; } + return true; + } + + @Override + public void commit(int partition) { + // do nothing + } + + private void schemaChange(final SinkRecord record) { + Struct recordStruct = (Struct) (record.value()); + List tableChanges = recordStruct.getArray(TABLE_CHANGES); + Struct tableChange = (Struct) tableChanges.get(0); RecordDescriptor recordDescriptor = RecordDescriptor.builder() .withSinkRecord(record) @@ -118,6 +153,17 @@ private void schemaChange(final SinkRecord record) { tableChange(tableName, recordDescriptor); } + private boolean isTruncate(final Struct record) { + // Generally the truncate corresponding tableChanges is empty + return record.getArray(TABLE_CHANGES).isEmpty(); + } + + private static boolean isSchemaChange(SinkRecord record) { + return record.valueSchema() != null + && !Strings.isNullOrEmpty(record.valueSchema().name()) + && record.valueSchema().name().contains(SCHEMA_CHANGE_VALUE); + } + private String resolveTableName(SinkRecord record) { if (isTombstone(record)) { LOG.warn( @@ -170,15 +216,6 @@ private boolean hasTable(String tableName) { } private void tableChange(String tableName, RecordDescriptor recordDescriptor) { - if (!sinkTableSet.contains(tableName)) { - processedOffset.set(recordDescriptor.getOffset()); - LOG.warn( - "The " - + tableName - + " is not defined and requires synchronized data. If you need to synchronize the table data, please configure it in 'doris.topic2table.map'"); - return; - } - if (!hasTable(tableName)) { // TODO Table does not exist, automatically created it. LOG.error("{} Table does not exist, please create manually.", tableName); @@ -223,7 +260,7 @@ private boolean checkSchemaChange( public long getOffset() { committedOffset.set(processedOffset.get()); - return committedOffset.get(); + return committedOffset.get() + 1; } private boolean isTombstone(SinkRecord record) {