From 8d9903b96096fffba9c37cf26820fc1f23dbf15c Mon Sep 17 00:00:00 2001 From: shunki-fujita Date: Tue, 11 Jun 2024 07:19:36 +0000 Subject: [PATCH 1/2] issue-686 Replace 'master/slave' with 'source/replica' --- server/clone_test.go | 23 ++++++--- server/initialize.go | 15 +++++- server/mysql_status.go | 97 +++++++++++++++++++++--------------- server/mysqld_health.go | 2 +- server/mysqld_health_test.go | 30 +++++++---- 5 files changed, 110 insertions(+), 57 deletions(-) diff --git a/server/clone_test.go b/server/clone_test.go index b5d9180..b3a43b5 100644 --- a/server/clone_test.go +++ b/server/clone_test.go @@ -40,8 +40,13 @@ var _ = Describe("clone", func() { Expect(err).NotTo(HaveOccurred()) _, err = donorDB.Exec("INSERT INTO foo.bar (i) VALUES (100), (101), (102), (103)") Expect(err).NotTo(HaveOccurred()) - _, err = donorDB.Exec(`RESET MASTER`) - Expect(err).NotTo(HaveOccurred()) + if strings.HasPrefix(MySQLVersion, "8.4") { + _, err = donorDB.Exec(`RESET BINARY LOGS AND GTIDS`) + Expect(err).NotTo(HaveOccurred()) + } else { + _, err = donorDB.Exec(`RESET MASTER`) + Expect(err).NotTo(HaveOccurred()) + } _, err = donorDB.Exec("INSERT INTO foo.bar (i) VALUES (200), (800), (10000), (-3)") Expect(err).NotTo(HaveOccurred()) @@ -99,10 +104,16 @@ var _ = Describe("clone", func() { By("starting replication") _, err = donorDB.Exec(`INSERT INTO foo.bar (i) VALUES (9), (999)`) Expect(err).NotTo(HaveOccurred()) - _, err = replicaDB.Exec(`CHANGE MASTER TO MASTER_HOST=?, MASTER_PORT=3306, MASTER_USER=?, MASTER_PASSWORD=?, GET_MASTER_PUBLIC_KEY=1`, - donorHost, mocoagent.ReplicationUser, replicationUserPassword) - Expect(err).NotTo(HaveOccurred()) - _, err = replicaDB.Exec(`START SLAVE`) + if strings.hasPrefix(MySQLVersion, "8.4") { + _, err = replicaDB.Exec(`CHANGE REPLICATION SOURCE TO SOURCE_HOST=?, SOURCE_PORT=3306, SOURCE_USER=?, SOURCE_PASSWORD=?, GET_SOURCE_PUBLIC_KEY=1`, + donorHost, mocoagent.ReplicationUser, replicationUserPassword) + Expect(err).NotTo(HaveOccurred()) + } else { + _, err = replicaDB.Exec(`CHANGE MASTER TO MASTER_HOST=?, MASTER_PORT=3306, MASTER_USER=?, MASTER_PASSWORD=?, GET_MASTER_PUBLIC_KEY=1`, + donorHost, mocoagent.ReplicationUser, replicationUserPassword) + Expect(err).NotTo(HaveOccurred()) + } + _, err = replicaDB.Exec(`START REPLICA`) Expect(err).NotTo(HaveOccurred()) Eventually(func() int { diff --git a/server/initialize.go b/server/initialize.go index 2269bc6..44065ba 100644 --- a/server/initialize.go +++ b/server/initialize.go @@ -318,8 +318,19 @@ func Init(ctx context.Context, db *sqlx.DB, socket string) error { return err } - if _, err := db.ExecContext(ctx, "RESET MASTER"); err != nil { - return fmt.Errorf("failed to reset master: %w", err) + var version string + err := db.GetContext(ctx, &version, `SELECT SUBSTRING_INDEX(VERSION(), '.', 2)`) + if err != nil { + return false, fmt.Errorf("failed to get version: %w", err) + } + if version == "8.0" { + if _, err := db.ExecContext(ctx, "RESET MASTER"); err != nil { + return fmt.Errorf("failed to reset master: %w", err) + } + } else { + if _, err := db.ExecContext(ctx, "RESET BINARY LOGS AND GTIDS"); err != nil { + return fmt.Errorf("failed to reset binary logs and gtids: %w", err) + } } if _, err := db.ExecContext(ctx, "SET GLOBAL super_read_only=ON"); err != nil { return fmt.Errorf("failed to enable super_read_only: %w", err) diff --git a/server/mysql_status.go b/server/mysql_status.go index 73e83a7..5b387fb 100644 --- a/server/mysql_status.go +++ b/server/mysql_status.go @@ -35,26 +35,26 @@ type MySQLPrimaryStatus struct { // MySQLReplicaStatus defines the observed state of a replica type MySQLReplicaStatus struct { - LastIOErrno int `db:"Last_IO_Errno"` - LastIOError string `db:"Last_IO_Error"` - LastSQLErrno int `db:"Last_SQL_Errno"` - LastSQLError string `db:"Last_SQL_Error"` - MasterHost string `db:"Master_Host"` - RetrievedGtidSet string `db:"Retrieved_Gtid_Set"` - ExecutedGtidSet string `db:"Executed_Gtid_Set"` - SlaveIORunning string `db:"Slave_IO_Running"` - SlaveSQLRunning string `db:"Slave_SQL_Running"` + LastIOErrno int `db:"Last_IO_Errno"` + LastIOError string `db:"Last_IO_Error"` + LastSQLErrno int `db:"Last_SQL_Errno"` + LastSQLError string `db:"Last_SQL_Error"` + SourceHost string `db:"Source_Host"` + RetrievedGtidSet string `db:"Retrieved_Gtid_Set"` + ExecutedGtidSet string `db:"Executed_Gtid_Set"` + ReplicaIORunning string `db:"Replica_IO_Running"` + ReplicaSQLRunning string `db:"Replica_SQL_Running"` // All of variables from here are NOT used in MOCO's reconcile - SlaveIOState string `db:"Slave_IO_State"` - MasterUser string `db:"Master_User"` - MasterPort int `db:"Master_Port"` + ReplicaIOState string `db:"Replica_IO_State"` + SourceUser string `db:"Source_User"` + SourcePort int `db:"Source_Port"` ConnectRetry int `db:"Connect_Retry"` - MasterLogFile string `db:"Master_Log_File"` - ReadMasterLogPos int `db:"Read_Master_Log_Pos"` + SourceLogFile string `db:"Source_Log_File"` + ReadSourceLogPos int `db:"Read_Source_Log_Pos"` RelayLogFile string `db:"Relay_Log_File"` RelayLogPos int `db:"Relay_Log_Pos"` - RelayMasterLogFile string `db:"Relay_Master_Log_File"` + RelaySourceLogFile string `db:"Relay_Source_Log_File"` ReplicateDoDB string `db:"Replicate_Do_DB"` ReplicateIgnoreDB string `db:"Replicate_Ignore_DB"` ReplicateDoTable string `db:"Replicate_Do_Table"` @@ -64,38 +64,38 @@ type MySQLReplicaStatus struct { LastErrno int `db:"Last_Errno"` LastError string `db:"Last_Error"` SkipCounter int `db:"Skip_Counter"` - ExecMasterLogPos int `db:"Exec_Master_Log_Pos"` + ExecSourceLogPos int `db:"Exec_Source_Log_Pos"` RelayLogSpace int `db:"Relay_Log_Space"` UntilCondition string `db:"Until_Condition"` UntilLogFile string `db:"Until_Log_File"` UntilLogPos int `db:"Until_Log_Pos"` - MasterSSLAllowed string `db:"Master_SSL_Allowed"` - MasterSSLCAFile string `db:"Master_SSL_CA_File"` - MasterSSLCAPath string `db:"Master_SSL_CA_Path"` - MasterSSLCert string `db:"Master_SSL_Cert"` - MasterSSLCipher string `db:"Master_SSL_Cipher"` - MasterSSLKey string `db:"Master_SSL_Key"` - SecondsBehindMaster sql.NullInt64 `db:"Seconds_Behind_Master"` - MasterSSLVerifyServerCert string `db:"Master_SSL_Verify_Server_Cert"` + SourceSSLAllowed string `db:"Source_SSL_Allowed"` + SourceSSLCAFile string `db:"Source_SSL_CA_File"` + SourceSSLCAPath string `db:"Source_SSL_CA_Path"` + SourceSSLCert string `db:"Source_SSL_Cert"` + SourceSSLCipher string `db:"Source_SSL_Cipher"` + SourceSSLKey string `db:"Source_SSL_Key"` + SecondsBehindSource sql.NullInt64 `db:"Seconds_Behind_Source"` + SourceSSLVerifyServerCert string `db:"Source_SSL_Verify_Server_Cert"` ReplicateIgnoreServerIds string `db:"Replicate_Ignore_Server_Ids"` - MasterServerID int `db:"Master_Server_Id"` - MasterUUID string `db:"Master_UUID"` - MasterInfoFile string `db:"Master_Info_File"` + SourceServerID int `db:"Source_Server_Id"` + SourceUUID string `db:"Source_UUID"` + SourceInfoFile string `db:"Source_Info_File"` SQLDelay int `db:"SQL_Delay"` SQLRemainingDelay sql.NullInt64 `db:"SQL_Remaining_Delay"` - SlaveSQLRunningState string `db:"Slave_SQL_Running_State"` - MasterRetryCount int `db:"Master_Retry_Count"` - MasterBind string `db:"Master_Bind"` + ReplicaSQLRunningState string `db:"Replica_SQL_Running_State"` + SourceRetryCount int `db:"Source_Retry_Count"` + SourceBind string `db:"Source_Bind"` LastIOErrorTimestamp string `db:"Last_IO_Error_Timestamp"` LastSQLErrorTimestamp string `db:"Last_SQL_Error_Timestamp"` - MasterSSLCrl string `db:"Master_SSL_Crl"` - MasterSSLCrlpath string `db:"Master_SSL_Crlpath"` + SourceSSLCrl string `db:"Source_SSL_Crl"` + SourceSSLCrlpath string `db:"Source_SSL_Crlpath"` AutoPosition string `db:"Auto_Position"` ReplicateRewriteDB string `db:"Replicate_Rewrite_DB"` ChannelName string `db:"Channel_Name"` - MasterTLSVersion string `db:"Master_TLS_Version"` - Masterpublickeypath string `db:"Master_public_key_path"` - Getmasterpublickey string `db:"Get_master_public_key"` + SourceTLSVersion string `db:"Source_TLS_Version"` + Sourcepublickeypath string `db:"Source_public_key_path"` + GetSourcepublickey string `db:"Get_Source_public_key"` NetworkNamespace string `db:"Network_Namespace"` } @@ -120,18 +120,37 @@ func (a *Agent) GetMySQLCloneStateStatus(ctx context.Context) (*MySQLCloneStateS return status, nil } +func (a *Agent) IsMySQL84(ctx context.Context) (bool, error) { + var version string + err := a.db.GetContext(ctx, &version, `SELECT SUBSTRING_INDEX(VERSION(), '.', 2)`) + if err != nil { + return false, fmt.Errorf("failed to get version: %w", err) + } + return version == "8.4", nil +} + func (a *Agent) GetMySQLPrimaryStatus(ctx context.Context) (*MySQLPrimaryStatus, error) { status := &MySQLPrimaryStatus{} - if err := a.db.GetContext(ctx, status, `SHOW MASTER STATUS`); err != nil { - return nil, fmt.Errorf("failed to show master status: %w", err) + isMySQL84, err := a.IsMySQL84(ctx) + if err != nil { + return nil, err + } + if isMySQL84 { + if err := a.db.GetContext(ctx, status, `SHOW BINARY LOG STATUS`); err != nil { + return nil, fmt.Errorf("failed to show binary log status: %w", err) + } + } else { + if err := a.db.GetContext(ctx, status, `SHOW MASTER STATUS`); err != nil { + return nil, fmt.Errorf("failed to show master status: %w", err) + } } return status, nil } func (a *Agent) GetMySQLReplicaStatus(ctx context.Context) (*MySQLReplicaStatus, error) { status := &MySQLReplicaStatus{} - if err := a.db.GetContext(ctx, status, `SHOW SLAVE STATUS`); err != nil { - return nil, fmt.Errorf("failed to show slave status: %w", err) + if err := a.db.GetContext(ctx, status, `SHOW REPLICA STATUS`); err != nil { + return nil, fmt.Errorf("failed to show replica status: %w", err) } return status, nil } diff --git a/server/mysqld_health.go b/server/mysqld_health.go index 64e054e..41a7f5b 100644 --- a/server/mysqld_health.go +++ b/server/mysqld_health.go @@ -59,7 +59,7 @@ func (a *Agent) MySQLDReady(w http.ResponseWriter, r *http.Request) { return } - if replicaStatus.SlaveIORunning != "Yes" || replicaStatus.SlaveSQLRunning != "Yes" { + if replicaStatus.ReplicaIORunning != "Yes" || replicaStatus.ReplicaSQLRunning != "Yes" { a.logger.Info("replication threads are stopped") http.Error(w, "replication thread are stopped", http.StatusServiceUnavailable) return diff --git a/server/mysqld_health_test.go b/server/mysqld_health_test.go index 6b991c8..b4ab221 100644 --- a/server/mysqld_health_test.go +++ b/server/mysqld_health_test.go @@ -104,11 +104,17 @@ var _ = Describe("health", func() { items := []interface{}{100, 299, 993, 9292} _, err = donorDB.Exec("INSERT INTO foo.bar (i) VALUES (?), (?), (?), (?)", items...) Expect(err).NotTo(HaveOccurred()) - - _, err = replicaDB.Exec(`CHANGE MASTER TO MASTER_HOST=?, MASTER_PORT=3306, MASTER_USER=?, MASTER_PASSWORD=?, GET_MASTER_PUBLIC_KEY=1`, - donorHost, mocoagent.ReplicationUser, replicationUserPassword) - Expect(err).NotTo(HaveOccurred()) - _, err = replicaDB.Exec(`START SLAVE`) + + if strings.hasPrefix(MYSQL_VERSION, "8.4") { + _, err = replicaDB.Exec(`CHANGE REPLICATION SOURCE TO SOURCE_HOST=?, SOURCE_PORT=3306, SOURCE_USER=?, SOURCE_PASSWORD=?, GET_SOURCE_PUBLIC_KEY=1`, + donorHost, mocoagent.ReplicationUser, replicationUserPassword) + Expect(err).NotTo(HaveOccurred()) + } else { + _, err = replicaDB.Exec(`CHANGE MASTER TO MASTER_HOST=?, MASTER_PORT=3306, MASTER_USER=?, MASTER_PASSWORD=?, GET_MASTER_PUBLIC_KEY=1`, + donorHost, mocoagent.ReplicationUser, replicationUserPassword) + Expect(err).NotTo(HaveOccurred()) + } + _, err = replicaDB.Exec(`START REPLICA`) Expect(err).NotTo(HaveOccurred()) By("checking readiness") @@ -173,10 +179,16 @@ var _ = Describe("health", func() { _, err = donorDB.Exec("SET GLOBAL read_only=0") Expect(err).NotTo(HaveOccurred()) - _, err = replicaDB.Exec(`CHANGE MASTER TO MASTER_HOST=?, MASTER_PORT=3306, MASTER_USER=?, MASTER_PASSWORD=?, GET_MASTER_PUBLIC_KEY=1`, - donorHost, mocoagent.ReplicationUser, replicationUserPassword) - Expect(err).NotTo(HaveOccurred()) - _, err = replicaDB.Exec(`START SLAVE`) + if strings.hasPrefix(MYSQL_VERSION, "8.4") { + _, err = replicaDB.Exec(`CHANGE REPLICATION SOURCE TO SOURCE_HOST=?, SOURCE_PORT=3306, SOURCE_USER=?, SOURCE_PASSWORD=?, GET_SOURCE_PUBLIC_KEY=1`, + donorHost, mocoagent.ReplicationUser, replicationUserPassword) + Expect(err).NotTo(HaveOccurred()) + } else { + _, err = replicaDB.Exec(`CHANGE MASTER TO MASTER_HOST=?, MASTER_PORT=3306, MASTER_USER=?, MASTER_PASSWORD=?, GET_MASTER_PUBLIC_KEY=1`, + donorHost, mocoagent.ReplicationUser, replicationUserPassword) + Expect(err).NotTo(HaveOccurred()) + } + _, err = replicaDB.Exec(`START REPLICA`) Expect(err).NotTo(HaveOccurred()) By("checking readiness") From 230da459955916bde60d5e0d471841e711db9885 Mon Sep 17 00:00:00 2001 From: shunki-fujita Date: Wed, 12 Jun 2024 06:51:48 +0000 Subject: [PATCH 2/2] issue-686: support MySQL 8.4.0 --- .github/workflows/ci.yaml | 2 +- Makefile | 2 +- server/clone_test.go | 3 ++- server/initialize.go | 13 +++++++------ server/mysql_test.go | 2 +- server/mysqld_health_test.go | 7 ++++--- 6 files changed, 16 insertions(+), 13 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 7ce1800..fcca8f5 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -43,7 +43,7 @@ jobs: name: Small Tests strategy: matrix: - mysql-version: ["8.0.18", "8.0.25", "8.0.26", "8.0.27", "8.0.28", "8.0.30", "8.0.31", "8.0.32", "8.0.33", "8.0.34", "8.0.35", "8.0.36", "8.0.37"] + mysql-version: ["8.0.28", "8.0.36", "8.0.37", "8.4.0"] runs-on: ubuntu-22.04 steps: - uses: actions/checkout@v4 diff --git a/Makefile b/Makefile index b358778..877869a 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -MYSQL_VERSION = 8.0.37 +MYSQL_VERSION = 8.4.0 # For Go GOOS := $(shell go env GOOS) diff --git a/server/clone_test.go b/server/clone_test.go index b3a43b5..e305f3a 100644 --- a/server/clone_test.go +++ b/server/clone_test.go @@ -3,6 +3,7 @@ package server import ( "context" "path/filepath" + "strings" "time" mocoagent "github.com/cybozu-go/moco-agent" @@ -104,7 +105,7 @@ var _ = Describe("clone", func() { By("starting replication") _, err = donorDB.Exec(`INSERT INTO foo.bar (i) VALUES (9), (999)`) Expect(err).NotTo(HaveOccurred()) - if strings.hasPrefix(MySQLVersion, "8.4") { + if strings.HasPrefix(MySQLVersion, "8.4") { _, err = replicaDB.Exec(`CHANGE REPLICATION SOURCE TO SOURCE_HOST=?, SOURCE_PORT=3306, SOURCE_USER=?, SOURCE_PASSWORD=?, GET_SOURCE_PUBLIC_KEY=1`, donorHost, mocoagent.ReplicationUser, replicationUserPassword) Expect(err).NotTo(HaveOccurred()) diff --git a/server/initialize.go b/server/initialize.go index 44065ba..21f85e2 100644 --- a/server/initialize.go +++ b/server/initialize.go @@ -321,16 +321,17 @@ func Init(ctx context.Context, db *sqlx.DB, socket string) error { var version string err := db.GetContext(ctx, &version, `SELECT SUBSTRING_INDEX(VERSION(), '.', 2)`) if err != nil { - return false, fmt.Errorf("failed to get version: %w", err) + return fmt.Errorf("failed to get version: %w", err) } - if version == "8.0" { - if _, err := db.ExecContext(ctx, "RESET MASTER"); err != nil { - return fmt.Errorf("failed to reset master: %w", err) - } - } else { + if version == "8.4" { if _, err := db.ExecContext(ctx, "RESET BINARY LOGS AND GTIDS"); err != nil { return fmt.Errorf("failed to reset binary logs and gtids: %w", err) } + + } else { + if _, err := db.ExecContext(ctx, "RESET MASTER"); err != nil { + return fmt.Errorf("failed to reset master: %w", err) + } } if _, err := db.ExecContext(ctx, "SET GLOBAL super_read_only=ON"); err != nil { return fmt.Errorf("failed to enable super_read_only: %w", err) diff --git a/server/mysql_test.go b/server/mysql_test.go index 363e4fd..18f5956 100644 --- a/server/mysql_test.go +++ b/server/mysql_test.go @@ -35,7 +35,7 @@ var tmpBaseDir = path.Join(os.TempDir(), "moco-agent-test-server") var MySQLVersion = func() string { if ver := os.Getenv("MYSQL_VERSION"); ver == "" { - os.Setenv("MYSQL_VERSION", "8.0.28") + os.Setenv("MYSQL_VERSION", "8.4.0") } return os.Getenv("MYSQL_VERSION") }() diff --git a/server/mysqld_health_test.go b/server/mysqld_health_test.go index b4ab221..b592fb8 100644 --- a/server/mysqld_health_test.go +++ b/server/mysqld_health_test.go @@ -4,6 +4,7 @@ import ( "net/http" "net/http/httptest" "path/filepath" + "strings" "time" mocoagent "github.com/cybozu-go/moco-agent" @@ -104,8 +105,8 @@ var _ = Describe("health", func() { items := []interface{}{100, 299, 993, 9292} _, err = donorDB.Exec("INSERT INTO foo.bar (i) VALUES (?), (?), (?), (?)", items...) Expect(err).NotTo(HaveOccurred()) - - if strings.hasPrefix(MYSQL_VERSION, "8.4") { + + if strings.HasPrefix(MySQLVersion, "8.4") { _, err = replicaDB.Exec(`CHANGE REPLICATION SOURCE TO SOURCE_HOST=?, SOURCE_PORT=3306, SOURCE_USER=?, SOURCE_PASSWORD=?, GET_SOURCE_PUBLIC_KEY=1`, donorHost, mocoagent.ReplicationUser, replicationUserPassword) Expect(err).NotTo(HaveOccurred()) @@ -179,7 +180,7 @@ var _ = Describe("health", func() { _, err = donorDB.Exec("SET GLOBAL read_only=0") Expect(err).NotTo(HaveOccurred()) - if strings.hasPrefix(MYSQL_VERSION, "8.4") { + if strings.HasPrefix(MySQLVersion, "8.4") { _, err = replicaDB.Exec(`CHANGE REPLICATION SOURCE TO SOURCE_HOST=?, SOURCE_PORT=3306, SOURCE_USER=?, SOURCE_PASSWORD=?, GET_SOURCE_PUBLIC_KEY=1`, donorHost, mocoagent.ReplicationUser, replicationUserPassword) Expect(err).NotTo(HaveOccurred())