diff --git a/go/mysql/binlog_event_filepos.go b/go/mysql/binlog_event_filepos.go index b7e6ed9e0f2..cccf1557e51 100644 --- a/go/mysql/binlog_event_filepos.go +++ b/go/mysql/binlog_event_filepos.go @@ -75,12 +75,12 @@ func (ev *filePosBinlogEvent) StripChecksum(f BinlogFormat) (BinlogEvent, []byte // nextPosition returns the next file position of the binlog. // If no information is available, it returns 0. -func (ev *filePosBinlogEvent) nextPosition(f BinlogFormat) uint32 { +func (ev *filePosBinlogEvent) nextPosition(f BinlogFormat) uint64 { if f.HeaderLength <= 13 { // Dead code. This is just a failsafe. return 0 } - return binary.LittleEndian.Uint32(ev.Bytes()[13:17]) + return binary.LittleEndian.Uint64(ev.Bytes()[13:21]) } // rotate implements BinlogEvent.Rotate(). @@ -283,7 +283,7 @@ type filePosGTIDEvent struct { gtid replication.FilePosGTID } -func newFilePosGTIDEvent(file string, pos uint32, timestamp uint32) filePosGTIDEvent { +func newFilePosGTIDEvent(file string, pos uint64, timestamp uint32) filePosGTIDEvent { return filePosGTIDEvent{ filePosFakeEvent: filePosFakeEvent{ timestamp: timestamp, diff --git a/go/mysql/endtoend/replication_test.go b/go/mysql/endtoend/replication_test.go index a04f75c6b43..8d18a72555e 100644 --- a/go/mysql/endtoend/replication_test.go +++ b/go/mysql/endtoend/replication_test.go @@ -29,7 +29,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "vitess.io/vitess/go/mysql/replication" "vitess.io/vitess/go/mysql/sqlerror" "vitess.io/vitess/go/mysql" @@ -58,9 +57,8 @@ func connectForReplication(t *testing.T, rbr bool) (*mysql.Conn, mysql.BinlogFor status, err := conn.ShowPrimaryStatus() require.NoError(t, err, "retrieving primary status failed: %v", err) - filePos := status.FilePosition.GTIDSet.(replication.FilePosGTID) - file := filePos.File - position := filePos.Pos + file := status.FilePosition.File + position := status.FilePosition.Pos // Tell the server that we understand the format of events // that will be used if binlog_checksum is enabled on the server. diff --git a/go/mysql/replication.go b/go/mysql/replication.go index 84c65842c7e..baf355020bf 100644 --- a/go/mysql/replication.go +++ b/go/mysql/replication.go @@ -18,6 +18,7 @@ package mysql import ( "fmt" + "math" "vitess.io/vitess/go/mysql/sqlerror" "vitess.io/vitess/go/vt/proto/vtrpc" @@ -34,7 +35,12 @@ const ( // WriteComBinlogDump writes a ComBinlogDump command. // See http://dev.mysql.com/doc/internals/en/com-binlog-dump.html for syntax. // Returns a SQLError. -func (c *Conn) WriteComBinlogDump(serverID uint32, binlogFilename string, binlogPos uint32, flags uint16) error { +func (c *Conn) WriteComBinlogDump(serverID uint32, binlogFilename string, binlogPos uint64, flags uint16) error { + // The binary log file position is a uint64, but the protocol command + // only uses 4 bytes for the file position. + if binlogPos > math.MaxUint32 { + return vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, "binlog position %d is too large, it must fit into 32 bits", binlogPos) + } c.sequence = 0 length := 1 + // ComBinlogDump 4 + // binlog-pos @@ -43,7 +49,7 @@ func (c *Conn) WriteComBinlogDump(serverID uint32, binlogFilename string, binlog len(binlogFilename) // binlog-filename data, pos := c.startEphemeralPacketWithHeader(length) pos = writeByte(data, pos, ComBinlogDump) - pos = writeUint32(data, pos, binlogPos) + pos = writeUint32(data, pos, uint32(binlogPos)) pos = writeUint16(data, pos, flags) pos = writeUint32(data, pos, serverID) _ = writeEOFString(data, pos, binlogFilename) diff --git a/go/mysql/replication/binlog_file_position.go b/go/mysql/replication/binlog_file_position.go new file mode 100644 index 00000000000..1159043af54 --- /dev/null +++ b/go/mysql/replication/binlog_file_position.go @@ -0,0 +1,66 @@ +/* +Copyright 2025 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package replication + +import ( + "fmt" + "strconv" + "strings" +) + +type BinlogFilePos struct { + File string + Pos uint64 +} + +// ParseBinlogFilePos parses a binlog file and position in the input +// format used by commands such as SHOW REPLICA STATUS and SHOW BINARY +// LOG STATUS. +func ParseBinlogFilePos(s string) (BinlogFilePos, error) { + bfp := BinlogFilePos{} + + // Split into parts. + file, posStr, ok := strings.Cut(s, ":") + if !ok { + return bfp, fmt.Errorf("invalid binlog file position (%v): expecting file:pos", s) + } + + pos, err := strconv.ParseUint(posStr, 0, 64) + if err != nil { + return bfp, fmt.Errorf("invalid binlog file position (%v): expecting position to be an unsigned 64 bit integer", posStr) + } + + bfp.File = file + bfp.Pos = pos + + return bfp, nil +} + +// String returns the string representation of the BinlogFilePos +// using a colon as the seperator. +func (bfp BinlogFilePos) String() string { + return fmt.Sprintf("%s:%d", bfp.File, bfp.Pos) +} + +func (bfp BinlogFilePos) IsZero() bool { + return bfp.File == "" && bfp.Pos == 0 +} + +func (bfp BinlogFilePos) ConvertToFlavorPosition() (pos Position, err error) { + pos.GTIDSet, err = ParseFilePosGTIDSet(bfp.String()) + return pos, err +} diff --git a/go/mysql/replication/filepos_gtid.go b/go/mysql/replication/filepos_gtid.go index 850fb421915..95c7efcd3b1 100644 --- a/go/mysql/replication/filepos_gtid.go +++ b/go/mysql/replication/filepos_gtid.go @@ -33,14 +33,14 @@ func parseFilePosGTID(s string) (GTID, error) { return nil, fmt.Errorf("invalid FilePos GTID (%v): expecting file:pos", s) } - pos, err := strconv.ParseUint(parts[1], 0, 32) + pos, err := strconv.ParseUint(parts[1], 0, 64) if err != nil { return nil, fmt.Errorf("invalid FilePos GTID (%v): expecting pos to be an integer", s) } return FilePosGTID{ File: parts[0], - Pos: uint32(pos), + Pos: pos, }, nil } @@ -56,7 +56,7 @@ func ParseFilePosGTIDSet(s string) (GTIDSet, error) { // FilePosGTID implements GTID. type FilePosGTID struct { File string - Pos uint32 + Pos uint64 } // String implements GTID.String(). diff --git a/go/mysql/replication/filepos_gtid_test.go b/go/mysql/replication/filepos_gtid_test.go index 174aed6ccf9..6cef4756af2 100644 --- a/go/mysql/replication/filepos_gtid_test.go +++ b/go/mysql/replication/filepos_gtid_test.go @@ -23,7 +23,7 @@ import ( func Test_filePosGTID_String(t *testing.T) { type fields struct { file string - pos uint32 + pos uint64 } tests := []struct { name string @@ -35,6 +35,11 @@ func Test_filePosGTID_String(t *testing.T) { fields{file: "mysql-bin.166031", pos: 192394}, "mysql-bin.166031:192394", }, + { + "handles large position correctly", + fields{file: "vt-1448040107-bin.003222", pos: 4663881395}, + "vt-1448040107-bin.003222:4663881395", + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -52,7 +57,7 @@ func Test_filePosGTID_String(t *testing.T) { func Test_filePosGTID_ContainsGTID(t *testing.T) { type fields struct { file string - pos uint32 + pos uint64 } type args struct { other GTID diff --git a/go/mysql/replication/primary_status.go b/go/mysql/replication/primary_status.go index 220fce3cfde..7a572fbf7be 100644 --- a/go/mysql/replication/primary_status.go +++ b/go/mysql/replication/primary_status.go @@ -28,8 +28,9 @@ import ( type PrimaryStatus struct { // Position represents the server's GTID based position. Position Position - // FilePosition represents the server's file based position. - FilePosition Position + // FilePosition represents the server's current binary log + // file and position. + FilePosition BinlogFilePos // ServerUUID is the UUID of the server. ServerUUID string } @@ -38,7 +39,7 @@ type PrimaryStatus struct { func PrimaryStatusToProto(s PrimaryStatus) *replicationdatapb.PrimaryStatus { return &replicationdatapb.PrimaryStatus{ Position: EncodePosition(s.Position), - FilePosition: EncodePosition(s.FilePosition), + FilePosition: s.FilePosition.String(), ServerUuid: s.ServerUUID, } } @@ -63,7 +64,7 @@ func ParsePrimaryStatus(fields map[string]string) PrimaryStatus { file := fields["File"] if file != "" && fileExecPosStr != "" { var err error - status.FilePosition.GTIDSet, err = ParseFilePosGTIDSet(fmt.Sprintf("%s:%s", file, fileExecPosStr)) + status.FilePosition, err = ParseBinlogFilePos(fmt.Sprintf("%s:%s", file, fileExecPosStr)) if err != nil { log.Warningf("Error parsing GTID set %s:%s: %v", file, fileExecPosStr, err) } diff --git a/go/mysql/replication/replication_status.go b/go/mysql/replication/replication_status.go index b79ae3dc262..a72a460b1c9 100644 --- a/go/mysql/replication/replication_status.go +++ b/go/mysql/replication/replication_status.go @@ -31,24 +31,17 @@ type ReplicationStatus struct { // it is the executed GTID set. For file replication implementation, it is same as // FilePosition Position Position - // RelayLogPosition is the Position that the replica would be at if it - // were to finish executing everything that's currently in its relay log. - // However, some MySQL flavors don't expose this information, - // in which case RelayLogPosition.IsZero() will be true. - // If ReplicationLagUnknown is true then we should not rely on the seconds - // behind value and we can instead try to calculate the lag ourselves when - // appropriate. For MySQL GTID replication implementation it is the union of - // executed GTID set and retrieved GTID set. For file replication implementation, - // it is same as RelayLogSourceBinlogEquivalentPosition + // RelayLogPosition is the relay log file and position that the replica would be + // at if it were to finish executing everything that's currently in its relay log. RelayLogPosition Position // FilePosition stores the position of the source tablets binary log // upto which the SQL thread of the replica has run. - FilePosition Position + FilePosition BinlogFilePos // RelayLogSourceBinlogEquivalentPosition stores the position of the source tablets binary log // upto which the IO thread has read and added to the relay log - RelayLogSourceBinlogEquivalentPosition Position + RelayLogSourceBinlogEquivalentPosition BinlogFilePos // RelayLogFilePosition stores the position in the relay log file - RelayLogFilePosition Position + RelayLogFilePosition BinlogFilePos SourceServerID uint32 IOState ReplicationState LastIOError string @@ -96,14 +89,14 @@ func (s *ReplicationStatus) SQLHealthy() bool { func ReplicationStatusToProto(s ReplicationStatus) *replicationdatapb.Status { replstatuspb := &replicationdatapb.Status{ Position: EncodePosition(s.Position), - RelayLogPosition: EncodePosition(s.RelayLogPosition), - FilePosition: EncodePosition(s.FilePosition), - RelayLogSourceBinlogEquivalentPosition: EncodePosition(s.RelayLogSourceBinlogEquivalentPosition), + RelayLogPosition: s.RelayLogPosition.String(), + FilePosition: s.FilePosition.String(), + RelayLogSourceBinlogEquivalentPosition: s.RelayLogSourceBinlogEquivalentPosition.String(), SourceServerId: s.SourceServerID, ReplicationLagSeconds: s.ReplicationLagSeconds, ReplicationLagUnknown: s.ReplicationLagUnknown, SqlDelay: s.SQLDelay, - RelayLogFilePosition: EncodePosition(s.RelayLogFilePosition), + RelayLogFilePosition: s.RelayLogFilePosition.String(), SourceHost: s.SourceHost, SourceUser: s.SourceUser, SourcePort: s.SourcePort, @@ -131,15 +124,15 @@ func ProtoToReplicationStatus(s *replicationdatapb.Status) ReplicationStatus { if err != nil { panic(vterrors.Wrapf(err, "cannot decode RelayLogPosition")) } - filePos, err := DecodePosition(s.FilePosition) + filePos, err := ParseBinlogFilePos(s.FilePosition) if err != nil { panic(vterrors.Wrapf(err, "cannot decode FilePosition")) } - fileRelayPos, err := DecodePosition(s.RelayLogSourceBinlogEquivalentPosition) + fileRelayPos, err := ParseBinlogFilePos(s.RelayLogSourceBinlogEquivalentPosition) if err != nil { panic(vterrors.Wrapf(err, "cannot decode RelayLogSourceBinlogEquivalentPosition")) } - relayFilePos, err := DecodePosition(s.RelayLogFilePosition) + relayFilePos, err := ParseBinlogFilePos(s.RelayLogFilePosition) if err != nil { panic(vterrors.Wrapf(err, "cannot decode RelayLogFilePosition")) } @@ -270,18 +263,23 @@ func ParseMariadbReplicationStatus(resultMap map[string]string) (ReplicationStat func ParseFilePosReplicationStatus(resultMap map[string]string) (ReplicationStatus, error) { status := ParseReplicationStatus(resultMap, false) - status.Position = status.FilePosition - status.RelayLogPosition = status.RelayLogSourceBinlogEquivalentPosition + var err error + status.Position, err = status.FilePosition.ConvertToFlavorPosition() + if err != nil { + return status, err + } + status.RelayLogPosition, err = status.RelayLogSourceBinlogEquivalentPosition.ConvertToFlavorPosition() - return status, nil + return status, err } func ParseFilePosPrimaryStatus(resultMap map[string]string) (PrimaryStatus, error) { status := ParsePrimaryStatus(resultMap) - status.Position = status.FilePosition + var err error + status.Position, err = status.FilePosition.ConvertToFlavorPosition() - return status, nil + return status, err } // ParseReplicationStatus parses the common (non-flavor-specific) fields of ReplicationStatus @@ -348,27 +346,27 @@ func ParseReplicationStatus(fields map[string]string, replica bool) ReplicationS executedPosStr := fields[execSourceLogPosField] file := fields[relaySourceLogFileField] if file != "" && executedPosStr != "" { - status.FilePosition.GTIDSet, err = ParseFilePosGTIDSet(fmt.Sprintf("%s:%s", file, executedPosStr)) + status.FilePosition, err = ParseBinlogFilePos(fmt.Sprintf("%s:%s", file, executedPosStr)) if err != nil { - log.Warningf("Error parsing GTID set %s:%s: %v", file, executedPosStr, err) + log.Warningf("Error parsing binlog file and position %s:%s: %v", file, executedPosStr, err) } } readPosStr := fields[readSourceLogPosField] file = fields[sourceLogFileField] if file != "" && readPosStr != "" { - status.RelayLogSourceBinlogEquivalentPosition.GTIDSet, err = ParseFilePosGTIDSet(fmt.Sprintf("%s:%s", file, readPosStr)) + status.RelayLogSourceBinlogEquivalentPosition, err = ParseBinlogFilePos(fmt.Sprintf("%s:%s", file, readPosStr)) if err != nil { - log.Warningf("Error parsing GTID set %s:%s: %v", file, readPosStr, err) + log.Warningf("Error parsing relay log file and position %s:%s: %v", file, readPosStr, err) } } relayPosStr := fields["Relay_Log_Pos"] file = fields["Relay_Log_File"] if file != "" && relayPosStr != "" { - status.RelayLogFilePosition.GTIDSet, err = ParseFilePosGTIDSet(fmt.Sprintf("%s:%s", file, relayPosStr)) + status.RelayLogFilePosition, err = ParseBinlogFilePos(fmt.Sprintf("%s:%s", file, relayPosStr)) if err != nil { - log.Warningf("Error parsing GTID set %s:%s: %v", file, relayPosStr, err) + log.Warningf("Error parsing relay log file and position %s:%s: %v", file, relayPosStr, err) } } return status diff --git a/go/mysql/replication/replication_status_test.go b/go/mysql/replication/replication_status_test.go index 8b458f76803..20e35713ec2 100644 --- a/go/mysql/replication/replication_status_test.go +++ b/go/mysql/replication/replication_status_test.go @@ -138,12 +138,12 @@ func TestMysqlShouldGetPosition(t *testing.T) { sid, _ := ParseSID("3e11fa47-71ca-11e1-9e33-c80aa9429562") want := PrimaryStatus{ Position: Position{GTIDSet: Mysql56GTIDSet{sid: []interval{{start: 1, end: 5}}}}, - FilePosition: Position{GTIDSet: FilePosGTID{File: "source-bin.000003", Pos: 1307}}, + FilePosition: BinlogFilePos{File: "source-bin.000003", Pos: 1307}, } got, err := ParseMysqlPrimaryStatus(resultMap) require.NoError(t, err) assert.Equalf(t, got.Position.GTIDSet.String(), want.Position.GTIDSet.String(), "got Position: %v; want Position: %v", got.Position.GTIDSet, want.Position.GTIDSet) - assert.Equalf(t, got.FilePosition.GTIDSet.String(), want.FilePosition.GTIDSet.String(), "got FilePosition: %v; want FilePosition: %v", got.FilePosition.GTIDSet, want.FilePosition.GTIDSet) + assert.Equalf(t, got.FilePosition.String(), want.FilePosition.String(), "got FilePosition: %v; want FilePosition: %v", got.FilePosition, want.FilePosition) } func TestMysqlRetrieveMasterServerId(t *testing.T) { @@ -179,15 +179,16 @@ func TestMysqlRetrieveFileBasedPositions(t *testing.T) { } want := ReplicationStatus{ - FilePosition: Position{GTIDSet: FilePosGTID{File: "master-bin.000002", Pos: 1307}}, - RelayLogSourceBinlogEquivalentPosition: Position{GTIDSet: FilePosGTID{File: "master-bin.000003", Pos: 1308}}, - RelayLogFilePosition: Position{GTIDSet: FilePosGTID{File: "relay-bin.000004", Pos: 1309}}, + FilePosition: BinlogFilePos{File: "master-bin.000002", Pos: 1307}, + RelayLogSourceBinlogEquivalentPosition: BinlogFilePos{File: "master-bin.000003", Pos: 1308}, + RelayLogFilePosition: BinlogFilePos{File: "relay-bin.000004", Pos: 1309}, } got, err := ParseMysqlReplicationStatus(resultMap, false) require.NoError(t, err) - assert.Equalf(t, got.FilePosition.GTIDSet, want.FilePosition.GTIDSet, "got FilePosition: %v; want FilePosition: %v", got.FilePosition.GTIDSet, want.FilePosition.GTIDSet) - assert.Equalf(t, got.RelayLogFilePosition.GTIDSet, want.RelayLogFilePosition.GTIDSet, "got RelayLogFilePosition: %v; want RelayLogFilePosition: %v", got.RelayLogFilePosition.GTIDSet, want.RelayLogFilePosition.GTIDSet) - assert.Equalf(t, got.RelayLogSourceBinlogEquivalentPosition.GTIDSet, want.RelayLogSourceBinlogEquivalentPosition.GTIDSet, "got RelayLogSourceBinlogEquivalentPosition: %v; want RelayLogSourceBinlogEquivalentPosition: %v", got.RelayLogSourceBinlogEquivalentPosition.GTIDSet, want.RelayLogSourceBinlogEquivalentPosition.GTIDSet) + assert.Equalf(t, got.FilePosition, want.FilePosition, "got FilePosition: %v; want FilePosition: %v", got.FilePosition, want.FilePosition) + assert.Equalf(t, got.RelayLogFilePosition, want.RelayLogFilePosition, "got RelayLogFilePosition: %v; want RelayLogFilePosition: %v", got.RelayLogFilePosition, want.RelayLogFilePosition) + assert.Equalf(t, got.RelayLogSourceBinlogEquivalentPosition, want.RelayLogSourceBinlogEquivalentPosition, "got RelayLogSourceBinlogEquivalentPosition: %v; want RelayLogSourceBinlogEquivalentPosition: %v", + got.RelayLogSourceBinlogEquivalentPosition, want.RelayLogSourceBinlogEquivalentPosition) } func TestMysqlShouldGetLegacyRelayLogPosition(t *testing.T) { @@ -254,15 +255,15 @@ func TestMariadbRetrieveFileBasedPositions(t *testing.T) { } want := ReplicationStatus{ - FilePosition: Position{GTIDSet: FilePosGTID{File: "master-bin.000002", Pos: 1307}}, - RelayLogSourceBinlogEquivalentPosition: Position{GTIDSet: FilePosGTID{File: "master-bin.000003", Pos: 1308}}, - RelayLogFilePosition: Position{GTIDSet: FilePosGTID{File: "relay-bin.000004", Pos: 1309}}, + FilePosition: BinlogFilePos{File: "master-bin.000002", Pos: 1307}, + RelayLogSourceBinlogEquivalentPosition: BinlogFilePos{File: "master-bin.000003", Pos: 1308}, + RelayLogFilePosition: BinlogFilePos{File: "relay-bin.000004", Pos: 1309}, } got, err := ParseMariadbReplicationStatus(resultMap) require.NoError(t, err) - assert.Equalf(t, got.RelayLogFilePosition.GTIDSet, want.RelayLogFilePosition.GTIDSet, "got RelayLogFilePosition: %v; want RelayLogFilePosition: %v", got.RelayLogFilePosition.GTIDSet, want.RelayLogFilePosition.GTIDSet) - assert.Equal(t, got.FilePosition.GTIDSet, want.FilePosition.GTIDSet, fmt.Sprintf("got FilePosition: %v; want FilePosition: %v", got.FilePosition.GTIDSet, want.FilePosition.GTIDSet)) - assert.Equal(t, got.RelayLogSourceBinlogEquivalentPosition.GTIDSet, want.RelayLogSourceBinlogEquivalentPosition.GTIDSet, fmt.Sprintf("got RelayLogSourceBinlogEquivalentPosition: %v; want RelayLogSourceBinlogEquivalentPosition: %v", got.RelayLogSourceBinlogEquivalentPosition.GTIDSet, want.RelayLogSourceBinlogEquivalentPosition.GTIDSet)) + assert.Equalf(t, got.RelayLogFilePosition, want.RelayLogFilePosition, "got RelayLogFilePosition: %v; want RelayLogFilePosition: %v", got.RelayLogFilePosition, want.RelayLogFilePosition) + assert.Equal(t, got.FilePosition, want.FilePosition, fmt.Sprintf("got FilePosition: %v; want FilePosition: %v", got.FilePosition, want.FilePosition)) + assert.Equal(t, got.RelayLogSourceBinlogEquivalentPosition, want.RelayLogSourceBinlogEquivalentPosition, fmt.Sprintf("got RelayLogSourceBinlogEquivalentPosition: %v; want RelayLogSourceBinlogEquivalentPosition: %v", got.RelayLogSourceBinlogEquivalentPosition, want.RelayLogSourceBinlogEquivalentPosition)) } func TestMariadbShouldGetNilRelayLogPosition(t *testing.T) { @@ -302,19 +303,19 @@ func TestFilePosRetrieveExecutedPosition(t *testing.T) { want := ReplicationStatus{ Position: Position{GTIDSet: FilePosGTID{File: "master-bin.000002", Pos: 1307}}, RelayLogPosition: Position{GTIDSet: FilePosGTID{File: "master-bin.000003", Pos: 1308}}, - FilePosition: Position{GTIDSet: FilePosGTID{File: "master-bin.000002", Pos: 1307}}, - RelayLogSourceBinlogEquivalentPosition: Position{GTIDSet: FilePosGTID{File: "master-bin.000003", Pos: 1308}}, - RelayLogFilePosition: Position{GTIDSet: FilePosGTID{File: "relay-bin.000004", Pos: 1309}}, + FilePosition: BinlogFilePos{File: "master-bin.000002", Pos: 1307}, + RelayLogSourceBinlogEquivalentPosition: BinlogFilePos{File: "master-bin.000003", Pos: 1308}, + RelayLogFilePosition: BinlogFilePos{File: "relay-bin.000004", Pos: 1309}, } got, err := ParseFilePosReplicationStatus(resultMap) require.NoError(t, err) assert.Equalf(t, got.Position.GTIDSet, want.Position.GTIDSet, "got Position: %v; want Position: %v", got.Position.GTIDSet, want.Position.GTIDSet) - assert.Equalf(t, got.RelayLogPosition.GTIDSet, want.RelayLogPosition.GTIDSet, "got RelayLogPosition: %v; want RelayLogPosition: %v", got.RelayLogPosition.GTIDSet, want.RelayLogPosition.GTIDSet) - assert.Equalf(t, got.RelayLogFilePosition.GTIDSet, want.RelayLogFilePosition.GTIDSet, "got RelayLogFilePosition: %v; want RelayLogFilePosition: %v", got.RelayLogFilePosition.GTIDSet, want.RelayLogFilePosition.GTIDSet) - assert.Equalf(t, got.FilePosition.GTIDSet, want.FilePosition.GTIDSet, "got FilePosition: %v; want FilePosition: %v", got.FilePosition.GTIDSet, want.FilePosition.GTIDSet) - assert.Equalf(t, got.RelayLogSourceBinlogEquivalentPosition.GTIDSet, want.RelayLogSourceBinlogEquivalentPosition.GTIDSet, "got RelayLogSourceBinlogEquivalentPosition: %v; want RelayLogSourceBinlogEquivalentPosition: %v", got.RelayLogSourceBinlogEquivalentPosition.GTIDSet, want.RelayLogSourceBinlogEquivalentPosition.GTIDSet) - assert.Equalf(t, got.Position.GTIDSet, got.FilePosition.GTIDSet, "FilePosition and Position don't match when they should for the FilePos flavor") - assert.Equalf(t, got.RelayLogPosition.GTIDSet, got.RelayLogSourceBinlogEquivalentPosition.GTIDSet, "RelayLogPosition and RelayLogSourceBinlogEquivalentPosition don't match when they should for the FilePos flavor") + assert.Equalf(t, got.RelayLogPosition.GTIDSet, want.RelayLogPosition, "got RelayLogPosition: %v; want RelayLogPosition: %v", got.RelayLogPosition.GTIDSet, want.RelayLogPosition.GTIDSet) + assert.Equalf(t, got.RelayLogFilePosition, want.RelayLogFilePosition, "got RelayLogFilePosition: %v; want RelayLogFilePosition: %v", got.RelayLogFilePosition, want.RelayLogFilePosition) + assert.Equalf(t, got.FilePosition, want.FilePosition, "got FilePosition: %v; want FilePosition: %v", got.FilePosition, want.FilePosition) + assert.Equalf(t, got.RelayLogSourceBinlogEquivalentPosition, want.RelayLogSourceBinlogEquivalentPosition, "got RelayLogSourceBinlogEquivalentPosition: %v; want RelayLogSourceBinlogEquivalentPosition: %v", got.RelayLogSourceBinlogEquivalentPosition, want.RelayLogSourceBinlogEquivalentPosition) + assert.Equalf(t, got.Position.GTIDSet, got.FilePosition, "FilePosition and Position don't match when they should for the FilePos flavor") + assert.Equalf(t, got.RelayLogPosition.GTIDSet, got.RelayLogSourceBinlogEquivalentPosition, "RelayLogPosition and RelayLogSourceBinlogEquivalentPosition don't match when they should for the FilePos flavor") } func TestFilePosShouldGetPosition(t *testing.T) { @@ -325,11 +326,11 @@ func TestFilePosShouldGetPosition(t *testing.T) { want := PrimaryStatus{ Position: Position{GTIDSet: FilePosGTID{File: "source-bin.000003", Pos: 1307}}, - FilePosition: Position{GTIDSet: FilePosGTID{File: "source-bin.000003", Pos: 1307}}, + FilePosition: BinlogFilePos{File: "source-bin.000003", Pos: 1307}, } got, err := ParseFilePosPrimaryStatus(resultMap) require.NoError(t, err) assert.Equalf(t, got.Position.GTIDSet, want.Position.GTIDSet, "got Position: %v; want Position: %v", got.Position.GTIDSet, want.Position.GTIDSet) - assert.Equalf(t, got.FilePosition.GTIDSet, want.FilePosition.GTIDSet, "got FilePosition: %v; want FilePosition: %v", got.FilePosition.GTIDSet, want.FilePosition.GTIDSet) - assert.Equalf(t, got.Position.GTIDSet, got.FilePosition.GTIDSet, "FilePosition and Position don't match when they should for the FilePos flavor") + assert.Equalf(t, got.FilePosition, want.FilePosition, "got FilePosition: %v; want FilePosition: %v", got.FilePosition, want.FilePosition) + assert.Equalf(t, got.Position.GTIDSet, got.FilePosition, "FilePosition and Position don't match when they should for the FilePos flavor") } diff --git a/go/vt/mysqlctl/fakemysqldaemon.go b/go/vt/mysqlctl/fakemysqldaemon.go index e6afe7917f1..e9926270d48 100644 --- a/go/vt/mysqlctl/fakemysqldaemon.go +++ b/go/vt/mysqlctl/fakemysqldaemon.go @@ -89,7 +89,7 @@ type FakeMysqlDaemon struct { // CurrentSourceFilePosition is used to determine the executed // file based positioning of the replication source. - CurrentSourceFilePosition replication.Position + CurrentSourceFilePosition replication.BinlogFilePos // ReplicationStatusError is used by ReplicationStatus. ReplicationStatusError error diff --git a/go/vt/wrangler/testlib/emergency_reparent_shard_test.go b/go/vt/wrangler/testlib/emergency_reparent_shard_test.go index 984ff93095e..6aee7de8777 100644 --- a/go/vt/wrangler/testlib/emergency_reparent_shard_test.go +++ b/go/vt/wrangler/testlib/emergency_reparent_shard_test.go @@ -73,10 +73,9 @@ func TestEmergencyReparentShard(t *testing.T) { }, }, }) - currentPrimaryFilePosition, _ := replication.ParseFilePosGTIDSet("mariadb-bin.000010:456") - oldPrimary.FakeMysqlDaemon.CurrentSourceFilePosition = replication.Position{ - GTIDSet: currentPrimaryFilePosition, - } + var err error + oldPrimary.FakeMysqlDaemon.CurrentSourceFilePosition, err = replication.ParseBinlogFilePos("mariadb-bin.000010:456") + require.NoError(t, err) // new primary newPrimary.FakeMysqlDaemon.ReadOnly = true @@ -90,11 +89,10 @@ func TestEmergencyReparentShard(t *testing.T) { }, }, }) - newPrimaryRelayLogPos, _ := replication.ParseFilePosGTIDSet("relay-bin.000004:456") - newPrimary.FakeMysqlDaemon.CurrentSourceFilePosition = replication.Position{ - GTIDSet: newPrimaryRelayLogPos, - } - newPrimary.FakeMysqlDaemon.WaitPrimaryPositions = append(newPrimary.FakeMysqlDaemon.WaitPrimaryPositions, newPrimary.FakeMysqlDaemon.CurrentSourceFilePosition) + newPrimary.FakeMysqlDaemon.CurrentSourceFilePosition, _ = replication.ParseBinlogFilePos("relay-bin.000004:456") + filePos, err := newPrimary.FakeMysqlDaemon.CurrentSourceFilePosition.ConvertToFlavorPosition() + require.NoError(t, err) + newPrimary.FakeMysqlDaemon.WaitPrimaryPositions = append(newPrimary.FakeMysqlDaemon.WaitPrimaryPositions, filePos) newPrimary.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ "STOP REPLICA IO_THREAD", "SUBINSERT INTO _vt.reparent_journal (time_created_ns, action_name, primary_alias, replication_position) VALUES", @@ -133,11 +131,9 @@ func TestEmergencyReparentShard(t *testing.T) { }, }, }) - goodReplica1RelayLogPos, _ := replication.ParseFilePosGTIDSet("relay-bin.000004:455") - goodReplica1.FakeMysqlDaemon.CurrentSourceFilePosition = replication.Position{ - GTIDSet: goodReplica1RelayLogPos, - } - goodReplica1.FakeMysqlDaemon.WaitPrimaryPositions = append(goodReplica1.FakeMysqlDaemon.WaitPrimaryPositions, goodReplica1.FakeMysqlDaemon.CurrentSourceFilePosition) + goodReplica1.FakeMysqlDaemon.CurrentSourceFilePosition, _ = replication.ParseBinlogFilePos("relay-bin.000004:455") + filePos, _ = goodReplica1.FakeMysqlDaemon.CurrentSourceFilePosition.ConvertToFlavorPosition() + goodReplica1.FakeMysqlDaemon.WaitPrimaryPositions = append(goodReplica1.FakeMysqlDaemon.WaitPrimaryPositions, filePos) goodReplica1.FakeMysqlDaemon.SetReplicationSourceInputs = append(goodReplica1.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(newPrimary.Tablet), topoproto.MysqlAddr(oldPrimary.Tablet)) goodReplica1.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ // These 3 statements come from tablet startup @@ -164,11 +160,9 @@ func TestEmergencyReparentShard(t *testing.T) { }, }, }) - goodReplica2RelayLogPos, _ := replication.ParseFilePosGTIDSet("relay-bin.000004:454") - goodReplica2.FakeMysqlDaemon.CurrentSourceFilePosition = replication.Position{ - GTIDSet: goodReplica2RelayLogPos, - } - goodReplica2.FakeMysqlDaemon.WaitPrimaryPositions = append(goodReplica2.FakeMysqlDaemon.WaitPrimaryPositions, goodReplica2.FakeMysqlDaemon.CurrentSourceFilePosition) + goodReplica2.FakeMysqlDaemon.CurrentSourceFilePosition, _ = replication.ParseBinlogFilePos("relay-bin.000004:454") + filePos, _ = goodReplica2.FakeMysqlDaemon.CurrentSourceFilePosition.ConvertToFlavorPosition() + goodReplica2.FakeMysqlDaemon.WaitPrimaryPositions = append(goodReplica2.FakeMysqlDaemon.WaitPrimaryPositions, filePos) goodReplica2.FakeMysqlDaemon.SetReplicationSourceInputs = append(goodReplica2.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(newPrimary.Tablet), topoproto.MysqlAddr(oldPrimary.Tablet)) goodReplica2.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ // These 3 statements come from tablet startup @@ -182,7 +176,7 @@ func TestEmergencyReparentShard(t *testing.T) { // run EmergencyReparentShard waitReplicaTimeout := time.Second * 2 - err := vp.Run([]string{"EmergencyReparentShard", "--wait_replicas_timeout", waitReplicaTimeout.String(), newPrimary.Tablet.Keyspace + "/" + newPrimary.Tablet.Shard, + err = vp.Run([]string{"EmergencyReparentShard", "--wait_replicas_timeout", waitReplicaTimeout.String(), newPrimary.Tablet.Keyspace + "/" + newPrimary.Tablet.Shard, topoproto.TabletAliasString(newPrimary.Tablet.Alias)}) require.NoError(t, err) // check what was run @@ -227,11 +221,9 @@ func TestEmergencyReparentShardPrimaryElectNotBest(t *testing.T) { }, }, }) - newPrimaryRelayLogPos, _ := replication.ParseFilePosGTIDSet("relay-bin.000004:456") - newPrimary.FakeMysqlDaemon.CurrentSourceFilePosition = replication.Position{ - GTIDSet: newPrimaryRelayLogPos, - } - newPrimary.FakeMysqlDaemon.WaitPrimaryPositions = append(newPrimary.FakeMysqlDaemon.WaitPrimaryPositions, newPrimary.FakeMysqlDaemon.CurrentSourceFilePosition) + newPrimary.FakeMysqlDaemon.CurrentSourceFilePosition, _ = replication.ParseBinlogFilePos("relay-bin.000004:456") + filePos, _ := newPrimary.FakeMysqlDaemon.CurrentSourceFilePosition.ConvertToFlavorPosition() + newPrimary.FakeMysqlDaemon.WaitPrimaryPositions = append(newPrimary.FakeMysqlDaemon.WaitPrimaryPositions, filePos) newPrimary.FakeMysqlDaemon.SetReplicationSourceInputs = append(newPrimary.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(moreAdvancedReplica.Tablet)) newPrimary.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ "STOP REPLICA IO_THREAD", @@ -260,12 +252,10 @@ func TestEmergencyReparentShardPrimaryElectNotBest(t *testing.T) { }, }, }) - moreAdvancedReplicaLogPos, _ := replication.ParseFilePosGTIDSet("relay-bin.000004:457") - moreAdvancedReplica.FakeMysqlDaemon.CurrentSourceFilePosition = replication.Position{ - GTIDSet: moreAdvancedReplicaLogPos, - } + moreAdvancedReplica.FakeMysqlDaemon.CurrentSourceFilePosition, _ = replication.ParseBinlogFilePos("relay-bin.000004:457") + filePos, _ = moreAdvancedReplica.FakeMysqlDaemon.CurrentSourceFilePosition.ConvertToFlavorPosition() + moreAdvancedReplica.FakeMysqlDaemon.WaitPrimaryPositions = append(moreAdvancedReplica.FakeMysqlDaemon.WaitPrimaryPositions, filePos) moreAdvancedReplica.FakeMysqlDaemon.SetReplicationSourceInputs = append(moreAdvancedReplica.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(newPrimary.Tablet), topoproto.MysqlAddr(oldPrimary.Tablet)) - moreAdvancedReplica.FakeMysqlDaemon.WaitPrimaryPositions = append(moreAdvancedReplica.FakeMysqlDaemon.WaitPrimaryPositions, moreAdvancedReplica.FakeMysqlDaemon.CurrentSourceFilePosition) newPrimary.FakeMysqlDaemon.WaitPrimaryPositions = append(newPrimary.FakeMysqlDaemon.WaitPrimaryPositions, moreAdvancedReplica.FakeMysqlDaemon.GetPrimaryPositionLocked()) moreAdvancedReplica.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ // These 3 statements come from tablet startup