Skip to content

Commit

Permalink
Adjustments and fixes
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Jan 7, 2025
1 parent 85389bb commit 740bc3f
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 66 deletions.
6 changes: 4 additions & 2 deletions go/mysql/endtoend/replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/mysql/sqlerror"

"vitess.io/vitess/go/mysql"
Expand Down Expand Up @@ -57,8 +58,9 @@ func connectForReplication(t *testing.T, rbr bool) (*mysql.Conn, mysql.BinlogFor
status, err := conn.ShowPrimaryStatus()
require.NoError(t, err, "retrieving primary status failed: %v", err)

file := status.FilePosition.File
position := status.FilePosition.Pos
filePos := status.FilePosition.GTIDSet.(replication.FilePosGTID)
file := filePos.File
position := filePos.Pos

// Tell the server that we understand the format of events
// that will be used if binlog_checksum is enabled on the server.
Expand Down
10 changes: 5 additions & 5 deletions go/mysql/replication/primary_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ import (
type PrimaryStatus struct {
// Position represents the server's GTID based position.
Position Position
// FilePosition represents the server's current binary log
// file and position.
FilePosition BinlogFilePos
// FilePosition represents the server's file/pos based replication
// psuedo GTID position.
FilePosition Position
// ServerUUID is the UUID of the server.
ServerUUID string
}
Expand All @@ -39,7 +39,7 @@ type PrimaryStatus struct {
func PrimaryStatusToProto(s PrimaryStatus) *replicationdatapb.PrimaryStatus {
return &replicationdatapb.PrimaryStatus{
Position: EncodePosition(s.Position),
FilePosition: s.FilePosition.String(),
FilePosition: EncodePosition(s.FilePosition),
ServerUuid: s.ServerUUID,
}
}
Expand All @@ -64,7 +64,7 @@ func ParsePrimaryStatus(fields map[string]string) PrimaryStatus {
file := fields["File"]
if file != "" && fileExecPosStr != "" {
var err error
status.FilePosition, err = ParseBinlogFilePos(fmt.Sprintf("%s:%s", file, fileExecPosStr))
status.FilePosition.GTIDSet, err = ParseFilePosGTIDSet(fmt.Sprintf("%s:%s", file, fileExecPosStr))
if err != nil {
log.Warningf("Error parsing GTID set %s:%s: %v", file, fileExecPosStr, err)
}
Expand Down
53 changes: 27 additions & 26 deletions go/mysql/replication/replication_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,22 @@ type ReplicationStatus struct {
// it is the executed GTID set. For file replication implementation, it is same as
// FilePosition
Position Position
// RelayLogPosition is the relay log file and position that the replica would be
// at if it were to finish executing everything that's currently in its relay log.
// RelayLogPosition is the GTID Position that the replica would be at if it
// were to finish executing everything that's currently in its relay log.
// However, some MySQL flavors don't expose this information,
// in which case RelayLogPosition.IsZero() will be true.
// If ReplicationLagUnknown is true then we should not rely on the seconds
// behind value and we can instead try to calculate the lag ourselves when
// appropriate. For MySQL GTID replication implementation it is the union of
// executed GTID set and retrieved GTID set. For file replication implementation,
// it is same as RelayLogSourceBinlogEquivalentPosition
RelayLogPosition Position
// FilePosition stores the position of the source tablets binary log
// upto which the SQL thread of the replica has run.
FilePosition BinlogFilePos
// FilePosition represents the server's file/pos based replication psuedo GTID position.
FilePosition Position
// RelayLogSourceBinlogEquivalentPosition stores the position of the source tablets binary log
// upto which the IO thread has read and added to the relay log
RelayLogSourceBinlogEquivalentPosition BinlogFilePos
// RelayLogFilePosition stores the position in the relay log file
RelayLogSourceBinlogEquivalentPosition Position
// RelayLogFilePosition stores the binlog file position in the relay log file.
RelayLogFilePosition BinlogFilePos
SourceServerID uint32
IOState ReplicationState
Expand Down Expand Up @@ -90,8 +96,8 @@ func ReplicationStatusToProto(s ReplicationStatus) *replicationdatapb.Status {
replstatuspb := &replicationdatapb.Status{
Position: EncodePosition(s.Position),
RelayLogPosition: EncodePosition(s.RelayLogPosition),
FilePosition: s.FilePosition.String(),
RelayLogSourceBinlogEquivalentPosition: s.RelayLogSourceBinlogEquivalentPosition.String(),
FilePosition: EncodePosition(s.FilePosition),
RelayLogSourceBinlogEquivalentPosition: EncodePosition(s.RelayLogSourceBinlogEquivalentPosition),
SourceServerId: s.SourceServerID,
ReplicationLagSeconds: s.ReplicationLagSeconds,
ReplicationLagUnknown: s.ReplicationLagUnknown,
Expand Down Expand Up @@ -124,11 +130,11 @@ func ProtoToReplicationStatus(s *replicationdatapb.Status) ReplicationStatus {
if err != nil {
panic(vterrors.Wrapf(err, "cannot decode RelayLogPosition"))
}
filePos, err := ParseBinlogFilePos(s.FilePosition)
filePos, err := DecodePosition(s.FilePosition)
if err != nil {
panic(vterrors.Wrapf(err, "cannot decode FilePosition"))
}
fileRelayPos, err := ParseBinlogFilePos(s.RelayLogSourceBinlogEquivalentPosition)
fileRelayPos, err := DecodePosition(s.RelayLogSourceBinlogEquivalentPosition)
if err != nil {
panic(vterrors.Wrapf(err, "cannot decode RelayLogSourceBinlogEquivalentPosition"))
}
Expand Down Expand Up @@ -263,23 +269,18 @@ func ParseMariadbReplicationStatus(resultMap map[string]string) (ReplicationStat
func ParseFilePosReplicationStatus(resultMap map[string]string) (ReplicationStatus, error) {
status := ParseReplicationStatus(resultMap, false)

var err error
status.Position, err = status.FilePosition.ConvertToFlavorPosition()
if err != nil {
return status, err
}
status.RelayLogPosition, err = status.RelayLogSourceBinlogEquivalentPosition.ConvertToFlavorPosition()
status.Position = status.FilePosition
status.RelayLogPosition = status.RelayLogSourceBinlogEquivalentPosition

return status, err
return status, nil
}

func ParseFilePosPrimaryStatus(resultMap map[string]string) (PrimaryStatus, error) {
status := ParsePrimaryStatus(resultMap)

var err error
status.Position, err = status.FilePosition.ConvertToFlavorPosition()
status.Position = status.FilePosition

return status, err
return status, nil
}

// ParseReplicationStatus parses the common (non-flavor-specific) fields of ReplicationStatus
Expand Down Expand Up @@ -346,18 +347,18 @@ func ParseReplicationStatus(fields map[string]string, replica bool) ReplicationS
executedPosStr := fields[execSourceLogPosField]
file := fields[relaySourceLogFileField]
if file != "" && executedPosStr != "" {
status.FilePosition, err = ParseBinlogFilePos(fmt.Sprintf("%s:%s", file, executedPosStr))
status.FilePosition.GTIDSet, err = ParseFilePosGTIDSet(fmt.Sprintf("%s:%s", file, executedPosStr))
if err != nil {
log.Warningf("Error parsing binlog file and position %s:%s: %v", file, executedPosStr, err)
log.Warningf("Error parsing GTID set %s:%s: %v", file, executedPosStr, err)
}
}

readPosStr := fields[readSourceLogPosField]
file = fields[sourceLogFileField]
if file != "" && readPosStr != "" {
status.RelayLogSourceBinlogEquivalentPosition, err = ParseBinlogFilePos(fmt.Sprintf("%s:%s", file, readPosStr))
status.RelayLogSourceBinlogEquivalentPosition.GTIDSet, err = ParseFilePosGTIDSet(fmt.Sprintf("%s:%s", file, readPosStr))
if err != nil {
log.Warningf("Error parsing relay log file and position %s:%s: %v", file, readPosStr, err)
log.Warningf("Error parsing GTID set %s:%s: %v", file, readPosStr, err)
}
}

Expand All @@ -366,7 +367,7 @@ func ParseReplicationStatus(fields map[string]string, replica bool) ReplicationS
if file != "" && relayPosStr != "" {
status.RelayLogFilePosition, err = ParseBinlogFilePos(fmt.Sprintf("%s:%s", file, relayPosStr))
if err != nil {
log.Warningf("Error parsing relay log file and position %s:%s: %v", file, relayPosStr, err)
log.Warningf("Error parsing GTID set %s:%s: %v", file, relayPosStr, err)
}
}
return status
Expand Down
32 changes: 13 additions & 19 deletions go/mysql/replication/replication_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,12 @@ func TestMysqlShouldGetPosition(t *testing.T) {
sid, _ := ParseSID("3e11fa47-71ca-11e1-9e33-c80aa9429562")
want := PrimaryStatus{
Position: Position{GTIDSet: Mysql56GTIDSet{sid: []interval{{start: 1, end: 5}}}},
FilePosition: BinlogFilePos{File: "source-bin.000003", Pos: 1307},
FilePosition: Position{GTIDSet: FilePosGTID{File: "source-bin.000003", Pos: 1307}},
}
got, err := ParseMysqlPrimaryStatus(resultMap)
require.NoError(t, err)
assert.Equalf(t, got.Position.GTIDSet.String(), want.Position.GTIDSet.String(), "got Position: %v; want Position: %v", got.Position.GTIDSet, want.Position.GTIDSet)
assert.Equalf(t, got.FilePosition.String(), want.FilePosition.String(), "got FilePosition: %v; want FilePosition: %v", got.FilePosition, want.FilePosition)
assert.Equalf(t, got.Position.GTIDSet, want.Position.GTIDSet, "got Position: %v; want Position: %v", got.Position.GTIDSet, want.Position.GTIDSet)
assert.Equalf(t, got.FilePosition, want.FilePosition, "got FilePosition: %v; want FilePosition: %v", got.FilePosition, want.FilePosition)
}

func TestMysqlRetrieveMasterServerId(t *testing.T) {
Expand Down Expand Up @@ -179,8 +179,8 @@ func TestMysqlRetrieveFileBasedPositions(t *testing.T) {
}

want := ReplicationStatus{
FilePosition: BinlogFilePos{File: "master-bin.000002", Pos: 1307},
RelayLogSourceBinlogEquivalentPosition: BinlogFilePos{File: "master-bin.000003", Pos: 1308},
FilePosition: Position{GTIDSet: FilePosGTID{File: "master-bin.000002", Pos: 1307}},
RelayLogSourceBinlogEquivalentPosition: Position{GTIDSet: FilePosGTID{File: "master-bin.000003", Pos: 1308}},
RelayLogFilePosition: BinlogFilePos{File: "relay-bin.000004", Pos: 1309},
}
got, err := ParseMysqlReplicationStatus(resultMap, false)
Expand Down Expand Up @@ -255,8 +255,8 @@ func TestMariadbRetrieveFileBasedPositions(t *testing.T) {
}

want := ReplicationStatus{
FilePosition: BinlogFilePos{File: "master-bin.000002", Pos: 1307},
RelayLogSourceBinlogEquivalentPosition: BinlogFilePos{File: "master-bin.000003", Pos: 1308},
FilePosition: Position{GTIDSet: FilePosGTID{File: "master-bin.000002", Pos: 1307}},
RelayLogSourceBinlogEquivalentPosition: Position{GTIDSet: FilePosGTID{File: "master-bin.000003", Pos: 1308}},
RelayLogFilePosition: BinlogFilePos{File: "relay-bin.000004", Pos: 1309},
}
got, err := ParseMariadbReplicationStatus(resultMap)
Expand Down Expand Up @@ -303,8 +303,8 @@ func TestFilePosRetrieveExecutedPosition(t *testing.T) {
want := ReplicationStatus{
Position: Position{GTIDSet: FilePosGTID{File: "master-bin.000002", Pos: 1307}},
RelayLogPosition: Position{GTIDSet: FilePosGTID{File: "master-bin.000003", Pos: 1308}},
FilePosition: BinlogFilePos{File: "master-bin.000002", Pos: 1307},
RelayLogSourceBinlogEquivalentPosition: BinlogFilePos{File: "master-bin.000003", Pos: 1308},
FilePosition: Position{GTIDSet: FilePosGTID{File: "master-bin.000002", Pos: 1307}},
RelayLogSourceBinlogEquivalentPosition: Position{GTIDSet: FilePosGTID{File: "master-bin.000003", Pos: 1308}},
RelayLogFilePosition: BinlogFilePos{File: "relay-bin.000004", Pos: 1309},
}
got, err := ParseFilePosReplicationStatus(resultMap)
Expand All @@ -314,12 +314,8 @@ func TestFilePosRetrieveExecutedPosition(t *testing.T) {
assert.Equalf(t, got.RelayLogFilePosition, want.RelayLogFilePosition, "got RelayLogFilePosition: %v; want RelayLogFilePosition: %v", got.RelayLogFilePosition, want.RelayLogFilePosition)
assert.Equalf(t, got.FilePosition, want.FilePosition, "got FilePosition: %v; want FilePosition: %v", got.FilePosition, want.FilePosition)
assert.Equalf(t, got.RelayLogSourceBinlogEquivalentPosition, want.RelayLogSourceBinlogEquivalentPosition, "got RelayLogSourceBinlogEquivalentPosition: %v; want RelayLogSourceBinlogEquivalentPosition: %v", got.RelayLogSourceBinlogEquivalentPosition, want.RelayLogSourceBinlogEquivalentPosition)
filePos, err := got.FilePosition.ConvertToFlavorPosition()
require.NoError(t, err)
assert.Equalf(t, got.Position.GTIDSet, filePos.GTIDSet, "FilePosition and Position don't match when they should for the FilePos flavor")
filePos, err = got.RelayLogSourceBinlogEquivalentPosition.ConvertToFlavorPosition()
require.NoError(t, err)
assert.Equalf(t, got.RelayLogPosition.GTIDSet, filePos.GTIDSet, "RelayLogPosition and RelayLogSourceBinlogEquivalentPosition don't match when they should for the FilePos flavor")
assert.Equalf(t, got.Position.GTIDSet, got.FilePosition.GTIDSet, "FilePosition and Position don't match when they should for the FilePos flavor")
assert.Equalf(t, got.RelayLogPosition.GTIDSet, got.RelayLogSourceBinlogEquivalentPosition.GTIDSet, "RelayLogPosition and RelayLogSourceBinlogEquivalentPosition don't match when they should for the FilePos flavor")
}

func TestFilePosShouldGetPosition(t *testing.T) {
Expand All @@ -330,13 +326,11 @@ func TestFilePosShouldGetPosition(t *testing.T) {

want := PrimaryStatus{
Position: Position{GTIDSet: FilePosGTID{File: "source-bin.000003", Pos: 1307}},
FilePosition: BinlogFilePos{File: "source-bin.000003", Pos: 1307},
FilePosition: Position{GTIDSet: FilePosGTID{File: "source-bin.000003", Pos: 1307}},
}
got, err := ParseFilePosPrimaryStatus(resultMap)
require.NoError(t, err)
assert.Equalf(t, got.Position.GTIDSet, want.Position.GTIDSet, "got Position: %v; want Position: %v", got.Position.GTIDSet, want.Position.GTIDSet)
assert.Equalf(t, got.FilePosition, want.FilePosition, "got FilePosition: %v; want FilePosition: %v", got.FilePosition, want.FilePosition)
filePos, err := got.FilePosition.ConvertToFlavorPosition()
require.NoError(t, err)
assert.Equalf(t, got.Position.GTIDSet, filePos.GTIDSet, "FilePosition and Position don't match when they should for the FilePos flavor")
assert.Equalf(t, got.Position.GTIDSet, got.FilePosition.GTIDSet, "FilePosition and Position don't match when they should for the FilePos flavor")
}
10 changes: 6 additions & 4 deletions go/vt/mysqlctl/fakemysqldaemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,19 +315,20 @@ func (fmd *FakeMysqlDaemon) ReplicationStatus(ctx context.Context) (replication.
}
fmd.mu.Lock()
defer fmd.mu.Unlock()
filePos, err := fmd.CurrentSourceFilePosition.ConvertToFlavorPosition()
return replication.ReplicationStatus{
Position: fmd.CurrentPrimaryPosition,
FilePosition: fmd.CurrentSourceFilePosition,
FilePosition: filePos,
RelayLogPosition: fmd.CurrentRelayLogPosition,
RelayLogSourceBinlogEquivalentPosition: fmd.CurrentSourceFilePosition,
RelayLogSourceBinlogEquivalentPosition: filePos,
ReplicationLagSeconds: fmd.ReplicationLagSeconds,
// Implemented as AND to avoid changing all tests that were
// previously using Replicating = false.
IOState: replication.ReplicationStatusToState(fmt.Sprintf("%v", fmd.Replicating && fmd.IOThreadRunning)),
SQLState: replication.ReplicationStatusToState(fmt.Sprintf("%v", fmd.Replicating)),
SourceHost: fmd.CurrentSourceHost,
SourcePort: fmd.CurrentSourcePort,
}, nil
}, err
}

// PrimaryStatus is part of the MysqlDaemon interface.
Expand All @@ -338,9 +339,10 @@ func (fmd *FakeMysqlDaemon) PrimaryStatus(ctx context.Context) (replication.Prim
return replication.PrimaryStatus{}, fmd.PrimaryStatusError
}
serverUUID, _ := fmd.GetServerUUID(ctx)
filePos, _ := fmd.CurrentSourceFilePosition.ConvertToFlavorPosition()
return replication.PrimaryStatus{
Position: fmd.CurrentPrimaryPosition,
FilePosition: fmd.CurrentSourceFilePosition,
FilePosition: filePos,
ServerUUID: serverUUID,
}, nil
}
Expand Down
Loading

0 comments on commit 740bc3f

Please sign in to comment.