From 3e10b3bc279be114e8046a09f87825cf71efbfaa Mon Sep 17 00:00:00 2001 From: bb7133 Date: Tue, 8 Oct 2024 10:53:19 -0700 Subject: [PATCH] support TiCDC --- r/example.result | 31 ++--- src/main.go | 191 ++++++++++++++++++++++++++--- src/query.go | 2 + src/type.go | 4 +- src/util.go | 4 - t/br_integration.test | 11 ++ t/cdc_integration.test | 12 ++ t/dumpling_import_integration.test | 10 ++ t/example.test | 11 +- 9 files changed, 226 insertions(+), 50 deletions(-) create mode 100644 t/br_integration.test create mode 100644 t/cdc_integration.test create mode 100644 t/dumpling_import_integration.test diff --git a/r/example.result b/r/example.result index 09a8d7d..e009bb0 100644 --- a/r/example.result +++ b/r/example.result @@ -27,36 +27,14 @@ Error 1064 (42000): You have an error in your SQL syntax; check the manual that explain analyze format='brief' select * from t; id estRows actRows task access object execution info operator info memory disk TableReader 10000.00 5 root NULL time:, loops:, RU:, cop_task: {num:, max:, proc_keys:, tot_proc:, tot_wait:, copr_cache_hit_ratio:, build_task_duration:, max_distsql_concurrency:, rpc_info:{Cop:{num_rpc:, total_time:}} data:TableFullScan Bytes N/A -└─TableFullScan 10000.00 5 cop[tikv] table:t tikv_task:{time:, loops:, scan_detail: {total_process_keys:, total_process_keys_size:, total_keys:, get_snapshot_time:, rocksdb: {key_skipped_count:, block: {}}}, time_detail: {total_process_time:, total_wait_time:, tikv_wall_time:} keep order:false, stats:pseudo N/A N/A +└─TableFullScan 10000.00 5 cop[tikv] table:t tikv_task:{time:, loops:, scan_detail: {total_process_keys:, total_process_keys_size:, total_keys:, get_snapshot_time:, rocksdb: {delete_skipped_count:, key_skipped_count:, block: {}}}, time_detail: {total_process_time:, total_wait_time:, tikv_wall_time:} keep order:false, stats:pseudo N/A N/A explain analyze select * from t; id estRows actRows task access object execution info operator info memory disk TableReader_5 10000.00 5 root NULL time:, loops:, RU:, cop_task: {num:, max:, proc_keys:, tot_proc:, tot_wait:, copr_cache_hit_ratio:, build_task_duration:, max_distsql_concurrency:, rpc_info:{Cop:{num_rpc:, total_time:}} data:TableFullScan_4 Bytes N/A -└─TableFullScan_4 10000.00 5 cop[tikv] table:t tikv_task:{time:, loops:, scan_detail: {total_process_keys:, total_process_keys_size:, total_keys:, get_snapshot_time:, rocksdb: {key_skipped_count:, block: {}}}, time_detail: {total_process_time:, total_wait_time:, tikv_wall_time:} keep order:false, stats:pseudo N/A N/A +└─TableFullScan_4 10000.00 5 cop[tikv] table:t tikv_task:{time:, loops:, scan_detail: {total_process_keys:, total_process_keys_size:, total_keys:, get_snapshot_time:, rocksdb: {delete_skipped_count:, key_skipped_count:, block: {}}}, time_detail: {total_process_time:, total_wait_time:, tikv_wall_time:} keep order:false, stats:pseudo N/A N/A insert into t values (6, 6); affected rows: 1 info: -Destination Size BackupTS Queue Time Execution Time -/tmp/t_b7987183-d39f-4572-868d-75e25c8cd215 1876 451473879442653228 2024-07-29 14:56:13 2024-07-29 14:56:13 -affected rows: 0 -info: -affected rows: 0 -info: -affected rows: 0 -info: -Destination Size BackupTS Cluster TS Queue Time Execution Time -/tmp/t_b7987183-d39f-4572-868d-75e25c8cd215 1876 451473879442653228 451473880386371620 2024-07-29 14:56:17 2024-07-29 14:56:17 -affected rows: 0 -info: -affected rows: 0 -info: -affected rows: 0 -info: -affected rows: 0 -info: -Job_ID Data_Source Target_Table Table_ID Phase Status Source_File_Size Imported_Rows Result_Message Create_Time Start_Time End_Time Created_By -3 /tmp/t_6cac1a43-c66c-4af9-962f-95287fa12432/example.t.000000000.csv `example`.`td` 453 finished 30B 6 2024-07-29 14:56:17.619215 2024-07-29 14:56:18.125792 2024-07-29 14:56:19.640005 root@% -affected rows: 0 -info: DROP TABLE IF EXISTS t1; affected rows: 0 info: @@ -71,3 +49,8 @@ affected rows: 3 info: Records: 2 Duplicates: 1 Warnings: 0 1 use `test`;; +use example; +select * from t1; +f1 f2 +1 1 +2 2 diff --git a/src/main.go b/src/main.go index 17d922c..d59eeae 100644 --- a/src/main.go +++ b/src/main.go @@ -51,6 +51,15 @@ var ( checkErr bool pathBR string pathDumpling string + pathCDC string + addressCDC string + downstream string + + downStreamHost string + downStreamPort string + downStreamUser string + downStreamPassword string + downStreamDB string ) func init() { @@ -67,8 +76,11 @@ func init() { flag.IntVar(&retryConnCount, "retry-connection-count", 120, "The max number to retry to connect to the database.") flag.BoolVar(&checkErr, "check-error", false, "if --error ERR does not match, return error instead of just warn") flag.BoolVar(&collationDisable, "collation-disable", false, "run collation related-test with new-collation disabled") - flag.StringVar(&pathBR, "path-br", "", "Path of BR") - flag.StringVar(&pathDumpling, "path-dumpling", "", "Path of Dumpling") + flag.StringVar(&pathBR, "path-br", "", "Path of BR binary") + flag.StringVar(&pathDumpling, "path-dumpling", "", "Path of Dumpling binary") + flag.StringVar(&pathCDC, "path-cdc", "", "Path of TiCDC binary") + flag.StringVar(&addressCDC, "address-cdc", "127.0.0.1:8300", "Address of Server") + flag.StringVar(&downstream, "downstream", "", "Connection string of downstream TiDB cluster") } const ( @@ -165,6 +177,12 @@ type tester struct { // dump and import context through --dump_and_import $SOURCE_TABLE as $TARGET_TABLE' dumpAndImport *SourceAndTarget + + // replication checkpoint database name + replicationCheckpointDB string + + // replication checkpoint ID + replicationCheckpointID int } func newTester(name string) *tester { @@ -179,6 +197,8 @@ func newTester(name string) *tester { t.enableConcurrent = false t.enableInfo = false + t.replicationCheckpointDB = "checkpoint-" + uuid.NewString() + t.replicationCheckpointID = 0 return t } @@ -219,7 +239,7 @@ func isTiDB(db *sql.DB) bool { return true } -func (t *tester) addConnection(connName, hostName, userName, password, db string) { +func (t *tester) addConnection(connName, hostName, port, userName, password, db string) { var ( mdb *sql.DB err error @@ -285,6 +305,64 @@ func (t *tester) disconnect(connName string) { t.currConnName = default_connection } +func parseUserInfo(userInfo string) (string, string, error) { + colonIndex := strings.Index(userInfo, ":") + if colonIndex == -1 { + return "", "", fmt.Errorf("missing password in userinfo") + } + return userInfo[:colonIndex], userInfo[colonIndex+1:], nil +} + +func parseHostPort(hostPort string) (string, string, error) { + colonIndex := strings.Index(hostPort, ":") + if colonIndex == -1 { + return "", "", fmt.Errorf("missing port in host:port") + } + return hostPort[:colonIndex], hostPort[colonIndex+1:], nil +} + +func parseDownstream(connStr string) (dbname string, host string, port string, user string, password string) { + // Splitting into userinfo and network/database parts + parts := strings.SplitN(connStr, "@", 2) + if len(parts) != 2 { + fmt.Println("Invalid connection string format") + return + } + + // Parsing userinfo + userInfo := parts[0] + user, password, err := parseUserInfo(userInfo) + if err != nil { + fmt.Println("Error parsing userinfo:", err) + return + } + + // Splitting network type and database part + networkAndDB := parts[1] + networkTypeIndex := strings.Index(networkAndDB, "(") + if networkTypeIndex == -1 { + fmt.Println("Invalid connection string format: missing network type") + return + } + + // Extracting host, port, and database name + hostPortDB := networkAndDB[networkTypeIndex+1:] + hostPortDBParts := strings.SplitN(hostPortDB, ")/", 2) + if len(hostPortDBParts) != 2 { + fmt.Println("Invalid connection string format") + return + } + + host, port, err = parseHostPort(hostPortDBParts[0]) + if err != nil { + fmt.Println("Error parsing host and port:", err) + return + } + + dbname = hostPortDBParts[1] + return +} + func (t *tester) preProcess() { dbName := "test" mdb, err := OpenDBWithRetry("mysql", user+":"+passwd+"@tcp("+host+":"+port+")/"+dbName+"?time_zone=%27Asia%2FShanghai%27&allowAllFiles=true"+params, retryConnCount) @@ -313,6 +391,7 @@ func (t *tester) preProcess() { log.Fatalf("Executing create db %s err[%v]", dbName, err) } t.mdb = mdb + conn, err := initConn(mdb, user, passwd, host, dbName) if err != nil { log.Fatalf("Open db err %v", err) @@ -320,6 +399,17 @@ func (t *tester) preProcess() { t.conn[default_connection] = conn t.curr = conn t.currConnName = default_connection + + if downstream != "" { + // create replication checkpoint database + if _, err := t.mdb.Exec(fmt.Sprintf("create database if not exists `%s`", t.replicationCheckpointDB)); err != nil { + log.Fatalf("Executing create db %s err[%v]", t.replicationCheckpointDB, err) + } + + downStreamDB, downStreamHost, downStreamPort, downStreamUser, downStreamPassword = parseDownstream(downstream) + t.addConnection("downstream", downStreamHost, downStreamPort, downStreamUser, downStreamPassword, downStreamDB) + } + t.switchConnection(default_connection) } func (t *tester) postProcess() { @@ -329,6 +419,7 @@ func (t *tester) postProcess() { } t.mdb.Close() }() + t.switchConnection(default_connection) if !reserveSchema { rows, err := t.mdb.Query("show databases") if err != nil { @@ -384,6 +475,11 @@ func generateBRStatements(source, target string) (string, string) { } func (t *tester) dumpTable(source string) (string, error) { + // Check if the file exists + if _, err := os.Stat(pathDumpling); os.IsNotExist(err) { + return "", errors.New(fmt.Sprintf("path-dumpling [%s] does not exist.", pathDumpling)) + } + log.Warnf("Start dumping table: %s", source) path := "/tmp/" + source + "_" + uuid.NewString() cmdArgs := []string{ @@ -392,6 +488,8 @@ func (t *tester) dumpTable(source string) (string, error) { fmt.Sprintf("-u%s", user), fmt.Sprintf("-T%s.%s", t.name, source), fmt.Sprintf("-o%s", path), + "--output-filename-template", + "tempDump", "--no-header", "--filetype", "csv", @@ -405,9 +503,7 @@ func (t *tester) dumpTable(source string) (string, error) { output, err := cmd.CombinedOutput() if err != nil { - log.Warnf("Failed executing commands: %s, output: %s)", - cmd.String(), string(output)) - return "", err + return "", errors.Annotate(err, fmt.Sprintf("Dumpling failed: %s, output: %s.", cmd.String(), string(output))) } log.Warnf("Done executing commands: %s, output: %s)", cmd.String(), string(output)) @@ -417,10 +513,57 @@ func (t *tester) dumpTable(source string) (string, error) { func (t *tester) importTableStmt(path, target string) string { return fmt.Sprintf(` IMPORT INTO %s - FROM '%s/example.t.000000000.csv' + FROM '%s/tempDump.csv' `, target, path) } +func (t *tester) startReplication(tables string) error { + return nil +} + +func (t *tester) waitForReplicationCheckpoint() error { + curr := t.currConnName + defer t.switchConnection(curr) + + if err := t.executeStmt(fmt.Sprintf("use `%s`", t.replicationCheckpointDB)); err != nil { + return err + } + + markerTable := fmt.Sprintf("marker_%d", t.replicationCheckpointID) + if err := t.executeStmt(fmt.Sprintf("create table `%s`.`%s` (id int primary key)", t.replicationCheckpointDB, markerTable)); err != nil { + return err + } + + t.switchConnection("downstream") + + checkInterval := 1 * time.Second + queryTimeout := 10 * time.Second + + // Keep querying until the table is found + for { + ctx, cancel := context.WithTimeout(context.Background(), queryTimeout) + defer cancel() + + query := fmt.Sprintf("select * from information_schema.tables where table_schema = '%s' and table_name = '%s';", t.replicationCheckpointDB, markerTable) + rows, err := t.mdb.QueryContext(ctx, query) + if err != nil { + log.Printf("Error checking for table: %v", err) + return err + } + + if rows.Next() { + fmt.Printf("Table '%s' found!\n", markerTable) + break + } else { + fmt.Printf("Table '%s' not found. Retrying in %v...\n", markerTable, checkInterval) + } + + time.Sleep(checkInterval) + } + + return nil +} + func (t *tester) Run() error { t.preProcess() defer t.postProcess() @@ -543,7 +686,7 @@ func (t *tester) Run() error { for i := 0; i < 4; i++ { args = append(args, "") } - t.addConnection(args[0], args[1], args[2], args[3], args[4]) + t.addConnection(args[0], args[1], port, args[2], args[3], args[4]) case Q_CONNECTION: q.Query = strings.TrimSpace(q.Query) if q.Query[len(q.Query)-1] == ';' { @@ -593,16 +736,17 @@ func (t *tester) Run() error { } t.replaceRegex = regex case Q_BACKUP_AND_RESTORE: + if !isTiDB(t.mdb) { + return errors.New(fmt.Sprintf("backup_and_restore is only supported on TiDB, line: %d sql:%v", q.Line, q.Query)) + } t.backupAndRestore, err = parseSourceAndTarget(q.Query) if err != nil { return errors.Annotate(err, fmt.Sprintf("Could not parse backup table and restore table name in --backup_and_restore, line: %d sql:%v", q.Line, q.Query)) } backupStmt, restoreStmt := generateBRStatements(t.backupAndRestore.sourceTable, t.backupAndRestore.targetTable) - log.WithFields(log.Fields{"stmt": backupStmt, "line": q.Line}).Warn("Backup started") if err := t.executeStmt(backupStmt); err != nil { return err } - log.WithFields(log.Fields{"stmt": backupStmt, "line": q.Line}).Warn("Backup end") tempTable := t.backupAndRestore.sourceTable + uuid.NewString() renameStmt := fmt.Sprintf("RENAME TABLE `%s` TO `%s`", t.backupAndRestore.sourceTable, tempTable) if err := t.executeStmt(renameStmt); err != nil { @@ -612,11 +756,9 @@ func (t *tester) Run() error { if err := t.executeStmt(dupTableStmt); err != nil { return err } - log.WithFields(log.Fields{"stmt": restoreStmt, "line": q.Line}).Warn("Restore start") if err := t.executeStmt(restoreStmt); err != nil { return err } - log.WithFields(log.Fields{"stmt": restoreStmt, "line": q.Line}).Warn("Restore end") renameStmt = fmt.Sprintf("RENAME TABLE `%s` TO `%s`", t.backupAndRestore.sourceTable, t.backupAndRestore.targetTable) if err := t.executeStmt(renameStmt); err != nil { return err @@ -626,6 +768,9 @@ func (t *tester) Run() error { return err } case Q_DUMP_AND_IMPORT: + if !isTiDB(t.mdb) { + return errors.New(fmt.Sprintf("dump_and_import is only supported on TiDB, line: %d sql:%v", q.Line, q.Query)) + } t.dumpAndImport, err = parseSourceAndTarget(q.Query) if err != nil { return err @@ -634,19 +779,28 @@ func (t *tester) Run() error { if err != nil { return err } - - dupTableStmt := fmt.Sprintf("CREATE TABLE `%s` LIKE `%s`", t.dumpAndImport.targetTable, t.backupAndRestore.sourceTable) + dupTableStmt := fmt.Sprintf("CREATE TABLE `%s` LIKE `%s`", t.dumpAndImport.targetTable, t.dumpAndImport.sourceTable) if err := t.executeStmt(dupTableStmt); err != nil { return err } - importStmt := t.importTableStmt(path, t.dumpAndImport.targetTable) - log.WithFields(log.Fields{"stmt": importStmt, "line": q.Line}).Warn("Import start") if err = t.executeStmt(importStmt); err != nil { return err } - log.WithFields(log.Fields{"stmt": importStmt, "line": q.Line}).Warn("Restore end") - + case Q_REPLICATION: + if !isTiDB(t.mdb) { + return errors.New(fmt.Sprintf("replication is only supported on TiDB, line: %d sql:%v", q.Line, q.Query)) + } + if err := t.startReplication(q.Query); err != nil { + return err + } + case Q_REPLICATION_CHECKPOINT: + if !isTiDB(t.mdb) { + return errors.New(fmt.Sprintf("replication_checkpoint is only supported on TiDB, line: %d sql:%v", q.Line, q.Query)) + } + if err := t.waitForReplicationCheckpoint(); err != nil { + return err + } default: log.WithFields(log.Fields{"command": q.firstWord, "arguments": q.Query, "line": q.Line}).Warn("command not implemented") } @@ -663,7 +817,6 @@ func (t *tester) Run() error { if xmlPath != "" { t.addSuccess(&testSuite, &startTime, testCnt) } - return t.flushResult() } diff --git a/src/query.go b/src/query.go index de14658..461ab10 100644 --- a/src/query.go +++ b/src/query.go @@ -126,6 +126,8 @@ const ( Q_EMPTY_LINE Q_BACKUP_AND_RESTORE Q_DUMP_AND_IMPORT + Q_REPLICATION + Q_REPLICATION_CHECKPOINT ) // ParseQueries parses an array of string into an array of query object. diff --git a/src/type.go b/src/type.go index 2267d65..a4343c9 100644 --- a/src/type.go +++ b/src/type.go @@ -115,7 +115,9 @@ var commandMap = map[string]int{ "begin_concurrent": Q_BEGIN_CONCURRENT, "end_concurrent": Q_END_CONCURRENT, "backup_and_restore": Q_BACKUP_AND_RESTORE, - "dump_and_import": Q_DUMP_AND_IMPORT, + "dump_and_import": Q_DUMP_AND_IMPORT, + "replication_checkpoint": Q_REPLICATION_CHECKPOINT, + "replication": Q_REPLICATION, } func findType(cmdName string) int { diff --git a/src/util.go b/src/util.go index 33b3604..a677d9b 100644 --- a/src/util.go +++ b/src/util.go @@ -15,7 +15,6 @@ package main import ( "database/sql" - "fmt" "regexp" "strings" "time" @@ -118,8 +117,5 @@ func parseSourceAndTarget(s string) (*SourceAndTarget, error) { sourceTable: strings.TrimSpace(parts[0]), targetTable: strings.TrimSpace(parts[1]), } - - fmt.Printf("Parse source: %s and target: %s\n", st.sourceTable, st.targetTable) - return st, nil } diff --git a/t/br_integration.test b/t/br_integration.test new file mode 100644 index 0000000..2188841 --- /dev/null +++ b/t/br_integration.test @@ -0,0 +1,11 @@ +# Test BR and AutoIncrement + +CREATE TABLE t1 (a INT PRIMARY KEY NONCLUSTERED AUTO_INCREMENT, b INT) AUTO_ID_CACHE = 1; +INSERT INTO t1 (b) VALUES (1), (2), (3); +SHOW TABLE t1 NEXT_ROW_ID; + +--backup_and_restore t1 AS tt1 + +SHOW TABLE tt1 NEXT_ROW_ID; +INSERT INTO tt1 (b) VALUES (4), (5), (6); +SHOW TABLE tt1 NEXT_ROW_ID; \ No newline at end of file diff --git a/t/cdc_integration.test b/t/cdc_integration.test new file mode 100644 index 0000000..e0cd210 --- /dev/null +++ b/t/cdc_integration.test @@ -0,0 +1,12 @@ +# Test TiCDC replication + +CREATE TABLE t3 (a INT PRIMARY KEY, b INT, UNIQUE KEY (b)); + +INSERT INTO t3 VALUES (1, 23); +--error ER_DUP_ENTRY: Duplicate entry '23' for key 'b' +INSERT INTO t3 VALUES (11, 23); + +--replication_checkpoint +--connection downstream +--error ER_DUP_ENTRY: Duplicate entry '23' for key 'b' +INSERT INTO t3 VALUES (11, 23); diff --git a/t/dumpling_import_integration.test b/t/dumpling_import_integration.test new file mode 100644 index 0000000..27c56a7 --- /dev/null +++ b/t/dumpling_import_integration.test @@ -0,0 +1,10 @@ +# Test Lightning and AutoRandom + +CREATE TABLE t2(c BIGINT AUTO_RANDOM PRIMARY KEY, a INT, b INT); +INSERT INTO t2(a, b) VALUES (1, 1), (2, 2), (3, 3); +SELECT * FROM t2; + +--dump_and_import t2 AS tt2 + +INSERT INTO tt2(a, b) VALUES (1, 1), (2, 2), (3, 3); +SELECT * FROM tt2; \ No newline at end of file diff --git a/t/example.test b/t/example.test index 2cb4a00..8a1f389 100644 --- a/t/example.test +++ b/t/example.test @@ -38,9 +38,9 @@ explain analyze select * from t; --enable_info insert into t values (6, 6); ---backup_and_restore t AS tt +# --backup_and_restore t AS tt ---dump_and_import t AS td +# --dump_and_import t AS td DROP TABLE IF EXISTS t1; CREATE TABLE t1 (f1 INT PRIMARY KEY, f2 INT NOT NULL UNIQUE); @@ -52,3 +52,10 @@ INSERT t1 VALUES (1, 1), (1, 1) ON DUPLICATE KEY UPDATE f1 = 2, f2 = 2; --echo $a use `test`;; + +sleep 10; + +--replication_checkpoint +connection default; +use example; +select * from t1;