Skip to content

Commit

Permalink
Merge pull request #84 from cybozu-go/transaction-queueing-wait
Browse files Browse the repository at this point in the history
add an option for waiting transaction queueing on replica
  • Loading branch information
umezawatakeshi authored Aug 1, 2023
2 parents 172edbc + 49853de commit 1143f0d
Show file tree
Hide file tree
Showing 8 changed files with 129 additions and 44 deletions.
24 changes: 13 additions & 11 deletions cmd/moco-agent/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,17 @@ const (
)

var config struct {
address string
probeAddress string
metricsAddress string
connIdleTime time.Duration
connectionTimeout time.Duration
logRotationSchedule string
readTimeout time.Duration
maxDelayThreshold time.Duration
socketPath string
grpcCertDir string
address string
probeAddress string
metricsAddress string
connIdleTime time.Duration
connectionTimeout time.Duration
logRotationSchedule string
readTimeout time.Duration
maxDelayThreshold time.Duration
socketPath string
grpcCertDir string
transactionQueueingWait time.Duration
}

type mysqlLogger struct{}
Expand Down Expand Up @@ -108,7 +109,7 @@ var rootCmd = &cobra.Command{
ReadTimeout: config.readTimeout,
}
agent, err := server.New(conf, clusterName, config.socketPath, mocoagent.VarLogPath,
config.maxDelayThreshold, rLogger.WithName("agent"))
config.maxDelayThreshold, config.transactionQueueingWait, rLogger.WithName("agent"))
if err != nil {
return err
}
Expand Down Expand Up @@ -236,6 +237,7 @@ func init() {
fs.DurationVar(&config.maxDelayThreshold, "max-delay", time.Minute, "Acceptable max commit delay considering as ready; the zero value accepts any delay")
fs.StringVar(&config.socketPath, "socket-path", socketPathDefault, "Path of mysqld socket file.")
fs.StringVar(&config.grpcCertDir, "grpc-cert-dir", "/grpc-cert", "gRPC certificate directory")
fs.DurationVar(&config.transactionQueueingWait, "transaction-queueing-wait", time.Minute, "The maximum amount of time for waiting transaction queueing on replica")
}

func initializeMySQLForMOCO(ctx context.Context, socketPath string, logger logr.Logger) error {
Expand Down
28 changes: 15 additions & 13 deletions docs/moco-agent.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,21 @@ This is the specification document of `moco-agent` command.

```
Flags:
--address string Listening address and port for gRPC API. (default ":9080")
--connection-timeout duration Dial timeout (default 5s)
-h, --help help for moco-agent
--log-rotation-schedule string Cron format schedule for MySQL log rotation (default "*/5 * * * *")
--logfile string Log filename
--logformat string Log format [plain,logfmt,json]
--loglevel string Log level [critical,error,warning,info,debug]
--max-delay duration Acceptable max commit delay considering as ready; the zero value accepts any delay (default 1m0s)
--max-idle-time duration The maximum amount of time a connection may be idle (default 30s)
--metrics-address string Listening address and port for metrics. (default ":8080")
--probe-address string Listening address and port for mysqld health probes. (default ":9081")
--read-timeout duration I/O read timeout (default 30s)
--socket-path string Path of mysqld socket file. (default "/run/mysqld.sock")
--address string Listening address and port for gRPC API. (default ":9080")
--connection-timeout duration Dial timeout (default 5s)
--grpc-cert-dir string gRPC certificate directory (default "/grpc-cert")
-h, --help help for moco-agent
--log-rotation-schedule string Cron format schedule for MySQL log rotation (default "*/5 * * * *")
--logfile string Log filename
--logformat string Log format [plain,logfmt,json]
--loglevel string Log level [critical,error,warning,info,debug]
--max-delay duration Acceptable max commit delay considering as ready; the zero value accepts any delay (default 1m0s)
--max-idle-time duration The maximum amount of time a connection may be idle (default 30s)
--metrics-address string Listening address and port for metrics. (default ":8080")
--probe-address string Listening address and port for mysqld health probes. (default ":9081")
--read-timeout duration I/O read timeout (default 30s)
--socket-path string Path of mysqld socket file. (default "/run/mysqld.sock")
--transaction-queueing-wait duration The maximum amount of time for waiting transaction queueing on replica (default 1m0s)
```

## Environment variables
Expand Down
2 changes: 1 addition & 1 deletion server/clone_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ var _ = Describe("clone", func() {
ConnectionTimeout: 3 * time.Second,
ReadTimeout: 30 * time.Second,
}
agent, err := New(conf, testClusterName, sockFile, "", 100*time.Millisecond, testLogger)
agent, err := New(conf, testClusterName, sockFile, "", 100*time.Millisecond, time.Second, testLogger)

Expect(err).ShouldNot(HaveOccurred())
defer agent.CloseDB()
Expand Down
19 changes: 18 additions & 1 deletion server/mysql_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"database/sql"
"errors"
"fmt"
"strconv"
"time"
)

Expand Down Expand Up @@ -135,7 +136,7 @@ func (a *Agent) GetMySQLReplicaStatus(ctx context.Context) (*MySQLReplicaStatus,
return status, nil
}

func (a *Agent) GetTransactionTimestamps(ctx context.Context) (queued, applied time.Time, err error) {
func (a *Agent) GetTransactionTimestamps(ctx context.Context) (queued, applied time.Time, uptime time.Duration, err error) {
err = a.db.GetContext(ctx, &queued, `
SELECT MAX(LAST_QUEUED_TRANSACTION_ORIGINAL_COMMIT_TIMESTAMP)
FROM performance_schema.replication_connection_status`)
Expand All @@ -145,5 +146,21 @@ FROM performance_schema.replication_connection_status`)
err = a.db.GetContext(ctx, &applied, `
SELECT MAX(LAST_APPLIED_TRANSACTION_ORIGINAL_COMMIT_TIMESTAMP)
FROM performance_schema.replication_applier_status_by_worker`)
if err != nil {
return
}
var uptime_seconds_string string
err = a.db.GetContext(ctx, &uptime_seconds_string, `
SELECT VARIABLE_VALUE
FROM performance_schema.global_status
WHERE VARIABLE_NAME='Uptime'`)
if err != nil {
return
}
uptime_seconds, err := strconv.Atoi(uptime_seconds_string)
if err != nil {
return
}
uptime = time.Second * time.Duration(uptime_seconds)
return
}
9 changes: 8 additions & 1 deletion server/mysqld_health.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (a *Agent) MySQLDReady(w http.ResponseWriter, r *http.Request) {
return
}

queued, applied, err := a.GetTransactionTimestamps(r.Context())
queued, applied, uptime, err := a.GetTransactionTimestamps(r.Context())
if err != nil {
a.logger.Error(err, "failed to get replication lag")
msg := fmt.Sprintf("failed to get replication lag: %+v", err)
Expand All @@ -94,6 +94,13 @@ func (a *Agent) MySQLDReady(w http.ResponseWriter, r *http.Request) {
// "0000-00-00 00:00:00.000000", the zero value of transaction timestamps (type TIMESTAMP(6) column),
// is converted to "0001-01-01 00:00:00 +0000", the zero value of time.Time.
// So, this IsZero() works as expected.
if queued.IsZero() && uptime < a.transactionQueueingWait {
a.logger.Info("the instance does not seem to receive transactions yet", "uptime", uptime)
msg := fmt.Sprintf("the instance does not seem to receive transactions yet: uptime=%v", uptime)
http.Error(w, msg, http.StatusServiceUnavailable)
return
}

if !queued.IsZero() {
lag = queued.Sub(applied)
}
Expand Down
59 changes: 57 additions & 2 deletions server/mysqld_health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ var _ = Describe("health", func() {
ConnectionTimeout: 3 * time.Second,
ReadTimeout: 30 * time.Second,
}
agent, err := New(conf, testClusterName, sockFile, "", maxDelayThreshold, testLogger)
agent, err := New(conf, testClusterName, sockFile, "", maxDelayThreshold, time.Second, testLogger)
Expect(err).NotTo(HaveOccurred())
defer agent.CloseDB()

Expand Down Expand Up @@ -80,7 +80,7 @@ var _ = Describe("health", func() {
ConnectionTimeout: 3 * time.Second,
ReadTimeout: 30 * time.Second,
}
agent, err := New(conf, testClusterName, sockFile, "", 100*time.Millisecond, testLogger)
agent, err := New(conf, testClusterName, sockFile, "", 100*time.Millisecond, time.Second, testLogger)
Expect(err).ShouldNot(HaveOccurred())
defer agent.CloseDB()

Expand Down Expand Up @@ -135,6 +135,61 @@ var _ = Describe("health", func() {
return getReady(agent)
}).Should(HaveHTTPStatus(http.StatusOK))
})

It("checks transactionQueueingWait works", func() {
By("starting primary/replica MySQLds")
StartMySQLD(donorHost, donorPort, donorServerID)
defer StopAndRemoveMySQLD(donorHost)

sockFile := filepath.Join(socketDir(donorHost), "mysqld.sock")

donorDB, err := GetMySQLConnLocalSocket(mocoagent.AdminUser, adminUserPassword, sockFile)
Expect(err).NotTo(HaveOccurred())
defer donorDB.Close()

StartMySQLD(replicaHost, replicaPort, replicaServerID)
defer StopAndRemoveMySQLD(replicaHost)

sockFile = filepath.Join(socketDir(replicaHost), "mysqld.sock")
conf := MySQLAccessorConfig{
Host: "localhost",
Port: replicaPort,
Password: agentUserPassword,
ConnMaxIdleTime: 30 * time.Minute,
ConnectionTimeout: 3 * time.Second,
ReadTimeout: 30 * time.Second,
}
agent, err := New(conf, testClusterName, sockFile, "", 100*time.Millisecond, time.Second*60, testLogger)
Expect(err).ShouldNot(HaveOccurred())
defer agent.CloseDB()

replicaDB, err := GetMySQLConnLocalSocket(mocoagent.AdminUser, adminUserPassword, sockFile)
Expect(err).NotTo(HaveOccurred())
defer replicaDB.Close()

By("setting up donor")
_, err = replicaDB.Exec("SET GLOBAL clone_valid_donor_list = ?", donorHost+":3306")
Expect(err).NotTo(HaveOccurred())
_, err = donorDB.Exec("SET GLOBAL read_only=0")
Expect(err).NotTo(HaveOccurred())

_, err = replicaDB.Exec(`CHANGE MASTER TO MASTER_HOST=?, MASTER_PORT=3306, MASTER_USER=?, MASTER_PASSWORD=?, GET_MASTER_PUBLIC_KEY=1`,
donorHost, mocoagent.ReplicationUser, replicationUserPassword)
Expect(err).NotTo(HaveOccurred())
_, err = replicaDB.Exec(`START SLAVE`)
Expect(err).NotTo(HaveOccurred())

By("checking readiness")
// The uptime observed by the agent is about 15s smaller than the process uptime reported by kernel
// The test flow takes about 35s from the process start to this point.
// We set Consistently timeout to 60(transactionQueueingWait) - (35 - 15) - 5(margin) = 35
Consistently(func() interface{} {
return getReady(agent)
}).WithPolling(time.Second).WithTimeout(time.Second * 35).ShouldNot(HaveHTTPStatus(http.StatusOK))
Eventually(func() interface{} {
return getReady(agent)
}).WithPolling(time.Second).WithTimeout(time.Second * 10).Should(HaveHTTPStatus(http.StatusOK))
})
})

func getHealth(agent *Agent) *httptest.ResponseRecorder {
Expand Down
2 changes: 1 addition & 1 deletion server/rotate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ var _ = Describe("log rotation", func() {
ConnectionTimeout: 3 * time.Second,
ReadTimeout: 30 * time.Second,
}
agent, err := New(conf, testClusterName, sockFile, tmpDir, maxDelayThreshold, testLogger)
agent, err := New(conf, testClusterName, sockFile, tmpDir, maxDelayThreshold, time.Second, testLogger)
Expect(err).ShouldNot(HaveOccurred())
defer agent.CloseDB()

Expand Down
30 changes: 16 additions & 14 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,31 +22,33 @@ type agentService struct {
}

// New returns an Agent
func New(config MySQLAccessorConfig, clusterName, socket, logDir string, maxDelay time.Duration, logger logr.Logger) (*Agent, error) {
func New(config MySQLAccessorConfig, clusterName, socket, logDir string, maxDelay, transactionQueueingWait time.Duration, logger logr.Logger) (*Agent, error) {
db, err := getMySQLConn(config)
if err != nil {
return nil, err
}

return &Agent{
config: config,
db: db,
logger: logger,
mysqlSocketPath: socket,
logDir: logDir,
maxDelayThreshold: maxDelay,
cloneLock: make(chan struct{}, 1),
config: config,
db: db,
logger: logger,
mysqlSocketPath: socket,
logDir: logDir,
maxDelayThreshold: maxDelay,
transactionQueueingWait: transactionQueueingWait,
cloneLock: make(chan struct{}, 1),
}, nil
}

// Agent is the agent to executes some MySQL commands of the own Pod
type Agent struct {
config MySQLAccessorConfig
db *sqlx.DB
logger logr.Logger
mysqlSocketPath string
logDir string
maxDelayThreshold time.Duration
config MySQLAccessorConfig
db *sqlx.DB
logger logr.Logger
mysqlSocketPath string
logDir string
maxDelayThreshold time.Duration
transactionQueueingWait time.Duration

cloneLock chan struct{}
registryLock sync.Mutex
Expand Down

0 comments on commit 1143f0d

Please sign in to comment.