Skip to content

Commit

Permalink
[Fix] Fix some bugs in commit offset to kafka, intercepting non-schem…
Browse files Browse the repository at this point in the history
…a changes, and intercepting truncate table (#16)
  • Loading branch information
DongLiang-0 authored May 11, 2024
1 parent c4a1f6f commit e65e583
Showing 1 changed file with 60 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.common.annotations.VisibleForTesting;
import io.debezium.data.Envelope;
import io.debezium.util.Strings;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
Expand All @@ -45,6 +46,9 @@

public class DebeziumSchemaChange extends DorisWriter {
private static final Logger LOG = LoggerFactory.getLogger(DebeziumSchemaChange.class);
public static final String SCHEMA_CHANGE_VALUE = "SchemaChangeValue";
public static final String TABLE_CHANGES = "tableChanges";
public static final String TABLE_CHANGES_TYPE = "type";
private final Map<String, String> topic2TableMap;
private SchemaChangeManager schemaChangeManager;
private DorisSystemService dorisSystemService;
Expand Down Expand Up @@ -80,36 +84,67 @@ private void init() {

@Override
public void insert(SinkRecord record) {
if (!validate(record)) {
processedOffset.set(record.kafkaOffset());
return;
}
schemaChange(record);
}

@Override
public void commit(int partition) {
// do nothing
}
private boolean validate(final SinkRecord record) {
if (!isSchemaChange(record)) {
LOG.warn(
"Current topic={}, the message does not contain schema change change information, please check schema.topic",
dorisOptions.getSchemaTopic());
throw new SchemaChangeException(
"The message does not contain schema change change information, please check schema.topic");
}

private void schemaChange(final SinkRecord record) {
String tableName = resolveTableName(record);
tableName = resolveTableName(record);
if (tableName == null) {
LOG.warn(
"Ignored to write record from topic '{}' partition '{}' offset '{}'. No resolvable table name",
record.topic(),
record.kafkaPartition(),
record.kafkaOffset());
processedOffset.set(record.kafkaOffset());
return;
return false;
}

if (!sinkTableSet.contains(tableName)) {
LOG.warn(
"The "
+ tableName
+ " is not defined and requires synchronized data. If you need to synchronize the table data, please configure it in 'doris.topic2table.map'");
return false;
}

Struct recordStruct = (Struct) (record.value());
List<Object> tableChanges = recordStruct.getArray("tableChanges");
if (isTruncate(recordStruct)) {
LOG.warn("Truncate {} table is not supported", tableName);
return false;
}

List<Object> tableChanges = recordStruct.getArray(TABLE_CHANGES);
Struct tableChange = (Struct) tableChanges.get(0);
if ("DROP".equalsIgnoreCase(tableChange.getString("type"))
|| "CREATE".equalsIgnoreCase(tableChange.getString("type"))) {
if ("DROP".equalsIgnoreCase(tableChange.getString(TABLE_CHANGES_TYPE))
|| "CREATE".equalsIgnoreCase(tableChange.getString(TABLE_CHANGES_TYPE))) {
LOG.warn(
"CREATE and DROP {} tables are currently not supported. Please create or drop them manually.",
tableName);
processedOffset.set(record.kafkaOffset());
return;
return false;
}
return true;
}

@Override
public void commit(int partition) {
// do nothing
}

private void schemaChange(final SinkRecord record) {
Struct recordStruct = (Struct) (record.value());
List<Object> tableChanges = recordStruct.getArray(TABLE_CHANGES);
Struct tableChange = (Struct) tableChanges.get(0);
RecordDescriptor recordDescriptor =
RecordDescriptor.builder()
.withSinkRecord(record)
Expand All @@ -118,6 +153,17 @@ private void schemaChange(final SinkRecord record) {
tableChange(tableName, recordDescriptor);
}

private boolean isTruncate(final Struct record) {
// Generally the truncate corresponding tableChanges is empty
return record.getArray(TABLE_CHANGES).isEmpty();
}

private static boolean isSchemaChange(SinkRecord record) {
return record.valueSchema() != null
&& !Strings.isNullOrEmpty(record.valueSchema().name())
&& record.valueSchema().name().contains(SCHEMA_CHANGE_VALUE);
}

private String resolveTableName(SinkRecord record) {
if (isTombstone(record)) {
LOG.warn(
Expand Down Expand Up @@ -170,15 +216,6 @@ private boolean hasTable(String tableName) {
}

private void tableChange(String tableName, RecordDescriptor recordDescriptor) {
if (!sinkTableSet.contains(tableName)) {
processedOffset.set(recordDescriptor.getOffset());
LOG.warn(
"The "
+ tableName
+ " is not defined and requires synchronized data. If you need to synchronize the table data, please configure it in 'doris.topic2table.map'");
return;
}

if (!hasTable(tableName)) {
// TODO Table does not exist, automatically created it.
LOG.error("{} Table does not exist, please create manually.", tableName);
Expand Down Expand Up @@ -223,7 +260,7 @@ private boolean checkSchemaChange(

public long getOffset() {
committedOffset.set(processedOffset.get());
return committedOffset.get();
return committedOffset.get() + 1;
}

private boolean isTombstone(SinkRecord record) {
Expand Down

0 comments on commit e65e583

Please sign in to comment.