Skip to content

Commit

Permalink
fix delete expired keys
Browse files Browse the repository at this point in the history
  • Loading branch information
roseduan committed Aug 30, 2023
1 parent d7ab6f5 commit 92ab3d5
Showing 1 changed file with 31 additions and 40 deletions.
71 changes: 31 additions & 40 deletions db.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package rosedb

import (
"bytes"
"context"
"errors"
"fmt"
Expand Down Expand Up @@ -354,11 +353,10 @@ func (db *DB) Ascend(handleFn func(k []byte, v []byte) (bool, error)) {
if err != nil {
return false, err
}
value, err := db.checkValue(chunk)
if err != nil {
return false, err
if value := db.checkValue(chunk); value != nil {
return handleFn(key, value)
}
return handleFn(key, value)
return true, nil
})
}

Expand All @@ -372,11 +370,10 @@ func (db *DB) AscendRange(startKey, endKey []byte, handleFn func(k []byte, v []b
if err != nil {
return false, nil
}
value, err := db.checkValue(chunk)
if err != nil {
return false, err
if value := db.checkValue(chunk); value != nil {
return handleFn(key, value)
}
return handleFn(key, value)
return true, nil
})
}

Expand All @@ -390,11 +387,10 @@ func (db *DB) AscendGreaterOrEqual(key []byte, handleFn func(k []byte, v []byte)
if err != nil {
return false, nil
}
value, err := db.checkValue(chunk)
if err != nil {
return false, err
if value := db.checkValue(chunk); value != nil {
return handleFn(key, value)
}
return handleFn(key, value)
return true, nil
})
}

Expand Down Expand Up @@ -426,11 +422,10 @@ func (db *DB) Descend(handleFn func(k []byte, v []byte) (bool, error)) {
if err != nil {
return false, nil
}
value, err := db.checkValue(chunk)
if err != nil {
return false, err
if value := db.checkValue(chunk); value != nil {
return handleFn(key, value)
}
return handleFn(key, value)
return true, nil
})
}

Expand All @@ -444,11 +439,10 @@ func (db *DB) DescendRange(startKey, endKey []byte, handleFn func(k []byte, v []
if err != nil {
return false, nil
}
value, err := db.checkValue(chunk)
if err != nil {
return false, err
if value := db.checkValue(chunk); value != nil {
return handleFn(key, value)
}
return handleFn(key, value)
return true, nil
})
}

Expand All @@ -462,11 +456,10 @@ func (db *DB) DescendLessOrEqual(key []byte, handleFn func(k []byte, v []byte) (
if err != nil {
return false, nil
}
value, err := db.checkValue(chunk)
if err != nil {
return false, err
if value := db.checkValue(chunk); value != nil {
return handleFn(key, value)
}
return handleFn(key, value)
return true, nil
})
}

Expand All @@ -488,13 +481,13 @@ func (db *DB) DescendKeys(pattern []byte, handleFn func(k []byte) (bool, error))
})
}

func (db *DB) checkValue(chunk []byte) ([]byte, error) {
func (db *DB) checkValue(chunk []byte) []byte {
record := decodeLogRecord(chunk)
now := time.Now().UnixNano()
if record.Type == LogRecordDeleted || record.IsExpired(now) {
return nil, ErrKeyNotFound
if record.Type != LogRecordDeleted && !record.IsExpired(now) {
return record.Value
}
return record.Value, nil
return nil
}

func checkOptions(options Options) error {
Expand Down Expand Up @@ -585,21 +578,18 @@ func (db *DB) DeleteExpiredKeys(timeout time.Duration) error {
db.mu.Lock()
defer db.mu.Unlock()

// set expiration time
// set timeout
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
done := make(chan struct{}, 1)

innerErrs := make([]error, 0) // record anonymous func error
var innerErr error
now := time.Now().UnixNano()
go func(ctx context.Context) {
for {
// get 100 key's positions
// select 100 keys from the db.index
positions := make([]*wal.ChunkPosition, 0, 100)
db.index.AscendGreaterOrEqual(db.expiredCursorKey, func(k []byte, pos *wal.ChunkPosition) (bool, error) {
// filter processed key
if bytes.Compare(k, db.expiredCursorKey) == 0 {
return true, nil
}
positions = append(positions, pos)
if len(positions) >= 100 {
return false, nil
Expand All @@ -610,14 +600,16 @@ func (db *DB) DeleteExpiredKeys(timeout time.Duration) error {
// If keys in the db.index has been traversed, len(positions) will be 0.
if len(positions) == 0 {
db.expiredCursorKey = nil
done <- struct{}{}
return
}

// delete from index if the key is expired.
for _, pos := range positions {
chunk, err := db.dataFiles.Read(pos)
if err != nil {
innerErrs = append(innerErrs, err)
innerErr = err
done <- struct{}{}
return
}
record := decodeLogRecord(chunk)
Expand All @@ -631,9 +623,8 @@ func (db *DB) DeleteExpiredKeys(timeout time.Duration) error {

select {
case <-ctx.Done():
if len(innerErrs) > 0 {
return innerErrs[0]
}
return innerErr
case <-done:
return nil
}
}

0 comments on commit 92ab3d5

Please sign in to comment.