Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
JNSimba committed Aug 26, 2024
1 parent 41d2aa1 commit 1056cd9
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ public class CdcDatabaseJob extends AbstractJob<CdcDatabaseTask, Map<Object, Obj
public static final String FINISH_SPLITS = "finishSplits";
public static final String ASSIGNED_SPLITS = "assignedSplits";
public static final String SNAPSHOT_TABLE = "snapshotTable";
private static final String PURE_BINLOG_PHASE = "pureBinlogPhase";

public static final ImmutableList<Column> SCHEMA = ImmutableList.of(
new Column("Id", ScalarType.createStringType()),
new Column("Name", ScalarType.createStringType()),
Expand All @@ -104,7 +106,6 @@ public class CdcDatabaseJob extends AbstractJob<CdcDatabaseTask, Map<Object, Obj
new Column("Progress", ScalarType.createStringType()));
public static final ImmutableMap<String, Integer> COLUMN_TO_INDEX;
private static final Logger LOG = LogManager.getLogger(CdcDatabaseJob.class);
private static final int port = 9096;
private static ObjectMapper objectMapper = new ObjectMapper();
@SerializedName("rs")
List<SnapshotSplit> remainingSplits = new CopyOnWriteArrayList<>();
Expand All @@ -127,6 +128,8 @@ public class CdcDatabaseJob extends AbstractJob<CdcDatabaseTask, Map<Object, Obj
private long dbId;
@SerializedName("cf")
private Map<String, String> config;
@SerializedName("pbg")
private boolean pureBinlogPhase;

static {
ImmutableMap.Builder<String, Integer> builder = new ImmutableMap.Builder();
Expand Down Expand Up @@ -332,7 +335,6 @@ public List<CdcDatabaseTask> createTasks(TaskType taskType, Map<Object, Object>
// Call the BE interface and pass host, port, jobId
// select backends
Backend backend = selectBackend(getJobId());

if (!isBinlogSplitAssigned) {
if (!remainingSplits.isEmpty()) {
SnapshotSplit snapshotSplit = remainingSplits.get(0);
Expand All @@ -356,7 +358,7 @@ public List<CdcDatabaseTask> createTasks(TaskType taskType, Map<Object, Object>
} else {
readOffset.put(SPLIT_ID, BINLOG_SPLIT_ID);
// todo: When fully entering the binlog phase, there is no need to pass splits
if (!splitFinishedOffsets.isEmpty() && !assignedSplits.isEmpty()) {
if (!pureBinlogPhase && !splitFinishedOffsets.isEmpty() && !assignedSplits.isEmpty()) {
readOffset.put(FINISH_SPLITS, objectMapper.writeValueAsString(splitFinishedOffsets));
readOffset.put(ASSIGNED_SPLITS, objectMapper.writeValueAsString(assignedSplits));
}
Expand Down Expand Up @@ -426,11 +428,17 @@ public void onStatusChanged(JobStatus oldStatus, JobStatus newStatus) throws Job
}

if (JobStatus.RUNNING.equals(oldStatus) && JobStatus.PAUSED.equals(newStatus)) {
executor.shutdown();
if (executor != null){
executor.shutdown();
}
}

if (JobStatus.STOPPED.equals(newStatus)) {
closeCdcResource();
}
}

private void startCdcScanner(Backend backend) throws JobException {
public static void startCdcScanner(Backend backend) throws JobException {
TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort());
//Reserved parameters, currently empty
String params = "";
Expand Down Expand Up @@ -460,6 +468,9 @@ public void updateOffset(Map<String, String> meta) {
if (splitId == null) {
return;
}
if(meta.containsKey(PURE_BINLOG_PHASE)){
pureBinlogPhase = Boolean.parseBoolean(meta.remove(PURE_BINLOG_PHASE));
}
if (!BINLOG_SPLIT_ID.equals(splitId)) {
remainingSplits.remove(0);
splitFinishedOffsets.put(splitId, meta);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TCell;
import org.apache.doris.thrift.TRow;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.TransactionException;
import org.apache.doris.transaction.TransactionState;
import org.apache.doris.transaction.TransactionStatus;
Expand All @@ -47,11 +48,12 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.google.gson.annotations.SerializedName;
import static org.apache.doris.job.extensions.cdc.CdcDatabaseJob.startCdcScanner;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Map;
import java.util.UUID;

public class CdcDatabaseTask extends AbstractTask implements TxnStateChangeCallback {

Expand Down Expand Up @@ -80,8 +82,8 @@ public class CdcDatabaseTask extends AbstractTask implements TxnStateChangeCallb
COLUMN_TO_INDEX = builder.build();
}

private UUID id;
private long dbId;
@SerializedName(value = "bd")
private Backend backend;
private Long jobId;
private Map<String, String> meta;
Expand All @@ -96,7 +98,6 @@ public CdcDatabaseTask(long dbId, Backend backend, Long jobId, Map<String, Strin
this.jobId = jobId;
this.meta = meta;
this.config = config;
this.id = UUID.randomUUID();
}

@Override
Expand All @@ -114,8 +115,7 @@ public void run() throws JobException {
// //call be rpc
// //waitlock
// }


startCdcScanner(backend);
// Call the BE interface and pass host, port, jobId
// mock pull data
System.out.println("====run task...");
Expand All @@ -132,6 +132,7 @@ public void run() throws JobException {
}

public boolean beginTxn() throws JobException {
TUniqueId id = new TUniqueId(getJobId(), getTaskId());
// begin a txn for task
try {
txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(dbId,
Expand All @@ -141,8 +142,8 @@ public boolean beginTxn() throws JobException {
TransactionState.LoadJobSourceType.BACKEND_STREAMING, getJobId(),
60000);
} catch (Exception ex) {
LOG.warn("failed to begin txn for cdc load task: {}, job id: {}",
DebugUtil.printId(id), jobId, ex);
LOG.warn("failed to begin txn for cdc load task: {}, task id {}, job id: {}",
DebugUtil.printId(id), getTaskId(), jobId, ex);
return false;
}
return true;
Expand All @@ -161,7 +162,7 @@ public TRow getTvfInfo(String jobName) {
trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(super.getCreateTimeMs())));
trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(super.getStartTimeMs())));
trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(super.getFinishTimeMs())));
trow.addToColumnValue(new TCell().setStringVal(backend.getHost()));
trow.addToColumnValue(new TCell().setStringVal(backend == null ? FeConstants.null_string : backend.getHost()));
trow.addToColumnValue(new TCell().setStringVal(new Gson().toJson(meta)));
trow.addToColumnValue(new TCell()
.setStringVal(finishedMeta == null ? FeConstants.null_string : new Gson().toJson(finishedMeta)));
Expand Down Expand Up @@ -213,8 +214,8 @@ public void beforeAborted(TransactionState txnState) throws TransactionException
public void afterCommitted(TransactionState txnState, boolean txnOperated) throws UserException {
if(txnOperated){
if(txnState.getTransactionId() != txnId){
LOG.info("Can not find task with transaction {} after committed, task: {}, job: {}",
txnState.getTransactionId(), id, getJobId());
LOG.info("Can not find task with transaction {} after committed, task id: {}, job: {}",
txnState.getTransactionId(), getTaskId(), getJobId());
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ public class CdcLoadConstants {
public static final String PASSWORD = "password";
public static final String DATABASE_NAME = "database_name";
public static final String TABLE_NAME = "table_name";
public static final String DRIVER_CLASS = "driver_class";
public static final String DRIVER_URL = "driver_url";

public static final String INCLUDE_TABLES_LIST = "include_tables_list";
public static final String EXCLUDE_TABLES_LIST = "exclude_tables_list";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@

import static org.apache.doris.job.extensions.cdc.utils.CdcLoadConstants.DATABASE_NAME;
import static org.apache.doris.job.extensions.cdc.utils.CdcLoadConstants.DB_SOURCE_TYPE;
import static org.apache.doris.job.extensions.cdc.utils.CdcLoadConstants.DRIVER_CLASS;
import static org.apache.doris.job.extensions.cdc.utils.CdcLoadConstants.DRIVER_URL;
import static org.apache.doris.job.extensions.cdc.utils.CdcLoadConstants.HOST;
import static org.apache.doris.job.extensions.cdc.utils.CdcLoadConstants.INCLUDE_TABLES_LIST;
import static org.apache.doris.job.extensions.cdc.utils.CdcLoadConstants.PASSWORD;
Expand All @@ -57,8 +59,8 @@ public static JdbcClient getJdbcClient(Map<String, String> properties) throws Jo
config.setCatalog(properties.get(DB_SOURCE_TYPE));
config.setUser(properties.get(USERNAME));
config.setPassword(properties.get(PASSWORD));
config.setDriverClass(sourceType.getDriverClass());
config.setDriverUrl(sourceType.getDriverUrl());
config.setDriverClass(properties.get(DRIVER_CLASS));
config.setDriverUrl(properties.get(DRIVER_URL));
config.setJdbcUrl(
"jdbc:" + properties.get(DB_SOURCE_TYPE) + "://" + properties.get(HOST) + ":" + properties.get(PORT)
+ "/");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.doris.job.extensions.cdc.state.AbstractSourceSplit;
import org.apache.doris.job.extensions.cdc.state.BinlogSplit;
import org.apache.doris.job.extensions.cdc.state.SnapshotSplit;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils;
import org.apache.flink.cdc.connectors.mysql.debezium.reader.BinlogSplitReader;
import org.apache.flink.cdc.connectors.mysql.debezium.reader.DebeziumReader;
Expand Down Expand Up @@ -83,6 +84,7 @@ public class MySqlSourceReader implements SourceReader<RecordWithMeta, FetchReco
private static final String FINISH_SPLITS = "finishSplits";
private static final String ASSIGNED_SPLITS = "assignedSplits";
private static final String SNAPSHOT_TABLE = "snapshotTable";
private static final String PURE_BINLOG_PHASE = "pureBinlogPhase";
private static final FlinkJsonTableChangeSerializer TABLE_CHANGE_SERIALIZER =
new FlinkJsonTableChangeSerializer();
private Map<Long, MySqlSourceConfig> sourceConfigMap;
Expand Down Expand Up @@ -148,18 +150,19 @@ public List<AbstractSourceSplit> getSourceSplits(JobConfig config) throws JsonPr
public RecordWithMeta read(FetchRecordReq fetchRecord) throws Exception {
JobConfig jobConfig = new JobConfig(fetchRecord.getJobId(), fetchRecord.getConfig());
int count=0;
Map<String, String> lastMeta = new HashMap<>();
RecordWithMeta recordResponse = new RecordWithMeta(lastMeta);
RecordWithMeta recordResponse = new RecordWithMeta();
int fetchSize = fetchRecord.getFetchSize();
boolean schedule = fetchRecord.isSchedule();
MySqlSplit split = null;

boolean pureBinlogPhase = false;
if(schedule){
Map<String, String> offset = fetchRecord.getMeta();
if(offset.isEmpty()){
throw new RuntimeException("miss meta offset");
}
split = createMySqlSplit(offset, jobConfig);
Tuple2<MySqlSplit, Boolean> splitFlag = createMySqlSplit(offset, jobConfig);
split = splitFlag.f0;
pureBinlogPhase = splitFlag.f1;
//reset current reader
closeBinlogReader(jobConfig.getJobId());
SplitRecords currentSplitRecords = pollSplitRecords(split, jobConfig);
Expand Down Expand Up @@ -195,9 +198,10 @@ public RecordWithMeta read(FetchRecordReq fetchRecord) throws Exception {
continue;
}
count += serialize.size();
lastMeta = RecordUtils.getBinlogPosition(element).getOffset();
Map<String, String> lastMeta = RecordUtils.getBinlogPosition(element).getOffset();
if(split.isBinlogSplit()){
lastMeta.put(SPLIT_ID, BINLOG_SPLIT_ID);
lastMeta.put(PURE_BINLOG_PHASE, String.valueOf(pureBinlogPhase));
recordResponse.setMeta(lastMeta);
}
recordResponse.getRecords().addAll(serialize);
Expand All @@ -211,7 +215,7 @@ public RecordWithMeta read(FetchRecordReq fetchRecord) throws Exception {
List<String> sqls = serializer.serialize(jobConfig.getConfig(), element);
if(!sqls.isEmpty()){
recordResponse.setSqls(sqls);
lastMeta = RecordUtils.getBinlogPosition(element).getOffset();
Map<String, String> lastMeta = RecordUtils.getBinlogPosition(element).getOffset();
lastMeta.put(SPLIT_ID, BINLOG_SPLIT_ID);
recordResponse.setMeta(lastMeta);
return recordResponse;
Expand All @@ -232,6 +236,7 @@ public RecordWithMeta read(FetchRecordReq fetchRecord) throws Exception {
if(split.isBinlogSplit()){
Map<String, String> offset = split.asBinlogSplit().getStartingOffset().getOffset();
offset.put(SPLIT_ID, BINLOG_SPLIT_ID);
offset.put(PURE_BINLOG_PHASE, String.valueOf(pureBinlogPhase));
recordResponse.setMeta(offset);
}else{
recordResponse.setMeta(fetchRecord.getMeta());
Expand All @@ -257,15 +262,16 @@ private void refreshTableChanges(SourceRecord element, Long jobId) throws IOExce
}
}

private MySqlSplit createMySqlSplit(Map<String, String> offset, JobConfig jobConfig) throws JsonProcessingException {
MySqlSplit split = null;
private Tuple2<MySqlSplit, Boolean> createMySqlSplit(Map<String, String> offset, JobConfig jobConfig) throws JsonProcessingException {
Tuple2<MySqlSplit, Boolean> splitRes = null;
String splitId = offset.get(SPLIT_ID);
if(!BINLOG_SPLIT_ID.equals(splitId)){
split = createSnapshotSplit(offset, jobConfig);
MySqlSnapshotSplit split = createSnapshotSplit(offset, jobConfig);
splitRes = Tuple2.of(split, false);
}else{
split = createBinlogSplit(offset,jobConfig);
splitRes = createBinlogSplit(offset, jobConfig);
}
return split;
return splitRes;
}

private MySqlSnapshotSplit createSnapshotSplit(Map<String, String> offset, JobConfig jobConfig) throws JsonProcessingException {
Expand All @@ -283,7 +289,7 @@ private MySqlSnapshotSplit createSnapshotSplit(Map<String, String> offset, JobCo
return split;
}

private MySqlBinlogSplit createBinlogSplit(Map<String, String> meta, JobConfig config) throws JsonProcessingException {
private Tuple2<MySqlSplit, Boolean> createBinlogSplit(Map<String, String> meta, JobConfig config) throws JsonProcessingException {
MySqlSourceConfig sourceConfig = getSourceConfig(config);
BinlogOffset offsetConfig = null;
if(sourceConfig.getStartupOptions() != null){
Expand All @@ -292,7 +298,8 @@ private MySqlBinlogSplit createBinlogSplit(Map<String, String> meta, JobConfig c

List<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos = new ArrayList<>();
BinlogOffset minOffsetFinishSplits = null;
if(meta.containsKey(FINISH_SPLITS) && meta.containsKey(ASSIGNED_SPLITS) ){
BinlogOffset maxOffsetFinishSplits = null;
if(meta.containsKey(FINISH_SPLITS) && meta.containsKey(ASSIGNED_SPLITS)) {
//Construct binlogsplit based on the finished split and assigned split.
String finishSplitsOffset = meta.remove(FINISH_SPLITS);
String assignedSplits = meta.remove(ASSIGNED_SPLITS);
Expand All @@ -309,6 +316,9 @@ private MySqlBinlogSplit createBinlogSplit(Map<String, String> meta, JobConfig c
if (minOffsetFinishSplits == null || binlogOffset.isBefore(minOffsetFinishSplits)) {
minOffsetFinishSplits = binlogOffset;
}
if (maxOffsetFinishSplits == null || binlogOffset.isAfter(maxOffsetFinishSplits)) {
maxOffsetFinishSplits = binlogOffset;
}
Object[] splitStart = split.getSplitStart() == null ? null : objectMapper.readValue(split.getSplitStart(), Object[].class);
Object[] splitEnd = split.getSplitEnd() == null ? null : objectMapper.readValue(split.getSplitEnd(), Object[].class);

Expand All @@ -334,10 +344,20 @@ private MySqlBinlogSplit createBinlogSplit(Map<String, String> meta, JobConfig c
startOffset = BinlogOffset.ofEarliest();
}

boolean pureBinlogPhase = false;
if(maxOffsetFinishSplits == null){
pureBinlogPhase = true;
}else if(startOffset.isAtOrAfter(maxOffsetFinishSplits)){
// All the offsets of the current split are smaller than the offset of the binlog,
// indicating that the binlog phase has been fully entered.
pureBinlogPhase = true;
LOG.info("The binlog phase has been fully entered, the current split is: {}", startOffset);
}

MySqlBinlogSplit split = new MySqlBinlogSplit(BINLOG_SPLIT_ID, startOffset, BinlogOffset.ofNonStopping(), finishedSnapshotSplitInfos, new HashMap<>(), 0);
//filterTableSchema
MySqlBinlogSplit binlogSplit = MySqlBinlogSplit.fillTableSchemas(split.asBinlogSplit(), getTableSchemas(config));
return binlogSplit;
return Tuple2.of(binlogSplit, pureBinlogPhase);
}

private List<MySqlSnapshotSplit> startSplitChunks(MySqlSourceConfig sourceConfig, String snapshotTable, Map<String, String> config){
Expand Down Expand Up @@ -457,8 +477,9 @@ public void close(Long jobId) {
closeSnapshotReader(jobId);
closeBinlogReader(jobId);
sourceConfigMap.remove(jobId);
tableSchemaMaps.remove(jobId).clear();
tableSchemaMaps.remove(jobId);
currentSplitRecordsMap.remove(jobId);
LOG.info("Close source reader for job {}", jobId);
}

private Map<TableId, TableChanges.TableChange> getTableSchemas(JobConfig config){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.doris.cdcloader.mysql.reader;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

Expand All @@ -27,8 +28,8 @@ public class RecordWithMeta {
private List<String> records;
private List<String> sqls;

public RecordWithMeta(Map<String, String> meta) {
this.meta = meta;
public RecordWithMeta() {
this.meta = new HashMap<>();
this.records = new ArrayList<>();
this.sqls = new ArrayList<>();
}
Expand Down

0 comments on commit 1056cd9

Please sign in to comment.