diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 7ce1800..fcca8f5 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -43,7 +43,7 @@ jobs: name: Small Tests strategy: matrix: - mysql-version: ["8.0.18", "8.0.25", "8.0.26", "8.0.27", "8.0.28", "8.0.30", "8.0.31", "8.0.32", "8.0.33", "8.0.34", "8.0.35", "8.0.36", "8.0.37"] + mysql-version: ["8.0.28", "8.0.36", "8.0.37", "8.4.0"] runs-on: ubuntu-22.04 steps: - uses: actions/checkout@v4 diff --git a/Makefile b/Makefile index b358778..877869a 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -MYSQL_VERSION = 8.0.37 +MYSQL_VERSION = 8.4.0 # For Go GOOS := $(shell go env GOOS) diff --git a/server/clone_test.go b/server/clone_test.go index b5d9180..e305f3a 100644 --- a/server/clone_test.go +++ b/server/clone_test.go @@ -3,6 +3,7 @@ package server import ( "context" "path/filepath" + "strings" "time" mocoagent "github.com/cybozu-go/moco-agent" @@ -40,8 +41,13 @@ var _ = Describe("clone", func() { Expect(err).NotTo(HaveOccurred()) _, err = donorDB.Exec("INSERT INTO foo.bar (i) VALUES (100), (101), (102), (103)") Expect(err).NotTo(HaveOccurred()) - _, err = donorDB.Exec(`RESET MASTER`) - Expect(err).NotTo(HaveOccurred()) + if strings.HasPrefix(MySQLVersion, "8.4") { + _, err = donorDB.Exec(`RESET BINARY LOGS AND GTIDS`) + Expect(err).NotTo(HaveOccurred()) + } else { + _, err = donorDB.Exec(`RESET MASTER`) + Expect(err).NotTo(HaveOccurred()) + } _, err = donorDB.Exec("INSERT INTO foo.bar (i) VALUES (200), (800), (10000), (-3)") Expect(err).NotTo(HaveOccurred()) @@ -99,10 +105,16 @@ var _ = Describe("clone", func() { By("starting replication") _, err = donorDB.Exec(`INSERT INTO foo.bar (i) VALUES (9), (999)`) Expect(err).NotTo(HaveOccurred()) - _, err = replicaDB.Exec(`CHANGE MASTER TO MASTER_HOST=?, MASTER_PORT=3306, MASTER_USER=?, MASTER_PASSWORD=?, GET_MASTER_PUBLIC_KEY=1`, - donorHost, mocoagent.ReplicationUser, replicationUserPassword) - Expect(err).NotTo(HaveOccurred()) - _, err = replicaDB.Exec(`START SLAVE`) + if strings.HasPrefix(MySQLVersion, "8.4") { + _, err = replicaDB.Exec(`CHANGE REPLICATION SOURCE TO SOURCE_HOST=?, SOURCE_PORT=3306, SOURCE_USER=?, SOURCE_PASSWORD=?, GET_SOURCE_PUBLIC_KEY=1`, + donorHost, mocoagent.ReplicationUser, replicationUserPassword) + Expect(err).NotTo(HaveOccurred()) + } else { + _, err = replicaDB.Exec(`CHANGE MASTER TO MASTER_HOST=?, MASTER_PORT=3306, MASTER_USER=?, MASTER_PASSWORD=?, GET_MASTER_PUBLIC_KEY=1`, + donorHost, mocoagent.ReplicationUser, replicationUserPassword) + Expect(err).NotTo(HaveOccurred()) + } + _, err = replicaDB.Exec(`START REPLICA`) Expect(err).NotTo(HaveOccurred()) Eventually(func() int { diff --git a/server/initialize.go b/server/initialize.go index 2269bc6..21f85e2 100644 --- a/server/initialize.go +++ b/server/initialize.go @@ -318,8 +318,20 @@ func Init(ctx context.Context, db *sqlx.DB, socket string) error { return err } - if _, err := db.ExecContext(ctx, "RESET MASTER"); err != nil { - return fmt.Errorf("failed to reset master: %w", err) + var version string + err := db.GetContext(ctx, &version, `SELECT SUBSTRING_INDEX(VERSION(), '.', 2)`) + if err != nil { + return fmt.Errorf("failed to get version: %w", err) + } + if version == "8.4" { + if _, err := db.ExecContext(ctx, "RESET BINARY LOGS AND GTIDS"); err != nil { + return fmt.Errorf("failed to reset binary logs and gtids: %w", err) + } + + } else { + if _, err := db.ExecContext(ctx, "RESET MASTER"); err != nil { + return fmt.Errorf("failed to reset master: %w", err) + } } if _, err := db.ExecContext(ctx, "SET GLOBAL super_read_only=ON"); err != nil { return fmt.Errorf("failed to enable super_read_only: %w", err) diff --git a/server/mysql_status.go b/server/mysql_status.go index 73e83a7..5b387fb 100644 --- a/server/mysql_status.go +++ b/server/mysql_status.go @@ -35,26 +35,26 @@ type MySQLPrimaryStatus struct { // MySQLReplicaStatus defines the observed state of a replica type MySQLReplicaStatus struct { - LastIOErrno int `db:"Last_IO_Errno"` - LastIOError string `db:"Last_IO_Error"` - LastSQLErrno int `db:"Last_SQL_Errno"` - LastSQLError string `db:"Last_SQL_Error"` - MasterHost string `db:"Master_Host"` - RetrievedGtidSet string `db:"Retrieved_Gtid_Set"` - ExecutedGtidSet string `db:"Executed_Gtid_Set"` - SlaveIORunning string `db:"Slave_IO_Running"` - SlaveSQLRunning string `db:"Slave_SQL_Running"` + LastIOErrno int `db:"Last_IO_Errno"` + LastIOError string `db:"Last_IO_Error"` + LastSQLErrno int `db:"Last_SQL_Errno"` + LastSQLError string `db:"Last_SQL_Error"` + SourceHost string `db:"Source_Host"` + RetrievedGtidSet string `db:"Retrieved_Gtid_Set"` + ExecutedGtidSet string `db:"Executed_Gtid_Set"` + ReplicaIORunning string `db:"Replica_IO_Running"` + ReplicaSQLRunning string `db:"Replica_SQL_Running"` // All of variables from here are NOT used in MOCO's reconcile - SlaveIOState string `db:"Slave_IO_State"` - MasterUser string `db:"Master_User"` - MasterPort int `db:"Master_Port"` + ReplicaIOState string `db:"Replica_IO_State"` + SourceUser string `db:"Source_User"` + SourcePort int `db:"Source_Port"` ConnectRetry int `db:"Connect_Retry"` - MasterLogFile string `db:"Master_Log_File"` - ReadMasterLogPos int `db:"Read_Master_Log_Pos"` + SourceLogFile string `db:"Source_Log_File"` + ReadSourceLogPos int `db:"Read_Source_Log_Pos"` RelayLogFile string `db:"Relay_Log_File"` RelayLogPos int `db:"Relay_Log_Pos"` - RelayMasterLogFile string `db:"Relay_Master_Log_File"` + RelaySourceLogFile string `db:"Relay_Source_Log_File"` ReplicateDoDB string `db:"Replicate_Do_DB"` ReplicateIgnoreDB string `db:"Replicate_Ignore_DB"` ReplicateDoTable string `db:"Replicate_Do_Table"` @@ -64,38 +64,38 @@ type MySQLReplicaStatus struct { LastErrno int `db:"Last_Errno"` LastError string `db:"Last_Error"` SkipCounter int `db:"Skip_Counter"` - ExecMasterLogPos int `db:"Exec_Master_Log_Pos"` + ExecSourceLogPos int `db:"Exec_Source_Log_Pos"` RelayLogSpace int `db:"Relay_Log_Space"` UntilCondition string `db:"Until_Condition"` UntilLogFile string `db:"Until_Log_File"` UntilLogPos int `db:"Until_Log_Pos"` - MasterSSLAllowed string `db:"Master_SSL_Allowed"` - MasterSSLCAFile string `db:"Master_SSL_CA_File"` - MasterSSLCAPath string `db:"Master_SSL_CA_Path"` - MasterSSLCert string `db:"Master_SSL_Cert"` - MasterSSLCipher string `db:"Master_SSL_Cipher"` - MasterSSLKey string `db:"Master_SSL_Key"` - SecondsBehindMaster sql.NullInt64 `db:"Seconds_Behind_Master"` - MasterSSLVerifyServerCert string `db:"Master_SSL_Verify_Server_Cert"` + SourceSSLAllowed string `db:"Source_SSL_Allowed"` + SourceSSLCAFile string `db:"Source_SSL_CA_File"` + SourceSSLCAPath string `db:"Source_SSL_CA_Path"` + SourceSSLCert string `db:"Source_SSL_Cert"` + SourceSSLCipher string `db:"Source_SSL_Cipher"` + SourceSSLKey string `db:"Source_SSL_Key"` + SecondsBehindSource sql.NullInt64 `db:"Seconds_Behind_Source"` + SourceSSLVerifyServerCert string `db:"Source_SSL_Verify_Server_Cert"` ReplicateIgnoreServerIds string `db:"Replicate_Ignore_Server_Ids"` - MasterServerID int `db:"Master_Server_Id"` - MasterUUID string `db:"Master_UUID"` - MasterInfoFile string `db:"Master_Info_File"` + SourceServerID int `db:"Source_Server_Id"` + SourceUUID string `db:"Source_UUID"` + SourceInfoFile string `db:"Source_Info_File"` SQLDelay int `db:"SQL_Delay"` SQLRemainingDelay sql.NullInt64 `db:"SQL_Remaining_Delay"` - SlaveSQLRunningState string `db:"Slave_SQL_Running_State"` - MasterRetryCount int `db:"Master_Retry_Count"` - MasterBind string `db:"Master_Bind"` + ReplicaSQLRunningState string `db:"Replica_SQL_Running_State"` + SourceRetryCount int `db:"Source_Retry_Count"` + SourceBind string `db:"Source_Bind"` LastIOErrorTimestamp string `db:"Last_IO_Error_Timestamp"` LastSQLErrorTimestamp string `db:"Last_SQL_Error_Timestamp"` - MasterSSLCrl string `db:"Master_SSL_Crl"` - MasterSSLCrlpath string `db:"Master_SSL_Crlpath"` + SourceSSLCrl string `db:"Source_SSL_Crl"` + SourceSSLCrlpath string `db:"Source_SSL_Crlpath"` AutoPosition string `db:"Auto_Position"` ReplicateRewriteDB string `db:"Replicate_Rewrite_DB"` ChannelName string `db:"Channel_Name"` - MasterTLSVersion string `db:"Master_TLS_Version"` - Masterpublickeypath string `db:"Master_public_key_path"` - Getmasterpublickey string `db:"Get_master_public_key"` + SourceTLSVersion string `db:"Source_TLS_Version"` + Sourcepublickeypath string `db:"Source_public_key_path"` + GetSourcepublickey string `db:"Get_Source_public_key"` NetworkNamespace string `db:"Network_Namespace"` } @@ -120,18 +120,37 @@ func (a *Agent) GetMySQLCloneStateStatus(ctx context.Context) (*MySQLCloneStateS return status, nil } +func (a *Agent) IsMySQL84(ctx context.Context) (bool, error) { + var version string + err := a.db.GetContext(ctx, &version, `SELECT SUBSTRING_INDEX(VERSION(), '.', 2)`) + if err != nil { + return false, fmt.Errorf("failed to get version: %w", err) + } + return version == "8.4", nil +} + func (a *Agent) GetMySQLPrimaryStatus(ctx context.Context) (*MySQLPrimaryStatus, error) { status := &MySQLPrimaryStatus{} - if err := a.db.GetContext(ctx, status, `SHOW MASTER STATUS`); err != nil { - return nil, fmt.Errorf("failed to show master status: %w", err) + isMySQL84, err := a.IsMySQL84(ctx) + if err != nil { + return nil, err + } + if isMySQL84 { + if err := a.db.GetContext(ctx, status, `SHOW BINARY LOG STATUS`); err != nil { + return nil, fmt.Errorf("failed to show binary log status: %w", err) + } + } else { + if err := a.db.GetContext(ctx, status, `SHOW MASTER STATUS`); err != nil { + return nil, fmt.Errorf("failed to show master status: %w", err) + } } return status, nil } func (a *Agent) GetMySQLReplicaStatus(ctx context.Context) (*MySQLReplicaStatus, error) { status := &MySQLReplicaStatus{} - if err := a.db.GetContext(ctx, status, `SHOW SLAVE STATUS`); err != nil { - return nil, fmt.Errorf("failed to show slave status: %w", err) + if err := a.db.GetContext(ctx, status, `SHOW REPLICA STATUS`); err != nil { + return nil, fmt.Errorf("failed to show replica status: %w", err) } return status, nil } diff --git a/server/mysql_test.go b/server/mysql_test.go index 363e4fd..18f5956 100644 --- a/server/mysql_test.go +++ b/server/mysql_test.go @@ -35,7 +35,7 @@ var tmpBaseDir = path.Join(os.TempDir(), "moco-agent-test-server") var MySQLVersion = func() string { if ver := os.Getenv("MYSQL_VERSION"); ver == "" { - os.Setenv("MYSQL_VERSION", "8.0.28") + os.Setenv("MYSQL_VERSION", "8.4.0") } return os.Getenv("MYSQL_VERSION") }() diff --git a/server/mysqld_health.go b/server/mysqld_health.go index 64e054e..41a7f5b 100644 --- a/server/mysqld_health.go +++ b/server/mysqld_health.go @@ -59,7 +59,7 @@ func (a *Agent) MySQLDReady(w http.ResponseWriter, r *http.Request) { return } - if replicaStatus.SlaveIORunning != "Yes" || replicaStatus.SlaveSQLRunning != "Yes" { + if replicaStatus.ReplicaIORunning != "Yes" || replicaStatus.ReplicaSQLRunning != "Yes" { a.logger.Info("replication threads are stopped") http.Error(w, "replication thread are stopped", http.StatusServiceUnavailable) return diff --git a/server/mysqld_health_test.go b/server/mysqld_health_test.go index 6b991c8..b592fb8 100644 --- a/server/mysqld_health_test.go +++ b/server/mysqld_health_test.go @@ -4,6 +4,7 @@ import ( "net/http" "net/http/httptest" "path/filepath" + "strings" "time" mocoagent "github.com/cybozu-go/moco-agent" @@ -105,10 +106,16 @@ var _ = Describe("health", func() { _, err = donorDB.Exec("INSERT INTO foo.bar (i) VALUES (?), (?), (?), (?)", items...) Expect(err).NotTo(HaveOccurred()) - _, err = replicaDB.Exec(`CHANGE MASTER TO MASTER_HOST=?, MASTER_PORT=3306, MASTER_USER=?, MASTER_PASSWORD=?, GET_MASTER_PUBLIC_KEY=1`, - donorHost, mocoagent.ReplicationUser, replicationUserPassword) - Expect(err).NotTo(HaveOccurred()) - _, err = replicaDB.Exec(`START SLAVE`) + if strings.HasPrefix(MySQLVersion, "8.4") { + _, err = replicaDB.Exec(`CHANGE REPLICATION SOURCE TO SOURCE_HOST=?, SOURCE_PORT=3306, SOURCE_USER=?, SOURCE_PASSWORD=?, GET_SOURCE_PUBLIC_KEY=1`, + donorHost, mocoagent.ReplicationUser, replicationUserPassword) + Expect(err).NotTo(HaveOccurred()) + } else { + _, err = replicaDB.Exec(`CHANGE MASTER TO MASTER_HOST=?, MASTER_PORT=3306, MASTER_USER=?, MASTER_PASSWORD=?, GET_MASTER_PUBLIC_KEY=1`, + donorHost, mocoagent.ReplicationUser, replicationUserPassword) + Expect(err).NotTo(HaveOccurred()) + } + _, err = replicaDB.Exec(`START REPLICA`) Expect(err).NotTo(HaveOccurred()) By("checking readiness") @@ -173,10 +180,16 @@ var _ = Describe("health", func() { _, err = donorDB.Exec("SET GLOBAL read_only=0") Expect(err).NotTo(HaveOccurred()) - _, err = replicaDB.Exec(`CHANGE MASTER TO MASTER_HOST=?, MASTER_PORT=3306, MASTER_USER=?, MASTER_PASSWORD=?, GET_MASTER_PUBLIC_KEY=1`, - donorHost, mocoagent.ReplicationUser, replicationUserPassword) - Expect(err).NotTo(HaveOccurred()) - _, err = replicaDB.Exec(`START SLAVE`) + if strings.HasPrefix(MySQLVersion, "8.4") { + _, err = replicaDB.Exec(`CHANGE REPLICATION SOURCE TO SOURCE_HOST=?, SOURCE_PORT=3306, SOURCE_USER=?, SOURCE_PASSWORD=?, GET_SOURCE_PUBLIC_KEY=1`, + donorHost, mocoagent.ReplicationUser, replicationUserPassword) + Expect(err).NotTo(HaveOccurred()) + } else { + _, err = replicaDB.Exec(`CHANGE MASTER TO MASTER_HOST=?, MASTER_PORT=3306, MASTER_USER=?, MASTER_PASSWORD=?, GET_MASTER_PUBLIC_KEY=1`, + donorHost, mocoagent.ReplicationUser, replicationUserPassword) + Expect(err).NotTo(HaveOccurred()) + } + _, err = replicaDB.Exec(`START REPLICA`) Expect(err).NotTo(HaveOccurred()) By("checking readiness")