diff --git a/node/etl/src/main/java/com/alibaba/otter/node/etl/select/selector/MessageParser.java b/node/etl/src/main/java/com/alibaba/otter/node/etl/select/selector/MessageParser.java index 18b59c25..c405d41d 100644 --- a/node/etl/src/main/java/com/alibaba/otter/node/etl/select/selector/MessageParser.java +++ b/node/etl/src/main/java/com/alibaba/otter/node/etl/select/selector/MessageParser.java @@ -107,7 +107,10 @@ public List parse(Long pipelineId, List datas) throws SelectEx if (isMarkTable) { RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); if (!rowChange.getIsDdl()) { - int loopback = checkLoopback(pipeline, rowChange.getRowDatas(0)); + int loopback = 0; + if (rowChange.getRowDatasCount() > 0) { + loopback = checkLoopback(pipeline, rowChange.getRowDatas(0)); + } if (loopback == 2) { needLoopback |= true; // 只处理正常同步产生的回环数据 } @@ -122,7 +125,10 @@ public List parse(Long pipelineId, List datas) throws SelectEx if (isCompatibleLoopback) { RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); if (!rowChange.getIsDdl()) { - int loopback = checkCompatibleLoopback(pipeline, rowChange.getRowDatas(0)); + int loopback = 0; + if (rowChange.getRowDatasCount() > 0) { + loopback = checkCompatibleLoopback(pipeline, rowChange.getRowDatas(0)); + } if (loopback == 2) { needLoopback |= true; // 只处理正常同步产生的回环数据 }