Skip to content

Commit

Permalink
fixed some bugs in reading AOF
Browse files Browse the repository at this point in the history
  • Loading branch information
faith_liu committed Sep 14, 2023
1 parent 511624f commit 2228f6b
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 71 deletions.
46 changes: 24 additions & 22 deletions internal/aof/aof.go
Original file line number Diff line number Diff line change
Expand Up @@ -673,10 +673,9 @@ func AOFLoadManifestFromDisk() {
}

am := AOFLoadManifestFromFile(am_Filepath)
if am != nil {
AOFFileInfo.AOFManifest = am
}

//if am != nil {
AOFFileInfo.AOFManifest = am
//}
}

func GetNewIncrAOFName(am *AOFManifest) string {
Expand Down Expand Up @@ -706,7 +705,6 @@ func GetLastIncrAOFName(am *AOFManifest) string {

ai, ok := lastnode.value.(AOFInfo)
if !ok {
fmt.Printf("Failed to convert lastnode.value to AOFInfo")
log.Panicf("Failed to convert lastnode.value to AOFInfo")
}
return ai.FileName
Expand Down Expand Up @@ -814,7 +812,7 @@ func GetBaseAndIncrAppendOnlyFilesNum(am *AOFManifest) int {
return num
}

func (ld *Loader) LoadSingleAppendOnlyFile(FileName string, ch chan *entry.Entry) int {
func (ld *Loader) LoadSingleAppendOnlyFile(FileName string, ch chan *entry.Entry, LastFile bool) int {
ret := AOFOK
AOFFilepath := MakePath(AOFFileInfo.AOFDirName, FileName)
var sizes int64 = 0
Expand Down Expand Up @@ -845,13 +843,13 @@ func (ld *Loader) LoadSingleAppendOnlyFile(FileName string, ch chan *entry.Entry
return ret
}
} else {
sizes += 5
log.Infof("Reading RDB Base File on AOF loading...")
ldRDB := rdb.NewLoader(AOFFilepath, ch)
ldRDB.ParseRDB()
return AOFOK
//Skipped RDB checksum and has not been processed yet.
}
sizes += 5
reader := bufio.NewReader(fp)
for {

Expand Down Expand Up @@ -900,29 +898,28 @@ func (ld *Loader) LoadSingleAppendOnlyFile(FileName string, ch chan *entry.Entry
}
sizes += int64(len(line))
len, _ := strconv.ParseInt(string(line[1:len(line)-2]), 10, 64)

argstring := make([]byte, len)
_, err = reader.Read(argstring)
if err != nil {
argstring := make([]byte, len+2)
argstring, err = reader.ReadBytes('\n')
if err != nil || argstring[len+1] != '\n' {
log.Infof("Unrecoverable error reading the append only File %v: %v", FileName, err)
ret = AOFFailed
return ret
}
/*if ConsumeNewline(argstring[len-2:]) == 0 {
return 0
}*/
argstring = argstring[:len]
argv = append(argv, string(argstring))
CRLF := make([]byte, 2)
_, err = reader.Read(CRLF)
if err != nil {
log.Infof("Unrecoverable error reading the append only File %v: %v", FileName, err)
ret = AOFFailed
return ret
}

sizes += len + 2
}
for _, value := range argv {
e.Argv = append(e.Argv, value)
}
ld.ch <- e

if sizes >= CheckAOFInfof.pos && LastFile {
break
}
}

}
Expand Down Expand Up @@ -978,7 +975,7 @@ func (ld *Loader) LoadAppendOnlyFile(am *AOFManifest, ch chan *entry.Entry) int
BaseSize = GetAppendOnlyFileSize(AOFName, nil)
lastFile = totalNum
start = Ustime()
ret = ld.LoadSingleAppendOnlyFile(AOFName, ch)
ret = ld.LoadSingleAppendOnlyFile(AOFName, ch, false)
if ret == AOFOK || (ret == AOFTruncated && lastFile == 1) {
log.Infof("DB loaded from Base File %v: %.3f seconds", AOFName, float64(Ustime()-start)/1000000)
}
Expand All @@ -1005,6 +1002,7 @@ func (ld *Loader) LoadAppendOnlyFile(am *AOFManifest, ch chan *entry.Entry) int
return ret
}
}
totalNum--
}

if am.incrAOFList.len > 0 {
Expand All @@ -1023,7 +1021,11 @@ func (ld *Loader) LoadAppendOnlyFile(am *AOFManifest, ch chan *entry.Entry) int
lastFile = totalNum
AOFNum++
start = Ustime()
ret = ld.LoadSingleAppendOnlyFile(AOFName, ch)
if lastFile == 1 {
ret = ld.LoadSingleAppendOnlyFile(AOFName, ch, true)
} else {
ret = ld.LoadSingleAppendOnlyFile(AOFName, ch, false)
}
if ret == AOFOK || (ret == AOFTruncated && lastFile == 1) {
log.Infof("DB loaded from incr File %v: %.3f seconds", AOFName, float64(Ustime()-start)/1000000)
}
Expand All @@ -1047,7 +1049,7 @@ func (ld *Loader) LoadAppendOnlyFile(am *AOFManifest, ch chan *entry.Entry) int
}
ln = ListNext(&li)
}

totalNum--
}

AOFFileInfo.AOFCurrentSize = totalSize
Expand Down
62 changes: 36 additions & 26 deletions internal/aof/aof_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bufio"
"fmt"
"io"

"math"
"os"
"path"
Expand Down Expand Up @@ -107,7 +108,7 @@ func FilelsManifest(AOFFilePath string) bool {
log.Panicf("cannot read File: %v\n", AOFFilePath)
}
}
if lines[0] == '#' {
if lines[0] == '#' || len(lines) < 4 {
continue
} else if lines[:4] == "file" {
is_manifest = true
Expand Down Expand Up @@ -180,9 +181,8 @@ func ReadString(rd *bufio.Reader, target *string) int {
log.Infof("Expected to read string of %d bytes, which is not in the suitable range\n", len)
return 0
}

// Increase length to also consume \r\n
len += 2
// Increase length to also consume \r\n
data := make([]byte, len)
if ReadBytes(rd, &data, len) == 0 {
return 0
Expand All @@ -191,20 +191,19 @@ func ReadString(rd *bufio.Reader, target *string) int {
if ConsumeNewline(data[len-2:]) == 0 {
return 0
}

*target = string(data[:len-2])
data = data[:len-2] //\r\n
*target = string(data)
return 1
}

func ReadBytes(rd *bufio.Reader, target *[]byte, length int64) int {
var real int64
n, err := rd.Read(*target)
real = int64(n)
if err != nil || real != length {
log.Infof("Expected to read %d bytes, got %d bytes\n", length, real)
var err error
*target, err = rd.ReadBytes('\n')
if err != nil || (*target)[length-1] != '\n' {
log.Infof("AOF format error:%s", *target)
return 0
}
CheckAOFInfof.pos += real
CheckAOFInfof.pos += length
return 1
}

Expand Down Expand Up @@ -267,13 +266,18 @@ func AOFLoadManifestFromFile(am_Filepath string) *AOFManifest {
if err != nil {
if err == io.EOF {
if linenum == 0 {
log.Panicf("Found an empty AOF manifest")
log.Infof("Found an empty AOF manifest")
am = nil
return am
} else {
break
}

} else {
log.Panicf("Read AOF manifest failed")
log.Infof("Read AOF manifest failed")
am = nil
return am

}
}

Expand All @@ -282,17 +286,21 @@ func AOFLoadManifestFromFile(am_Filepath string) *AOFManifest {
continue
}
if !strings.Contains(buf, "\n") {
log.Panicf("The AOF manifest File contains too long line")
log.Infof("The AOF manifest File contains too long line")
return nil
}
line = strings.Trim(buf, " \t\r\n")
if len(line) == 0 {
log.Panicf("Invalid AOF manifest File format")
log.Infof("Invalid AOF manifest File format")
return nil
}
argc := 0
argv, argc = SplitArgs(line)

if argc < 6 || argc%2 != 0 {
log.Panicf("Invalid AOF manifest File format")
log.Infof("Invalid AOF manifest File format")
am = nil
return am
}
ai = AOFInfoCreate()
for i := 0; i < argc; i += 2 {
Expand Down Expand Up @@ -372,8 +380,6 @@ func ProcessAnnotations(rd *bufio.Reader, Filename string, lastFile bool) int {
if err != nil {
log.Panicf("Failed to read annotations from AOF %v, aborting...\n", Filename)
}
CheckAOFInfof.pos += int64(len(buf)) + 2
CheckAOFInfof.toTimestamp = config.Config.Source.AOFTruncateToTimestamp
if CheckAOFInfof.toTimestamp != 0 && strings.HasPrefix(string(buf), "TS:") {
var ts int64
ts, err = strconv.ParseInt(strings.TrimPrefix(string(buf), "TS:"), 10, 64)
Expand All @@ -382,6 +388,7 @@ func ProcessAnnotations(rd *bufio.Reader, Filename string, lastFile bool) int {
}

if ts <= CheckAOFInfof.toTimestamp {
CheckAOFInfof.pos += int64(len(buf)) + 2
return 1
}

Expand All @@ -395,14 +402,14 @@ func ProcessAnnotations(rd *bufio.Reader, Filename string, lastFile bool) int {
}

// Truncate remaining AOF if exceeding 'toTimestamp'
if err := CheckAOFInfof.fp.Truncate(CheckAOFInfof.pos); err != nil {
/*if err := CheckAOFInfof.fp.Truncate(CheckAOFInfof.pos); err != nil {
log.Panicf("Failed to truncate AOF %v to timestamp %d\n", Filename, CheckAOFInfof.toTimestamp)
} else {

return 0
}
} else {*/
//CheckAOFInfof.pos += int64(len(buf)) + 2
return 0
//}
}

CheckAOFInfof.pos += int64(len(buf)) + 2
return 1
}

Expand All @@ -420,7 +427,8 @@ func CheckMultipartAOF(DirPath string, ManifestFilePath string, fix int) {
if am.BaseAOFInfo != nil {
AOFFileName := am.BaseAOFInfo.FileName
AOFFilePath := MakePath(DirPath, AOFFileName)
lastFile := (AOFNum + 1) == totalNum
AOFNum++
lastFile := AOFNum == totalNum
AOFPreable := FileIsRDB(AOFFilePath)
if AOFPreable {
log.Infof("Start to check Base AOF (RDB format).\n")
Expand All @@ -439,7 +447,8 @@ func CheckMultipartAOF(DirPath string, ManifestFilePath string, fix int) {
ai := ln.value.(*AOFInfo)
AOFFileName := ai.FileName
AOFFilePath := MakePath(DirPath, AOFFileName)
lastFile := (AOFNum + 1) == totalNum
AOFNum++
lastFile := AOFNum == totalNum
ret = CheckSingleAOF(AOFFileName, AOFFilePath, lastFile, fix, false)
OutPutAOFStyle(ret, AOFFileName, "INCR AOF")
ln = ln.next
Expand All @@ -457,6 +466,7 @@ func CheckOldStyleAOF(AOFFilePath string, fix int, preamble bool) {
}
func CheckSingleAOF(AOFFileName, AOFFilePath string, lastFile bool, fix int, preamble bool) int {
var rdbpos int64 = 0
CheckAOFInfof.toTimestamp = config.Config.Source.AOFTruncateToTimestamp
multi := 0
CheckAOFInfof.pos = 0
buf := make([]byte, 1)
Expand Down
5 changes: 3 additions & 2 deletions internal/reader/aof_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func (r *aofReader) StartRead() chan *entry.Entry {

go func() {
aof.AOFFileInfo = *(aof.NewAOFFileInfo(r.path))

aof.AOFLoadManifestFromDisk()
am := aof.AOFFileInfo.GetAOFManifest()

Expand All @@ -49,8 +50,7 @@ func (r *aofReader) StartRead() chan *entry.Entry {
statistics.Metrics.AofFileSize = uint64(fi.Size())
statistics.Metrics.AofReceivedSize = uint64(fi.Size())
aofLoader := aof.NewLoader(r.path, r.ch)

_ = aofLoader.LoadSingleAppendOnlyFile(paths, r.ch)
_ = aofLoader.LoadSingleAppendOnlyFile(paths, r.ch, true)
log.Infof("Send AOF finished. path=[%s]", r.path)
close(r.ch)
} else {
Expand All @@ -68,6 +68,7 @@ func (r *aofReader) StartRead() chan *entry.Entry {
log.Infof("Send AOF finished. path=[%s]", r.path)
close(r.ch)
}

}()

return r.ch
Expand Down
Loading

0 comments on commit 2228f6b

Please sign in to comment.