diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/cdc/CdcDatabaseJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/cdc/CdcDatabaseJob.java index f3169f436fe835..af2d986e92c91a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/cdc/CdcDatabaseJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/cdc/CdcDatabaseJob.java @@ -91,6 +91,8 @@ public class CdcDatabaseJob extends AbstractJob SCHEMA = ImmutableList.of( new Column("Id", ScalarType.createStringType()), new Column("Name", ScalarType.createStringType()), @@ -104,7 +106,6 @@ public class CdcDatabaseJob extends AbstractJob COLUMN_TO_INDEX; private static final Logger LOG = LogManager.getLogger(CdcDatabaseJob.class); - private static final int port = 9096; private static ObjectMapper objectMapper = new ObjectMapper(); @SerializedName("rs") List remainingSplits = new CopyOnWriteArrayList<>(); @@ -127,6 +128,8 @@ public class CdcDatabaseJob extends AbstractJob config; + @SerializedName("pbg") + private boolean pureBinlogPhase; static { ImmutableMap.Builder builder = new ImmutableMap.Builder(); @@ -332,7 +335,6 @@ public List createTasks(TaskType taskType, Map // Call the BE interface and pass host, port, jobId // select backends Backend backend = selectBackend(getJobId()); - if (!isBinlogSplitAssigned) { if (!remainingSplits.isEmpty()) { SnapshotSplit snapshotSplit = remainingSplits.get(0); @@ -356,7 +358,7 @@ public List createTasks(TaskType taskType, Map } else { readOffset.put(SPLIT_ID, BINLOG_SPLIT_ID); // todo: When fully entering the binlog phase, there is no need to pass splits - if (!splitFinishedOffsets.isEmpty() && !assignedSplits.isEmpty()) { + if (!pureBinlogPhase && !splitFinishedOffsets.isEmpty() && !assignedSplits.isEmpty()) { readOffset.put(FINISH_SPLITS, objectMapper.writeValueAsString(splitFinishedOffsets)); readOffset.put(ASSIGNED_SPLITS, objectMapper.writeValueAsString(assignedSplits)); } @@ -426,11 +428,17 @@ public void onStatusChanged(JobStatus oldStatus, JobStatus newStatus) throws Job } if (JobStatus.RUNNING.equals(oldStatus) && JobStatus.PAUSED.equals(newStatus)) { - executor.shutdown(); + if (executor != null){ + executor.shutdown(); + } + } + + if (JobStatus.STOPPED.equals(newStatus)) { + closeCdcResource(); } } - private void startCdcScanner(Backend backend) throws JobException { + public static void startCdcScanner(Backend backend) throws JobException { TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort()); //Reserved parameters, currently empty String params = ""; @@ -460,6 +468,9 @@ public void updateOffset(Map meta) { if (splitId == null) { return; } + if(meta.containsKey(PURE_BINLOG_PHASE)){ + pureBinlogPhase = Boolean.parseBoolean(meta.remove(PURE_BINLOG_PHASE)); + } if (!BINLOG_SPLIT_ID.equals(splitId)) { remainingSplits.remove(0); splitFinishedOffsets.put(splitId, meta); diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/cdc/CdcDatabaseTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/cdc/CdcDatabaseTask.java index cc7a864f1c3246..ba029a8f162784 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/cdc/CdcDatabaseTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/cdc/CdcDatabaseTask.java @@ -36,6 +36,7 @@ import org.apache.doris.system.Backend; import org.apache.doris.thrift.TCell; import org.apache.doris.thrift.TRow; +import org.apache.doris.thrift.TUniqueId; import org.apache.doris.transaction.TransactionException; import org.apache.doris.transaction.TransactionState; import org.apache.doris.transaction.TransactionStatus; @@ -47,11 +48,12 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.gson.Gson; +import com.google.gson.annotations.SerializedName; +import static org.apache.doris.job.extensions.cdc.CdcDatabaseJob.startCdcScanner; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.Map; -import java.util.UUID; public class CdcDatabaseTask extends AbstractTask implements TxnStateChangeCallback { @@ -80,8 +82,8 @@ public class CdcDatabaseTask extends AbstractTask implements TxnStateChangeCallb COLUMN_TO_INDEX = builder.build(); } - private UUID id; private long dbId; + @SerializedName(value = "bd") private Backend backend; private Long jobId; private Map meta; @@ -96,7 +98,6 @@ public CdcDatabaseTask(long dbId, Backend backend, Long jobId, Map properties) throws Jo config.setCatalog(properties.get(DB_SOURCE_TYPE)); config.setUser(properties.get(USERNAME)); config.setPassword(properties.get(PASSWORD)); - config.setDriverClass(sourceType.getDriverClass()); - config.setDriverUrl(sourceType.getDriverUrl()); + config.setDriverClass(properties.get(DRIVER_CLASS)); + config.setDriverUrl(properties.get(DRIVER_URL)); config.setJdbcUrl( "jdbc:" + properties.get(DB_SOURCE_TYPE) + "://" + properties.get(HOST) + ":" + properties.get(PORT) + "/"); diff --git a/fe_plugins/cdcloader/src/main/java/org/apache/doris/cdcloader/mysql/reader/MySqlSourceReader.java b/fe_plugins/cdcloader/src/main/java/org/apache/doris/cdcloader/mysql/reader/MySqlSourceReader.java index 4ad177b5c79705..590457ad559af5 100644 --- a/fe_plugins/cdcloader/src/main/java/org/apache/doris/cdcloader/mysql/reader/MySqlSourceReader.java +++ b/fe_plugins/cdcloader/src/main/java/org/apache/doris/cdcloader/mysql/reader/MySqlSourceReader.java @@ -50,6 +50,7 @@ import org.apache.doris.job.extensions.cdc.state.AbstractSourceSplit; import org.apache.doris.job.extensions.cdc.state.BinlogSplit; import org.apache.doris.job.extensions.cdc.state.SnapshotSplit; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils; import org.apache.flink.cdc.connectors.mysql.debezium.reader.BinlogSplitReader; import org.apache.flink.cdc.connectors.mysql.debezium.reader.DebeziumReader; @@ -83,6 +84,7 @@ public class MySqlSourceReader implements SourceReader sourceConfigMap; @@ -148,18 +150,19 @@ public List getSourceSplits(JobConfig config) throws JsonPr public RecordWithMeta read(FetchRecordReq fetchRecord) throws Exception { JobConfig jobConfig = new JobConfig(fetchRecord.getJobId(), fetchRecord.getConfig()); int count=0; - Map lastMeta = new HashMap<>(); - RecordWithMeta recordResponse = new RecordWithMeta(lastMeta); + RecordWithMeta recordResponse = new RecordWithMeta(); int fetchSize = fetchRecord.getFetchSize(); boolean schedule = fetchRecord.isSchedule(); MySqlSplit split = null; - + boolean pureBinlogPhase = false; if(schedule){ Map offset = fetchRecord.getMeta(); if(offset.isEmpty()){ throw new RuntimeException("miss meta offset"); } - split = createMySqlSplit(offset, jobConfig); + Tuple2 splitFlag = createMySqlSplit(offset, jobConfig); + split = splitFlag.f0; + pureBinlogPhase = splitFlag.f1; //reset current reader closeBinlogReader(jobConfig.getJobId()); SplitRecords currentSplitRecords = pollSplitRecords(split, jobConfig); @@ -195,9 +198,10 @@ public RecordWithMeta read(FetchRecordReq fetchRecord) throws Exception { continue; } count += serialize.size(); - lastMeta = RecordUtils.getBinlogPosition(element).getOffset(); + Map lastMeta = RecordUtils.getBinlogPosition(element).getOffset(); if(split.isBinlogSplit()){ lastMeta.put(SPLIT_ID, BINLOG_SPLIT_ID); + lastMeta.put(PURE_BINLOG_PHASE, String.valueOf(pureBinlogPhase)); recordResponse.setMeta(lastMeta); } recordResponse.getRecords().addAll(serialize); @@ -211,7 +215,7 @@ public RecordWithMeta read(FetchRecordReq fetchRecord) throws Exception { List sqls = serializer.serialize(jobConfig.getConfig(), element); if(!sqls.isEmpty()){ recordResponse.setSqls(sqls); - lastMeta = RecordUtils.getBinlogPosition(element).getOffset(); + Map lastMeta = RecordUtils.getBinlogPosition(element).getOffset(); lastMeta.put(SPLIT_ID, BINLOG_SPLIT_ID); recordResponse.setMeta(lastMeta); return recordResponse; @@ -232,6 +236,7 @@ public RecordWithMeta read(FetchRecordReq fetchRecord) throws Exception { if(split.isBinlogSplit()){ Map offset = split.asBinlogSplit().getStartingOffset().getOffset(); offset.put(SPLIT_ID, BINLOG_SPLIT_ID); + offset.put(PURE_BINLOG_PHASE, String.valueOf(pureBinlogPhase)); recordResponse.setMeta(offset); }else{ recordResponse.setMeta(fetchRecord.getMeta()); @@ -257,15 +262,16 @@ private void refreshTableChanges(SourceRecord element, Long jobId) throws IOExce } } - private MySqlSplit createMySqlSplit(Map offset, JobConfig jobConfig) throws JsonProcessingException { - MySqlSplit split = null; + private Tuple2 createMySqlSplit(Map offset, JobConfig jobConfig) throws JsonProcessingException { + Tuple2 splitRes = null; String splitId = offset.get(SPLIT_ID); if(!BINLOG_SPLIT_ID.equals(splitId)){ - split = createSnapshotSplit(offset, jobConfig); + MySqlSnapshotSplit split = createSnapshotSplit(offset, jobConfig); + splitRes = Tuple2.of(split, false); }else{ - split = createBinlogSplit(offset,jobConfig); + splitRes = createBinlogSplit(offset, jobConfig); } - return split; + return splitRes; } private MySqlSnapshotSplit createSnapshotSplit(Map offset, JobConfig jobConfig) throws JsonProcessingException { @@ -283,7 +289,7 @@ private MySqlSnapshotSplit createSnapshotSplit(Map offset, JobCo return split; } - private MySqlBinlogSplit createBinlogSplit(Map meta, JobConfig config) throws JsonProcessingException { + private Tuple2 createBinlogSplit(Map meta, JobConfig config) throws JsonProcessingException { MySqlSourceConfig sourceConfig = getSourceConfig(config); BinlogOffset offsetConfig = null; if(sourceConfig.getStartupOptions() != null){ @@ -292,7 +298,8 @@ private MySqlBinlogSplit createBinlogSplit(Map meta, JobConfig c List finishedSnapshotSplitInfos = new ArrayList<>(); BinlogOffset minOffsetFinishSplits = null; - if(meta.containsKey(FINISH_SPLITS) && meta.containsKey(ASSIGNED_SPLITS) ){ + BinlogOffset maxOffsetFinishSplits = null; + if(meta.containsKey(FINISH_SPLITS) && meta.containsKey(ASSIGNED_SPLITS)) { //Construct binlogsplit based on the finished split and assigned split. String finishSplitsOffset = meta.remove(FINISH_SPLITS); String assignedSplits = meta.remove(ASSIGNED_SPLITS); @@ -309,6 +316,9 @@ private MySqlBinlogSplit createBinlogSplit(Map meta, JobConfig c if (minOffsetFinishSplits == null || binlogOffset.isBefore(minOffsetFinishSplits)) { minOffsetFinishSplits = binlogOffset; } + if (maxOffsetFinishSplits == null || binlogOffset.isAfter(maxOffsetFinishSplits)) { + maxOffsetFinishSplits = binlogOffset; + } Object[] splitStart = split.getSplitStart() == null ? null : objectMapper.readValue(split.getSplitStart(), Object[].class); Object[] splitEnd = split.getSplitEnd() == null ? null : objectMapper.readValue(split.getSplitEnd(), Object[].class); @@ -334,10 +344,20 @@ private MySqlBinlogSplit createBinlogSplit(Map meta, JobConfig c startOffset = BinlogOffset.ofEarliest(); } + boolean pureBinlogPhase = false; + if(maxOffsetFinishSplits == null){ + pureBinlogPhase = true; + }else if(startOffset.isAtOrAfter(maxOffsetFinishSplits)){ + // All the offsets of the current split are smaller than the offset of the binlog, + // indicating that the binlog phase has been fully entered. + pureBinlogPhase = true; + LOG.info("The binlog phase has been fully entered, the current split is: {}", startOffset); + } + MySqlBinlogSplit split = new MySqlBinlogSplit(BINLOG_SPLIT_ID, startOffset, BinlogOffset.ofNonStopping(), finishedSnapshotSplitInfos, new HashMap<>(), 0); //filterTableSchema MySqlBinlogSplit binlogSplit = MySqlBinlogSplit.fillTableSchemas(split.asBinlogSplit(), getTableSchemas(config)); - return binlogSplit; + return Tuple2.of(binlogSplit, pureBinlogPhase); } private List startSplitChunks(MySqlSourceConfig sourceConfig, String snapshotTable, Map config){ @@ -457,8 +477,9 @@ public void close(Long jobId) { closeSnapshotReader(jobId); closeBinlogReader(jobId); sourceConfigMap.remove(jobId); - tableSchemaMaps.remove(jobId).clear(); + tableSchemaMaps.remove(jobId); currentSplitRecordsMap.remove(jobId); + LOG.info("Close source reader for job {}", jobId); } private Map getTableSchemas(JobConfig config){ diff --git a/fe_plugins/cdcloader/src/main/java/org/apache/doris/cdcloader/mysql/reader/RecordWithMeta.java b/fe_plugins/cdcloader/src/main/java/org/apache/doris/cdcloader/mysql/reader/RecordWithMeta.java index e4c6ffe816c84e..2b583d63c57351 100644 --- a/fe_plugins/cdcloader/src/main/java/org/apache/doris/cdcloader/mysql/reader/RecordWithMeta.java +++ b/fe_plugins/cdcloader/src/main/java/org/apache/doris/cdcloader/mysql/reader/RecordWithMeta.java @@ -18,6 +18,7 @@ package org.apache.doris.cdcloader.mysql.reader; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -27,8 +28,8 @@ public class RecordWithMeta { private List records; private List sqls; - public RecordWithMeta(Map meta) { - this.meta = meta; + public RecordWithMeta() { + this.meta = new HashMap<>(); this.records = new ArrayList<>(); this.sqls = new ArrayList<>(); }