From 2ebac13e9f143c5bb3db845dd6306acf18b847b6 Mon Sep 17 00:00:00 2001 From: BohuTANG Date: Wed, 9 Jan 2019 12:12:11 +0800 Subject: [PATCH] feature: add session variables setting #13 usage: ./bin/mydumper ... --vars 'SET @@radon_streaming_fetch=true' --- .travis.yml | 1 + README.md | 2 ++ src/common/common.go | 1 + src/common/dumper.go | 2 +- src/common/dumper_test.go | 4 +++- src/common/loader.go | 2 +- src/common/pool.go | 8 ++++++-- src/common/pool_test.go | 4 ++-- src/common/streamer.go | 4 ++-- src/common/streamer_test.go | 2 +- src/mydumper/main.go | 6 ++++-- src/mystreamer/main.go | 8 +++++--- 12 files changed, 29 insertions(+), 15 deletions(-) diff --git a/.travis.yml b/.travis.yml index ea706b6..80d0cff 100644 --- a/.travis.yml +++ b/.travis.yml @@ -2,6 +2,7 @@ language: go sudo: required go: - 1.8 + - 1.10.x before_install: - go get github.com/pierrre/gotestcover diff --git a/README.md b/README.md index 0ae1941..6d0e337 100644 --- a/README.md +++ b/README.md @@ -49,6 +49,8 @@ Usage: ./bin/mydumper -h [HOST] -P [PORT] -u [USER] -p [PASSWORD] -db [DATABASE] Table to dump -u string Username with privileges to run the dump + -vars string + Session variables Examples: $./bin/mydumper -h 192.168.0.1 -P 3306 -u mock -p mock -db sbtest -o sbtest.sql diff --git a/src/common/common.go b/src/common/common.go index 1db9699..e7dabf5 100644 --- a/src/common/common.go +++ b/src/common/common.go @@ -30,6 +30,7 @@ type Args struct { Database string Table string Outdir string + SessionVars string Threads int ChunksizeInMB int StmtSize int diff --git a/src/common/dumper.go b/src/common/dumper.go index 2041e47..47b2f51 100644 --- a/src/common/dumper.go +++ b/src/common/dumper.go @@ -136,7 +136,7 @@ func allTables(log *xlog.Log, conn *Connection, args *Args) []string { // Dumper used to start the dumper worker. func Dumper(log *xlog.Log, args *Args) { - pool, err := NewPool(log, args.Threads, args.Address, args.User, args.Password) + pool, err := NewPool(log, args.Threads, args.Address, args.User, args.Password, args.SessionVars) AssertNil(err) defer pool.Close() diff --git a/src/common/dumper_test.go b/src/common/dumper_test.go index c5336af..fa39182 100644 --- a/src/common/dumper_test.go +++ b/src/common/dumper_test.go @@ -15,11 +15,11 @@ import ( "strings" "testing" + "github.com/stretchr/testify/assert" "github.com/xelabs/go-mysqlstack/driver" querypb "github.com/xelabs/go-mysqlstack/sqlparser/depends/query" "github.com/xelabs/go-mysqlstack/sqlparser/depends/sqltypes" "github.com/xelabs/go-mysqlstack/xlog" - "github.com/stretchr/testify/assert" ) func TestDumper(t *testing.T) { @@ -111,6 +111,7 @@ func TestDumper(t *testing.T) { fakedbs.AddQueryPattern("show create table .*", schemaResult) fakedbs.AddQueryPattern("show tables from .*", tablesResult) fakedbs.AddQueryPattern("select .*", selectResult) + fakedbs.AddQueryPattern("set .*", &sqltypes.Result{}) } args := &Args{ @@ -123,6 +124,7 @@ func TestDumper(t *testing.T) { Threads: 16, StmtSize: 10000, IntervalMs: 500, + SessionVars: "SET @@radon_streaming_fetch='ON'; SET @@xx=1;", } os.RemoveAll(args.Outdir) diff --git a/src/common/loader.go b/src/common/loader.go index a651fbc..93d738a 100644 --- a/src/common/loader.go +++ b/src/common/loader.go @@ -148,7 +148,7 @@ func restoreTable(log *xlog.Log, table string, conn *Connection) int { // Loader used to start the loader worker. func Loader(log *xlog.Log, args *Args) { - pool, err := NewPool(log, args.Threads, args.Address, args.User, args.Password) + pool, err := NewPool(log, args.Threads, args.Address, args.User, args.Password, args.SessionVars) AssertNil(err) defer pool.Close() diff --git a/src/common/pool.go b/src/common/pool.go index ef0a1bd..60a7bbb 100644 --- a/src/common/pool.go +++ b/src/common/pool.go @@ -47,14 +47,18 @@ func (conn *Connection) StreamFetch(query string) (driver.Rows, error) { } // NewPool creates the new pool. -func NewPool(log *xlog.Log, cap int, address string, user string, password string) (*Pool, error) { +func NewPool(log *xlog.Log, cap int, address string, user string, password string, vars string) (*Pool, error) { conns := make(chan *Connection, cap) for i := 0; i < cap; i++ { client, err := driver.NewConn(user, password, address, "", "utf8") if err != nil { return nil, err } - conns <- &Connection{ID: i, client: client} + conn := &Connection{ID: i, client: client} + if vars != "" { + conn.Execute(vars) + } + conns <- conn } return &Pool{ diff --git a/src/common/pool_test.go b/src/common/pool_test.go index 411ef73..24cabb3 100644 --- a/src/common/pool_test.go +++ b/src/common/pool_test.go @@ -14,10 +14,10 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "github.com/xelabs/go-mysqlstack/driver" "github.com/xelabs/go-mysqlstack/sqlparser/depends/sqltypes" "github.com/xelabs/go-mysqlstack/xlog" - "github.com/stretchr/testify/assert" ) func TestPool(t *testing.T) { @@ -33,7 +33,7 @@ func TestPool(t *testing.T) { fakedbs.AddQueryPattern("select .*", &sqltypes.Result{}) } - pool, err := NewPool(log, 8, address, "mock", "mock") + pool, err := NewPool(log, 8, address, "mock", "mock", "") assert.Nil(t, err) var wg sync.WaitGroup diff --git a/src/common/streamer.go b/src/common/streamer.go index 5d7e73a..2735b9c 100644 --- a/src/common/streamer.go +++ b/src/common/streamer.go @@ -131,11 +131,11 @@ func Streamer(log *xlog.Log, args *Args) { var tables []string var wg sync.WaitGroup - fromPool, err := NewPool(log, args.Threads, args.Address, args.User, args.Password) + fromPool, err := NewPool(log, args.Threads, args.Address, args.User, args.Password, args.SessionVars) AssertNil(err) defer fromPool.Close() - toPool, err := NewPool(log, args.Threads, args.ToAddress, args.ToUser, args.ToPassword) + toPool, err := NewPool(log, args.Threads, args.ToAddress, args.ToUser, args.ToPassword, "") AssertNil(err) defer toPool.Close() diff --git a/src/common/streamer_test.go b/src/common/streamer_test.go index 6c5491c..81abb0d 100644 --- a/src/common/streamer_test.go +++ b/src/common/streamer_test.go @@ -12,11 +12,11 @@ package common import ( "testing" + "github.com/stretchr/testify/assert" "github.com/xelabs/go-mysqlstack/driver" querypb "github.com/xelabs/go-mysqlstack/sqlparser/depends/query" "github.com/xelabs/go-mysqlstack/sqlparser/depends/sqltypes" "github.com/xelabs/go-mysqlstack/xlog" - "github.com/stretchr/testify/assert" ) func TestStreamer(t *testing.T) { diff --git a/src/mydumper/main.go b/src/mydumper/main.go index fed46c2..fd5f177 100644 --- a/src/mydumper/main.go +++ b/src/mydumper/main.go @@ -19,8 +19,8 @@ import ( ) var ( - flagChunksize, flagThreads, flagPort, flagStmtSize int - flagUser, flagPasswd, flagHost, flagDb, flagTable, flagDir string + flagChunksize, flagThreads, flagPort, flagStmtSize int + flagUser, flagPasswd, flagHost, flagDb, flagTable, flagDir, flagSessionVars string log = xlog.NewStdLog(xlog.Level(xlog.INFO)) ) @@ -36,6 +36,7 @@ func init() { flag.IntVar(&flagChunksize, "F", 128, "Split tables into chunks of this output file size. This value is in MB") flag.IntVar(&flagThreads, "t", 16, "Number of threads to use") flag.IntVar(&flagStmtSize, "s", 1000000, "Attempted size of INSERT statement in bytes") + flag.StringVar(&flagSessionVars, "vars", "", "Session variables") } func usage() { @@ -68,6 +69,7 @@ func main() { Threads: flagThreads, StmtSize: flagStmtSize, IntervalMs: 10 * 1000, + SessionVars: flagSessionVars, } common.Dumper(log, args) diff --git a/src/mystreamer/main.go b/src/mystreamer/main.go index 9b285a5..2bdcc8c 100644 --- a/src/mystreamer/main.go +++ b/src/mystreamer/main.go @@ -19,9 +19,9 @@ import ( ) var ( - flagOverwriteTables bool - flagThreads, flagPort, flag2port, flagStmtSize int - flagUser, flagPasswd, flagHost, flag2user, flag2passwd, flag2host, flagDb, flag2Db, flag2Engine, flagTable string + flagOverwriteTables bool + flagThreads, flagPort, flag2port, flagStmtSize int + flagUser, flagPasswd, flagHost, flag2user, flag2passwd, flag2host, flagDb, flag2Db, flag2Engine, flagTable, flagSessionVars string log = xlog.NewStdLog(xlog.Level(xlog.INFO)) ) @@ -42,6 +42,7 @@ func init() { flag.IntVar(&flagThreads, "t", 16, "Number of threads to use") flag.IntVar(&flagStmtSize, "s", 1000000, "Attempted size of INSERT statement in bytes") flag.BoolVar(&flagOverwriteTables, "o", false, "Drop tables if they already exist") + flag.StringVar(&flagSessionVars, "vars", "", "Session variables") } func usage() { @@ -73,6 +74,7 @@ func main() { StmtSize: flagStmtSize, IntervalMs: 10 * 1000, OverwriteTables: flagOverwriteTables, + SessionVars: flagSessionVars, } common.Streamer(log, args) }