Skip to content

Commit

Permalink
Read correct number of messages from end of file (#7)
Browse files Browse the repository at this point in the history
* trivial improvement of some checks

* use file instead of bufio.Reader in readNumOfMessages from file end

Signed-off-by: Leon Ziyang Zhang <[email protected]>
  • Loading branch information
leonzz authored Jul 23, 2021
1 parent 90e0163 commit adc5610
Showing 1 changed file with 5 additions and 11 deletions.
16 changes: 5 additions & 11 deletions diskqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,11 @@ func NewWithDiskSpace(name string, dataPath string,

// Get the last known state of DiskQueue from metadata and start ioLoop
func (d *diskQueue) start() error {
// ensure that DiskQueue has enough space to write the metadata file
if d.enableDiskLimitation && d.maxBytesDiskSpace <= maxMetaDataFileSize {
// ensure that DiskQueue has enough space to write the metadata file + at least one data file with max size + message size
if d.enableDiskLimitation && (d.maxBytesDiskSpace <= maxMetaDataFileSize+d.maxBytesPerFile) {
errorMsg := fmt.Sprintf(
"disk size limit too small(%d): not enough space for MetaData file size(%d)",
d.maxBytesDiskSpace, maxMetaDataFileSize)
"disk size limit too small(%d): not enough space for MetaData file (size=%d) and at least one data file with max size (maxBytesPerFile=%d).",
d.maxBytesDiskSpace, maxMetaDataFileSize, d.maxBytesPerFile)
d.logf(ERROR, "DISKQUEUE(%s) - %s", errorMsg)
return errors.New(errorMsg)
}
Expand Down Expand Up @@ -463,8 +463,6 @@ func (d *diskQueue) readNumOfMessages(fileName string) (int64, error) {
if err != nil {
return 0, err
}

d.reader = bufio.NewReader(d.readFile)
}

closeReadFile := func() {
Expand All @@ -480,7 +478,7 @@ func (d *diskQueue) readNumOfMessages(fileName string) (int64, error) {
}

var totalMessages int64
err = binary.Read(d.reader, binary.BigEndian, &totalMessages)
err = binary.Read(d.readFile, binary.BigEndian, &totalMessages)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -580,10 +578,6 @@ func (d *diskQueue) freeDiskSpace(expectedBytesIncrease int64) error {
d.logf(ERROR, "DISKQUEUE(%s) failed to retrieve all .bad file info - %s", d.name, err)
}

if expectedBytesIncrease > d.maxBytesDiskSpace {
return fmt.Errorf("could not make space for expectedBytesIncrease = %d, with maxBytesDiskSpace = %d ", expectedBytesIncrease, d.maxBytesDiskSpace)
}

// keep freeing up disk space until we have enough space to write this message
for _, badFileInfo := range badFileInfos {
if d.totalDiskSpaceUsed+expectedBytesIncrease <= d.maxBytesDiskSpace {
Expand Down

0 comments on commit adc5610

Please sign in to comment.