From 49853de81372335306efb109c75dece6f85b191b Mon Sep 17 00:00:00 2001 From: UMEZAWA Takeshi Date: Mon, 24 Jul 2023 13:22:26 +0900 Subject: [PATCH] add an option for waiting transaction queueing on replica Signed-off-by: UMEZAWA Takeshi --- cmd/moco-agent/cmd/root.go | 24 ++++++++------- docs/moco-agent.md | 28 +++++++++-------- server/clone_test.go | 2 +- server/mysql_status.go | 19 +++++++++++- server/mysqld_health.go | 9 +++++- server/mysqld_health_test.go | 59 ++++++++++++++++++++++++++++++++++-- server/rotate_test.go | 2 +- server/server.go | 30 +++++++++--------- 8 files changed, 129 insertions(+), 44 deletions(-) diff --git a/cmd/moco-agent/cmd/root.go b/cmd/moco-agent/cmd/root.go index bd21faf..d3c4458 100644 --- a/cmd/moco-agent/cmd/root.go +++ b/cmd/moco-agent/cmd/root.go @@ -46,16 +46,17 @@ const ( ) var config struct { - address string - probeAddress string - metricsAddress string - connIdleTime time.Duration - connectionTimeout time.Duration - logRotationSchedule string - readTimeout time.Duration - maxDelayThreshold time.Duration - socketPath string - grpcCertDir string + address string + probeAddress string + metricsAddress string + connIdleTime time.Duration + connectionTimeout time.Duration + logRotationSchedule string + readTimeout time.Duration + maxDelayThreshold time.Duration + socketPath string + grpcCertDir string + transactionQueueingWait time.Duration } type mysqlLogger struct{} @@ -108,7 +109,7 @@ var rootCmd = &cobra.Command{ ReadTimeout: config.readTimeout, } agent, err := server.New(conf, clusterName, config.socketPath, mocoagent.VarLogPath, - config.maxDelayThreshold, rLogger.WithName("agent")) + config.maxDelayThreshold, config.transactionQueueingWait, rLogger.WithName("agent")) if err != nil { return err } @@ -236,6 +237,7 @@ func init() { fs.DurationVar(&config.maxDelayThreshold, "max-delay", time.Minute, "Acceptable max commit delay considering as ready; the zero value accepts any delay") fs.StringVar(&config.socketPath, "socket-path", socketPathDefault, "Path of mysqld socket file.") fs.StringVar(&config.grpcCertDir, "grpc-cert-dir", "/grpc-cert", "gRPC certificate directory") + fs.DurationVar(&config.transactionQueueingWait, "transaction-queueing-wait", time.Minute, "The maximum amount of time for waiting transaction queueing on replica") } func initializeMySQLForMOCO(ctx context.Context, socketPath string, logger logr.Logger) error { diff --git a/docs/moco-agent.md b/docs/moco-agent.md index 9b53058..d0c17d5 100644 --- a/docs/moco-agent.md +++ b/docs/moco-agent.md @@ -6,19 +6,21 @@ This is the specification document of `moco-agent` command. ``` Flags: - --address string Listening address and port for gRPC API. (default ":9080") - --connection-timeout duration Dial timeout (default 5s) - -h, --help help for moco-agent - --log-rotation-schedule string Cron format schedule for MySQL log rotation (default "*/5 * * * *") - --logfile string Log filename - --logformat string Log format [plain,logfmt,json] - --loglevel string Log level [critical,error,warning,info,debug] - --max-delay duration Acceptable max commit delay considering as ready; the zero value accepts any delay (default 1m0s) - --max-idle-time duration The maximum amount of time a connection may be idle (default 30s) - --metrics-address string Listening address and port for metrics. (default ":8080") - --probe-address string Listening address and port for mysqld health probes. (default ":9081") - --read-timeout duration I/O read timeout (default 30s) - --socket-path string Path of mysqld socket file. (default "/run/mysqld.sock") + --address string Listening address and port for gRPC API. (default ":9080") + --connection-timeout duration Dial timeout (default 5s) + --grpc-cert-dir string gRPC certificate directory (default "/grpc-cert") + -h, --help help for moco-agent + --log-rotation-schedule string Cron format schedule for MySQL log rotation (default "*/5 * * * *") + --logfile string Log filename + --logformat string Log format [plain,logfmt,json] + --loglevel string Log level [critical,error,warning,info,debug] + --max-delay duration Acceptable max commit delay considering as ready; the zero value accepts any delay (default 1m0s) + --max-idle-time duration The maximum amount of time a connection may be idle (default 30s) + --metrics-address string Listening address and port for metrics. (default ":8080") + --probe-address string Listening address and port for mysqld health probes. (default ":9081") + --read-timeout duration I/O read timeout (default 30s) + --socket-path string Path of mysqld socket file. (default "/run/mysqld.sock") + --transaction-queueing-wait duration The maximum amount of time for waiting transaction queueing on replica (default 1m0s) ``` ## Environment variables diff --git a/server/clone_test.go b/server/clone_test.go index f6ebcbe..b5d9180 100644 --- a/server/clone_test.go +++ b/server/clone_test.go @@ -69,7 +69,7 @@ var _ = Describe("clone", func() { ConnectionTimeout: 3 * time.Second, ReadTimeout: 30 * time.Second, } - agent, err := New(conf, testClusterName, sockFile, "", 100*time.Millisecond, testLogger) + agent, err := New(conf, testClusterName, sockFile, "", 100*time.Millisecond, time.Second, testLogger) Expect(err).ShouldNot(HaveOccurred()) defer agent.CloseDB() diff --git a/server/mysql_status.go b/server/mysql_status.go index 62cb2e9..73e83a7 100644 --- a/server/mysql_status.go +++ b/server/mysql_status.go @@ -5,6 +5,7 @@ import ( "database/sql" "errors" "fmt" + "strconv" "time" ) @@ -135,7 +136,7 @@ func (a *Agent) GetMySQLReplicaStatus(ctx context.Context) (*MySQLReplicaStatus, return status, nil } -func (a *Agent) GetTransactionTimestamps(ctx context.Context) (queued, applied time.Time, err error) { +func (a *Agent) GetTransactionTimestamps(ctx context.Context) (queued, applied time.Time, uptime time.Duration, err error) { err = a.db.GetContext(ctx, &queued, ` SELECT MAX(LAST_QUEUED_TRANSACTION_ORIGINAL_COMMIT_TIMESTAMP) FROM performance_schema.replication_connection_status`) @@ -145,5 +146,21 @@ FROM performance_schema.replication_connection_status`) err = a.db.GetContext(ctx, &applied, ` SELECT MAX(LAST_APPLIED_TRANSACTION_ORIGINAL_COMMIT_TIMESTAMP) FROM performance_schema.replication_applier_status_by_worker`) + if err != nil { + return + } + var uptime_seconds_string string + err = a.db.GetContext(ctx, &uptime_seconds_string, ` +SELECT VARIABLE_VALUE +FROM performance_schema.global_status +WHERE VARIABLE_NAME='Uptime'`) + if err != nil { + return + } + uptime_seconds, err := strconv.Atoi(uptime_seconds_string) + if err != nil { + return + } + uptime = time.Second * time.Duration(uptime_seconds) return } diff --git a/server/mysqld_health.go b/server/mysqld_health.go index 4cf3150..29caaaf 100644 --- a/server/mysqld_health.go +++ b/server/mysqld_health.go @@ -82,7 +82,7 @@ func (a *Agent) MySQLDReady(w http.ResponseWriter, r *http.Request) { return } - queued, applied, err := a.GetTransactionTimestamps(r.Context()) + queued, applied, uptime, err := a.GetTransactionTimestamps(r.Context()) if err != nil { a.logger.Error(err, "failed to get replication lag") msg := fmt.Sprintf("failed to get replication lag: %+v", err) @@ -94,6 +94,13 @@ func (a *Agent) MySQLDReady(w http.ResponseWriter, r *http.Request) { // "0000-00-00 00:00:00.000000", the zero value of transaction timestamps (type TIMESTAMP(6) column), // is converted to "0001-01-01 00:00:00 +0000", the zero value of time.Time. // So, this IsZero() works as expected. + if queued.IsZero() && uptime < a.transactionQueueingWait { + a.logger.Info("the instance does not seem to receive transactions yet", "uptime", uptime) + msg := fmt.Sprintf("the instance does not seem to receive transactions yet: uptime=%v", uptime) + http.Error(w, msg, http.StatusServiceUnavailable) + return + } + if !queued.IsZero() { lag = queued.Sub(applied) } diff --git a/server/mysqld_health_test.go b/server/mysqld_health_test.go index e837602..6b991c8 100644 --- a/server/mysqld_health_test.go +++ b/server/mysqld_health_test.go @@ -25,7 +25,7 @@ var _ = Describe("health", func() { ConnectionTimeout: 3 * time.Second, ReadTimeout: 30 * time.Second, } - agent, err := New(conf, testClusterName, sockFile, "", maxDelayThreshold, testLogger) + agent, err := New(conf, testClusterName, sockFile, "", maxDelayThreshold, time.Second, testLogger) Expect(err).NotTo(HaveOccurred()) defer agent.CloseDB() @@ -80,7 +80,7 @@ var _ = Describe("health", func() { ConnectionTimeout: 3 * time.Second, ReadTimeout: 30 * time.Second, } - agent, err := New(conf, testClusterName, sockFile, "", 100*time.Millisecond, testLogger) + agent, err := New(conf, testClusterName, sockFile, "", 100*time.Millisecond, time.Second, testLogger) Expect(err).ShouldNot(HaveOccurred()) defer agent.CloseDB() @@ -135,6 +135,61 @@ var _ = Describe("health", func() { return getReady(agent) }).Should(HaveHTTPStatus(http.StatusOK)) }) + + It("checks transactionQueueingWait works", func() { + By("starting primary/replica MySQLds") + StartMySQLD(donorHost, donorPort, donorServerID) + defer StopAndRemoveMySQLD(donorHost) + + sockFile := filepath.Join(socketDir(donorHost), "mysqld.sock") + + donorDB, err := GetMySQLConnLocalSocket(mocoagent.AdminUser, adminUserPassword, sockFile) + Expect(err).NotTo(HaveOccurred()) + defer donorDB.Close() + + StartMySQLD(replicaHost, replicaPort, replicaServerID) + defer StopAndRemoveMySQLD(replicaHost) + + sockFile = filepath.Join(socketDir(replicaHost), "mysqld.sock") + conf := MySQLAccessorConfig{ + Host: "localhost", + Port: replicaPort, + Password: agentUserPassword, + ConnMaxIdleTime: 30 * time.Minute, + ConnectionTimeout: 3 * time.Second, + ReadTimeout: 30 * time.Second, + } + agent, err := New(conf, testClusterName, sockFile, "", 100*time.Millisecond, time.Second*60, testLogger) + Expect(err).ShouldNot(HaveOccurred()) + defer agent.CloseDB() + + replicaDB, err := GetMySQLConnLocalSocket(mocoagent.AdminUser, adminUserPassword, sockFile) + Expect(err).NotTo(HaveOccurred()) + defer replicaDB.Close() + + By("setting up donor") + _, err = replicaDB.Exec("SET GLOBAL clone_valid_donor_list = ?", donorHost+":3306") + Expect(err).NotTo(HaveOccurred()) + _, err = donorDB.Exec("SET GLOBAL read_only=0") + Expect(err).NotTo(HaveOccurred()) + + _, err = replicaDB.Exec(`CHANGE MASTER TO MASTER_HOST=?, MASTER_PORT=3306, MASTER_USER=?, MASTER_PASSWORD=?, GET_MASTER_PUBLIC_KEY=1`, + donorHost, mocoagent.ReplicationUser, replicationUserPassword) + Expect(err).NotTo(HaveOccurred()) + _, err = replicaDB.Exec(`START SLAVE`) + Expect(err).NotTo(HaveOccurred()) + + By("checking readiness") + // The uptime observed by the agent is about 15s smaller than the process uptime reported by kernel + // The test flow takes about 35s from the process start to this point. + // We set Consistently timeout to 60(transactionQueueingWait) - (35 - 15) - 5(margin) = 35 + Consistently(func() interface{} { + return getReady(agent) + }).WithPolling(time.Second).WithTimeout(time.Second * 35).ShouldNot(HaveHTTPStatus(http.StatusOK)) + Eventually(func() interface{} { + return getReady(agent) + }).WithPolling(time.Second).WithTimeout(time.Second * 10).Should(HaveHTTPStatus(http.StatusOK)) + }) }) func getHealth(agent *Agent) *httptest.ResponseRecorder { diff --git a/server/rotate_test.go b/server/rotate_test.go index 1618f3e..a1a9695 100644 --- a/server/rotate_test.go +++ b/server/rotate_test.go @@ -31,7 +31,7 @@ var _ = Describe("log rotation", func() { ConnectionTimeout: 3 * time.Second, ReadTimeout: 30 * time.Second, } - agent, err := New(conf, testClusterName, sockFile, tmpDir, maxDelayThreshold, testLogger) + agent, err := New(conf, testClusterName, sockFile, tmpDir, maxDelayThreshold, time.Second, testLogger) Expect(err).ShouldNot(HaveOccurred()) defer agent.CloseDB() diff --git a/server/server.go b/server/server.go index e8d3d3c..e68c38e 100644 --- a/server/server.go +++ b/server/server.go @@ -22,31 +22,33 @@ type agentService struct { } // New returns an Agent -func New(config MySQLAccessorConfig, clusterName, socket, logDir string, maxDelay time.Duration, logger logr.Logger) (*Agent, error) { +func New(config MySQLAccessorConfig, clusterName, socket, logDir string, maxDelay, transactionQueueingWait time.Duration, logger logr.Logger) (*Agent, error) { db, err := getMySQLConn(config) if err != nil { return nil, err } return &Agent{ - config: config, - db: db, - logger: logger, - mysqlSocketPath: socket, - logDir: logDir, - maxDelayThreshold: maxDelay, - cloneLock: make(chan struct{}, 1), + config: config, + db: db, + logger: logger, + mysqlSocketPath: socket, + logDir: logDir, + maxDelayThreshold: maxDelay, + transactionQueueingWait: transactionQueueingWait, + cloneLock: make(chan struct{}, 1), }, nil } // Agent is the agent to executes some MySQL commands of the own Pod type Agent struct { - config MySQLAccessorConfig - db *sqlx.DB - logger logr.Logger - mysqlSocketPath string - logDir string - maxDelayThreshold time.Duration + config MySQLAccessorConfig + db *sqlx.DB + logger logr.Logger + mysqlSocketPath string + logDir string + maxDelayThreshold time.Duration + transactionQueueingWait time.Duration cloneLock chan struct{} registryLock sync.Mutex