diff --git a/config/default_test.go b/config/default_test.go
index fd1933f4..87128150 100644
--- a/config/default_test.go
+++ b/config/default_test.go
@@ -145,16 +145,26 @@ func TestFileChange(t *testing.T) {
 	if err != nil {
 		t.Error(err)
 	}
+
 	changeTimes := 0
+	done := make(chan bool)
+
 	go func() {
+		defer func() {
+			close(done)
+		}()
+
 		for {
 			v, err := watcher.Next()
 			if err != nil {
-				t.Error(err)
+				if err.Error() != "watcher stopped" {
+					t.Error(err)
+					return
+				}
 				return
 			}
 			changeTimes++
-			t.Logf("file change,%s", string(v.Bytes()))
+			t.Logf("file change,%s: %d", string(v.Bytes()), changeTimes)
 		}
 	}()
 
@@ -169,10 +179,17 @@ func TestFileChange(t *testing.T) {
 			t.Error(err)
 		}
 
-		time.Sleep(time.Second)
+		time.Sleep(500 * time.Millisecond)
 	}
 
-	if changeTimes != 4 {
-		t.Error(fmt.Errorf("watcher error: change times %d is not enough", changeTimes))
+	if err := watcher.Stop(); err != nil {
+		t.Fatalf("failed to stop watcher: %s", err)
+	}
+
+	// wait for the watcher to finish
+	<-done
+
+	if changeTimes != 5 {
+		t.Errorf("watcher error: change times %d is not enough", changeTimes)
 	}
 }
diff --git a/config/source/consul/watcher.go b/config/source/consul/watcher.go
index e6993d8b..a20c8f9b 100644
--- a/config/source/consul/watcher.go
+++ b/config/source/consul/watcher.go
@@ -1,7 +1,6 @@
 package consul
 
 import (
-	"errors"
 	"time"
 
 	"github.com/hashicorp/consul/api"
@@ -80,7 +79,7 @@ func (w *watcher) Next() (*source.ChangeSet, error) {
 	case cs := <-w.ch:
 		return cs, nil
 	case <-w.exit:
-		return nil, errors.New("watcher stopped")
+		return nil, source.ErrWatcherStopped
 	}
 }
 
diff --git a/config/source/env/env_test.go b/config/source/env/env_test.go
index 891d8d8b..4c22122e 100644
--- a/config/source/env/env_test.go
+++ b/config/source/env/env_test.go
@@ -86,8 +86,8 @@ func TestEnvvar_Prefixes(t *testing.T) {
 }
 
 func TestEnvvar_WatchNextNoOpsUntilStop(t *testing.T) {
-	source := NewSource(WithStrippedPrefix("GOMICRO_"))
-	w, err := source.Watch()
+	src := NewSource(WithStrippedPrefix("GOMICRO_"))
+	w, err := src.Watch()
 	if err != nil {
 		t.Error(err)
 	}
@@ -97,7 +97,7 @@ func TestEnvvar_WatchNextNoOpsUntilStop(t *testing.T) {
 		w.Stop()
 	}()
 
-	if _, err := w.Next(); err.Error() != "watcher stopped" {
+	if _, err := w.Next(); err != source.ErrWatcherStopped {
 		t.Errorf("expected watcher stopped error, got %v", err)
 	}
 }
diff --git a/config/source/env/watcher.go b/config/source/env/watcher.go
index 5dd3ef34..4ffe783c 100644
--- a/config/source/env/watcher.go
+++ b/config/source/env/watcher.go
@@ -1,8 +1,6 @@
 package env
 
 import (
-	"errors"
-
 	"github.com/micro/go-micro/config/source"
 )
 
@@ -13,7 +11,7 @@ type watcher struct {
 func (w *watcher) Next() (*source.ChangeSet, error) {
 	<-w.exit
 
-	return nil, errors.New("watcher stopped")
+	return nil, source.ErrWatcherStopped
 }
 
 func (w *watcher) Stop() error {
diff --git a/config/source/file/watcher.go b/config/source/file/watcher.go
index 76ecd260..d9595e27 100644
--- a/config/source/file/watcher.go
+++ b/config/source/file/watcher.go
@@ -3,7 +3,6 @@
 package file
 
 import (
-	"errors"
 	"os"
 
 	"github.com/fsnotify/fsnotify"
@@ -36,7 +35,7 @@ func (w *watcher) Next() (*source.ChangeSet, error) {
 	// is it closed?
 	select {
 	case <-w.exit:
-		return nil, errors.New("watcher stopped")
+		return nil, source.ErrWatcherStopped
 	default:
 	}
 
@@ -59,7 +58,7 @@ func (w *watcher) Next() (*source.ChangeSet, error) {
 	case err := <-w.fw.Errors:
 		return nil, err
 	case <-w.exit:
-		return nil, errors.New("watcher stopped")
+		return nil, source.ErrWatcherStopped
 	}
 }
 
diff --git a/config/source/file/watcher_linux.go b/config/source/file/watcher_linux.go
index 3f48a00b..82d45154 100644
--- a/config/source/file/watcher_linux.go
+++ b/config/source/file/watcher_linux.go
@@ -3,7 +3,6 @@
 package file
 
 import (
-	"errors"
 	"os"
 
 	"github.com/fsnotify/fsnotify"
@@ -36,7 +35,7 @@ func (w *watcher) Next() (*source.ChangeSet, error) {
 	// is it closed?
 	select {
 	case <-w.exit:
-		return nil, errors.New("watcher stopped")
+		return nil, source.ErrWatcherStopped
 	default:
 	}
 
@@ -63,7 +62,7 @@ func (w *watcher) Next() (*source.ChangeSet, error) {
 	case err := <-w.fw.Errors:
 		return nil, err
 	case <-w.exit:
-		return nil, errors.New("watcher stopped")
+		return nil, source.ErrWatcherStopped
 	}
 }
 
diff --git a/config/source/source.go b/config/source/source.go
index 828c8ad2..c6d961be 100644
--- a/config/source/source.go
+++ b/config/source/source.go
@@ -2,9 +2,15 @@
 package source
 
 import (
+	"errors"
 	"time"
 )
 
+var (
+	// ErrWatcherStopped is returned when source watcher has been stopped
+	ErrWatcherStopped = errors.New("watcher stopped")
+)
+
 // Source is the source from which config is loaded
 type Source interface {
 	Read() (*ChangeSet, error)