Skip to content

Commit

Permalink
feature: add watch function (#247)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeremy-Run authored Aug 5, 2023
1 parent 02c9953 commit 5c32d53
Show file tree
Hide file tree
Showing 10 changed files with 373 additions and 7 deletions.
18 changes: 18 additions & 0 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,8 +272,26 @@ func (b *Batch) Commit() error {
for key, record := range b.pendingWrites {
if record.Type == LogRecordDeleted {
b.db.index.Delete(record.Key)
// insert delete event
if b.db.options.WatchQueueSize > 0 {
b.db.watcher.insertEvent(&Event{
Action: WatchActionDelete,
Key: record.Key,
Value: record.Value,
BatchId: record.BatchId,
})
}
} else {
b.db.index.Put(record.Key, positions[key])
// insert put event
if b.db.options.WatchQueueSize > 0 {
b.db.watcher.insertEvent(&Event{
Action: WatchActionPut,
Key: record.Key,
Value: record.Value,
BatchId: record.BatchId,
})
}
}
}

Expand Down
22 changes: 22 additions & 0 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ type DB struct {
closed bool
mergeRunning uint32 // indicate if the database is merging
batchPool sync.Pool
watchCh chan *Event // user consume channel for watch events
watcher *Watcher
}

// Stat represents the statistics of the database.
Expand Down Expand Up @@ -113,6 +115,14 @@ func Open(options Options) (*DB, error) {
batchPool: sync.Pool{New: makeBatch},
}

// enable watch
if options.WatchQueueSize > 0 {
db.watchCh = make(chan *Event, 100)
db.watcher = NewWatcher(options.WatchQueueSize)
// run a goroutine to synchronize event information
go db.watcher.sendEvent(db.watchCh)
}

// load index frm hint file
if err = db.loadIndexFromHintFile(); err != nil {
return nil, err
Expand Down Expand Up @@ -148,6 +158,11 @@ func (db *DB) Close() error {
return err
}

// close watch channel
if db.options.WatchQueueSize > 0 {
close(db.watchCh)
}

db.closed = true
return nil
}
Expand Down Expand Up @@ -317,3 +332,10 @@ func (db *DB) loadIndexFromWAL() error {
}
return nil
}

func (db *DB) WatchChan() (chan *Event, error) {
if db.options.WatchQueueSize <= 0 {
return nil, ErrWatchUnopened
}
return db.watchCh, nil
}
1 change: 1 addition & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ var (
ErrBatchRollbacked = errors.New("the batch is rollbacked")
ErrDBClosed = errors.New("the database is closed")
ErrMergeRunning = errors.New("the merge operation is running")
ErrWatchUnopened = errors.New("the watch is unopened")
)
57 changes: 57 additions & 0 deletions examples/watch/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package main

import (
"fmt"
"time"

"github.com/rosedblabs/rosedb/v2"
"github.com/rosedblabs/rosedb/v2/utils"
)

// this file shows how to use the Watch feature of rosedb.

func main() {
// specify the options
options := rosedb.DefaultOptions
options.DirPath = "/tmp/rosedb_merge"
options.WatchQueueSize = 1000

// open a database
db, err := rosedb.Open(options)
if err != nil {
panic(err)
}
defer func() {
_ = db.Close()
}()

// run a new goroutine to handle db event.
go func() {
wc, err := db.WatchChan()
if err != nil {
return
}
for {
event := <-wc
// when db closed, the event will receive nil.
if event == nil {
fmt.Println("The db is closed, so the watch channel is closed.")
return
}
// events can be captured here for processing
fmt.Printf("==== Get a new event ==== %s \n", event.String())
}
}()

// write some data
for i := 0; i < 10; i++ {
_ = db.Put([]byte(utils.GetTestKey(i)), utils.RandomValue(64))
}
// delete some data
for i := 0; i < 10/2; i++ {
_ = db.Delete([]byte(utils.GetTestKey(i)))
}

// wait for watch goroutine to finish.
time.Sleep(1 * time.Second)
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.19

require (
github.com/rosedblabs/go-immutable-radix/v2 v2.0.1-0.20230614125820-f2a7bc058c90
github.com/rosedblabs/wal v1.3.1-0.20230803104221-cb708139c877
github.com/rosedblabs/wal v1.3.1
)

require (
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ github.com/rosedblabs/go-immutable-radix/v2 v2.0.1-0.20230614125820-f2a7bc058c90
github.com/rosedblabs/go-immutable-radix/v2 v2.0.1-0.20230614125820-f2a7bc058c90/go.mod h1:Hk7adp95/ngEfetvapVWdgneuZb15mi9nH/keSH/KqI=
github.com/rosedblabs/wal v1.3.1-0.20230803104221-cb708139c877 h1:/1pH6cSVst+QcKaBRVXJDuzBLQF4GA+cbfr6tTFkKLQ=
github.com/rosedblabs/wal v1.3.1-0.20230803104221-cb708139c877/go.mod h1:tYh0WapCkDQrID7PNsNHpsZDlkTczJVAFaTySmwaD7U=
github.com/rosedblabs/wal v1.3.1 h1:QFAmnEPGJgUYfbn9WRO/43VkDdXVo9AacRcB0FfCNTI=
github.com/rosedblabs/wal v1.3.1/go.mod h1:tYh0WapCkDQrID7PNsNHpsZDlkTczJVAFaTySmwaD7U=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
golang.org/x/exp v0.0.0-20221215174704-0915cd710c24 h1:6w3iSY8IIkp5OQtbYj8NeuKG1jS9d+kYaubXqsoOiQ8=
Expand Down
2 changes: 1 addition & 1 deletion merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func TestDB_Multi_Open_Merge(t *testing.T) {
for key, value := range kvs {
v, err := db.Get([]byte(key))
assert.Nil(t, err)
assert.Equal(t, string(value), string(v))
assert.Equal(t, value, v)
}
assert.Equal(t, len(kvs), db.index.Size())
}
15 changes: 10 additions & 5 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ type Options struct {

// BytesPerSync specifies the number of bytes to write before calling fsync.
BytesPerSync uint32

// WatchQueueSize the cache length of the watch queue.
// if the size greater than 0, which means enable the watch.
WatchQueueSize uint64
}

// BatchOptions specifies the options for creating a batch.
Expand Down Expand Up @@ -55,11 +59,12 @@ const (
)

var DefaultOptions = Options{
DirPath: tempDBDir(),
SegmentSize: 1 * GB,
BlockCache: 0,
Sync: false,
BytesPerSync: 0,
DirPath: tempDBDir(),
SegmentSize: 1 * GB,
BlockCache: 0,
Sync: false,
BytesPerSync: 0,
WatchQueueSize: 0,
}

var DefaultBatchOptions = BatchOptions{
Expand Down
117 changes: 117 additions & 0 deletions watch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package rosedb

import (
"fmt"
"sync"
"time"
)

type WatchActionType = byte

const (
WatchActionPut WatchActionType = iota
WatchActionDelete
)

// Event
type Event struct {
Action WatchActionType
Key []byte
Value []byte
BatchId uint64
}

func (e *Event) String() string {
return fmt.Sprintf(`
Event{
Action: %d,
Key: %s,
Value: %s,
BatchId: %d
}`,
e.Action,
e.Key,
e.Value,
e.BatchId)
}

// Watcher temporarily stores event information,
// as it is generated until it is synchronized to DB's watch.
//
// If the event is overflow, It will remove the oldest data,
// even if event hasn't been read yet.
type Watcher struct {
queue eventQueue
mu sync.RWMutex
}

func NewWatcher(capacity uint64) *Watcher {
return &Watcher{
queue: eventQueue{
Events: make([]*Event, capacity),
Capacity: capacity,
},
}
}

func (w *Watcher) insertEvent(e *Event) {
w.mu.Lock()
w.queue.push(e)
if w.queue.isFull() {
w.queue.frontTakeAStep()
}
w.mu.Unlock()
}

// getEvent if queue is empty, it will return nil.
func (w *Watcher) getEvent() *Event {
w.mu.RLock()
defer w.mu.RUnlock()
if isEmpty := w.queue.isEmpty(); isEmpty {
return nil
}
e := w.queue.pop()
return e
}

// sendEvent send events to DB's watch
func (w *Watcher) sendEvent(c chan *Event) {
for {
e := w.getEvent()
if e == nil {
time.Sleep(100 * time.Millisecond)
continue
}
c <- e
}
}

type eventQueue struct {
Events []*Event
Capacity uint64
Front uint64 // read point
Back uint64 // write point
}

func (eq *eventQueue) push(e *Event) {
eq.Events[eq.Back] = e
eq.Back = (eq.Back + 1) % eq.Capacity
}

func (eq *eventQueue) pop() *Event {
e := eq.Events[eq.Front]
eq.frontTakeAStep()
return e
}

func (eq *eventQueue) isFull() bool {
return (eq.Back+1)%eq.Capacity == eq.Front
}

func (eq *eventQueue) isEmpty() bool {
return eq.Back == eq.Front
}

func (eq *eventQueue) frontTakeAStep() {
eq.Front = (eq.Front + 1) % eq.Capacity
}
Loading

0 comments on commit 5c32d53

Please sign in to comment.