From ad8fca255b794b75e7fa3ec967c60d9498797930 Mon Sep 17 00:00:00 2001 From: Mohamed MHAMDI Date: Wed, 26 Apr 2023 02:19:52 +0200 Subject: [PATCH] fix(config): fix file source watcher stop behavior when Stop is called (#2630) Co-authored-by: Mohamed MHAMDI --- config/source/file/watcher.go | 34 ++++---- config/source/file/watcher_linux.go | 33 ++++---- config/source/file/watcher_test.go | 120 ++++++++++++++++++++++++++++ go.mod | 2 +- go.sum | 3 + 5 files changed, 157 insertions(+), 35 deletions(-) create mode 100644 config/source/file/watcher_test.go diff --git a/config/source/file/watcher.go b/config/source/file/watcher.go index 4ae16736..5254a7a3 100644 --- a/config/source/file/watcher.go +++ b/config/source/file/watcher.go @@ -13,8 +13,7 @@ import ( type watcher struct { f *file - fw *fsnotify.Watcher - exit chan bool + fw *fsnotify.Watcher } func newWatcher(f *file) (source.Watcher, error) { @@ -26,24 +25,21 @@ func newWatcher(f *file) (source.Watcher, error) { fw.Add(f.path) return &watcher{ - f: f, - fw: fw, - exit: make(chan bool), + f: f, + fw: fw, }, nil } func (w *watcher) Next() (*source.ChangeSet, error) { - // is it closed? - select { - case <-w.exit: - return nil, source.ErrWatcherStopped - default: - } - // try get the event select { - case event, _ := <-w.fw.Events: - if event.Op == fsnotify.Rename { + case event, ok := <-w.fw.Events: + // check if channel was closed (i.e. Watcher.Close() was called). + if !ok { + return nil, source.ErrWatcherStopped + } + + if event.Has(fsnotify.Rename) { // check existence of file, and add watch again _, err := os.Stat(event.Name) if err == nil || os.IsExist(err) { @@ -55,11 +51,15 @@ func (w *watcher) Next() (*source.ChangeSet, error) { if err != nil { return nil, err } + return c, nil - case err := <-w.fw.Errors: + case err, ok := <-w.fw.Errors: + // check if channel was closed (i.e. Watcher.Close() was called). + if !ok { + return nil, source.ErrWatcherStopped + } + return nil, err - case <-w.exit: - return nil, source.ErrWatcherStopped } } diff --git a/config/source/file/watcher_linux.go b/config/source/file/watcher_linux.go index 36b0ed39..0bebeef0 100644 --- a/config/source/file/watcher_linux.go +++ b/config/source/file/watcher_linux.go @@ -13,8 +13,7 @@ import ( type watcher struct { f *file - fw *fsnotify.Watcher - exit chan bool + fw *fsnotify.Watcher } func newWatcher(f *file) (source.Watcher, error) { @@ -26,24 +25,21 @@ func newWatcher(f *file) (source.Watcher, error) { fw.Add(f.path) return &watcher{ - f: f, - fw: fw, - exit: make(chan bool), + f: f, + fw: fw, }, nil } func (w *watcher) Next() (*source.ChangeSet, error) { - // is it closed? - select { - case <-w.exit: - return nil, source.ErrWatcherStopped - default: - } - // try get the event select { - case event := <-w.fw.Events: - if event.Op == fsnotify.Rename { + case event, ok := <-w.fw.Events: + // check if channel was closed (i.e. Watcher.Close() was called). + if !ok { + return nil, source.ErrWatcherStopped + } + + if event.Has(fsnotify.Rename) { // check existence of file, and add watch again _, err := os.Stat(event.Name) if err == nil || os.IsExist(err) { @@ -60,10 +56,13 @@ func (w *watcher) Next() (*source.ChangeSet, error) { w.fw.Add(w.f.path) return c, nil - case err := <-w.fw.Errors: + case err, ok := <-w.fw.Errors: + // check if channel was closed (i.e. Watcher.Close() was called). + if !ok { + return nil, source.ErrWatcherStopped + } + return nil, err - case <-w.exit: - return nil, source.ErrWatcherStopped } } diff --git a/config/source/file/watcher_test.go b/config/source/file/watcher_test.go new file mode 100644 index 00000000..177b0f3c --- /dev/null +++ b/config/source/file/watcher_test.go @@ -0,0 +1,120 @@ +package file_test + +import ( + "bytes" + "errors" + "fmt" + "os" + "path/filepath" + "testing" + "time" + + "go-micro.dev/v4/config/source" + "go-micro.dev/v4/config/source/file" +) + +// createTestFile a local helper to creates a temporary file with the given data +func createTestFile(data []byte) (*os.File, func(), string, error) { + path := filepath.Join(os.TempDir(), fmt.Sprintf("file.%d", time.Now().UnixNano())) + fh, err := os.Create(path) + if err != nil { + return nil, func() {}, "", err + } + + _, err = fh.Write(data) + if err != nil { + return nil, func() {}, "", err + } + + return fh, func() { + fh.Close() + os.Remove(path) + }, path, err +} + +func TestWatcher(t *testing.T) { + data := []byte(`{"foo": "bar"}`) + fh, cleanup, path, err := createTestFile(data) + if err != nil { + t.Error(err) + } + defer cleanup() + + f := file.NewSource(file.WithPath(path)) + if err != nil { + t.Error(err) + } + + // create a watcher + w, err := f.Watch() + if err != nil { + t.Error(err) + } + + newdata := []byte(`{"foo": "baz"}`) + + go func() { + sc, err := w.Next() + if err != nil { + t.Error(err) + return + } + + if !bytes.Equal(sc.Data, newdata) { + t.Error("expected data to be different") + } + }() + + // rewrite to the file to trigger a change + _, err = fh.WriteAt(newdata, 0) + if err != nil { + t.Error(err) + } + + // wait for the underlying watcher to detect changes + time.Sleep(time.Second) +} + +func TestWatcherStop(t *testing.T) { + data := []byte(`{"foo": "bar"}`) + _, cleanup, path, err := createTestFile(data) + if err != nil { + t.Error(err) + } + defer cleanup() + + src := file.NewSource(file.WithPath(path)) + if err != nil { + t.Error(err) + } + + // create a watcher + w, err := src.Watch() + if err != nil { + t.Error(err) + } + + defer func() { + var err error + c := make(chan struct{}) + defer close(c) + + go func() { + _, err = w.Next() + c <- struct{}{} + }() + + select { + case <-time.After(2 * time.Second): + err = errors.New("timeout waiting for Watcher.Next() to return") + case <-c: + } + + if !errors.Is(err, source.ErrWatcherStopped) { + t.Error(err) + } + }() + + // stop the watcher + w.Stop() +} diff --git a/go.mod b/go.mod index 3e8c71c1..98930281 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/bitly/go-simplejson v0.5.0 github.com/ef-ds/deque v1.0.4 github.com/evanphx/json-patch/v5 v5.5.0 - github.com/fsnotify/fsnotify v1.4.9 + github.com/fsnotify/fsnotify v1.6.0 github.com/fsouza/go-dockerclient v1.7.3 github.com/go-acme/lego/v4 v4.4.0 github.com/go-git/go-git/v5 v5.4.2 diff --git a/go.sum b/go.sum index d9dd0943..1c1a925d 100644 --- a/go.sum +++ b/go.sum @@ -339,6 +339,8 @@ github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= +github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= +github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= github.com/fsouza/go-dockerclient v1.7.3 h1:i6iMcktl688vsKUEExA6gU1UjPgIvmGtJeQ0mbuFqZo= github.com/fsouza/go-dockerclient v1.7.3/go.mod h1:8xfZB8o9SptLNJ13VoV5pMiRbZGWkU/Omu5VOu/KC9Y= github.com/fullsailor/pkcs7 v0.0.0-20190404230743-d7302db945fa/go.mod h1:KnogPXtdwXqoenmZCw6S+25EAm2MkxbG0deNDu4cbSA= @@ -1147,6 +1149,7 @@ golang.org/x/sys v0.0.0-20211116061358-0a5406a5449c/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201113234701-d7a72108b828/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=