Skip to content

Commit

Permalink
simplified synchronization logic; temporary logging
Browse files Browse the repository at this point in the history
Signed-off-by: Shlomi Noach <[email protected]>
  • Loading branch information
shlomi-noach committed Oct 19, 2023
1 parent 9d64432 commit 802a94c
Showing 1 changed file with 24 additions and 7 deletions.
31 changes: 24 additions & 7 deletions go/vt/mysqlctl/mysqld.go
Original file line number Diff line number Diff line change
Expand Up @@ -1323,7 +1323,17 @@ func parseBinlogEntryTimestamp(logEntry string) (found bool, t time.Time, err er

// scanBinlogTimestamp invokes a `mysqlbinlog` binary to look for a timestamp in the given binary. The function
// either looks for the first such timestamp or the last.
func (mysqld *Mysqld) scanBinlogTimestamp(mysqlbinlogDir string, mysqlbinlogEnv []string, mysqlbinlogName string, binlogFile string, stopAtFirst bool) (matchedTime time.Time, matchFound bool, err error) {
func (mysqld *Mysqld) scanBinlogTimestamp(
mysqlbinlogDir string,
mysqlbinlogEnv []string,
mysqlbinlogName string,
binlogFile string,
stopAtFirst bool,
) (
matchedTime time.Time,
matchFound bool,
err error,
) {
args := []string{binlogFile}
mysqlbinlogCmd := exec.Command(mysqlbinlogName, args...)
mysqlbinlogCmd.Dir = mysqlbinlogDir
Expand All @@ -1333,10 +1343,12 @@ func (mysqld *Mysqld) scanBinlogTimestamp(mysqlbinlogDir string, mysqlbinlogEnv
if err != nil {
return matchedTime, false, err
}
scanComplete := make(chan error)
var scanError error
intentionalKill := false
var wg sync.WaitGroup
wg.Add(1)
scan := func() {
defer close(scanComplete)
defer wg.Done()
defer func() {
intentionalKill = true
mysqlbinlogCmd.Process.Kill() // ensures the binlog file is released
Expand All @@ -1348,7 +1360,7 @@ func (mysqld *Mysqld) scanBinlogTimestamp(mysqlbinlogDir string, mysqlbinlogEnv

found, t, err := parseBinlogEntryTimestamp(logEntry)
if err != nil {
scanComplete <- err
scanError = err
return
}
if found {
Expand All @@ -1369,7 +1381,8 @@ func (mysqld *Mysqld) scanBinlogTimestamp(mysqlbinlogDir string, mysqlbinlogEnv
if err := mysqlbinlogCmd.Wait(); err != nil && !intentionalKill {
return matchedTime, false, vterrors.Wrapf(err, "waiting on mysqlbinlog command in ReadBinlogFilesTimestamps")
}
if err := <-scanComplete; err != nil {
wg.Wait()
if scanError != nil {
return matchedTime, false, vterrors.Wrapf(err, "scanning mysqlbinlog output in ReadBinlogFilesTimestamps ")
}
return matchedTime, matchFound, nil
Expand Down Expand Up @@ -1407,26 +1420,30 @@ func (mysqld *Mysqld) ReadBinlogFilesTimestamps(ctx context.Context, req *mysqlc
for _, binlogFile := range req.BinlogFileNames {
t, found, err := mysqld.scanBinlogTimestamp(dir, env, mysqlbinlogName, binlogFile, true)
if err != nil {
return nil, err
return nil, vterrors.Wrapf(err, "while scanning for first binlog timestamp in %v", binlogFile)
}
if found {
log.Errorf("==== QQQ found first timestamp in binlog file: %v", binlogFile)
resp.FirstTimestamp = protoutil.TimeToProto(t)
resp.FirstTimestampBinlog = binlogFile
break
}
log.Errorf("==== QQQ could not find first timestamp in binlog file: %v", binlogFile)
}
// Find last timestamp
for i := len(req.BinlogFileNames) - 1; i >= 0; i-- {
binlogFile := req.BinlogFileNames[i]
t, found, err := mysqld.scanBinlogTimestamp(dir, env, mysqlbinlogName, binlogFile, false)
if err != nil {
return nil, err
return nil, vterrors.Wrapf(err, "while scanning for last binlog timestamp in %v", binlogFile)
}
if found {
log.Errorf("==== QQQ found last timestamp in binlog file: %v", binlogFile)
resp.LastTimestamp = protoutil.TimeToProto(t)
resp.LastTimestampBinlog = binlogFile
break
}
log.Errorf("==== QQQ could not find first timestamp in binlog file: %v", binlogFile)
}
return resp, nil
}
Expand Down

0 comments on commit 802a94c

Please sign in to comment.