Skip to content

Commit

Permalink
support canal 1.0.26
Browse files Browse the repository at this point in the history
  • Loading branch information
agapple committed Aug 1, 2018
1 parent 345aada commit 199c190
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.alibaba.otter.node.etl.select.selector.canal;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;
Expand All @@ -40,6 +41,7 @@
import com.alibaba.otter.canal.parse.ha.CanalHAController;
import com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser;
import com.alibaba.otter.canal.parse.support.AuthenticationInfo;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.ClientIdentity;
import com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded;
Expand All @@ -48,12 +50,15 @@
import com.alibaba.otter.node.common.config.ConfigClientService;
import com.alibaba.otter.node.etl.OtterConstants;
import com.alibaba.otter.node.etl.OtterContextLocator;
import com.alibaba.otter.node.etl.select.exceptions.SelectException;
import com.alibaba.otter.node.etl.select.selector.Message;
import com.alibaba.otter.node.etl.select.selector.MessageDumper;
import com.alibaba.otter.node.etl.select.selector.MessageParser;
import com.alibaba.otter.node.etl.select.selector.OtterSelector;
import com.alibaba.otter.shared.common.model.config.pipeline.Pipeline;
import com.alibaba.otter.shared.etl.model.EventData;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;

/**
* 基于canal embed实现数据获取方式
Expand Down Expand Up @@ -255,11 +260,25 @@ public Message<EventData> selector() throws InterruptedException {
}
}

List<EventData> eventDatas = messageParser.parse(pipelineId, message.getEntries()); // 过滤事务头/尾和回环数据
List<Entry> entries = null;
if (message.isRaw()) {
entries = new ArrayList<CanalEntry.Entry>(message.getRawEntries().size());
for (ByteString entry : message.getRawEntries()) {
try {
entries.add(CanalEntry.Entry.parseFrom(entry));
} catch (InvalidProtocolBufferException e) {
throw new SelectException(e);
}
}
} else {
entries = message.getEntries();
}

List<EventData> eventDatas = messageParser.parse(pipelineId, entries); // 过滤事务头/尾和回环数据
Message<EventData> result = new Message<EventData>(message.getId(), eventDatas);
// 更新一下最后的entry时间,包括被过滤的数据
if (!CollectionUtils.isEmpty(message.getEntries())) {
long lastEntryTime = message.getEntries().get(message.getEntries().size() - 1).getHeader().getExecuteTime();
if (!CollectionUtils.isEmpty(entries)) {
long lastEntryTime = entries.get(entries.size() - 1).getHeader().getExecuteTime();
if (lastEntryTime > 0) {// oracle的时间可能为0
this.lastEntryTime = lastEntryTime;
}
Expand All @@ -268,12 +287,12 @@ public Message<EventData> selector() throws InterruptedException {
if (dump && logger.isInfoEnabled()) {
String startPosition = null;
String endPosition = null;
if (!CollectionUtils.isEmpty(message.getEntries())) {
startPosition = buildPositionForDump(message.getEntries().get(0));
endPosition = buildPositionForDump(message.getEntries().get(message.getEntries().size() - 1));
if (!CollectionUtils.isEmpty(entries)) {
startPosition = buildPositionForDump(entries.get(0));
endPosition = buildPositionForDump(entries.get(entries.size() - 1));
}

dumpMessages(result, startPosition, endPosition, message.getEntries().size());// 记录一下,方便追查问题
dumpMessages(result, startPosition, endPosition, entries.size());// 记录一下,方便追查问题
}
return result;
}
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@
<dependency>
<groupId>com.alibaba.fastsql</groupId>
<artifactId>fastsql</artifactId>
<version>2.0.0_preview_366</version>
<version>2.0.0_preview_520</version>
</dependency>
<!-- zookeeper -->
<dependency>
Expand Down

0 comments on commit 199c190

Please sign in to comment.