From 9ddf008eafe09eeecb6c969efe0c6c18c14d5c5d Mon Sep 17 00:00:00 2001 From: bb7133 Date: Sun, 4 Aug 2024 22:30:48 -0700 Subject: [PATCH] add support for dump/import and BR --- go.mod | 1 + go.sum | 2 + r/example.result | 36 +++++++++++--- src/main.go | 124 +++++++++++++++++++++++++++++++++++++++++++++++ src/query.go | 2 + src/type.go | 2 + src/util.go | 19 ++++++++ t/example.test | 4 ++ 8 files changed, 184 insertions(+), 6 deletions(-) diff --git a/go.mod b/go.mod index dd3c41c..a45e217 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( require ( github.com/davecgh/go-spew v1.1.1 // indirect + github.com/google/uuid v1.6.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect go.uber.org/atomic v1.11.0 // indirect golang.org/x/sys v0.5.0 // indirect diff --git a/go.sum b/go.sum index 77d729c..de86ac2 100644 --- a/go.sum +++ b/go.sum @@ -3,6 +3,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/defined2014/mysql v0.0.0-20231121061906-fcfacaa39f49 h1:Q3Ri7Ycix4T+Ig7I896I6w0WuCajid2SgyierI16NSo= github.com/defined2014/mysql v0.0.0-20231121061906-fcfacaa39f49/go.mod h1:5GYlY+PrT+c8FHAJTMIsyOuHUNf62KAQuRPMGssbixo= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/pingcap/errors v0.11.5-0.20221009092201-b66cddb77c32 h1:m5ZsBa5o/0CkzZXfXLaThzKuR85SnHHetqBCpzQ30h8= github.com/pingcap/errors v0.11.5-0.20221009092201-b66cddb77c32/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= diff --git a/r/example.result b/r/example.result index 6e5cb8d..09a8d7d 100644 --- a/r/example.result +++ b/r/example.result @@ -6,8 +6,9 @@ a b SELECT 1 FROM NON_EXISTING_TABLE; Error 1146 (42S02): Table 'example.NON_EXISTING_TABLE' doesn't exist SELECT 2 FROM NON_EXISTING_TABLE; +Error 1146 (42S02): Table 'example.NON_EXISTING_TABLE' doesn't exist SELECT 3 FROM NON_EXISTING_TABLE; -Got one of the listed errors +Error 1146 (42S02): Table 'example.NON_EXISTING_TABLE' doesn't exist SELECT 4; 4 4 @@ -20,19 +21,42 @@ SELECT 6; 1 SELECT; Error 1064 (42000): You have an error in your SQL syntax; check the manual that corresponds to your TiDB version for the right syntax to use line 1 column 1 near "1 SELECT;" 2 SELECT; +Error 1064 (42000): You have an error in your SQL syntax; check the manual that corresponds to your TiDB version for the right syntax to use line 1 column 1 near "2 SELECT;" 3 SELECT; -Got one of the listed errors +Error 1064 (42000): You have an error in your SQL syntax; check the manual that corresponds to your TiDB version for the right syntax to use line 1 column 1 near "3 SELECT;" explain analyze format='brief' select * from t; id estRows actRows task access object execution info operator info memory disk -TableReader 10000.00 5 root NULL time:, loops:, RU:, cop_task: {num:, max:, proc_keys:, rpc_num:, rpc_time:, copr_cache_hit_ratio:, build_task_duration:, max_distsql_concurrency:} data:TableFullScan Bytes N/A -└─TableFullScan 10000.00 5 cop[tikv] table:t tikv_task:{time:, loops:} keep order:false, stats:pseudo N/A N/A +TableReader 10000.00 5 root NULL time:, loops:, RU:, cop_task: {num:, max:, proc_keys:, tot_proc:, tot_wait:, copr_cache_hit_ratio:, build_task_duration:, max_distsql_concurrency:, rpc_info:{Cop:{num_rpc:, total_time:}} data:TableFullScan Bytes N/A +└─TableFullScan 10000.00 5 cop[tikv] table:t tikv_task:{time:, loops:, scan_detail: {total_process_keys:, total_process_keys_size:, total_keys:, get_snapshot_time:, rocksdb: {key_skipped_count:, block: {}}}, time_detail: {total_process_time:, total_wait_time:, tikv_wall_time:} keep order:false, stats:pseudo N/A N/A explain analyze select * from t; id estRows actRows task access object execution info operator info memory disk -TableReader_5 10000.00 5 root NULL time:, loops:, RU:, cop_task: {num:, max:, proc_keys:, rpc_num:, rpc_time:, copr_cache_hit_ratio:, build_task_duration:, max_distsql_concurrency:} data:TableFullScan_4 Bytes N/A -└─TableFullScan_4 10000.00 5 cop[tikv] table:t tikv_task:{time:, loops:} keep order:false, stats:pseudo N/A N/A +TableReader_5 10000.00 5 root NULL time:, loops:, RU:, cop_task: {num:, max:, proc_keys:, tot_proc:, tot_wait:, copr_cache_hit_ratio:, build_task_duration:, max_distsql_concurrency:, rpc_info:{Cop:{num_rpc:, total_time:}} data:TableFullScan_4 Bytes N/A +└─TableFullScan_4 10000.00 5 cop[tikv] table:t tikv_task:{time:, loops:, scan_detail: {total_process_keys:, total_process_keys_size:, total_keys:, get_snapshot_time:, rocksdb: {key_skipped_count:, block: {}}}, time_detail: {total_process_time:, total_wait_time:, tikv_wall_time:} keep order:false, stats:pseudo N/A N/A insert into t values (6, 6); affected rows: 1 info: +Destination Size BackupTS Queue Time Execution Time +/tmp/t_b7987183-d39f-4572-868d-75e25c8cd215 1876 451473879442653228 2024-07-29 14:56:13 2024-07-29 14:56:13 +affected rows: 0 +info: +affected rows: 0 +info: +affected rows: 0 +info: +Destination Size BackupTS Cluster TS Queue Time Execution Time +/tmp/t_b7987183-d39f-4572-868d-75e25c8cd215 1876 451473879442653228 451473880386371620 2024-07-29 14:56:17 2024-07-29 14:56:17 +affected rows: 0 +info: +affected rows: 0 +info: +affected rows: 0 +info: +affected rows: 0 +info: +Job_ID Data_Source Target_Table Table_ID Phase Status Source_File_Size Imported_Rows Result_Message Create_Time Start_Time End_Time Created_By +3 /tmp/t_6cac1a43-c66c-4af9-962f-95287fa12432/example.t.000000000.csv `example`.`td` 453 finished 30B 6 2024-07-29 14:56:17.619215 2024-07-29 14:56:18.125792 2024-07-29 14:56:19.640005 root@% +affected rows: 0 +info: DROP TABLE IF EXISTS t1; affected rows: 0 info: diff --git a/src/main.go b/src/main.go index 3da256a..17d922c 100644 --- a/src/main.go +++ b/src/main.go @@ -20,6 +20,7 @@ import ( "flag" "fmt" "os" + "os/exec" "path/filepath" "regexp" "sort" @@ -29,6 +30,7 @@ import ( "time" "github.com/defined2014/mysql" + "github.com/google/uuid" "github.com/pingcap/errors" log "github.com/sirupsen/logrus" ) @@ -47,6 +49,8 @@ var ( retryConnCount int collationDisable bool checkErr bool + pathBR string + pathDumpling string ) func init() { @@ -63,6 +67,8 @@ func init() { flag.IntVar(&retryConnCount, "retry-connection-count", 120, "The max number to retry to connect to the database.") flag.BoolVar(&checkErr, "check-error", false, "if --error ERR does not match, return error instead of just warn") flag.BoolVar(&collationDisable, "collation-disable", false, "run collation related-test with new-collation disabled") + flag.StringVar(&pathBR, "path-br", "", "Path of BR") + flag.StringVar(&pathDumpling, "path-dumpling", "", "Path of Dumpling") } const ( @@ -98,6 +104,11 @@ type ReplaceRegex struct { replace string } +type SourceAndTarget struct { + sourceTable string + targetTable string +} + type tester struct { mdb *sql.DB name string @@ -148,6 +159,12 @@ type tester struct { // replace output result through --replace_regex /\.dll/.so/ replaceRegex []*ReplaceRegex + + // backup and restore context through --backup_and_restore $BACKUP_TABLE as $RESTORE_TABLE' + backupAndRestore *SourceAndTarget + + // dump and import context through --dump_and_import $SOURCE_TABLE as $TARGET_TABLE' + dumpAndImport *SourceAndTarget } func newTester(name string) *tester { @@ -352,6 +369,58 @@ func (t *tester) addSuccess(testSuite *XUnitTestSuite, startTime *time.Time, cnt }) } +func generateBRStatements(source, target string) (string, string) { + // Generate a random UUID + uuid := uuid.NewString() + + // Create the TMP_DIR path + tmpDir := fmt.Sprintf("/tmp/%s_%s", source, uuid) + + // Generate the SQL statements + backupSQL := fmt.Sprintf("BACKUP TABLE `%s` TO '%s'", source, tmpDir) + restoreSQL := fmt.Sprintf("RESTORE TABLE `%s` FROM '%s'", source, tmpDir) + + return backupSQL, restoreSQL +} + +func (t *tester) dumpTable(source string) (string, error) { + log.Warnf("Start dumping table: %s", source) + path := "/tmp/" + source + "_" + uuid.NewString() + cmdArgs := []string{ + fmt.Sprintf("-h%s", host), + fmt.Sprintf("-P%s", port), + fmt.Sprintf("-u%s", user), + fmt.Sprintf("-T%s.%s", t.name, source), + fmt.Sprintf("-o%s", path), + "--no-header", + "--filetype", + "csv", + } + + if passwd != "" { + cmdArgs = append(cmdArgs, fmt.Sprintf("-p%s", passwd)) + } + + cmd := exec.Command(pathDumpling, cmdArgs...) + + output, err := cmd.CombinedOutput() + if err != nil { + log.Warnf("Failed executing commands: %s, output: %s)", + cmd.String(), string(output)) + return "", err + } + log.Warnf("Done executing commands: %s, output: %s)", + cmd.String(), string(output)) + return path, nil +} + +func (t *tester) importTableStmt(path, target string) string { + return fmt.Sprintf(` + IMPORT INTO %s + FROM '%s/example.t.000000000.csv' + `, target, path) +} + func (t *tester) Run() error { t.preProcess() defer t.postProcess() @@ -523,6 +592,61 @@ func (t *tester) Run() error { return errors.Annotate(err, fmt.Sprintf("Could not parse regex in --replace_regex: line: %d sql:%v", q.Line, q.Query)) } t.replaceRegex = regex + case Q_BACKUP_AND_RESTORE: + t.backupAndRestore, err = parseSourceAndTarget(q.Query) + if err != nil { + return errors.Annotate(err, fmt.Sprintf("Could not parse backup table and restore table name in --backup_and_restore, line: %d sql:%v", q.Line, q.Query)) + } + backupStmt, restoreStmt := generateBRStatements(t.backupAndRestore.sourceTable, t.backupAndRestore.targetTable) + log.WithFields(log.Fields{"stmt": backupStmt, "line": q.Line}).Warn("Backup started") + if err := t.executeStmt(backupStmt); err != nil { + return err + } + log.WithFields(log.Fields{"stmt": backupStmt, "line": q.Line}).Warn("Backup end") + tempTable := t.backupAndRestore.sourceTable + uuid.NewString() + renameStmt := fmt.Sprintf("RENAME TABLE `%s` TO `%s`", t.backupAndRestore.sourceTable, tempTable) + if err := t.executeStmt(renameStmt); err != nil { + return err + } + dupTableStmt := fmt.Sprintf("CREATE TABLE `%s` LIKE `%s`", t.backupAndRestore.sourceTable, tempTable) + if err := t.executeStmt(dupTableStmt); err != nil { + return err + } + log.WithFields(log.Fields{"stmt": restoreStmt, "line": q.Line}).Warn("Restore start") + if err := t.executeStmt(restoreStmt); err != nil { + return err + } + log.WithFields(log.Fields{"stmt": restoreStmt, "line": q.Line}).Warn("Restore end") + renameStmt = fmt.Sprintf("RENAME TABLE `%s` TO `%s`", t.backupAndRestore.sourceTable, t.backupAndRestore.targetTable) + if err := t.executeStmt(renameStmt); err != nil { + return err + } + renameStmt = fmt.Sprintf("RENAME TABLE `%s` TO `%s`", tempTable, t.backupAndRestore.sourceTable) + if err := t.executeStmt(renameStmt); err != nil { + return err + } + case Q_DUMP_AND_IMPORT: + t.dumpAndImport, err = parseSourceAndTarget(q.Query) + if err != nil { + return err + } + path, err := t.dumpTable(t.dumpAndImport.sourceTable) + if err != nil { + return err + } + + dupTableStmt := fmt.Sprintf("CREATE TABLE `%s` LIKE `%s`", t.dumpAndImport.targetTable, t.backupAndRestore.sourceTable) + if err := t.executeStmt(dupTableStmt); err != nil { + return err + } + + importStmt := t.importTableStmt(path, t.dumpAndImport.targetTable) + log.WithFields(log.Fields{"stmt": importStmt, "line": q.Line}).Warn("Import start") + if err = t.executeStmt(importStmt); err != nil { + return err + } + log.WithFields(log.Fields{"stmt": importStmt, "line": q.Line}).Warn("Restore end") + default: log.WithFields(log.Fields{"command": q.firstWord, "arguments": q.Query, "line": q.Line}).Warn("command not implemented") } diff --git a/src/query.go b/src/query.go index 6a128d8..de14658 100644 --- a/src/query.go +++ b/src/query.go @@ -124,6 +124,8 @@ const ( Q_COMMENT /* Comments, ignored. */ Q_COMMENT_WITH_COMMAND Q_EMPTY_LINE + Q_BACKUP_AND_RESTORE + Q_DUMP_AND_IMPORT ) // ParseQueries parses an array of string into an array of query object. diff --git a/src/type.go b/src/type.go index 50ea5a6..2267d65 100644 --- a/src/type.go +++ b/src/type.go @@ -114,6 +114,8 @@ var commandMap = map[string]int{ "single_query": Q_SINGLE_QUERY, "begin_concurrent": Q_BEGIN_CONCURRENT, "end_concurrent": Q_END_CONCURRENT, + "backup_and_restore": Q_BACKUP_AND_RESTORE, + "dump_and_import": Q_DUMP_AND_IMPORT, } func findType(cmdName string) int { diff --git a/src/util.go b/src/util.go index b62c64b..33b3604 100644 --- a/src/util.go +++ b/src/util.go @@ -15,6 +15,7 @@ package main import ( "database/sql" + "fmt" "regexp" "strings" "time" @@ -104,3 +105,21 @@ func ParseReplaceRegex(originalString string) ([]*ReplaceRegex, error) { } return ret, nil } + +func parseSourceAndTarget(s string) (*SourceAndTarget, error) { + s = strings.ToLower(strings.TrimSpace(s)) + + parts := strings.Split(s, "as") + if len(parts) != 2 { + return nil, errors.Errorf("Could not parse source table and target table name: %v", s) + } + + st := &SourceAndTarget{ + sourceTable: strings.TrimSpace(parts[0]), + targetTable: strings.TrimSpace(parts[1]), + } + + fmt.Printf("Parse source: %s and target: %s\n", st.sourceTable, st.targetTable) + + return st, nil +} diff --git a/t/example.test b/t/example.test index 4b6f18d..2cb4a00 100644 --- a/t/example.test +++ b/t/example.test @@ -38,6 +38,10 @@ explain analyze select * from t; --enable_info insert into t values (6, 6); +--backup_and_restore t AS tt + +--dump_and_import t AS td + DROP TABLE IF EXISTS t1; CREATE TABLE t1 (f1 INT PRIMARY KEY, f2 INT NOT NULL UNIQUE); INSERT t1 VALUES (1, 1);