diff --git a/watcherx/definitions.go b/watcherx/definitions.go index d5fcbd05..ce674687 100644 --- a/watcherx/definitions.go +++ b/watcherx/definitions.go @@ -12,10 +12,11 @@ type ( } EventChannel chan Event Watcher interface { - DispatchNow() error + DispatchNow() (<-chan int, error) } dispatcher struct { trigger chan struct{} + done chan int } ) @@ -37,15 +38,16 @@ func (e *errSchemeUnknown) Error() string { func newDispatcher() *dispatcher { return &dispatcher{ trigger: make(chan struct{}), + done: make(chan int), } } -func (d *dispatcher) DispatchNow() error { +func (d *dispatcher) DispatchNow() (<-chan int, error) { if d.trigger == nil { - return ErrWatcherNotRunning + return nil, ErrWatcherNotRunning } d.trigger <- struct{}{} - return nil + return d.done, nil } func Watch(ctx context.Context, u *url.URL, c EventChannel) (Watcher, error) { diff --git a/watcherx/directory.go b/watcherx/directory.go index 72118be3..cc2a708a 100644 --- a/watcherx/directory.go +++ b/watcherx/directory.go @@ -35,7 +35,7 @@ func WatchDirectory(ctx context.Context, dir string, c EventChannel) (Watcher, e } d := newDispatcher() - go streamDirectoryEvents(ctx, w, c, d.trigger, dir) + go streamDirectoryEvents(ctx, w, c, d.trigger, d.done, dir) return d, nil } @@ -94,7 +94,7 @@ func handleEvent(e fsnotify.Event, w *fsnotify.Watcher, c EventChannel) { } } -func streamDirectoryEvents(ctx context.Context, w *fsnotify.Watcher, c EventChannel, sendNow <-chan struct{}, dir string) { +func streamDirectoryEvents(ctx context.Context, w *fsnotify.Watcher, c EventChannel, sendNow <-chan struct{}, sendNowDone chan<- int, dir string) { for { select { case <-ctx.Done(): @@ -103,6 +103,8 @@ func streamDirectoryEvents(ctx context.Context, w *fsnotify.Watcher, c EventChan case e := <-w.Events: handleEvent(e, w, c) case <-sendNow: + var eventsSent int + if err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { if err != nil { return err @@ -120,6 +122,7 @@ func streamDirectoryEvents(ctx context.Context, w *fsnotify.Watcher, c EventChan source: source(path), } } + eventsSent++ } return nil }); err != nil { @@ -127,7 +130,10 @@ func streamDirectoryEvents(ctx context.Context, w *fsnotify.Watcher, c EventChan error: err, source: source(dir), } + eventsSent++ } + + sendNowDone <- eventsSent } } } diff --git a/watcherx/directory_test.go b/watcherx/directory_test.go index 02300f0d..969c234b 100644 --- a/watcherx/directory_test.go +++ b/watcherx/directory_test.go @@ -8,6 +8,8 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) @@ -149,6 +151,9 @@ func TestWatchDirectory(t *testing.T) { ctx, c, dir, cancel := setup(t) defer cancel() + // buffered channel to allow usage of DispatchNow().done + c = make(EventChannel, 4) + files := map[string]string{ "a": "foo", "b": "bar", @@ -163,7 +168,17 @@ func TestWatchDirectory(t *testing.T) { d, err := WatchDirectory(ctx, dir, c) require.NoError(t, err) - require.NoError(t, d.DispatchNow()) + done, err := d.DispatchNow() + require.NoError(t, err) + + // wait for d.DispatchNow to be done + select { + case <-time.After(time.Second): + t.Log("Waiting for done timed out.") + t.FailNow() + case eventsSend := <-done: + assert.Equal(t, 4, eventsSend) + } // because filepath.WalkDir walks lexicographically, we can assume the events come in lex order assertChange(t, <-c, files["a"], filepath.Join(dir, "a")) diff --git a/watcherx/file.go b/watcherx/file.go index d02754fc..bfac4107 100644 --- a/watcherx/file.go +++ b/watcherx/file.go @@ -36,13 +36,13 @@ func WatchFile(ctx context.Context, file string, c EventChannel) (Watcher, error } } d := newDispatcher() - go streamFileEvents(ctx, watcher, c, d.trigger, file, resolvedFile) + go streamFileEvents(ctx, watcher, c, d.trigger, d.done, file, resolvedFile) return d, nil } // streamFileEvents watches for file changes and supports symlinks which requires several workarounds due to limitations of fsnotify. // Argument `resolvedFile` is the resolved symlink path of the file, or it is the watchedFile name itself. If `resolvedFile` is empty, then the watchedFile does not exist. -func streamFileEvents(ctx context.Context, watcher *fsnotify.Watcher, c EventChannel, sendNow <-chan struct{}, watchedFile, resolvedFile string) { +func streamFileEvents(ctx context.Context, watcher *fsnotify.Watcher, c EventChannel, sendNow <-chan struct{}, sendNowDone chan<- int, watchedFile, resolvedFile string) { eventSource := source(watchedFile) removeDirectFileWatcher := func() { _ = watcher.Remove(watchedFile) @@ -84,6 +84,9 @@ func streamFileEvents(ctx context.Context, watcher *fsnotify.Watcher, c EventCha source: eventSource, } } + + // in any of the above cases we send exactly one event + sendNowDone <- 1 case e, ok := <-watcher.Events: if !ok { close(c) diff --git a/watcherx/file_test.go b/watcherx/file_test.go index e3a4a52c..06e9fbb1 100644 --- a/watcherx/file_test.go +++ b/watcherx/file_test.go @@ -8,6 +8,7 @@ import ( "path/filepath" "runtime" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -208,13 +209,26 @@ func TestFileWatcher(t *testing.T) { ctx, c, dir, cancel := setup(t) defer cancel() + // buffered channel to allow usage of DispatchNow().done + c = make(EventChannel, 1) + fn := filepath.Join(dir, "example.file") initialContent := "initial content" require.NoError(t, ioutil.WriteFile(fn, []byte(initialContent), 0600)) d, err := WatchFile(ctx, fn, c) require.NoError(t, err) - require.NoError(t, d.DispatchNow()) + done, err := d.DispatchNow() + require.NoError(t, err) + + // wait for d.DispatchNow to be done + select { + case <-time.After(time.Second): + t.Log("Waiting for done timed out.") + t.FailNow() + case eventsSend := <-done: + assert.Equal(t, 1, eventsSend) + } assertChange(t, <-c, initialContent, fn) }) diff --git a/watcherx/websocket_client.go b/watcherx/websocket_client.go index 88b1ce45..2c228088 100644 --- a/watcherx/websocket_client.go +++ b/watcherx/websocket_client.go @@ -2,6 +2,7 @@ package watcherx import ( "context" + "fmt" "net" "net/url" "strings" @@ -19,10 +20,10 @@ func WatchWebsocket(ctx context.Context, u *url.URL, c EventChannel) (Watcher, e wsClosed := make(chan struct{}) go cleanupOnDone(ctx, conn, c, wsClosed) - go forwardWebsocketEvents(conn, c, u, wsClosed) - d := newDispatcher() + go forwardWebsocketEvents(conn, c, u, wsClosed, d.done) + go forwardDispatchNow(ctx, conn, c, d.trigger, u.String()) return d, nil @@ -43,7 +44,7 @@ func cleanupOnDone(ctx context.Context, conn *websocket.Conn, c EventChannel, ws _ = conn.Close() } -func forwardWebsocketEvents(ws *websocket.Conn, c EventChannel, u *url.URL, wsClosed chan<- struct{}) { +func forwardWebsocketEvents(ws *websocket.Conn, c EventChannel, u *url.URL, wsClosed chan<- struct{}, sendNowDone chan<- int) { serverURL := source(u.String()) defer func() { @@ -68,6 +69,14 @@ func forwardWebsocketEvents(ws *websocket.Conn, c EventChannel, u *url.URL, wsCl } return } + + var eventsSend int + _, err = fmt.Sscanf(string(msg), messageSendNowDone, &eventsSend) + if err == nil { + sendNowDone <- eventsSend + continue + } + e, err := unmarshalEvent(msg) if err != nil { c <- &ErrorEvent{ diff --git a/watcherx/websocket_server.go b/watcherx/websocket_server.go index 3b86ee22..3efe25f2 100644 --- a/watcherx/websocket_server.go +++ b/watcherx/websocket_server.go @@ -2,6 +2,7 @@ package watcherx import ( "context" + "fmt" "net" "net/http" "net/url" @@ -19,11 +20,16 @@ type ( cs []EventChannel } websocketWatcher struct { + wsWriteLock sync.Mutex + wsReadLock sync.Mutex wsClientChannels eventChannelSlice } ) -const messageSendNow = "send values now" +const ( + messageSendNow = "send values now" + messageSendNowDone = "done sending %d values" +) func WatchAndServeWS(ctx context.Context, u *url.URL, writer herodot.Writer) (http.HandlerFunc, error) { c := make(EventChannel) @@ -53,20 +59,36 @@ func (ww *websocketWatcher) broadcaster(ctx context.Context, c EventChannel) { } } -func readWebsocket(ws *websocket.Conn, c chan<- struct{}, watcher Watcher) { +func (ww *websocketWatcher) readWebsocket(ws *websocket.Conn, c chan<- struct{}, watcher Watcher) { for { // blocking call to ReadMessage that waits for a close message + ww.wsReadLock.Lock() _, msg, err := ws.ReadMessage() + ww.wsReadLock.Unlock() + switch errTyped := err.(type) { case nil: if string(msg) == messageSendNow { - if err := watcher.DispatchNow(); err != nil { - // we cant do much about this error and rely on the other + done, err := watcher.DispatchNow() + if err != nil { + // we cant do much about this error + ww.wsWriteLock.Lock() _ = ws.WriteJSON(&ErrorEvent{ error: err, source: "", }) + ww.wsWriteLock.Unlock() } + + go func() { + eventsSend := <-done + + ww.wsWriteLock.Lock() + defer ww.wsWriteLock.Unlock() + + // we cant do much about this error + _ = ws.WriteMessage(websocket.TextMessage, []byte(fmt.Sprintf(messageSendNowDone, eventsSend))) + }() } case *websocket.CloseError: if errTyped.Code == websocket.CloseNormalClosure { @@ -104,12 +126,15 @@ func (ww *websocketWatcher) serveWS(ctx context.Context, writer herodot.Writer, ww.wsClientChannels.Unlock() wsClosed := make(chan struct{}) - go readWebsocket(ws, wsClosed, watcher) + go ww.readWebsocket(ws, wsClosed, watcher) defer func() { // attempt to close the websocket // ignore errors as we are closing everything anyway + ww.wsWriteLock.Lock() _ = ws.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "server context canceled")) + ww.wsWriteLock.Unlock() + _ = ws.Close() ww.wsClientChannels.Lock() @@ -134,7 +159,12 @@ func (ww *websocketWatcher) serveWS(ctx context.Context, writer herodot.Writer, if !ok { return } - if err := ws.WriteJSON(e); err != nil { + + ww.wsWriteLock.Lock() + err := ws.WriteJSON(e) + ww.wsWriteLock.Unlock() + + if err != nil { return } } diff --git a/watcherx/websocket_test.go b/watcherx/websocket_test.go index 700eb01c..330fd48e 100644 --- a/watcherx/websocket_test.go +++ b/watcherx/websocket_test.go @@ -9,6 +9,7 @@ import ( "path/filepath" "strings" "testing" + "time" "github.com/sirupsen/logrus/hooks/test" "github.com/stretchr/testify/assert" @@ -171,6 +172,9 @@ func TestWatchWebsocket(t *testing.T) { ctxServer, c, dir, cancel := setup(t) defer cancel() + // buffered channel to allow usage of DispatchNow().done + c = make(EventChannel, 1) + l, hook := test.NewNullLogger() fn := filepath.Join(dir, "some.file") @@ -187,7 +191,17 @@ func TestWatchWebsocket(t *testing.T) { u := urlx.ParseOrPanic("ws" + strings.TrimLeft(s.URL, "http")) d, err := WatchWebsocket(ctxClient, u, c) require.NoError(t, err) - require.NoError(t, d.DispatchNow()) + done, err := d.DispatchNow() + require.NoError(t, err) + + // wait for d.DispatchNow to be done + select { + case <-time.After(time.Second): + t.Logf("Waiting for done timed out. %+v", <-c) + t.FailNow() + case eventsSend := <-done: + assert.Equal(t, 1, eventsSend) + } assertChange(t, <-c, initialContent, u.String()+fn)