1
0
mirror of https://github.com/go-micro/go-micro.git synced 2025-05-13 21:16:43 +02:00

fix(config): fix file source watcher stop behavior when Stop is called (#2630)

Co-authored-by: Mohamed MHAMDI <mmhamdi@hubside.com>
This commit is contained in:
Mohamed MHAMDI 2023-04-26 02:19:52 +02:00 committed by GitHub
parent a7522e7d6c
commit ad8fca255b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 157 additions and 35 deletions

View File

@ -13,8 +13,7 @@ import (
type watcher struct {
f *file
fw *fsnotify.Watcher
exit chan bool
fw *fsnotify.Watcher
}
func newWatcher(f *file) (source.Watcher, error) {
@ -26,24 +25,21 @@ func newWatcher(f *file) (source.Watcher, error) {
fw.Add(f.path)
return &watcher{
f: f,
fw: fw,
exit: make(chan bool),
f: f,
fw: fw,
}, nil
}
func (w *watcher) Next() (*source.ChangeSet, error) {
// is it closed?
select {
case <-w.exit:
return nil, source.ErrWatcherStopped
default:
}
// try get the event
select {
case event, _ := <-w.fw.Events:
if event.Op == fsnotify.Rename {
case event, ok := <-w.fw.Events:
// check if channel was closed (i.e. Watcher.Close() was called).
if !ok {
return nil, source.ErrWatcherStopped
}
if event.Has(fsnotify.Rename) {
// check existence of file, and add watch again
_, err := os.Stat(event.Name)
if err == nil || os.IsExist(err) {
@ -55,11 +51,15 @@ func (w *watcher) Next() (*source.ChangeSet, error) {
if err != nil {
return nil, err
}
return c, nil
case err := <-w.fw.Errors:
case err, ok := <-w.fw.Errors:
// check if channel was closed (i.e. Watcher.Close() was called).
if !ok {
return nil, source.ErrWatcherStopped
}
return nil, err
case <-w.exit:
return nil, source.ErrWatcherStopped
}
}

View File

@ -13,8 +13,7 @@ import (
type watcher struct {
f *file
fw *fsnotify.Watcher
exit chan bool
fw *fsnotify.Watcher
}
func newWatcher(f *file) (source.Watcher, error) {
@ -26,24 +25,21 @@ func newWatcher(f *file) (source.Watcher, error) {
fw.Add(f.path)
return &watcher{
f: f,
fw: fw,
exit: make(chan bool),
f: f,
fw: fw,
}, nil
}
func (w *watcher) Next() (*source.ChangeSet, error) {
// is it closed?
select {
case <-w.exit:
return nil, source.ErrWatcherStopped
default:
}
// try get the event
select {
case event := <-w.fw.Events:
if event.Op == fsnotify.Rename {
case event, ok := <-w.fw.Events:
// check if channel was closed (i.e. Watcher.Close() was called).
if !ok {
return nil, source.ErrWatcherStopped
}
if event.Has(fsnotify.Rename) {
// check existence of file, and add watch again
_, err := os.Stat(event.Name)
if err == nil || os.IsExist(err) {
@ -60,10 +56,13 @@ func (w *watcher) Next() (*source.ChangeSet, error) {
w.fw.Add(w.f.path)
return c, nil
case err := <-w.fw.Errors:
case err, ok := <-w.fw.Errors:
// check if channel was closed (i.e. Watcher.Close() was called).
if !ok {
return nil, source.ErrWatcherStopped
}
return nil, err
case <-w.exit:
return nil, source.ErrWatcherStopped
}
}

View File

@ -0,0 +1,120 @@
package file_test
import (
"bytes"
"errors"
"fmt"
"os"
"path/filepath"
"testing"
"time"
"go-micro.dev/v4/config/source"
"go-micro.dev/v4/config/source/file"
)
// createTestFile a local helper to creates a temporary file with the given data
func createTestFile(data []byte) (*os.File, func(), string, error) {
path := filepath.Join(os.TempDir(), fmt.Sprintf("file.%d", time.Now().UnixNano()))
fh, err := os.Create(path)
if err != nil {
return nil, func() {}, "", err
}
_, err = fh.Write(data)
if err != nil {
return nil, func() {}, "", err
}
return fh, func() {
fh.Close()
os.Remove(path)
}, path, err
}
func TestWatcher(t *testing.T) {
data := []byte(`{"foo": "bar"}`)
fh, cleanup, path, err := createTestFile(data)
if err != nil {
t.Error(err)
}
defer cleanup()
f := file.NewSource(file.WithPath(path))
if err != nil {
t.Error(err)
}
// create a watcher
w, err := f.Watch()
if err != nil {
t.Error(err)
}
newdata := []byte(`{"foo": "baz"}`)
go func() {
sc, err := w.Next()
if err != nil {
t.Error(err)
return
}
if !bytes.Equal(sc.Data, newdata) {
t.Error("expected data to be different")
}
}()
// rewrite to the file to trigger a change
_, err = fh.WriteAt(newdata, 0)
if err != nil {
t.Error(err)
}
// wait for the underlying watcher to detect changes
time.Sleep(time.Second)
}
func TestWatcherStop(t *testing.T) {
data := []byte(`{"foo": "bar"}`)
_, cleanup, path, err := createTestFile(data)
if err != nil {
t.Error(err)
}
defer cleanup()
src := file.NewSource(file.WithPath(path))
if err != nil {
t.Error(err)
}
// create a watcher
w, err := src.Watch()
if err != nil {
t.Error(err)
}
defer func() {
var err error
c := make(chan struct{})
defer close(c)
go func() {
_, err = w.Next()
c <- struct{}{}
}()
select {
case <-time.After(2 * time.Second):
err = errors.New("timeout waiting for Watcher.Next() to return")
case <-c:
}
if !errors.Is(err, source.ErrWatcherStopped) {
t.Error(err)
}
}()
// stop the watcher
w.Stop()
}

2
go.mod
View File

@ -6,7 +6,7 @@ require (
github.com/bitly/go-simplejson v0.5.0
github.com/ef-ds/deque v1.0.4
github.com/evanphx/json-patch/v5 v5.5.0
github.com/fsnotify/fsnotify v1.4.9
github.com/fsnotify/fsnotify v1.6.0
github.com/fsouza/go-dockerclient v1.7.3
github.com/go-acme/lego/v4 v4.4.0
github.com/go-git/go-git/v5 v5.4.2

3
go.sum
View File

@ -339,6 +339,8 @@ github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY=
github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw=
github.com/fsouza/go-dockerclient v1.7.3 h1:i6iMcktl688vsKUEExA6gU1UjPgIvmGtJeQ0mbuFqZo=
github.com/fsouza/go-dockerclient v1.7.3/go.mod h1:8xfZB8o9SptLNJ13VoV5pMiRbZGWkU/Omu5VOu/KC9Y=
github.com/fullsailor/pkcs7 v0.0.0-20190404230743-d7302db945fa/go.mod h1:KnogPXtdwXqoenmZCw6S+25EAm2MkxbG0deNDu4cbSA=
@ -1147,6 +1149,7 @@ golang.org/x/sys v0.0.0-20211116061358-0a5406a5449c/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201113234701-d7a72108b828/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=