Skip to content

Commit

Permalink
support TiCDC
Browse files Browse the repository at this point in the history
  • Loading branch information
bb7133 committed Oct 11, 2024
1 parent 9ddf008 commit 3e10b3b
Show file tree
Hide file tree
Showing 9 changed files with 226 additions and 50 deletions.
31 changes: 7 additions & 24 deletions r/example.result
Original file line number Diff line number Diff line change
Expand Up @@ -27,36 +27,14 @@ Error 1064 (42000): You have an error in your SQL syntax; check the manual that
explain analyze format='brief' select * from t;
id estRows actRows task access object execution info operator info memory disk
TableReader 10000.00 5 root NULL time:<num>, loops:<num>, RU:<num>, cop_task: {num:<num>, max:<num>, proc_keys:<num>, tot_proc:<num>, tot_wait:<num>, copr_cache_hit_ratio:<num>, build_task_duration:<num>, max_distsql_concurrency:<num>, rpc_info:{Cop:{num_rpc:<num>, total_time:<num>}} data:TableFullScan <num> Bytes N/A
└─TableFullScan 10000.00 5 cop[tikv] table:t tikv_task:{time:<num>, loops:<num>, scan_detail: {total_process_keys:<num>, total_process_keys_size:<num>, total_keys:<num>, get_snapshot_time:<num>, rocksdb: {key_skipped_count:<num>, block: {}}}, time_detail: {total_process_time:<num>, total_wait_time:<num>, tikv_wall_time:<num>} keep order:false, stats:pseudo N/A N/A
└─TableFullScan 10000.00 5 cop[tikv] table:t tikv_task:{time:<num>, loops:<num>, scan_detail: {total_process_keys:<num>, total_process_keys_size:<num>, total_keys:<num>, get_snapshot_time:<num>, rocksdb: {delete_skipped_count:<num>, key_skipped_count:<num>, block: {}}}, time_detail: {total_process_time:<num>, total_wait_time:<num>, tikv_wall_time:<num>} keep order:false, stats:pseudo N/A N/A
explain analyze select * from t;
id estRows actRows task access object execution info operator info memory disk
TableReader_5 10000.00 5 root NULL time:<num>, loops:<num>, RU:<num>, cop_task: {num:<num>, max:<num>, proc_keys:<num>, tot_proc:<num>, tot_wait:<num>, copr_cache_hit_ratio:<num>, build_task_duration:<num>, max_distsql_concurrency:<num>, rpc_info:{Cop:{num_rpc:<num>, total_time:<num>}} data:TableFullScan_4 <num> Bytes N/A
└─TableFullScan_4 10000.00 5 cop[tikv] table:t tikv_task:{time:<num>, loops:<num>, scan_detail: {total_process_keys:<num>, total_process_keys_size:<num>, total_keys:<num>, get_snapshot_time:<num>, rocksdb: {key_skipped_count:<num>, block: {}}}, time_detail: {total_process_time:<num>, total_wait_time:<num>, tikv_wall_time:<num>} keep order:false, stats:pseudo N/A N/A
└─TableFullScan_4 10000.00 5 cop[tikv] table:t tikv_task:{time:<num>, loops:<num>, scan_detail: {total_process_keys:<num>, total_process_keys_size:<num>, total_keys:<num>, get_snapshot_time:<num>, rocksdb: {delete_skipped_count:<num>, key_skipped_count:<num>, block: {}}}, time_detail: {total_process_time:<num>, total_wait_time:<num>, tikv_wall_time:<num>} keep order:false, stats:pseudo N/A N/A
insert into t values (6, 6);
affected rows: 1
info:
Destination Size BackupTS Queue Time Execution Time
/tmp/t_b7987183-d39f-4572-868d-75e25c8cd215 1876 451473879442653228 2024-07-29 14:56:13 2024-07-29 14:56:13
affected rows: 0
info:
affected rows: 0
info:
affected rows: 0
info:
Destination Size BackupTS Cluster TS Queue Time Execution Time
/tmp/t_b7987183-d39f-4572-868d-75e25c8cd215 1876 451473879442653228 451473880386371620 2024-07-29 14:56:17 2024-07-29 14:56:17
affected rows: 0
info:
affected rows: 0
info:
affected rows: 0
info:
affected rows: 0
info:
Job_ID Data_Source Target_Table Table_ID Phase Status Source_File_Size Imported_Rows Result_Message Create_Time Start_Time End_Time Created_By
3 /tmp/t_6cac1a43-c66c-4af9-962f-95287fa12432/example.t.000000000.csv `example`.`td` 453 finished 30B 6 2024-07-29 14:56:17.619215 2024-07-29 14:56:18.125792 2024-07-29 14:56:19.640005 root@%
affected rows: 0
info:
DROP TABLE IF EXISTS t1;
affected rows: 0
info:
Expand All @@ -71,3 +49,8 @@ affected rows: 3
info: Records: 2 Duplicates: 1 Warnings: 0
1
use `test`;;
use example;
select * from t1;
f1 f2
1 1
2 2
191 changes: 172 additions & 19 deletions src/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,15 @@ var (
checkErr bool
pathBR string
pathDumpling string
pathCDC string
addressCDC string
downstream string

downStreamHost string
downStreamPort string
downStreamUser string
downStreamPassword string
downStreamDB string
)

func init() {
Expand All @@ -67,8 +76,11 @@ func init() {
flag.IntVar(&retryConnCount, "retry-connection-count", 120, "The max number to retry to connect to the database.")
flag.BoolVar(&checkErr, "check-error", false, "if --error ERR does not match, return error instead of just warn")
flag.BoolVar(&collationDisable, "collation-disable", false, "run collation related-test with new-collation disabled")
flag.StringVar(&pathBR, "path-br", "", "Path of BR")
flag.StringVar(&pathDumpling, "path-dumpling", "", "Path of Dumpling")
flag.StringVar(&pathBR, "path-br", "", "Path of BR binary")
flag.StringVar(&pathDumpling, "path-dumpling", "", "Path of Dumpling binary")
flag.StringVar(&pathCDC, "path-cdc", "", "Path of TiCDC binary")
flag.StringVar(&addressCDC, "address-cdc", "127.0.0.1:8300", "Address of Server")
flag.StringVar(&downstream, "downstream", "", "Connection string of downstream TiDB cluster")
}

const (
Expand Down Expand Up @@ -165,6 +177,12 @@ type tester struct {

// dump and import context through --dump_and_import $SOURCE_TABLE as $TARGET_TABLE'
dumpAndImport *SourceAndTarget

// replication checkpoint database name
replicationCheckpointDB string

// replication checkpoint ID
replicationCheckpointID int
}

func newTester(name string) *tester {
Expand All @@ -179,6 +197,8 @@ func newTester(name string) *tester {
t.enableConcurrent = false
t.enableInfo = false

t.replicationCheckpointDB = "checkpoint-" + uuid.NewString()
t.replicationCheckpointID = 0
return t
}

Expand Down Expand Up @@ -219,7 +239,7 @@ func isTiDB(db *sql.DB) bool {
return true
}

func (t *tester) addConnection(connName, hostName, userName, password, db string) {
func (t *tester) addConnection(connName, hostName, port, userName, password, db string) {
var (
mdb *sql.DB
err error
Expand Down Expand Up @@ -285,6 +305,64 @@ func (t *tester) disconnect(connName string) {
t.currConnName = default_connection
}

func parseUserInfo(userInfo string) (string, string, error) {
colonIndex := strings.Index(userInfo, ":")
if colonIndex == -1 {
return "", "", fmt.Errorf("missing password in userinfo")
}
return userInfo[:colonIndex], userInfo[colonIndex+1:], nil
}

func parseHostPort(hostPort string) (string, string, error) {
colonIndex := strings.Index(hostPort, ":")
if colonIndex == -1 {
return "", "", fmt.Errorf("missing port in host:port")
}
return hostPort[:colonIndex], hostPort[colonIndex+1:], nil
}

func parseDownstream(connStr string) (dbname string, host string, port string, user string, password string) {
// Splitting into userinfo and network/database parts
parts := strings.SplitN(connStr, "@", 2)
if len(parts) != 2 {
fmt.Println("Invalid connection string format")
return
}

// Parsing userinfo
userInfo := parts[0]
user, password, err := parseUserInfo(userInfo)
if err != nil {
fmt.Println("Error parsing userinfo:", err)
return
}

// Splitting network type and database part
networkAndDB := parts[1]
networkTypeIndex := strings.Index(networkAndDB, "(")
if networkTypeIndex == -1 {
fmt.Println("Invalid connection string format: missing network type")
return
}

// Extracting host, port, and database name
hostPortDB := networkAndDB[networkTypeIndex+1:]
hostPortDBParts := strings.SplitN(hostPortDB, ")/", 2)
if len(hostPortDBParts) != 2 {
fmt.Println("Invalid connection string format")
return
}

host, port, err = parseHostPort(hostPortDBParts[0])
if err != nil {
fmt.Println("Error parsing host and port:", err)
return
}

dbname = hostPortDBParts[1]
return
}

func (t *tester) preProcess() {
dbName := "test"
mdb, err := OpenDBWithRetry("mysql", user+":"+passwd+"@tcp("+host+":"+port+")/"+dbName+"?time_zone=%27Asia%2FShanghai%27&allowAllFiles=true"+params, retryConnCount)
Expand Down Expand Up @@ -313,13 +391,25 @@ func (t *tester) preProcess() {
log.Fatalf("Executing create db %s err[%v]", dbName, err)
}
t.mdb = mdb

conn, err := initConn(mdb, user, passwd, host, dbName)
if err != nil {
log.Fatalf("Open db err %v", err)
}
t.conn[default_connection] = conn
t.curr = conn
t.currConnName = default_connection

if downstream != "" {
// create replication checkpoint database
if _, err := t.mdb.Exec(fmt.Sprintf("create database if not exists `%s`", t.replicationCheckpointDB)); err != nil {
log.Fatalf("Executing create db %s err[%v]", t.replicationCheckpointDB, err)
}

downStreamDB, downStreamHost, downStreamPort, downStreamUser, downStreamPassword = parseDownstream(downstream)
t.addConnection("downstream", downStreamHost, downStreamPort, downStreamUser, downStreamPassword, downStreamDB)
}
t.switchConnection(default_connection)
}

func (t *tester) postProcess() {
Expand All @@ -329,6 +419,7 @@ func (t *tester) postProcess() {
}
t.mdb.Close()
}()
t.switchConnection(default_connection)
if !reserveSchema {
rows, err := t.mdb.Query("show databases")
if err != nil {
Expand Down Expand Up @@ -384,6 +475,11 @@ func generateBRStatements(source, target string) (string, string) {
}

func (t *tester) dumpTable(source string) (string, error) {
// Check if the file exists
if _, err := os.Stat(pathDumpling); os.IsNotExist(err) {
return "", errors.New(fmt.Sprintf("path-dumpling [%s] does not exist.", pathDumpling))
}

log.Warnf("Start dumping table: %s", source)
path := "/tmp/" + source + "_" + uuid.NewString()
cmdArgs := []string{
Expand All @@ -392,6 +488,8 @@ func (t *tester) dumpTable(source string) (string, error) {
fmt.Sprintf("-u%s", user),
fmt.Sprintf("-T%s.%s", t.name, source),
fmt.Sprintf("-o%s", path),
"--output-filename-template",
"tempDump",
"--no-header",
"--filetype",
"csv",
Expand All @@ -405,9 +503,7 @@ func (t *tester) dumpTable(source string) (string, error) {

output, err := cmd.CombinedOutput()
if err != nil {
log.Warnf("Failed executing commands: %s, output: %s)",
cmd.String(), string(output))
return "", err
return "", errors.Annotate(err, fmt.Sprintf("Dumpling failed: %s, output: %s.", cmd.String(), string(output)))
}
log.Warnf("Done executing commands: %s, output: %s)",
cmd.String(), string(output))
Expand All @@ -417,10 +513,57 @@ func (t *tester) dumpTable(source string) (string, error) {
func (t *tester) importTableStmt(path, target string) string {
return fmt.Sprintf(`
IMPORT INTO %s
FROM '%s/example.t.000000000.csv'
FROM '%s/tempDump.csv'
`, target, path)
}

func (t *tester) startReplication(tables string) error {
return nil
}

func (t *tester) waitForReplicationCheckpoint() error {
curr := t.currConnName
defer t.switchConnection(curr)

if err := t.executeStmt(fmt.Sprintf("use `%s`", t.replicationCheckpointDB)); err != nil {
return err
}

markerTable := fmt.Sprintf("marker_%d", t.replicationCheckpointID)
if err := t.executeStmt(fmt.Sprintf("create table `%s`.`%s` (id int primary key)", t.replicationCheckpointDB, markerTable)); err != nil {
return err
}

t.switchConnection("downstream")

checkInterval := 1 * time.Second
queryTimeout := 10 * time.Second

// Keep querying until the table is found
for {
ctx, cancel := context.WithTimeout(context.Background(), queryTimeout)
defer cancel()

query := fmt.Sprintf("select * from information_schema.tables where table_schema = '%s' and table_name = '%s';", t.replicationCheckpointDB, markerTable)
rows, err := t.mdb.QueryContext(ctx, query)
if err != nil {
log.Printf("Error checking for table: %v", err)
return err
}

if rows.Next() {
fmt.Printf("Table '%s' found!\n", markerTable)
break
} else {
fmt.Printf("Table '%s' not found. Retrying in %v...\n", markerTable, checkInterval)
}

time.Sleep(checkInterval)
}

return nil
}

func (t *tester) Run() error {
t.preProcess()
defer t.postProcess()
Expand Down Expand Up @@ -543,7 +686,7 @@ func (t *tester) Run() error {
for i := 0; i < 4; i++ {
args = append(args, "")
}
t.addConnection(args[0], args[1], args[2], args[3], args[4])
t.addConnection(args[0], args[1], port, args[2], args[3], args[4])
case Q_CONNECTION:
q.Query = strings.TrimSpace(q.Query)
if q.Query[len(q.Query)-1] == ';' {
Expand Down Expand Up @@ -593,16 +736,17 @@ func (t *tester) Run() error {
}
t.replaceRegex = regex
case Q_BACKUP_AND_RESTORE:
if !isTiDB(t.mdb) {
return errors.New(fmt.Sprintf("backup_and_restore is only supported on TiDB, line: %d sql:%v", q.Line, q.Query))
}
t.backupAndRestore, err = parseSourceAndTarget(q.Query)
if err != nil {
return errors.Annotate(err, fmt.Sprintf("Could not parse backup table and restore table name in --backup_and_restore, line: %d sql:%v", q.Line, q.Query))
}
backupStmt, restoreStmt := generateBRStatements(t.backupAndRestore.sourceTable, t.backupAndRestore.targetTable)
log.WithFields(log.Fields{"stmt": backupStmt, "line": q.Line}).Warn("Backup started")
if err := t.executeStmt(backupStmt); err != nil {
return err
}
log.WithFields(log.Fields{"stmt": backupStmt, "line": q.Line}).Warn("Backup end")
tempTable := t.backupAndRestore.sourceTable + uuid.NewString()
renameStmt := fmt.Sprintf("RENAME TABLE `%s` TO `%s`", t.backupAndRestore.sourceTable, tempTable)
if err := t.executeStmt(renameStmt); err != nil {
Expand All @@ -612,11 +756,9 @@ func (t *tester) Run() error {
if err := t.executeStmt(dupTableStmt); err != nil {
return err
}
log.WithFields(log.Fields{"stmt": restoreStmt, "line": q.Line}).Warn("Restore start")
if err := t.executeStmt(restoreStmt); err != nil {
return err
}
log.WithFields(log.Fields{"stmt": restoreStmt, "line": q.Line}).Warn("Restore end")
renameStmt = fmt.Sprintf("RENAME TABLE `%s` TO `%s`", t.backupAndRestore.sourceTable, t.backupAndRestore.targetTable)
if err := t.executeStmt(renameStmt); err != nil {
return err
Expand All @@ -626,6 +768,9 @@ func (t *tester) Run() error {
return err
}
case Q_DUMP_AND_IMPORT:
if !isTiDB(t.mdb) {
return errors.New(fmt.Sprintf("dump_and_import is only supported on TiDB, line: %d sql:%v", q.Line, q.Query))
}
t.dumpAndImport, err = parseSourceAndTarget(q.Query)
if err != nil {
return err
Expand All @@ -634,19 +779,28 @@ func (t *tester) Run() error {
if err != nil {
return err
}

dupTableStmt := fmt.Sprintf("CREATE TABLE `%s` LIKE `%s`", t.dumpAndImport.targetTable, t.backupAndRestore.sourceTable)
dupTableStmt := fmt.Sprintf("CREATE TABLE `%s` LIKE `%s`", t.dumpAndImport.targetTable, t.dumpAndImport.sourceTable)
if err := t.executeStmt(dupTableStmt); err != nil {
return err
}

importStmt := t.importTableStmt(path, t.dumpAndImport.targetTable)
log.WithFields(log.Fields{"stmt": importStmt, "line": q.Line}).Warn("Import start")
if err = t.executeStmt(importStmt); err != nil {
return err
}
log.WithFields(log.Fields{"stmt": importStmt, "line": q.Line}).Warn("Restore end")

case Q_REPLICATION:
if !isTiDB(t.mdb) {
return errors.New(fmt.Sprintf("replication is only supported on TiDB, line: %d sql:%v", q.Line, q.Query))
}
if err := t.startReplication(q.Query); err != nil {
return err
}
case Q_REPLICATION_CHECKPOINT:
if !isTiDB(t.mdb) {
return errors.New(fmt.Sprintf("replication_checkpoint is only supported on TiDB, line: %d sql:%v", q.Line, q.Query))
}
if err := t.waitForReplicationCheckpoint(); err != nil {
return err
}
default:
log.WithFields(log.Fields{"command": q.firstWord, "arguments": q.Query, "line": q.Line}).Warn("command not implemented")
}
Expand All @@ -663,7 +817,6 @@ func (t *tester) Run() error {
if xmlPath != "" {
t.addSuccess(&testSuite, &startTime, testCnt)
}

return t.flushResult()
}

Expand Down
Loading

0 comments on commit 3e10b3b

Please sign in to comment.