Skip to content

Commit

Permalink
fix watch
Browse files Browse the repository at this point in the history
  • Loading branch information
roseduan committed Aug 5, 2023
1 parent 5c32d53 commit 6b612a1
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 62 deletions.
26 changes: 9 additions & 17 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,26 +272,18 @@ func (b *Batch) Commit() error {
for key, record := range b.pendingWrites {
if record.Type == LogRecordDeleted {
b.db.index.Delete(record.Key)
// insert delete event
if b.db.options.WatchQueueSize > 0 {
b.db.watcher.insertEvent(&Event{
Action: WatchActionDelete,
Key: record.Key,
Value: record.Value,
BatchId: record.BatchId,
})
}
} else {
b.db.index.Put(record.Key, positions[key])
// insert put event
if b.db.options.WatchQueueSize > 0 {
b.db.watcher.insertEvent(&Event{
Action: WatchActionPut,
Key: record.Key,
Value: record.Value,
BatchId: record.BatchId,
})
}

if b.db.options.WatchQueueSize > 0 {
e := &Event{Key: record.Key, Value: record.Value, BatchId: record.BatchId}
if record.Type == LogRecordDeleted {
e.Action = WatchActionDelete
} else {
e.Action = WatchActionPut
}
b.db.watcher.putEvent(e)
}
}

Expand Down
14 changes: 7 additions & 7 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,13 @@ func (db *DB) Exist(key []byte) (bool, error) {
return batch.Exist(key)
}

func (db *DB) Watch() (chan *Event, error) {
if db.options.WatchQueueSize <= 0 {
return nil, ErrWatchDisabled
}
return db.watchCh, nil
}

func checkOptions(options Options) error {
if options.DirPath == "" {
return errors.New("database dir path is empty")
Expand Down Expand Up @@ -332,10 +339,3 @@ func (db *DB) loadIndexFromWAL() error {
}
return nil
}

func (db *DB) WatchChan() (chan *Event, error) {
if db.options.WatchQueueSize <= 0 {
return nil, ErrWatchUnopened
}
return db.watchCh, nil
}
2 changes: 1 addition & 1 deletion errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@ var (
ErrBatchRollbacked = errors.New("the batch is rollbacked")
ErrDBClosed = errors.New("the database is closed")
ErrMergeRunning = errors.New("the merge operation is running")
ErrWatchUnopened = errors.New("the watch is unopened")
ErrWatchDisabled = errors.New("the watch is disabled")
)
12 changes: 6 additions & 6 deletions examples/watch/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
func main() {
// specify the options
options := rosedb.DefaultOptions
options.DirPath = "/tmp/rosedb_merge"
options.DirPath = "/tmp/rosedb_watch"
options.WatchQueueSize = 1000

// open a database
Expand All @@ -27,29 +27,29 @@ func main() {

// run a new goroutine to handle db event.
go func() {
wc, err := db.WatchChan()
eventCh, err := db.Watch()
if err != nil {
return
}
for {
event := <-wc
event := <-eventCh
// when db closed, the event will receive nil.
if event == nil {
fmt.Println("The db is closed, so the watch channel is closed.")
return
}
// events can be captured here for processing
fmt.Printf("==== Get a new event ==== %s \n", event.String())
fmt.Printf("Get a new event: key%s \n", event.Key)
}
}()

// write some data
for i := 0; i < 10; i++ {
_ = db.Put([]byte(utils.GetTestKey(i)), utils.RandomValue(64))
_ = db.Put(utils.GetTestKey(i), utils.RandomValue(64))
}
// delete some data
for i := 0; i < 10/2; i++ {
_ = db.Delete([]byte(utils.GetTestKey(i)))
_ = db.Delete(utils.GetTestKey(i))
}

// wait for watch goroutine to finish.
Expand Down
31 changes: 8 additions & 23 deletions watch.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package rosedb

import (
"fmt"
"sync"
"time"
)
Expand All @@ -13,28 +12,15 @@ const (
WatchActionDelete
)

// Event
// Event is the event that occurs when the database is modified.
// It is used to synchronize the watch of the database.
type Event struct {
Action WatchActionType
Key []byte
Value []byte
BatchId uint64
}

func (e *Event) String() string {
return fmt.Sprintf(`
Event{
Action: %d,
Key: %s,
Value: %s,
BatchId: %d
}`,
e.Action,
e.Key,
e.Value,
e.BatchId)
}

// Watcher temporarily stores event information,
// as it is generated until it is synchronized to DB's watch.
//
Expand All @@ -54,7 +40,7 @@ func NewWatcher(capacity uint64) *Watcher {
}
}

func (w *Watcher) insertEvent(e *Event) {
func (w *Watcher) putEvent(e *Event) {
w.mu.Lock()
w.queue.push(e)
if w.queue.isFull() {
Expand All @@ -67,22 +53,21 @@ func (w *Watcher) insertEvent(e *Event) {
func (w *Watcher) getEvent() *Event {
w.mu.RLock()
defer w.mu.RUnlock()
if isEmpty := w.queue.isEmpty(); isEmpty {
if w.queue.isEmpty() {
return nil
}
e := w.queue.pop()
return e
return w.queue.pop()
}

// sendEvent send events to DB's watch
func (w *Watcher) sendEvent(c chan *Event) {
for {
e := w.getEvent()
if e == nil {
event := w.getEvent()
if event == nil {
time.Sleep(100 * time.Millisecond)
continue
}
c <- e
c <- event
}
}

Expand Down
19 changes: 11 additions & 8 deletions watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func TestWatch_Insert_Scan(t *testing.T) {
key := utils.GetTestKey(rand.Int())
value := utils.RandomValue(128)
q = append(q, [2][]byte{key, value})
w.insertEvent(&Event{
w.putEvent(&Event{
Action: WatchActionPut,
Key: key,
Value: value,
Expand All @@ -43,7 +43,7 @@ func TestWatch_Rotate_Insert_Scan(t *testing.T) {
for i := 0; i < 2500; i++ {
key := utils.GetTestKey(rand.Int())
value := utils.RandomValue(128)
w.insertEvent(&Event{
w.putEvent(&Event{
Action: WatchActionPut,
Key: key,
Value: value,
Expand Down Expand Up @@ -75,7 +75,7 @@ func TestWatch_Put_Watch(t *testing.T) {
assert.Nil(t, err)
defer destroyDB(db)

w, err := db.WatchChan()
w, err := db.Watch()
assert.Nil(t, err)
for i := 0; i < 50; i++ {
key := utils.GetTestKey(rand.Int())
Expand All @@ -96,13 +96,15 @@ func TestWatch_Put_Delete_Watch(t *testing.T) {
assert.Nil(t, err)
defer destroyDB(db)

w, err := db.WatchChan()
w, err := db.Watch()
assert.Nil(t, err)

key := utils.GetTestKey(rand.Int())
value := utils.RandomValue(128)
db.Put(key, value)
db.Delete(key)
err = db.Put(key, value)
assert.Nil(t, err)
err = db.Delete(key)
assert.Nil(t, err)

for i := 0; i < 2; i++ {
event := <-w
Expand All @@ -122,7 +124,7 @@ func TestWatch_Batch_Put_Watch(t *testing.T) {
assert.Nil(t, err)
defer destroyDB(db)

w, err := db.WatchChan()
w, err := db.Watch()
assert.Nil(t, err)

times := 100
Expand All @@ -131,7 +133,8 @@ func TestWatch_Batch_Put_Watch(t *testing.T) {
err = batch.Put(utils.GetTestKey(rand.Int()), utils.RandomValue(128))
assert.Nil(t, err)
}
batch.Commit()
err = batch.Commit()
assert.Nil(t, err)

var batchId uint64
for i := 0; i < times; i++ {
Expand Down

0 comments on commit 6b612a1

Please sign in to comment.