Skip to content

Commit

Permalink
issue-686 Replace 'master/slave' with 'source/replica'
Browse files Browse the repository at this point in the history
  • Loading branch information
shunki-fujita committed Jun 7, 2024
1 parent cadbeb5 commit ef35d22
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 59 deletions.
27 changes: 22 additions & 5 deletions pkg/bkop/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,33 @@ import (
"fmt"
)

func (o operator) GetServerStatus(ctx context.Context, st *ServerStatus) error {
ms := &showMasterStatus{}
if err := o.db.GetContext(ctx, ms, `SHOW MASTER STATUS`); err != nil {
return fmt.Errorf("failed to show master status: %w", err)
func (o operator) IsMySQL84(ctx context.Context) (bool, error) {
var version string
if err := o.db.GetContext(ctx, &version, `SELECT SUBSTRING_INDEX(VERSION(), '.', 2)`); err != nil {
return false, fmt.Errorf("failed to get version: %w", err)
}
return version == "8.4", nil
}

func (o operator) GetServerStatus(ctx context.Context, st *ServerStatus) error {
bls := &showBinaryLogStatus{}
isMySQL84, err := o.IsMySQL84(ctx)
if err != nil {
return fmt.Errorf("failed to check MySQL version: %w", err)
}
if isMySQL84 {
if err := o.db.GetContext(ctx, bls, `SHOW BINARY LOG STATUS`); err != nil {
return fmt.Errorf("failed to show binary log status: %w", err)
}
} else {
if err := o.db.GetContext(ctx, bls, `SHOW MASTER STATUS`); err != nil {
return fmt.Errorf("failed to show master status: %w", err)
}
}
if err := o.db.GetContext(ctx, st, `SELECT @@super_read_only, @@server_uuid`); err != nil {
return fmt.Errorf("failed to get global variables: %w", err)
}

st.CurrentBinlog = ms.File
st.CurrentBinlog = bls.File
return nil
}
2 changes: 1 addition & 1 deletion pkg/bkop/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ type ServerStatus struct {
CurrentBinlog string
}

type showMasterStatus struct {
type showBinaryLogStatus struct {
File string `db:"File"`
Position int64 `db:"Position"`
BinlogDoDB string `db:"Binlog_Do_DB"`
Expand Down
13 changes: 11 additions & 2 deletions pkg/dbop/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,19 @@ import (
const semiSyncMasterTimeout = 24 * 60 * 60 * 1000

func (o *operator) ConfigureReplica(ctx context.Context, primary AccessInfo, semisync bool) error {
if _, err := o.db.ExecContext(ctx, `STOP SLAVE`); err != nil {
if _, err := o.db.ExecContext(ctx, `STOP REPLICA`); err != nil {
return fmt.Errorf("failed to stop replica: %w", err)
}
if _, err := o.db.NamedExecContext(ctx, `CHANGE MASTER TO MASTER_HOST = :Host, MASTER_PORT = :Port, MASTER_USER = :User, MASTER_PASSWORD = :Password, MASTER_AUTO_POSITION = 1, GET_MASTER_PUBLIC_KEY = 1`, primary); err != nil {
var cmd string
if isMySQL84, err := o.IsMySQL84(ctx); err != nil {

Check failure on line 15 in pkg/dbop/replication.go

View workflow job for this annotation

GitHub Actions / Small tests

isMySQL84 declared and not used

Check failure on line 15 in pkg/dbop/replication.go

View workflow job for this annotation

GitHub Actions / Small tests

o.IsMySQL84 undefined (type *operator has no field or method IsMySQL84)

Check failure on line 15 in pkg/dbop/replication.go

View workflow job for this annotation

GitHub Actions / Small tests

isMySQL84 declared and not used

Check failure on line 15 in pkg/dbop/replication.go

View workflow job for this annotation

GitHub Actions / Small tests

o.IsMySQL84 undefined (type *operator has no field or method IsMySQL84)

Check failure on line 15 in pkg/dbop/replication.go

View workflow job for this annotation

GitHub Actions / Integration tests with MySQL (8.0.37)

isMySQL84 declared and not used

Check failure on line 15 in pkg/dbop/replication.go

View workflow job for this annotation

GitHub Actions / Integration tests with MySQL (8.0.37)

o.IsMySQL84 undefined (type *operator has no field or method IsMySQL84)
return fmt.Errorf("failed to check MySQL version: %w", err)
}
if isMySQL84 {

Check failure on line 18 in pkg/dbop/replication.go

View workflow job for this annotation

GitHub Actions / Small tests

undefined: isMySQL84

Check failure on line 18 in pkg/dbop/replication.go

View workflow job for this annotation

GitHub Actions / Small tests

undefined: isMySQL84

Check failure on line 18 in pkg/dbop/replication.go

View workflow job for this annotation

GitHub Actions / Integration tests with MySQL (8.0.37)

undefined: isMySQL84
cmd = `CHANGE REPLICATION SOURCE TO SOURCE_HOST = :Host, SOURCE_PORT = :Port, SOURCE_USER = :User, SOURCE_PASSWORD = :Password, SOURCE_AUTO_POSITION = 1, GET_SOURCE_PUBLIC_KEY = 1`
} else {
cmd = `CHANGE MASTER TO MASTER_HOST = :Host, MASTER_PORT = :Port, MASTER_USER = :User, MASTER_PASSWORD = :Password, MASTER_AUTO_POSITION = 1, GET_MASTER_PUBLIC_KEY = 1`
}
if _, err := o.db.NamedExecContext(ctx, cmd, primary); err != nil {
return fmt.Errorf("failed to change primary: %w", err)
}
if _, err := o.db.ExecContext(ctx, "SET GLOBAL rpl_semi_sync_slave_enabled=?", semisync); err != nil {
Expand Down
8 changes: 6 additions & 2 deletions pkg/dbop/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ var startupMycnf = map[string]string{
"collation_server": "utf8mb4_unicode_ci",
"default_time_zone": "+0:00",
"disabled_storage_engines": "MyISAM",
"skip_slave_start": "ON",
"skip_replica_start": "ON",
"enforce_gtid_consistency": "ON",
"gtid_mode": "ON",
}
Expand Down Expand Up @@ -98,7 +98,11 @@ func ConfigureMySQLOnDocker(pwd *password.MySQLPassword, port int) error {
}

// clear executed_gtid_set
db.MustExec(`RESET MASTER`)
if strings.HasPrefix(testMySQLImage, "mysql:8.4") {
db.MustExec(`RESET BINARY LOGS AND GTIDS`)
} else {
db.MustExec(`RESET MASTER`)
}
db.Close()
return nil
}
Expand Down
98 changes: 49 additions & 49 deletions pkg/dbop/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,29 +25,29 @@ var statusGlobalVars = []string{
"@@gtid_executed",
"@@read_only",
"@@super_read_only",
"@@rpl_semi_sync_master_wait_for_slave_count",
"@@rpl_semi_sync_master_enabled",
"@@rpl_semi_sync_slave_enabled",
"@@rpl_semi_sync_source_wait_for_replica_count",
"@@rpl_semi_sync_source_enabled",
"@@rpl_semi_sync_replica_enabled",
}

// GlobalVariables defines the observed global variable values of a MySQL instance
type GlobalVariables struct {
UUID string `db:"@@server_uuid"`
ExecutedGTID string `db:"@@gtid_executed"`
ReadOnly bool `db:"@@read_only"`
SuperReadOnly bool `db:"@@super_read_only"`
WaitForSlaveCount int `db:"@@rpl_semi_sync_master_wait_for_slave_count"`
SemiSyncMasterEnabled bool `db:"@@rpl_semi_sync_master_enabled"`
SemiSyncSlaveEnabled bool `db:"@@rpl_semi_sync_slave_enabled"`
UUID string `db:"@@server_uuid"`
ExecutedGTID string `db:"@@gtid_executed"`
ReadOnly bool `db:"@@read_only"`
SuperReadOnly bool `db:"@@super_read_only"`
WaitForReplicaCount int `db:"@@rpl_semi_sync_source_wait_for_replica_count"`
SemiSyncSourceEnabled bool `db:"@@rpl_semi_sync_source_enabled"`
SemiSyncReplicaEnabled bool `db:"@@rpl_semi_sync_replica_enabled"`
}

// ReplicaHost defines the columns from `SHOW SLAVE HOSTS`
// ReplicaHost defines the columns from `SHOW REPLICAS`
type ReplicaHost struct {
ServerID int32 `db:"Server_id"`
Host string `db:"Host"`
Port int `db:"Port"`
SourceID int32 `db:"Master_id"`
ReplicaUUID string `db:"Slave_UUID"`
SourceID int32 `db:"Source_id"`
ReplicaUUID string `db:"Replica_UUID"`

// the following fields don't appear normally
User string `db:"User"`
Expand All @@ -56,26 +56,26 @@ type ReplicaHost struct {

// ReplicaStatus defines the observed state of a replica
type ReplicaStatus struct {
LastIoErrno int `db:"Last_IO_Errno"`
LastIoError string `db:"Last_IO_Error"`
LastSQLErrno int `db:"Last_SQL_Errno"`
LastSQLError string `db:"Last_SQL_Error"`
MasterHost string `db:"Master_Host"`
RetrievedGtidSet string `db:"Retrieved_Gtid_Set"`
ExecutedGtidSet string `db:"Executed_Gtid_Set"`
SlaveIORunning string `db:"Slave_IO_Running"`
SlaveSQLRunning string `db:"Slave_SQL_Running"`
LastIoErrno int `db:"Last_IO_Errno"`
LastIoError string `db:"Last_IO_Error"`
LastSQLErrno int `db:"Last_SQL_Errno"`
LastSQLError string `db:"Last_SQL_Error"`
SourceHost string `db:"Source_Host"`
RetrievedGtidSet string `db:"Retrieved_Gtid_Set"`
ExecutedGtidSet string `db:"Executed_Gtid_Set"`
ReplicaIORunning string `db:"Replica_IO_Running"`
ReplicaSQLRunning string `db:"Replica_SQL_Running"`

// All of variables from here are NOT used in MOCO's reconcile
SlaveIOState string `db:"Slave_IO_State"`
MasterUser string `db:"Master_User"`
MasterPort int `db:"Master_Port"`
ReplicaIOState string `db:"Replica_IO_State"`
SourceUser string `db:"Source_User"`
SourcePort int `db:"Source_Port"`
ConnectRetry int `db:"Connect_Retry"`
MasterLogFile string `db:"Master_Log_File"`
ReadMasterLogPos int `db:"Read_Master_Log_Pos"`
SourceLogFile string `db:"Source_Log_File"`
ReadSourceLogPos int `db:"Read_Source_Log_Pos"`
RelayLogFile string `db:"Relay_Log_File"`
RelayLogPos int `db:"Relay_Log_Pos"`
RelayMasterLogFile string `db:"Relay_Master_Log_File"`
RelaySourceLogFile string `db:"Relay_Source_Log_File"`
ReplicateDoDB string `db:"Replicate_Do_DB"`
ReplicateIgnoreDB string `db:"Replicate_Ignore_DB"`
ReplicateDoTable string `db:"Replicate_Do_Table"`
Expand All @@ -85,46 +85,46 @@ type ReplicaStatus struct {
LastErrno int `db:"Last_Errno"`
LastError string `db:"Last_Error"`
SkipCounter int `db:"Skip_Counter"`
ExecMasterLogPos int `db:"Exec_Master_Log_Pos"`
ExecSourceLogPos int `db:"Exec_Source_Log_Pos"`
RelayLogSpace int `db:"Relay_Log_Space"`
UntilCondition string `db:"Until_Condition"`
UntilLogFile string `db:"Until_Log_File"`
UntilLogPos int `db:"Until_Log_Pos"`
MasterSSLAllowed string `db:"Master_SSL_Allowed"`
MasterSSLCAFile string `db:"Master_SSL_CA_File"`
MasterSSLCAPath string `db:"Master_SSL_CA_Path"`
MasterSSLCert string `db:"Master_SSL_Cert"`
MasterSSLCipher string `db:"Master_SSL_Cipher"`
MasterSSLKey string `db:"Master_SSL_Key"`
SecondsBehindMaster sql.NullInt64 `db:"Seconds_Behind_Master"`
MasterSSLVerifyServerCert string `db:"Master_SSL_Verify_Server_Cert"`
SourceSSLAllowed string `db:"Source_SSL_Allowed"`
SourceSSLCAFile string `db:"Source_SSL_CA_File"`
SourceSSLCAPath string `db:"Source_SSL_CA_Path"`
SourceSSLCert string `db:"Source_SSL_Cert"`
SourceSSLCipher string `db:"Source_SSL_Cipher"`
SourceSSLKey string `db:"Source_SSL_Key"`
SecondsBehindSource sql.NullInt64 `db:"Seconds_Behind_Source"`
SourceSSLVerifyServerCert string `db:"Source_SSL_Verify_Server_Cert"`
ReplicateIgnoreServerIds string `db:"Replicate_Ignore_Server_Ids"`
MasterServerID int `db:"Master_Server_Id"`
MasterUUID string `db:"Master_UUID"`
MasterInfoFile string `db:"Master_Info_File"`
SourceServerID int `db:"Source_Server_Id"`
SourceUUID string `db:"Source_UUID"`
SourceInfoFile string `db:"Source_Info_File"`
SQLDelay int `db:"SQL_Delay"`
SQLRemainingDelay sql.NullInt64 `db:"SQL_Remaining_Delay"`
SlaveSQLRunningState string `db:"Slave_SQL_Running_State"`
MasterRetryCount int `db:"Master_Retry_Count"`
MasterBind string `db:"Master_Bind"`
ReplicaSQLRunningState string `db:"Replica_SQL_Running_State"`
SourceRetryCount int `db:"Source_Retry_Count"`
SourceBind string `db:"Source_Bind"`
LastIOErrorTimestamp string `db:"Last_IO_Error_Timestamp"`
LastSQLErrorTimestamp string `db:"Last_SQL_Error_Timestamp"`
MasterSSLCrl string `db:"Master_SSL_Crl"`
MasterSSLCrlpath string `db:"Master_SSL_Crlpath"`
SourceSSLCrl string `db:"Source_SSL_Crl"`
SourceSSLCrlpath string `db:"Source_SSL_Crlpath"`
AutoPosition string `db:"Auto_Position"`
ReplicateRewriteDB string `db:"Replicate_Rewrite_DB"`
ChannelName string `db:"Channel_Name"`
MasterTLSVersion string `db:"Master_TLS_Version"`
Masterpublickeypath string `db:"Master_public_key_path"`
Getmasterpublickey string `db:"Get_master_public_key"`
SourceTLSVersion string `db:"Source_TLS_Version"`
Sourcepublickeypath string `db:"Source_public_key_path"`
Getsourcepublickey string `db:"Get_source_public_key"`
NetworkNamespace string `db:"Network_Namespace"`
}

func (rs *ReplicaStatus) IsRunning() bool {
if rs == nil {
return false
}
return rs.SlaveIORunning == "Yes" && rs.SlaveSQLRunning == "Yes"
return rs.ReplicaIORunning == "Yes" && rs.ReplicaSQLRunning == "Yes"
}

// CloneStatus defines the observed clone status of a MySQL instance
Expand Down

0 comments on commit ef35d22

Please sign in to comment.