diff --git a/node/etl/src/main/java/com/alibaba/otter/node/etl/select/selector/canal/CanalEmbedSelector.java b/node/etl/src/main/java/com/alibaba/otter/node/etl/select/selector/canal/CanalEmbedSelector.java index 1a9f5c1b..d653923e 100644 --- a/node/etl/src/main/java/com/alibaba/otter/node/etl/select/selector/canal/CanalEmbedSelector.java +++ b/node/etl/src/main/java/com/alibaba/otter/node/etl/select/selector/canal/CanalEmbedSelector.java @@ -17,6 +17,7 @@ package com.alibaba.otter.node.etl.select.selector.canal; import java.text.SimpleDateFormat; +import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.concurrent.TimeUnit; @@ -40,6 +41,7 @@ import com.alibaba.otter.canal.parse.ha.CanalHAController; import com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser; import com.alibaba.otter.canal.parse.support.AuthenticationInfo; +import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.CanalEntry.Entry; import com.alibaba.otter.canal.protocol.ClientIdentity; import com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded; @@ -48,12 +50,15 @@ import com.alibaba.otter.node.common.config.ConfigClientService; import com.alibaba.otter.node.etl.OtterConstants; import com.alibaba.otter.node.etl.OtterContextLocator; +import com.alibaba.otter.node.etl.select.exceptions.SelectException; import com.alibaba.otter.node.etl.select.selector.Message; import com.alibaba.otter.node.etl.select.selector.MessageDumper; import com.alibaba.otter.node.etl.select.selector.MessageParser; import com.alibaba.otter.node.etl.select.selector.OtterSelector; import com.alibaba.otter.shared.common.model.config.pipeline.Pipeline; import com.alibaba.otter.shared.etl.model.EventData; +import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; /** * 基于canal embed实现数据获取方式 @@ -255,11 +260,25 @@ public Message selector() throws InterruptedException { } } - List eventDatas = messageParser.parse(pipelineId, message.getEntries()); // 过滤事务头/尾和回环数据 + List entries = null; + if (message.isRaw()) { + entries = new ArrayList(message.getRawEntries().size()); + for (ByteString entry : message.getRawEntries()) { + try { + entries.add(CanalEntry.Entry.parseFrom(entry)); + } catch (InvalidProtocolBufferException e) { + throw new SelectException(e); + } + } + } else { + entries = message.getEntries(); + } + + List eventDatas = messageParser.parse(pipelineId, entries); // 过滤事务头/尾和回环数据 Message result = new Message(message.getId(), eventDatas); // 更新一下最后的entry时间,包括被过滤的数据 - if (!CollectionUtils.isEmpty(message.getEntries())) { - long lastEntryTime = message.getEntries().get(message.getEntries().size() - 1).getHeader().getExecuteTime(); + if (!CollectionUtils.isEmpty(entries)) { + long lastEntryTime = entries.get(entries.size() - 1).getHeader().getExecuteTime(); if (lastEntryTime > 0) {// oracle的时间可能为0 this.lastEntryTime = lastEntryTime; } @@ -268,12 +287,12 @@ public Message selector() throws InterruptedException { if (dump && logger.isInfoEnabled()) { String startPosition = null; String endPosition = null; - if (!CollectionUtils.isEmpty(message.getEntries())) { - startPosition = buildPositionForDump(message.getEntries().get(0)); - endPosition = buildPositionForDump(message.getEntries().get(message.getEntries().size() - 1)); + if (!CollectionUtils.isEmpty(entries)) { + startPosition = buildPositionForDump(entries.get(0)); + endPosition = buildPositionForDump(entries.get(entries.size() - 1)); } - dumpMessages(result, startPosition, endPosition, message.getEntries().size());// 记录一下,方便追查问题 + dumpMessages(result, startPosition, endPosition, entries.size());// 记录一下,方便追查问题 } return result; } diff --git a/pom.xml b/pom.xml index 313126a8..1a10b440 100644 --- a/pom.xml +++ b/pom.xml @@ -193,7 +193,7 @@ com.alibaba.fastsql fastsql - 2.0.0_preview_366 + 2.0.0_preview_520