Skip to content
This repository has been archived by the owner on Feb 10, 2023. It is now read-only.

Commit

Permalink
feature: add session variables setting #13
Browse files Browse the repository at this point in the history
usage:
./bin/mydumper ... --vars 'SET @@radon_streaming_fetch=true'
  • Loading branch information
BohuTANG committed Jan 9, 2019
1 parent b983fd2 commit 2ebac13
Show file tree
Hide file tree
Showing 12 changed files with 29 additions and 15 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ language: go
sudo: required
go:
- 1.8
- 1.10.x

before_install:
- go get github.com/pierrre/gotestcover
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ Usage: ./bin/mydumper -h [HOST] -P [PORT] -u [USER] -p [PASSWORD] -db [DATABASE]
Table to dump
-u string
Username with privileges to run the dump
-vars string
Session variables
Examples:
$./bin/mydumper -h 192.168.0.1 -P 3306 -u mock -p mock -db sbtest -o sbtest.sql
Expand Down
1 change: 1 addition & 0 deletions src/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type Args struct {
Database string
Table string
Outdir string
SessionVars string
Threads int
ChunksizeInMB int
StmtSize int
Expand Down
2 changes: 1 addition & 1 deletion src/common/dumper.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func allTables(log *xlog.Log, conn *Connection, args *Args) []string {

// Dumper used to start the dumper worker.
func Dumper(log *xlog.Log, args *Args) {
pool, err := NewPool(log, args.Threads, args.Address, args.User, args.Password)
pool, err := NewPool(log, args.Threads, args.Address, args.User, args.Password, args.SessionVars)
AssertNil(err)
defer pool.Close()

Expand Down
4 changes: 3 additions & 1 deletion src/common/dumper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ import (
"strings"
"testing"

"github.com/stretchr/testify/assert"
"github.com/xelabs/go-mysqlstack/driver"
querypb "github.com/xelabs/go-mysqlstack/sqlparser/depends/query"
"github.com/xelabs/go-mysqlstack/sqlparser/depends/sqltypes"
"github.com/xelabs/go-mysqlstack/xlog"
"github.com/stretchr/testify/assert"
)

func TestDumper(t *testing.T) {
Expand Down Expand Up @@ -111,6 +111,7 @@ func TestDumper(t *testing.T) {
fakedbs.AddQueryPattern("show create table .*", schemaResult)
fakedbs.AddQueryPattern("show tables from .*", tablesResult)
fakedbs.AddQueryPattern("select .*", selectResult)
fakedbs.AddQueryPattern("set .*", &sqltypes.Result{})
}

args := &Args{
Expand All @@ -123,6 +124,7 @@ func TestDumper(t *testing.T) {
Threads: 16,
StmtSize: 10000,
IntervalMs: 500,
SessionVars: "SET @@radon_streaming_fetch='ON'; SET @@xx=1;",
}

os.RemoveAll(args.Outdir)
Expand Down
2 changes: 1 addition & 1 deletion src/common/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func restoreTable(log *xlog.Log, table string, conn *Connection) int {

// Loader used to start the loader worker.
func Loader(log *xlog.Log, args *Args) {
pool, err := NewPool(log, args.Threads, args.Address, args.User, args.Password)
pool, err := NewPool(log, args.Threads, args.Address, args.User, args.Password, args.SessionVars)
AssertNil(err)
defer pool.Close()

Expand Down
8 changes: 6 additions & 2 deletions src/common/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,18 @@ func (conn *Connection) StreamFetch(query string) (driver.Rows, error) {
}

// NewPool creates the new pool.
func NewPool(log *xlog.Log, cap int, address string, user string, password string) (*Pool, error) {
func NewPool(log *xlog.Log, cap int, address string, user string, password string, vars string) (*Pool, error) {
conns := make(chan *Connection, cap)
for i := 0; i < cap; i++ {
client, err := driver.NewConn(user, password, address, "", "utf8")
if err != nil {
return nil, err
}
conns <- &Connection{ID: i, client: client}
conn := &Connection{ID: i, client: client}
if vars != "" {
conn.Execute(vars)
}
conns <- conn
}

return &Pool{
Expand Down
4 changes: 2 additions & 2 deletions src/common/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/xelabs/go-mysqlstack/driver"
"github.com/xelabs/go-mysqlstack/sqlparser/depends/sqltypes"
"github.com/xelabs/go-mysqlstack/xlog"
"github.com/stretchr/testify/assert"
)

func TestPool(t *testing.T) {
Expand All @@ -33,7 +33,7 @@ func TestPool(t *testing.T) {
fakedbs.AddQueryPattern("select .*", &sqltypes.Result{})
}

pool, err := NewPool(log, 8, address, "mock", "mock")
pool, err := NewPool(log, 8, address, "mock", "mock", "")
assert.Nil(t, err)

var wg sync.WaitGroup
Expand Down
4 changes: 2 additions & 2 deletions src/common/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,11 @@ func Streamer(log *xlog.Log, args *Args) {
var tables []string
var wg sync.WaitGroup

fromPool, err := NewPool(log, args.Threads, args.Address, args.User, args.Password)
fromPool, err := NewPool(log, args.Threads, args.Address, args.User, args.Password, args.SessionVars)
AssertNil(err)
defer fromPool.Close()

toPool, err := NewPool(log, args.Threads, args.ToAddress, args.ToUser, args.ToPassword)
toPool, err := NewPool(log, args.Threads, args.ToAddress, args.ToUser, args.ToPassword, "")
AssertNil(err)
defer toPool.Close()

Expand Down
2 changes: 1 addition & 1 deletion src/common/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ package common
import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/xelabs/go-mysqlstack/driver"
querypb "github.com/xelabs/go-mysqlstack/sqlparser/depends/query"
"github.com/xelabs/go-mysqlstack/sqlparser/depends/sqltypes"
"github.com/xelabs/go-mysqlstack/xlog"
"github.com/stretchr/testify/assert"
)

func TestStreamer(t *testing.T) {
Expand Down
6 changes: 4 additions & 2 deletions src/mydumper/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ import (
)

var (
flagChunksize, flagThreads, flagPort, flagStmtSize int
flagUser, flagPasswd, flagHost, flagDb, flagTable, flagDir string
flagChunksize, flagThreads, flagPort, flagStmtSize int
flagUser, flagPasswd, flagHost, flagDb, flagTable, flagDir, flagSessionVars string

log = xlog.NewStdLog(xlog.Level(xlog.INFO))
)
Expand All @@ -36,6 +36,7 @@ func init() {
flag.IntVar(&flagChunksize, "F", 128, "Split tables into chunks of this output file size. This value is in MB")
flag.IntVar(&flagThreads, "t", 16, "Number of threads to use")
flag.IntVar(&flagStmtSize, "s", 1000000, "Attempted size of INSERT statement in bytes")
flag.StringVar(&flagSessionVars, "vars", "", "Session variables")
}

func usage() {
Expand Down Expand Up @@ -68,6 +69,7 @@ func main() {
Threads: flagThreads,
StmtSize: flagStmtSize,
IntervalMs: 10 * 1000,
SessionVars: flagSessionVars,
}

common.Dumper(log, args)
Expand Down
8 changes: 5 additions & 3 deletions src/mystreamer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ import (
)

var (
flagOverwriteTables bool
flagThreads, flagPort, flag2port, flagStmtSize int
flagUser, flagPasswd, flagHost, flag2user, flag2passwd, flag2host, flagDb, flag2Db, flag2Engine, flagTable string
flagOverwriteTables bool
flagThreads, flagPort, flag2port, flagStmtSize int
flagUser, flagPasswd, flagHost, flag2user, flag2passwd, flag2host, flagDb, flag2Db, flag2Engine, flagTable, flagSessionVars string

log = xlog.NewStdLog(xlog.Level(xlog.INFO))
)
Expand All @@ -42,6 +42,7 @@ func init() {
flag.IntVar(&flagThreads, "t", 16, "Number of threads to use")
flag.IntVar(&flagStmtSize, "s", 1000000, "Attempted size of INSERT statement in bytes")
flag.BoolVar(&flagOverwriteTables, "o", false, "Drop tables if they already exist")
flag.StringVar(&flagSessionVars, "vars", "", "Session variables")
}

func usage() {
Expand Down Expand Up @@ -73,6 +74,7 @@ func main() {
StmtSize: flagStmtSize,
IntervalMs: 10 * 1000,
OverwriteTables: flagOverwriteTables,
SessionVars: flagSessionVars,
}
common.Streamer(log, args)
}

0 comments on commit 2ebac13

Please sign in to comment.