Skip to content

Commit

Permalink
add support for dump/import and BR
Browse files Browse the repository at this point in the history
  • Loading branch information
bb7133 committed Sep 19, 2024
1 parent 314107b commit 9ddf008
Show file tree
Hide file tree
Showing 8 changed files with 184 additions and 6 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
golang.org/x/sys v0.5.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/defined2014/mysql v0.0.0-20231121061906-fcfacaa39f49 h1:Q3Ri7Ycix4T+Ig7I896I6w0WuCajid2SgyierI16NSo=
github.com/defined2014/mysql v0.0.0-20231121061906-fcfacaa39f49/go.mod h1:5GYlY+PrT+c8FHAJTMIsyOuHUNf62KAQuRPMGssbixo=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/pingcap/errors v0.11.5-0.20221009092201-b66cddb77c32 h1:m5ZsBa5o/0CkzZXfXLaThzKuR85SnHHetqBCpzQ30h8=
github.com/pingcap/errors v0.11.5-0.20221009092201-b66cddb77c32/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand Down
36 changes: 30 additions & 6 deletions r/example.result
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ a b
SELECT 1 FROM NON_EXISTING_TABLE;
Error 1146 (42S02): Table 'example.NON_EXISTING_TABLE' doesn't exist
SELECT 2 FROM NON_EXISTING_TABLE;
Error 1146 (42S02): Table 'example.NON_EXISTING_TABLE' doesn't exist
SELECT 3 FROM NON_EXISTING_TABLE;
Got one of the listed errors
Error 1146 (42S02): Table 'example.NON_EXISTING_TABLE' doesn't exist
SELECT 4;
4
4
Expand All @@ -20,19 +21,42 @@ SELECT 6;
1 SELECT;
Error 1064 (42000): You have an error in your SQL syntax; check the manual that corresponds to your TiDB version for the right syntax to use line 1 column 1 near "1 SELECT;"
2 SELECT;
Error 1064 (42000): You have an error in your SQL syntax; check the manual that corresponds to your TiDB version for the right syntax to use line 1 column 1 near "2 SELECT;"
3 SELECT;
Got one of the listed errors
Error 1064 (42000): You have an error in your SQL syntax; check the manual that corresponds to your TiDB version for the right syntax to use line 1 column 1 near "3 SELECT;"
explain analyze format='brief' select * from t;
id estRows actRows task access object execution info operator info memory disk
TableReader 10000.00 5 root NULL time:<num>, loops:<num>, RU:<num>, cop_task: {num:<num>, max:<num>, proc_keys:<num>, rpc_num:<num>, rpc_time:<num>, copr_cache_hit_ratio:<num>, build_task_duration:<num>, max_distsql_concurrency:<num>} data:TableFullScan <num> Bytes N/A
└─TableFullScan 10000.00 5 cop[tikv] table:t tikv_task:{time:<num>, loops:<num>} keep order:false, stats:pseudo N/A N/A
TableReader 10000.00 5 root NULL time:<num>, loops:<num>, RU:<num>, cop_task: {num:<num>, max:<num>, proc_keys:<num>, tot_proc:<num>, tot_wait:<num>, copr_cache_hit_ratio:<num>, build_task_duration:<num>, max_distsql_concurrency:<num>, rpc_info:{Cop:{num_rpc:<num>, total_time:<num>}} data:TableFullScan <num> Bytes N/A
└─TableFullScan 10000.00 5 cop[tikv] table:t tikv_task:{time:<num>, loops:<num>, scan_detail: {total_process_keys:<num>, total_process_keys_size:<num>, total_keys:<num>, get_snapshot_time:<num>, rocksdb: {key_skipped_count:<num>, block: {}}}, time_detail: {total_process_time:<num>, total_wait_time:<num>, tikv_wall_time:<num>} keep order:false, stats:pseudo N/A N/A
explain analyze select * from t;
id estRows actRows task access object execution info operator info memory disk
TableReader_5 10000.00 5 root NULL time:<num>, loops:<num>, RU:<num>, cop_task: {num:<num>, max:<num>, proc_keys:<num>, rpc_num:<num>, rpc_time:<num>, copr_cache_hit_ratio:<num>, build_task_duration:<num>, max_distsql_concurrency:<num>} data:TableFullScan_4 <num> Bytes N/A
└─TableFullScan_4 10000.00 5 cop[tikv] table:t tikv_task:{time:<num>, loops:<num>} keep order:false, stats:pseudo N/A N/A
TableReader_5 10000.00 5 root NULL time:<num>, loops:<num>, RU:<num>, cop_task: {num:<num>, max:<num>, proc_keys:<num>, tot_proc:<num>, tot_wait:<num>, copr_cache_hit_ratio:<num>, build_task_duration:<num>, max_distsql_concurrency:<num>, rpc_info:{Cop:{num_rpc:<num>, total_time:<num>}} data:TableFullScan_4 <num> Bytes N/A
└─TableFullScan_4 10000.00 5 cop[tikv] table:t tikv_task:{time:<num>, loops:<num>, scan_detail: {total_process_keys:<num>, total_process_keys_size:<num>, total_keys:<num>, get_snapshot_time:<num>, rocksdb: {key_skipped_count:<num>, block: {}}}, time_detail: {total_process_time:<num>, total_wait_time:<num>, tikv_wall_time:<num>} keep order:false, stats:pseudo N/A N/A
insert into t values (6, 6);
affected rows: 1
info:
Destination Size BackupTS Queue Time Execution Time
/tmp/t_b7987183-d39f-4572-868d-75e25c8cd215 1876 451473879442653228 2024-07-29 14:56:13 2024-07-29 14:56:13
affected rows: 0
info:
affected rows: 0
info:
affected rows: 0
info:
Destination Size BackupTS Cluster TS Queue Time Execution Time
/tmp/t_b7987183-d39f-4572-868d-75e25c8cd215 1876 451473879442653228 451473880386371620 2024-07-29 14:56:17 2024-07-29 14:56:17
affected rows: 0
info:
affected rows: 0
info:
affected rows: 0
info:
affected rows: 0
info:
Job_ID Data_Source Target_Table Table_ID Phase Status Source_File_Size Imported_Rows Result_Message Create_Time Start_Time End_Time Created_By
3 /tmp/t_6cac1a43-c66c-4af9-962f-95287fa12432/example.t.000000000.csv `example`.`td` 453 finished 30B 6 2024-07-29 14:56:17.619215 2024-07-29 14:56:18.125792 2024-07-29 14:56:19.640005 root@%
affected rows: 0
info:
DROP TABLE IF EXISTS t1;
affected rows: 0
info:
Expand Down
124 changes: 124 additions & 0 deletions src/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"flag"
"fmt"
"os"
"os/exec"
"path/filepath"
"regexp"
"sort"
Expand All @@ -29,6 +30,7 @@ import (
"time"

"github.com/defined2014/mysql"
"github.com/google/uuid"
"github.com/pingcap/errors"
log "github.com/sirupsen/logrus"
)
Expand All @@ -47,6 +49,8 @@ var (
retryConnCount int
collationDisable bool
checkErr bool
pathBR string
pathDumpling string
)

func init() {
Expand All @@ -63,6 +67,8 @@ func init() {
flag.IntVar(&retryConnCount, "retry-connection-count", 120, "The max number to retry to connect to the database.")
flag.BoolVar(&checkErr, "check-error", false, "if --error ERR does not match, return error instead of just warn")
flag.BoolVar(&collationDisable, "collation-disable", false, "run collation related-test with new-collation disabled")
flag.StringVar(&pathBR, "path-br", "", "Path of BR")
flag.StringVar(&pathDumpling, "path-dumpling", "", "Path of Dumpling")
}

const (
Expand Down Expand Up @@ -98,6 +104,11 @@ type ReplaceRegex struct {
replace string
}

type SourceAndTarget struct {
sourceTable string
targetTable string
}

type tester struct {
mdb *sql.DB
name string
Expand Down Expand Up @@ -148,6 +159,12 @@ type tester struct {

// replace output result through --replace_regex /\.dll/.so/
replaceRegex []*ReplaceRegex

// backup and restore context through --backup_and_restore $BACKUP_TABLE as $RESTORE_TABLE'
backupAndRestore *SourceAndTarget

// dump and import context through --dump_and_import $SOURCE_TABLE as $TARGET_TABLE'
dumpAndImport *SourceAndTarget
}

func newTester(name string) *tester {
Expand Down Expand Up @@ -352,6 +369,58 @@ func (t *tester) addSuccess(testSuite *XUnitTestSuite, startTime *time.Time, cnt
})
}

func generateBRStatements(source, target string) (string, string) {
// Generate a random UUID
uuid := uuid.NewString()

// Create the TMP_DIR path
tmpDir := fmt.Sprintf("/tmp/%s_%s", source, uuid)

// Generate the SQL statements
backupSQL := fmt.Sprintf("BACKUP TABLE `%s` TO '%s'", source, tmpDir)
restoreSQL := fmt.Sprintf("RESTORE TABLE `%s` FROM '%s'", source, tmpDir)

return backupSQL, restoreSQL
}

func (t *tester) dumpTable(source string) (string, error) {
log.Warnf("Start dumping table: %s", source)
path := "/tmp/" + source + "_" + uuid.NewString()
cmdArgs := []string{
fmt.Sprintf("-h%s", host),
fmt.Sprintf("-P%s", port),
fmt.Sprintf("-u%s", user),
fmt.Sprintf("-T%s.%s", t.name, source),
fmt.Sprintf("-o%s", path),
"--no-header",
"--filetype",
"csv",
}

if passwd != "" {
cmdArgs = append(cmdArgs, fmt.Sprintf("-p%s", passwd))
}

cmd := exec.Command(pathDumpling, cmdArgs...)

output, err := cmd.CombinedOutput()
if err != nil {
log.Warnf("Failed executing commands: %s, output: %s)",
cmd.String(), string(output))
return "", err
}
log.Warnf("Done executing commands: %s, output: %s)",
cmd.String(), string(output))
return path, nil
}

func (t *tester) importTableStmt(path, target string) string {
return fmt.Sprintf(`
IMPORT INTO %s
FROM '%s/example.t.000000000.csv'
`, target, path)
}

func (t *tester) Run() error {
t.preProcess()
defer t.postProcess()
Expand Down Expand Up @@ -523,6 +592,61 @@ func (t *tester) Run() error {
return errors.Annotate(err, fmt.Sprintf("Could not parse regex in --replace_regex: line: %d sql:%v", q.Line, q.Query))
}
t.replaceRegex = regex
case Q_BACKUP_AND_RESTORE:
t.backupAndRestore, err = parseSourceAndTarget(q.Query)
if err != nil {
return errors.Annotate(err, fmt.Sprintf("Could not parse backup table and restore table name in --backup_and_restore, line: %d sql:%v", q.Line, q.Query))
}
backupStmt, restoreStmt := generateBRStatements(t.backupAndRestore.sourceTable, t.backupAndRestore.targetTable)
log.WithFields(log.Fields{"stmt": backupStmt, "line": q.Line}).Warn("Backup started")
if err := t.executeStmt(backupStmt); err != nil {
return err
}
log.WithFields(log.Fields{"stmt": backupStmt, "line": q.Line}).Warn("Backup end")
tempTable := t.backupAndRestore.sourceTable + uuid.NewString()
renameStmt := fmt.Sprintf("RENAME TABLE `%s` TO `%s`", t.backupAndRestore.sourceTable, tempTable)
if err := t.executeStmt(renameStmt); err != nil {
return err
}
dupTableStmt := fmt.Sprintf("CREATE TABLE `%s` LIKE `%s`", t.backupAndRestore.sourceTable, tempTable)
if err := t.executeStmt(dupTableStmt); err != nil {
return err
}
log.WithFields(log.Fields{"stmt": restoreStmt, "line": q.Line}).Warn("Restore start")
if err := t.executeStmt(restoreStmt); err != nil {
return err
}
log.WithFields(log.Fields{"stmt": restoreStmt, "line": q.Line}).Warn("Restore end")
renameStmt = fmt.Sprintf("RENAME TABLE `%s` TO `%s`", t.backupAndRestore.sourceTable, t.backupAndRestore.targetTable)
if err := t.executeStmt(renameStmt); err != nil {
return err
}
renameStmt = fmt.Sprintf("RENAME TABLE `%s` TO `%s`", tempTable, t.backupAndRestore.sourceTable)
if err := t.executeStmt(renameStmt); err != nil {
return err
}
case Q_DUMP_AND_IMPORT:
t.dumpAndImport, err = parseSourceAndTarget(q.Query)
if err != nil {
return err
}
path, err := t.dumpTable(t.dumpAndImport.sourceTable)
if err != nil {
return err
}

dupTableStmt := fmt.Sprintf("CREATE TABLE `%s` LIKE `%s`", t.dumpAndImport.targetTable, t.backupAndRestore.sourceTable)
if err := t.executeStmt(dupTableStmt); err != nil {
return err
}

importStmt := t.importTableStmt(path, t.dumpAndImport.targetTable)
log.WithFields(log.Fields{"stmt": importStmt, "line": q.Line}).Warn("Import start")
if err = t.executeStmt(importStmt); err != nil {
return err
}
log.WithFields(log.Fields{"stmt": importStmt, "line": q.Line}).Warn("Restore end")

default:
log.WithFields(log.Fields{"command": q.firstWord, "arguments": q.Query, "line": q.Line}).Warn("command not implemented")
}
Expand Down
2 changes: 2 additions & 0 deletions src/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ const (
Q_COMMENT /* Comments, ignored. */
Q_COMMENT_WITH_COMMAND
Q_EMPTY_LINE
Q_BACKUP_AND_RESTORE
Q_DUMP_AND_IMPORT
)

// ParseQueries parses an array of string into an array of query object.
Expand Down
2 changes: 2 additions & 0 deletions src/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ var commandMap = map[string]int{
"single_query": Q_SINGLE_QUERY,
"begin_concurrent": Q_BEGIN_CONCURRENT,
"end_concurrent": Q_END_CONCURRENT,
"backup_and_restore": Q_BACKUP_AND_RESTORE,
"dump_and_import": Q_DUMP_AND_IMPORT,
}

func findType(cmdName string) int {
Expand Down
19 changes: 19 additions & 0 deletions src/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package main

import (
"database/sql"
"fmt"
"regexp"
"strings"
"time"
Expand Down Expand Up @@ -104,3 +105,21 @@ func ParseReplaceRegex(originalString string) ([]*ReplaceRegex, error) {
}
return ret, nil
}

func parseSourceAndTarget(s string) (*SourceAndTarget, error) {
s = strings.ToLower(strings.TrimSpace(s))

parts := strings.Split(s, "as")
if len(parts) != 2 {
return nil, errors.Errorf("Could not parse source table and target table name: %v", s)
}

st := &SourceAndTarget{
sourceTable: strings.TrimSpace(parts[0]),
targetTable: strings.TrimSpace(parts[1]),
}

fmt.Printf("Parse source: %s and target: %s\n", st.sourceTable, st.targetTable)

return st, nil
}
4 changes: 4 additions & 0 deletions t/example.test
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ explain analyze select * from t;
--enable_info
insert into t values (6, 6);

--backup_and_restore t AS tt

--dump_and_import t AS td

DROP TABLE IF EXISTS t1;
CREATE TABLE t1 (f1 INT PRIMARY KEY, f2 INT NOT NULL UNIQUE);
INSERT t1 VALUES (1, 1);
Expand Down

0 comments on commit 9ddf008

Please sign in to comment.