Skip to content

Commit

Permalink
feat(watcherx): notify dispatcher when load trigger is done (#244)
Browse files Browse the repository at this point in the history
  • Loading branch information
zepatrik authored Dec 10, 2020
1 parent 9dcf6c2 commit 1068895
Show file tree
Hide file tree
Showing 8 changed files with 113 additions and 20 deletions.
10 changes: 6 additions & 4 deletions watcherx/definitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ type (
}
EventChannel chan Event
Watcher interface {
DispatchNow() error
DispatchNow() (<-chan int, error)
}
dispatcher struct {
trigger chan struct{}
done chan int
}
)

Expand All @@ -37,15 +38,16 @@ func (e *errSchemeUnknown) Error() string {
func newDispatcher() *dispatcher {
return &dispatcher{
trigger: make(chan struct{}),
done: make(chan int),
}
}

func (d *dispatcher) DispatchNow() error {
func (d *dispatcher) DispatchNow() (<-chan int, error) {
if d.trigger == nil {
return ErrWatcherNotRunning
return nil, ErrWatcherNotRunning
}
d.trigger <- struct{}{}
return nil
return d.done, nil
}

func Watch(ctx context.Context, u *url.URL, c EventChannel) (Watcher, error) {
Expand Down
10 changes: 8 additions & 2 deletions watcherx/directory.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func WatchDirectory(ctx context.Context, dir string, c EventChannel) (Watcher, e
}

d := newDispatcher()
go streamDirectoryEvents(ctx, w, c, d.trigger, dir)
go streamDirectoryEvents(ctx, w, c, d.trigger, d.done, dir)
return d, nil
}

Expand Down Expand Up @@ -94,7 +94,7 @@ func handleEvent(e fsnotify.Event, w *fsnotify.Watcher, c EventChannel) {
}
}

func streamDirectoryEvents(ctx context.Context, w *fsnotify.Watcher, c EventChannel, sendNow <-chan struct{}, dir string) {
func streamDirectoryEvents(ctx context.Context, w *fsnotify.Watcher, c EventChannel, sendNow <-chan struct{}, sendNowDone chan<- int, dir string) {
for {
select {
case <-ctx.Done():
Expand All @@ -103,6 +103,8 @@ func streamDirectoryEvents(ctx context.Context, w *fsnotify.Watcher, c EventChan
case e := <-w.Events:
handleEvent(e, w, c)
case <-sendNow:
var eventsSent int

if err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
Expand All @@ -120,14 +122,18 @@ func streamDirectoryEvents(ctx context.Context, w *fsnotify.Watcher, c EventChan
source: source(path),
}
}
eventsSent++
}
return nil
}); err != nil {
c <- &ErrorEvent{
error: err,
source: source(dir),
}
eventsSent++
}

sendNowDone <- eventsSent
}
}
}
17 changes: 16 additions & 1 deletion watcherx/directory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -149,6 +151,9 @@ func TestWatchDirectory(t *testing.T) {
ctx, c, dir, cancel := setup(t)
defer cancel()

// buffered channel to allow usage of DispatchNow().done
c = make(EventChannel, 4)

files := map[string]string{
"a": "foo",
"b": "bar",
Expand All @@ -163,7 +168,17 @@ func TestWatchDirectory(t *testing.T) {

d, err := WatchDirectory(ctx, dir, c)
require.NoError(t, err)
require.NoError(t, d.DispatchNow())
done, err := d.DispatchNow()
require.NoError(t, err)

// wait for d.DispatchNow to be done
select {
case <-time.After(time.Second):
t.Log("Waiting for done timed out.")
t.FailNow()
case eventsSend := <-done:
assert.Equal(t, 4, eventsSend)
}

// because filepath.WalkDir walks lexicographically, we can assume the events come in lex order
assertChange(t, <-c, files["a"], filepath.Join(dir, "a"))
Expand Down
7 changes: 5 additions & 2 deletions watcherx/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ func WatchFile(ctx context.Context, file string, c EventChannel) (Watcher, error
}
}
d := newDispatcher()
go streamFileEvents(ctx, watcher, c, d.trigger, file, resolvedFile)
go streamFileEvents(ctx, watcher, c, d.trigger, d.done, file, resolvedFile)
return d, nil
}

// streamFileEvents watches for file changes and supports symlinks which requires several workarounds due to limitations of fsnotify.
// Argument `resolvedFile` is the resolved symlink path of the file, or it is the watchedFile name itself. If `resolvedFile` is empty, then the watchedFile does not exist.
func streamFileEvents(ctx context.Context, watcher *fsnotify.Watcher, c EventChannel, sendNow <-chan struct{}, watchedFile, resolvedFile string) {
func streamFileEvents(ctx context.Context, watcher *fsnotify.Watcher, c EventChannel, sendNow <-chan struct{}, sendNowDone chan<- int, watchedFile, resolvedFile string) {
eventSource := source(watchedFile)
removeDirectFileWatcher := func() {
_ = watcher.Remove(watchedFile)
Expand Down Expand Up @@ -84,6 +84,9 @@ func streamFileEvents(ctx context.Context, watcher *fsnotify.Watcher, c EventCha
source: eventSource,
}
}

// in any of the above cases we send exactly one event
sendNowDone <- 1
case e, ok := <-watcher.Events:
if !ok {
close(c)
Expand Down
16 changes: 15 additions & 1 deletion watcherx/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"path/filepath"
"runtime"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -208,13 +209,26 @@ func TestFileWatcher(t *testing.T) {
ctx, c, dir, cancel := setup(t)
defer cancel()

// buffered channel to allow usage of DispatchNow().done
c = make(EventChannel, 1)

fn := filepath.Join(dir, "example.file")
initialContent := "initial content"
require.NoError(t, ioutil.WriteFile(fn, []byte(initialContent), 0600))

d, err := WatchFile(ctx, fn, c)
require.NoError(t, err)
require.NoError(t, d.DispatchNow())
done, err := d.DispatchNow()
require.NoError(t, err)

// wait for d.DispatchNow to be done
select {
case <-time.After(time.Second):
t.Log("Waiting for done timed out.")
t.FailNow()
case eventsSend := <-done:
assert.Equal(t, 1, eventsSend)
}

assertChange(t, <-c, initialContent, fn)
})
Expand Down
15 changes: 12 additions & 3 deletions watcherx/websocket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package watcherx

import (
"context"
"fmt"
"net"
"net/url"
"strings"
Expand All @@ -19,10 +20,10 @@ func WatchWebsocket(ctx context.Context, u *url.URL, c EventChannel) (Watcher, e
wsClosed := make(chan struct{})
go cleanupOnDone(ctx, conn, c, wsClosed)

go forwardWebsocketEvents(conn, c, u, wsClosed)

d := newDispatcher()

go forwardWebsocketEvents(conn, c, u, wsClosed, d.done)

go forwardDispatchNow(ctx, conn, c, d.trigger, u.String())

return d, nil
Expand All @@ -43,7 +44,7 @@ func cleanupOnDone(ctx context.Context, conn *websocket.Conn, c EventChannel, ws
_ = conn.Close()
}

func forwardWebsocketEvents(ws *websocket.Conn, c EventChannel, u *url.URL, wsClosed chan<- struct{}) {
func forwardWebsocketEvents(ws *websocket.Conn, c EventChannel, u *url.URL, wsClosed chan<- struct{}, sendNowDone chan<- int) {
serverURL := source(u.String())

defer func() {
Expand All @@ -68,6 +69,14 @@ func forwardWebsocketEvents(ws *websocket.Conn, c EventChannel, u *url.URL, wsCl
}
return
}

var eventsSend int
_, err = fmt.Sscanf(string(msg), messageSendNowDone, &eventsSend)
if err == nil {
sendNowDone <- eventsSend
continue
}

e, err := unmarshalEvent(msg)
if err != nil {
c <- &ErrorEvent{
Expand Down
42 changes: 36 additions & 6 deletions watcherx/websocket_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package watcherx

import (
"context"
"fmt"
"net"
"net/http"
"net/url"
Expand All @@ -19,11 +20,16 @@ type (
cs []EventChannel
}
websocketWatcher struct {
wsWriteLock sync.Mutex
wsReadLock sync.Mutex
wsClientChannels eventChannelSlice
}
)

const messageSendNow = "send values now"
const (
messageSendNow = "send values now"
messageSendNowDone = "done sending %d values"
)

func WatchAndServeWS(ctx context.Context, u *url.URL, writer herodot.Writer) (http.HandlerFunc, error) {
c := make(EventChannel)
Expand Down Expand Up @@ -53,20 +59,36 @@ func (ww *websocketWatcher) broadcaster(ctx context.Context, c EventChannel) {
}
}

func readWebsocket(ws *websocket.Conn, c chan<- struct{}, watcher Watcher) {
func (ww *websocketWatcher) readWebsocket(ws *websocket.Conn, c chan<- struct{}, watcher Watcher) {
for {
// blocking call to ReadMessage that waits for a close message
ww.wsReadLock.Lock()
_, msg, err := ws.ReadMessage()
ww.wsReadLock.Unlock()

switch errTyped := err.(type) {
case nil:
if string(msg) == messageSendNow {
if err := watcher.DispatchNow(); err != nil {
// we cant do much about this error and rely on the other
done, err := watcher.DispatchNow()
if err != nil {
// we cant do much about this error
ww.wsWriteLock.Lock()
_ = ws.WriteJSON(&ErrorEvent{
error: err,
source: "",
})
ww.wsWriteLock.Unlock()
}

go func() {
eventsSend := <-done

ww.wsWriteLock.Lock()
defer ww.wsWriteLock.Unlock()

// we cant do much about this error
_ = ws.WriteMessage(websocket.TextMessage, []byte(fmt.Sprintf(messageSendNowDone, eventsSend)))
}()
}
case *websocket.CloseError:
if errTyped.Code == websocket.CloseNormalClosure {
Expand Down Expand Up @@ -104,12 +126,15 @@ func (ww *websocketWatcher) serveWS(ctx context.Context, writer herodot.Writer,
ww.wsClientChannels.Unlock()

wsClosed := make(chan struct{})
go readWebsocket(ws, wsClosed, watcher)
go ww.readWebsocket(ws, wsClosed, watcher)

defer func() {
// attempt to close the websocket
// ignore errors as we are closing everything anyway
ww.wsWriteLock.Lock()
_ = ws.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "server context canceled"))
ww.wsWriteLock.Unlock()

_ = ws.Close()

ww.wsClientChannels.Lock()
Expand All @@ -134,7 +159,12 @@ func (ww *websocketWatcher) serveWS(ctx context.Context, writer herodot.Writer,
if !ok {
return
}
if err := ws.WriteJSON(e); err != nil {

ww.wsWriteLock.Lock()
err := ws.WriteJSON(e)
ww.wsWriteLock.Unlock()

if err != nil {
return
}
}
Expand Down
16 changes: 15 additions & 1 deletion watcherx/websocket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"path/filepath"
"strings"
"testing"
"time"

"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -171,6 +172,9 @@ func TestWatchWebsocket(t *testing.T) {
ctxServer, c, dir, cancel := setup(t)
defer cancel()

// buffered channel to allow usage of DispatchNow().done
c = make(EventChannel, 1)

l, hook := test.NewNullLogger()

fn := filepath.Join(dir, "some.file")
Expand All @@ -187,7 +191,17 @@ func TestWatchWebsocket(t *testing.T) {
u := urlx.ParseOrPanic("ws" + strings.TrimLeft(s.URL, "http"))
d, err := WatchWebsocket(ctxClient, u, c)
require.NoError(t, err)
require.NoError(t, d.DispatchNow())
done, err := d.DispatchNow()
require.NoError(t, err)

// wait for d.DispatchNow to be done
select {
case <-time.After(time.Second):
t.Logf("Waiting for done timed out. %+v", <-c)
t.FailNow()
case eventsSend := <-done:
assert.Equal(t, 1, eventsSend)
}

assertChange(t, <-c, initialContent, u.String()+fn)

Expand Down

0 comments on commit 1068895

Please sign in to comment.