diff --git a/index.go b/index.go index a90ba5c7..61cb2c4e 100644 --- a/index.go +++ b/index.go @@ -135,7 +135,9 @@ func (db *RoseDB) buildSetsIndex(ent *logfile.LogEntry, pos *valuePos) { func (db *RoseDB) buildZSetIndex(ent *logfile.LogEntry, pos *valuePos) { if ent.Type == logfile.TypeDelete { db.zsetIndex.indexes.ZRem(string(ent.Key), string(ent.Value)) - db.zsetIndex.idxTree.Delete(ent.Value) + if db.zsetIndex.idxTree != nil { + db.zsetIndex.idxTree.Delete(ent.Value) + } return } diff --git a/zset.go b/zset.go index ab371b74..c92e2548 100644 --- a/zset.go +++ b/zset.go @@ -3,6 +3,7 @@ package rosedb import ( "github.com/flower-corp/rosedb/ds/art" "github.com/flower-corp/rosedb/logfile" + "github.com/flower-corp/rosedb/logger" "github.com/flower-corp/rosedb/util" ) @@ -73,11 +74,23 @@ func (db *RoseDB) ZRem(key, member []byte) error { db.zsetIndex.trees[string(key)] = art.NewART() } db.zsetIndex.idxTree = db.zsetIndex.trees[string(key)] - db.zsetIndex.idxTree.Delete(sum) + + oldVal, deleted := db.zsetIndex.idxTree.Delete(sum) + db.sendDiscard(oldVal, deleted, ZSet) entry := &logfile.LogEntry{Key: key, Value: sum, Type: logfile.TypeDelete} - if _, err := db.writeLogEntry(entry, ZSet); err != nil { + pos, err := db.writeLogEntry(entry, ZSet) + if err != nil { return err } + + // The deleted entry itself is also invalid. + _, size := logfile.EncodeEntry(entry) + node := &indexNode{fid: pos.fid, entrySize: size} + select { + case db.discards[ZSet].valChan <- node: + default: + logger.Warn("send to discard chan fail") + } return nil } diff --git a/zset_test.go b/zset_test.go index fc288c39..a5da6163 100644 --- a/zset_test.go +++ b/zset_test.go @@ -184,3 +184,30 @@ func testRoseDBZRange(t *testing.T, ioType IOType, mode DataIndexMode) { assert.Nil(t, err) assert.Equal(t, 4, len(values)) } + +func TestRoseDB_ZSetGC(t *testing.T) { + path := filepath.Join("/tmp", "rosedb") + opts := DefaultOptions(path) + opts.LogFileSizeThreshold = 32 << 20 + db, err := Open(opts) + assert.Nil(t, err) + defer destroyDB(db) + + zsetKey := []byte("my_zset") + writeCount := 500000 + for i := 0; i < writeCount; i++ { + err := db.ZAdd(zsetKey, float64(i+100), GetKey(i)) + assert.Nil(t, err) + } + + for i := 0; i < writeCount/2; i++ { + err := db.ZRem(zsetKey, GetKey(i)) + assert.Nil(t, err) + } + + err = db.RunLogFileGC(ZSet, 0, 0.1) + assert.Nil(t, err) + + card := db.ZCard(zsetKey) + assert.Equal(t, writeCount/2, card) +}