Skip to content

Commit

Permalink
feat(watcherx): allow requesting current data (#243)
Browse files Browse the repository at this point in the history
  • Loading branch information
zepatrik authored Dec 8, 2020
1 parent 367bd04 commit 9dcf6c2
Show file tree
Hide file tree
Showing 14 changed files with 359 additions and 139 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/windows_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@ jobs:
steps:
- uses: actions/checkout@v2
- run: |
go test -failfast -timeout=20m $(go list ./... | grep -v watcherx | grep -v sqlcon)
go test -failfast -timeout=20m $(go list ./... | grep -v watcherx | grep -v sqlcon | grep -v configx)
shell: bash
2 changes: 1 addition & 1 deletion configx/koanf_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,6 @@ func (f *KoanfFile) Read() (map[string]interface{}, error) {

// WatchChannel watches the file and triggers a callback when it changes. It is a
// blocking function that internally spawns a goroutine to watch for changes.
func (f *KoanfFile) WatchChannel(c watcherx.EventChannel) error {
func (f *KoanfFile) WatchChannel(c watcherx.EventChannel) (watcherx.Watcher, error) {
return watcherx.WatchFile(f.ctx, f.path, c)
}
3 changes: 1 addition & 2 deletions configx/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,14 +235,13 @@ func (p *Provider) addConfigFile(ctx context.Context, path string, k *koanf.Koan
cancel()
cancel = cancelInner
p.runOnChanges(e, nil)
close(c)
span.Finish()
return
}
}
}(c)

if err := fp.WatchChannel(c); err != nil {
if _, err := fp.WatchChannel(c); err != nil {
close(c)
return err
}
Expand Down
4 changes: 4 additions & 0 deletions configx/provider_watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func TestReload(t *testing.T) {

t.Run("case=rejects not validating changes", func(t *testing.T) {
configFile := tmpConfigFile(t, "memory", "bar")
defer configFile.Close()
hook := test.NewLocal(l.Entry.Logger)
wg := new(sync.WaitGroup)
p := setup(t, configFile, wg)
Expand All @@ -92,6 +93,7 @@ func TestReload(t *testing.T) {

t.Run("case=rejects to update immutable", func(t *testing.T) {
configFile := tmpConfigFile(t, "memory", "bar")
defer configFile.Close()
hook := test.NewLocal(l.Entry.Logger)
wg := new(sync.WaitGroup)
p := setup(t, configFile, wg,
Expand All @@ -114,6 +116,7 @@ func TestReload(t *testing.T) {

t.Run("case=runs without validation errors", func(t *testing.T) {
configFile := tmpConfigFile(t, "some string", "bar")
defer configFile.Close()
hook := test.NewLocal(l.Entry.Logger)
wg := new(sync.WaitGroup)
p := setup(t, configFile, wg)
Expand All @@ -125,6 +128,7 @@ func TestReload(t *testing.T) {

t.Run("case=has with validation errors", func(t *testing.T) {
configFile := tmpConfigFile(t, "some string", "not bar")
defer configFile.Close()
hook := test.NewLocal(l.Entry.Logger)

var b bytes.Buffer
Expand Down
34 changes: 29 additions & 5 deletions watcherx/definitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,19 @@ type (
scheme string
}
EventChannel chan Event
Watcher interface {
DispatchNow() error
}
dispatcher struct {
trigger chan struct{}
}
)

// ErrSchemeUnknown is just for checking with errors.Is()
var ErrSchemeUnknown = &errSchemeUnknown{}
var (
// ErrSchemeUnknown is just for checking with errors.Is()
ErrSchemeUnknown = &errSchemeUnknown{}
ErrWatcherNotRunning = fmt.Errorf("watcher is not running")
)

func (e *errSchemeUnknown) Is(other error) bool {
_, ok := other.(*errSchemeUnknown)
Expand All @@ -25,12 +34,27 @@ func (e *errSchemeUnknown) Error() string {
return fmt.Sprintf("unknown scheme '%s' to watch", e.scheme)
}

func Watch(ctx context.Context, u *url.URL, c EventChannel) error {
func newDispatcher() *dispatcher {
return &dispatcher{
trigger: make(chan struct{}),
}
}

func (d *dispatcher) DispatchNow() error {
if d.trigger == nil {
return ErrWatcherNotRunning
}
d.trigger <- struct{}{}
return nil
}

func Watch(ctx context.Context, u *url.URL, c EventChannel) (Watcher, error) {
switch u.Scheme {
case "file":
// see urlx.Parse for why the empty string is also file
case "file", "":
return WatchFile(ctx, u.Path, c)
case "ws":
return WatchWebsocket(ctx, u, c)
}
return &errSchemeUnknown{u.Scheme}
return nil, &errSchemeUnknown{u.Scheme}
}
45 changes: 36 additions & 9 deletions watcherx/directory.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package watcherx

import (
"bytes"
"context"
"io/ioutil"
"os"
Expand All @@ -12,10 +11,10 @@ import (
"github.com/pkg/errors"
)

func WatchDirectory(ctx context.Context, dir string, c EventChannel) error {
func WatchDirectory(ctx context.Context, dir string, c EventChannel) (Watcher, error) {
w, err := fsnotify.NewWatcher()
if err != nil {
return errors.WithStack(err)
return nil, errors.WithStack(err)
}
var subDirs []string
if err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
Expand All @@ -27,15 +26,17 @@ func WatchDirectory(ctx context.Context, dir string, c EventChannel) error {
}
return nil
}); err != nil {
return errors.WithStack(err)
return nil, errors.WithStack(err)
}
for _, d := range append(subDirs, dir) {
if err := w.Add(d); err != nil {
return errors.WithStack(err)
return nil, errors.WithStack(err)
}
}
go streamDirectoryEvents(ctx, w, c)
return nil

d := newDispatcher()
go streamDirectoryEvents(ctx, w, c, d.trigger, dir)
return d, nil
}

func handleEvent(e fsnotify.Event, w *fsnotify.Watcher, c EventChannel) {
Expand Down Expand Up @@ -86,21 +87,47 @@ func handleEvent(e fsnotify.Event, w *fsnotify.Watcher, c EventChannel) {
}
} else {
c <- &ChangeEvent{
data: bytes.NewBuffer(data),
data: data,
source: source(e.Name),
}
}
}
}

func streamDirectoryEvents(ctx context.Context, w *fsnotify.Watcher, c EventChannel) {
func streamDirectoryEvents(ctx context.Context, w *fsnotify.Watcher, c EventChannel, sendNow <-chan struct{}, dir string) {
for {
select {
case <-ctx.Done():
_ = w.Close()
return
case e := <-w.Events:
handleEvent(e, w, c)
case <-sendNow:
if err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.IsDir() {
data, err := ioutil.ReadFile(path)
if err != nil {
c <- &ErrorEvent{
error: err,
source: source(path),
}
} else {
c <- &ChangeEvent{
data: data,
source: source(path),
}
}
}
return nil
}); err != nil {
c <- &ErrorEvent{
error: err,
source: source(dir),
}
}
}
}
}
75 changes: 55 additions & 20 deletions watcherx/directory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ package watcherx

import (
"fmt"
"io/ioutil"
"os"
"path"
"path/filepath"
"testing"
"time"

Expand All @@ -15,8 +16,9 @@ func TestWatchDirectory(t *testing.T) {
ctx, c, dir, cancel := setup(t)
defer cancel()

require.NoError(t, WatchDirectory(ctx, dir, c))
fileName := path.Join(dir, "example")
_, err := WatchDirectory(ctx, dir, c)
require.NoError(t, err)
fileName := filepath.Join(dir, "example")
f, err := os.Create(fileName)
require.NoError(t, err)
require.NoError(t, f.Close())
Expand All @@ -28,10 +30,11 @@ func TestWatchDirectory(t *testing.T) {
ctx, c, dir, cancel := setup(t)
defer cancel()

fileName := path.Join(dir, "example")
fileName := filepath.Join(dir, "example")
f, err := os.Create(fileName)
require.NoError(t, err)
require.NoError(t, WatchDirectory(ctx, dir, c))
_, err = WatchDirectory(ctx, dir, c)
require.NoError(t, err)

_, err = fmt.Fprintf(f, "content")
require.NoError(t, f.Close())
Expand All @@ -43,12 +46,13 @@ func TestWatchDirectory(t *testing.T) {
ctx, c, dir, cancel := setup(t)
defer cancel()

fileName := path.Join(dir, "example")
fileName := filepath.Join(dir, "example")
f, err := os.Create(fileName)
require.NoError(t, err)
require.NoError(t, f.Close())

require.NoError(t, WatchDirectory(ctx, dir, c))
_, err = WatchDirectory(ctx, dir, c)
require.NoError(t, err)
require.NoError(t, os.Remove(fileName))

assertRemove(t, <-c, fileName)
Expand All @@ -58,12 +62,13 @@ func TestWatchDirectory(t *testing.T) {
ctx, c, dir, cancel := setup(t)
defer cancel()

childDir := path.Join(dir, "child")
childDir := filepath.Join(dir, "child")
require.NoError(t, os.Mkdir(childDir, 0777))

require.NoError(t, WatchDirectory(ctx, dir, c))
_, err := WatchDirectory(ctx, dir, c)
require.NoError(t, err)

fileName := path.Join(childDir, "example")
fileName := filepath.Join(childDir, "example")
f, err := os.Create(fileName)
require.NoError(t, err)
require.NoError(t, f.Close())
Expand All @@ -75,11 +80,12 @@ func TestWatchDirectory(t *testing.T) {
ctx, c, dir, cancel := setup(t)
defer cancel()

require.NoError(t, WatchDirectory(ctx, dir, c))
_, err := WatchDirectory(ctx, dir, c)
require.NoError(t, err)

childDir := path.Join(dir, "child")
childDir := filepath.Join(dir, "child")
require.NoError(t, os.Mkdir(childDir, 0777))
fileName := path.Join(childDir, "example")
fileName := filepath.Join(childDir, "example")
// there's not much we can do about this timeout as it takes some time until the new watcher is created
time.Sleep(time.Millisecond)
f, err := os.Create(fileName)
Expand All @@ -93,10 +99,11 @@ func TestWatchDirectory(t *testing.T) {
ctx, c, dir, cancel := setup(t)
defer cancel()

childDir := path.Join(dir, "child")
childDir := filepath.Join(dir, "child")
require.NoError(t, os.Mkdir(childDir, 0777))

require.NoError(t, WatchDirectory(ctx, dir, c))
_, err := WatchDirectory(ctx, dir, c)
require.NoError(t, err)

require.NoError(t, os.Remove(childDir))

Expand All @@ -113,19 +120,20 @@ func TestWatchDirectory(t *testing.T) {
ctx, c, dir, cancel := setup(t)
defer cancel()

childDir := path.Join(dir, "child")
subChildDir := path.Join(childDir, "subchild")
childDir := filepath.Join(dir, "child")
subChildDir := filepath.Join(childDir, "subchild")
require.NoError(t, os.MkdirAll(subChildDir, 0777))
f1 := path.Join(subChildDir, "f1")
f1 := filepath.Join(subChildDir, "f1")
f, err := os.Create(f1)
require.NoError(t, err)
require.NoError(t, f.Close())
f2 := path.Join(childDir, "f2")
f2 := filepath.Join(childDir, "f2")
f, err = os.Create(f2)
require.NoError(t, err)
require.NoError(t, f.Close())

require.NoError(t, WatchDirectory(ctx, dir, c))
_, err = WatchDirectory(ctx, dir, c)
require.NoError(t, err)

require.NoError(t, os.RemoveAll(childDir))

Expand All @@ -136,4 +144,31 @@ func TestWatchDirectory(t *testing.T) {
assertRemove(t, events[0], f2)
assertRemove(t, events[1], f1)
})

t.Run("case=sends event when requested", func(t *testing.T) {
ctx, c, dir, cancel := setup(t)
defer cancel()

files := map[string]string{
"a": "foo",
"b": "bar",
"c": "baz",
filepath.Join("d", "a"): "sub dir content",
}
for fn, fc := range files {
fp := filepath.Join(dir, fn)
require.NoError(t, os.MkdirAll(filepath.Dir(fp), 0700))
require.NoError(t, ioutil.WriteFile(fp, []byte(fc), 0600))
}

d, err := WatchDirectory(ctx, dir, c)
require.NoError(t, err)
require.NoError(t, d.DispatchNow())

// because filepath.WalkDir walks lexicographically, we can assume the events come in lex order
assertChange(t, <-c, files["a"], filepath.Join(dir, "a"))
assertChange(t, <-c, files["b"], filepath.Join(dir, "b"))
assertChange(t, <-c, files["c"], filepath.Join(dir, "c"))
assertChange(t, <-c, files[filepath.Join("d", "a")], filepath.Join(dir, "d", "a"))
})
}
Loading

0 comments on commit 9dcf6c2

Please sign in to comment.