diff --git a/cmd/moco-agent/cmd/root.go b/cmd/moco-agent/cmd/root.go index bd21faf..9c7ce86 100644 --- a/cmd/moco-agent/cmd/root.go +++ b/cmd/moco-agent/cmd/root.go @@ -100,14 +100,11 @@ var rootCmd = &cobra.Command{ } conf := server.MySQLAccessorConfig{ - Host: podName, - Port: mocoagent.MySQLAdminPort, - Password: agentPassword, ConnMaxIdleTime: config.connIdleTime, ConnectionTimeout: config.connectionTimeout, ReadTimeout: config.readTimeout, } - agent, err := server.New(conf, clusterName, config.socketPath, mocoagent.VarLogPath, + agent, err := server.New(conf, clusterName, agentPassword, config.socketPath, mocoagent.VarLogPath, config.maxDelayThreshold, rLogger.WithName("agent")) if err != nil { return err diff --git a/server/clone.go b/server/clone.go index 5b380c6..e86b4f4 100644 --- a/server/clone.go +++ b/server/clone.go @@ -62,7 +62,7 @@ func (a *Agent) Clone(ctx context.Context, req *proto.CloneRequest) error { } // To clone, the connection should not set timeout values. - cloneDB, err := GetMySQLConnLocalSocket(mocoagent.AgentUser, a.config.Password, a.mysqlSocketPath) + cloneDB, err := GetMySQLConnLocalSocket(mocoagent.AgentUser, a.agentUserPassword, a.mysqlSocketPath) if err != nil { return fmt.Errorf("failed to connect to mysqld through %s: %w", a.mysqlSocketPath, err) } diff --git a/server/clone_test.go b/server/clone_test.go index f6ebcbe..060e8b6 100644 --- a/server/clone_test.go +++ b/server/clone_test.go @@ -62,14 +62,11 @@ var _ = Describe("clone", func() { sockFile = filepath.Join(socketDir(replicaHost), "mysqld.sock") conf := MySQLAccessorConfig{ - Host: "localhost", - Port: replicaPort, - Password: agentUserPassword, ConnMaxIdleTime: 30 * time.Minute, ConnectionTimeout: 3 * time.Second, ReadTimeout: 30 * time.Second, } - agent, err := New(conf, testClusterName, sockFile, "", 100*time.Millisecond, testLogger) + agent, err := New(conf, testClusterName, agentUserPassword, sockFile, "", 100*time.Millisecond, testLogger) Expect(err).ShouldNot(HaveOccurred()) defer agent.CloseDB() diff --git a/server/connect.go b/server/connect.go index c35e705..8e42d35 100644 --- a/server/connect.go +++ b/server/connect.go @@ -2,38 +2,53 @@ package server import ( "errors" - "fmt" "time" - mocoagent "github.com/cybozu-go/moco-agent" "github.com/go-sql-driver/mysql" "github.com/jmoiron/sqlx" ) -func getMySQLConn(config MySQLAccessorConfig) (*sqlx.DB, error) { - conf := mysql.NewConfig() - conf.User = mocoagent.AgentUser - conf.Passwd = config.Password - conf.Net = "tcp" - conf.Addr = fmt.Sprintf("%s:%d", config.Host, config.Port) - conf.Timeout = config.ConnectionTimeout - conf.ReadTimeout = config.ReadTimeout - conf.InterpolateParams = true - conf.ParseTime = true +type options struct { + connMaxIdleTime time.Duration + connMaxLifeTime time.Duration + readTimeout time.Duration + connectionTimeout time.Duration +} - db, err := sqlx.Connect("mysql", conf.FormatDSN()) - if err != nil { - return nil, err - } +type Option interface { + apply(opts *options) +} - db.SetConnMaxIdleTime(config.ConnMaxIdleTime) - db.SetConnMaxLifetime(5 * time.Minute) - db.SetMaxIdleConns(1) +type accessorConfigOption MySQLAccessorConfig - return db, nil +func (ac accessorConfigOption) apply(opts *options) { + opts.connMaxIdleTime = ac.ConnMaxIdleTime + opts.readTimeout = ac.ReadTimeout + opts.connectionTimeout = ac.ConnectionTimeout +} + +func WithAccessorConfig(ac MySQLAccessorConfig) Option { + return accessorConfigOption(ac) } -func GetMySQLConnLocalSocket(user, password, socket string) (*sqlx.DB, error) { +type connMaxLifeTimeOption time.Duration + +func (t connMaxLifeTimeOption) apply(opts *options) { + opts.connMaxLifeTime = time.Duration(t) +} + +func WithConnMaxLifeTime(t time.Duration) Option { + return connMaxLifeTimeOption(t) +} + +func GetMySQLConnLocalSocket(user, password, socket string, opts ...Option) (*sqlx.DB, error) { + options := options{ + connMaxIdleTime: 30 * time.Second, + } + for _, o := range opts { + o.apply(&options) + } + conf := mysql.NewConfig() conf.User = user conf.Passwd = password @@ -41,14 +56,26 @@ func GetMySQLConnLocalSocket(user, password, socket string) (*sqlx.DB, error) { conf.Addr = socket conf.InterpolateParams = true conf.ParseTime = true + if options.connectionTimeout > 0 { + conf.Timeout = options.connectionTimeout + } + if options.readTimeout > 0 { + conf.ReadTimeout = options.readTimeout + } db, err := sqlx.Connect("mysql", conf.FormatDSN()) if err != nil { return nil, err } - db.SetConnMaxIdleTime(30 * time.Second) + if options.connMaxLifeTime > 0 { + db.SetConnMaxIdleTime(options.connMaxIdleTime) + } + if options.connMaxLifeTime > 0 { + db.SetConnMaxLifetime(options.connMaxLifeTime) + } db.SetMaxIdleConns(1) + return db, nil } diff --git a/server/mysqld_health_test.go b/server/mysqld_health_test.go index e837602..513a0e3 100644 --- a/server/mysqld_health_test.go +++ b/server/mysqld_health_test.go @@ -18,14 +18,11 @@ var _ = Describe("health", func() { sockFile := filepath.Join(socketDir(donorHost), "mysqld.sock") conf := MySQLAccessorConfig{ - Host: "localhost", - Port: donorPort, - Password: agentUserPassword, ConnMaxIdleTime: 30 * time.Minute, ConnectionTimeout: 3 * time.Second, ReadTimeout: 30 * time.Second, } - agent, err := New(conf, testClusterName, sockFile, "", maxDelayThreshold, testLogger) + agent, err := New(conf, testClusterName, agentUserPassword, sockFile, "", maxDelayThreshold, testLogger) Expect(err).NotTo(HaveOccurred()) defer agent.CloseDB() @@ -73,14 +70,11 @@ var _ = Describe("health", func() { sockFile = filepath.Join(socketDir(replicaHost), "mysqld.sock") conf := MySQLAccessorConfig{ - Host: "localhost", - Port: replicaPort, - Password: agentUserPassword, ConnMaxIdleTime: 30 * time.Minute, ConnectionTimeout: 3 * time.Second, ReadTimeout: 30 * time.Second, } - agent, err := New(conf, testClusterName, sockFile, "", 100*time.Millisecond, testLogger) + agent, err := New(conf, testClusterName, agentUserPassword, sockFile, "", 100*time.Millisecond, testLogger) Expect(err).ShouldNot(HaveOccurred()) defer agent.CloseDB() diff --git a/server/rotate_test.go b/server/rotate_test.go index 1618f3e..e96d4e6 100644 --- a/server/rotate_test.go +++ b/server/rotate_test.go @@ -24,14 +24,11 @@ var _ = Describe("log rotation", func() { defer os.RemoveAll(tmpDir) conf := MySQLAccessorConfig{ - Host: "localhost", - Port: replicaPort, - Password: agentUserPassword, ConnMaxIdleTime: 30 * time.Minute, ConnectionTimeout: 3 * time.Second, ReadTimeout: 30 * time.Second, } - agent, err := New(conf, testClusterName, sockFile, tmpDir, maxDelayThreshold, testLogger) + agent, err := New(conf, testClusterName, agentUserPassword, sockFile, tmpDir, maxDelayThreshold, testLogger) Expect(err).ShouldNot(HaveOccurred()) defer agent.CloseDB() diff --git a/server/server.go b/server/server.go index e8d3d3c..1126be8 100644 --- a/server/server.go +++ b/server/server.go @@ -4,6 +4,7 @@ import ( "sync" "time" + mocoagent "github.com/cybozu-go/moco-agent" "github.com/cybozu-go/moco-agent/metrics" "github.com/cybozu-go/moco-agent/proto" "github.com/go-logr/logr" @@ -22,14 +23,15 @@ type agentService struct { } // New returns an Agent -func New(config MySQLAccessorConfig, clusterName, socket, logDir string, maxDelay time.Duration, logger logr.Logger) (*Agent, error) { - db, err := getMySQLConn(config) +func New(config MySQLAccessorConfig, clusterName, agentPassword, socket, logDir string, maxDelay time.Duration, logger logr.Logger) (*Agent, error) { + db, err := GetMySQLConnLocalSocket(mocoagent.AgentUser, agentPassword, socket, + WithAccessorConfig(config), WithConnMaxLifeTime(5*time.Minute)) if err != nil { return nil, err } return &Agent{ - config: config, + agentUserPassword: agentPassword, db: db, logger: logger, mysqlSocketPath: socket, @@ -41,7 +43,7 @@ func New(config MySQLAccessorConfig, clusterName, socket, logDir string, maxDela // Agent is the agent to executes some MySQL commands of the own Pod type Agent struct { - config MySQLAccessorConfig + agentUserPassword string db *sqlx.DB logger logr.Logger mysqlSocketPath string @@ -74,9 +76,6 @@ func (a *Agent) configureReplicationMetrics(enable bool) { } type MySQLAccessorConfig struct { - Host string - Port int - Password string ConnMaxIdleTime time.Duration ConnectionTimeout time.Duration ReadTimeout time.Duration