Skip to content

Commit

Permalink
use unix domain socket for all connections to MySQL
Browse files Browse the repository at this point in the history
moco-agent holds several MySQL connection pools. Most of these
connections use unix domain sockets, but only one uses a TCP connection.
This connection does not use TLS, so if MySQL is set to
require_secure_transport=ON, moco-agent will not start.

This patch changes moco-agent to use unix domain sockets for all
connections to MySQL. This change allows moco-agent to run even if
require_secure_transport=ON is set for MySQL.

Signed-off-by: Daichi Mukai <[email protected]>
  • Loading branch information
daichimukai committed Jul 19, 2023
1 parent 172edbc commit 68a0864
Show file tree
Hide file tree
Showing 7 changed files with 61 additions and 50 deletions.
5 changes: 1 addition & 4 deletions cmd/moco-agent/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,11 @@ var rootCmd = &cobra.Command{
}

conf := server.MySQLAccessorConfig{
Host: podName,
Port: mocoagent.MySQLAdminPort,
Password: agentPassword,
ConnMaxIdleTime: config.connIdleTime,
ConnectionTimeout: config.connectionTimeout,
ReadTimeout: config.readTimeout,
}
agent, err := server.New(conf, clusterName, config.socketPath, mocoagent.VarLogPath,
agent, err := server.New(conf, clusterName, agentPassword, config.socketPath, mocoagent.VarLogPath,
config.maxDelayThreshold, rLogger.WithName("agent"))
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion server/clone.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (a *Agent) Clone(ctx context.Context, req *proto.CloneRequest) error {
}

// To clone, the connection should not set timeout values.
cloneDB, err := GetMySQLConnLocalSocket(mocoagent.AgentUser, a.config.Password, a.mysqlSocketPath)
cloneDB, err := GetMySQLConnLocalSocket(mocoagent.AgentUser, a.agentUserPassword, a.mysqlSocketPath)
if err != nil {
return fmt.Errorf("failed to connect to mysqld through %s: %w", a.mysqlSocketPath, err)
}
Expand Down
5 changes: 1 addition & 4 deletions server/clone_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,11 @@ var _ = Describe("clone", func() {

sockFile = filepath.Join(socketDir(replicaHost), "mysqld.sock")
conf := MySQLAccessorConfig{
Host: "localhost",
Port: replicaPort,
Password: agentUserPassword,
ConnMaxIdleTime: 30 * time.Minute,
ConnectionTimeout: 3 * time.Second,
ReadTimeout: 30 * time.Second,
}
agent, err := New(conf, testClusterName, sockFile, "", 100*time.Millisecond, testLogger)
agent, err := New(conf, testClusterName, agentUserPassword, sockFile, "", 100*time.Millisecond, testLogger)

Expect(err).ShouldNot(HaveOccurred())
defer agent.CloseDB()
Expand Down
71 changes: 49 additions & 22 deletions server/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,53 +2,80 @@ package server

import (
"errors"
"fmt"
"time"

mocoagent "github.com/cybozu-go/moco-agent"
"github.com/go-sql-driver/mysql"
"github.com/jmoiron/sqlx"
)

func getMySQLConn(config MySQLAccessorConfig) (*sqlx.DB, error) {
conf := mysql.NewConfig()
conf.User = mocoagent.AgentUser
conf.Passwd = config.Password
conf.Net = "tcp"
conf.Addr = fmt.Sprintf("%s:%d", config.Host, config.Port)
conf.Timeout = config.ConnectionTimeout
conf.ReadTimeout = config.ReadTimeout
conf.InterpolateParams = true
conf.ParseTime = true
type options struct {
connMaxIdleTime time.Duration
connMaxLifeTime time.Duration
readTimeout time.Duration
connectionTimeout time.Duration
}

db, err := sqlx.Connect("mysql", conf.FormatDSN())
if err != nil {
return nil, err
}
type Option interface {
apply(opts *options)
}

db.SetConnMaxIdleTime(config.ConnMaxIdleTime)
db.SetConnMaxLifetime(5 * time.Minute)
db.SetMaxIdleConns(1)
type accessorConfigOption MySQLAccessorConfig

return db, nil
func (ac accessorConfigOption) apply(opts *options) {
opts.connMaxIdleTime = ac.ConnMaxIdleTime
opts.readTimeout = ac.ReadTimeout
opts.connectionTimeout = ac.ConnectionTimeout
}

func WithAccessorConfig(ac MySQLAccessorConfig) Option {
return accessorConfigOption(ac)
}

func GetMySQLConnLocalSocket(user, password, socket string) (*sqlx.DB, error) {
type connMaxLifeTimeOption time.Duration

func (t connMaxLifeTimeOption) apply(opts *options) {
opts.connMaxLifeTime = time.Duration(t)
}

func WithConnMaxLifeTime(t time.Duration) Option {
return connMaxLifeTimeOption(t)
}

func GetMySQLConnLocalSocket(user, password, socket string, opts ...Option) (*sqlx.DB, error) {
options := options{
connMaxIdleTime: 30 * time.Second,
}
for _, o := range opts {
o.apply(&options)
}

conf := mysql.NewConfig()
conf.User = user
conf.Passwd = password
conf.Net = "unix"
conf.Addr = socket
conf.InterpolateParams = true
conf.ParseTime = true
if options.connectionTimeout > 0 {
conf.Timeout = options.connectionTimeout
}
if options.readTimeout > 0 {
conf.ReadTimeout = options.readTimeout
}

db, err := sqlx.Connect("mysql", conf.FormatDSN())
if err != nil {
return nil, err
}

db.SetConnMaxIdleTime(30 * time.Second)
if options.connMaxLifeTime > 0 {
db.SetConnMaxIdleTime(options.connMaxIdleTime)
}
if options.connMaxLifeTime > 0 {
db.SetConnMaxLifetime(options.connMaxLifeTime)
}
db.SetMaxIdleConns(1)

return db, nil
}

Expand Down
10 changes: 2 additions & 8 deletions server/mysqld_health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,11 @@ var _ = Describe("health", func() {

sockFile := filepath.Join(socketDir(donorHost), "mysqld.sock")
conf := MySQLAccessorConfig{
Host: "localhost",
Port: donorPort,
Password: agentUserPassword,
ConnMaxIdleTime: 30 * time.Minute,
ConnectionTimeout: 3 * time.Second,
ReadTimeout: 30 * time.Second,
}
agent, err := New(conf, testClusterName, sockFile, "", maxDelayThreshold, testLogger)
agent, err := New(conf, testClusterName, agentUserPassword, sockFile, "", maxDelayThreshold, testLogger)
Expect(err).NotTo(HaveOccurred())
defer agent.CloseDB()

Expand Down Expand Up @@ -73,14 +70,11 @@ var _ = Describe("health", func() {

sockFile = filepath.Join(socketDir(replicaHost), "mysqld.sock")
conf := MySQLAccessorConfig{
Host: "localhost",
Port: replicaPort,
Password: agentUserPassword,
ConnMaxIdleTime: 30 * time.Minute,
ConnectionTimeout: 3 * time.Second,
ReadTimeout: 30 * time.Second,
}
agent, err := New(conf, testClusterName, sockFile, "", 100*time.Millisecond, testLogger)
agent, err := New(conf, testClusterName, agentUserPassword, sockFile, "", 100*time.Millisecond, testLogger)
Expect(err).ShouldNot(HaveOccurred())
defer agent.CloseDB()

Expand Down
5 changes: 1 addition & 4 deletions server/rotate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,11 @@ var _ = Describe("log rotation", func() {
defer os.RemoveAll(tmpDir)

conf := MySQLAccessorConfig{
Host: "localhost",
Port: replicaPort,
Password: agentUserPassword,
ConnMaxIdleTime: 30 * time.Minute,
ConnectionTimeout: 3 * time.Second,
ReadTimeout: 30 * time.Second,
}
agent, err := New(conf, testClusterName, sockFile, tmpDir, maxDelayThreshold, testLogger)
agent, err := New(conf, testClusterName, agentUserPassword, sockFile, tmpDir, maxDelayThreshold, testLogger)
Expect(err).ShouldNot(HaveOccurred())
defer agent.CloseDB()

Expand Down
13 changes: 6 additions & 7 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"sync"
"time"

mocoagent "github.com/cybozu-go/moco-agent"
"github.com/cybozu-go/moco-agent/metrics"
"github.com/cybozu-go/moco-agent/proto"
"github.com/go-logr/logr"
Expand All @@ -22,14 +23,15 @@ type agentService struct {
}

// New returns an Agent
func New(config MySQLAccessorConfig, clusterName, socket, logDir string, maxDelay time.Duration, logger logr.Logger) (*Agent, error) {
db, err := getMySQLConn(config)
func New(config MySQLAccessorConfig, clusterName, agentPassword, socket, logDir string, maxDelay time.Duration, logger logr.Logger) (*Agent, error) {
db, err := GetMySQLConnLocalSocket(mocoagent.AgentUser, agentPassword, socket,
WithAccessorConfig(config), WithConnMaxLifeTime(5*time.Minute))
if err != nil {
return nil, err
}

return &Agent{
config: config,
agentUserPassword: agentPassword,
db: db,
logger: logger,
mysqlSocketPath: socket,
Expand All @@ -41,7 +43,7 @@ func New(config MySQLAccessorConfig, clusterName, socket, logDir string, maxDela

// Agent is the agent to executes some MySQL commands of the own Pod
type Agent struct {
config MySQLAccessorConfig
agentUserPassword string
db *sqlx.DB
logger logr.Logger
mysqlSocketPath string
Expand Down Expand Up @@ -74,9 +76,6 @@ func (a *Agent) configureReplicationMetrics(enable bool) {
}

type MySQLAccessorConfig struct {
Host string
Port int
Password string
ConnMaxIdleTime time.Duration
ConnectionTimeout time.Duration
ReadTimeout time.Duration
Expand Down

0 comments on commit 68a0864

Please sign in to comment.