diff --git a/contrib/config/nacos/config.go b/contrib/config/nacos/config.go index 54d4c92d3..c234817c5 100644 --- a/contrib/config/nacos/config.go +++ b/contrib/config/nacos/config.go @@ -4,28 +4,18 @@ import ( "context" "path/filepath" "strings" - "time" - "github.com/go-kratos/kratos/v2/config" "github.com/nacos-group/nacos-sdk-go/clients/config_client" "github.com/nacos-group/nacos-sdk-go/vo" + + "github.com/go-kratos/kratos/v2/config" ) type Option func(*options) type options struct { - endpoint string // nolint:structcheck,unused - - namespaceID string // nolint:structcheck,unused - group string dataID string - - timeoutMs uint64 - logLevel string - - logDir string - cacheDir string } // WithGroup With nacos config group. @@ -42,34 +32,6 @@ func WithDataID(dataID string) Option { } } -// WithLogDir With nacos config group. -func WithLogDir(logDir string) Option { - return func(o *options) { - o.logDir = logDir - } -} - -// WithCacheDir With nacos config cache dir. -func WithCacheDir(cacheDir string) Option { - return func(o *options) { - o.cacheDir = cacheDir - } -} - -// WithLogLevel With nacos config log level. -func WithLogLevel(logLevel string) Option { - return func(o *options) { - o.logLevel = logLevel - } -} - -// WithTimeout With nacos config timeout. -func WithTimeout(time time.Duration) Option { - return func(o *options) { - o.timeoutMs = uint64(time.Milliseconds()) - } -} - type Config struct { opts options client config_client.IConfigClient diff --git a/contrib/config/nacos/config_test.go b/contrib/config/nacos/config_test.go index 2fdc4f6b0..2d2908037 100644 --- a/contrib/config/nacos/config_test.go +++ b/contrib/config/nacos/config_test.go @@ -1,41 +1,130 @@ package config import ( - "fmt" - "net" + "reflect" "testing" "time" "github.com/nacos-group/nacos-sdk-go/clients" "github.com/nacos-group/nacos-sdk-go/common/constant" "github.com/nacos-group/nacos-sdk-go/vo" - "gopkg.in/yaml.v3" - kconfig "github.com/go-kratos/kratos/v2/config" + "github.com/go-kratos/kratos/v2/config" ) -func getIntranetIP() string { - addrs, err := net.InterfaceAddrs() - if err != nil { - return "127.0.0.1" - } - - for _, address := range addrs { - if ipnet, ok := address.(*net.IPNet); ok && !ipnet.IP.IsLoopback() { - if ipnet.IP.To4() != nil { - return ipnet.IP.String() - } - } - } - return "127.0.0.1" -} - -func TestGetConfig(t *testing.T) { - ip := getIntranetIP() - // ctx := context.Background() - +func TestConfig_Load(t *testing.T) { sc := []constant.ServerConfig{ - *constant.NewServerConfig(ip, 8848), + *constant.NewServerConfig("127.0.0.1", 8848), + } + + cc := constant.ClientConfig{ + TimeoutMs: 5000, + NotLoadCacheAtStart: true, + LogDir: "/tmp/nacos/log", + CacheDir: "/tmp/nacos/cache", + RotateTime: "1h", + MaxAge: 3, + LogLevel: "debug", + } + + client, err := clients.NewConfigClient( + vo.NacosClientParam{ + ClientConfig: &cc, + ServerConfigs: sc, + }, + ) + if err != nil { + t.Fatal(err) + } + source := NewConfigSource(client, WithGroup("test"), WithDataID("test.yaml")) + + type fields struct { + source config.Source + } + tests := []struct { + name string + fields fields + want []*config.KeyValue + wantErr bool + preFunc func(t *testing.T) + deferFunc func(t *testing.T) + }{ + { + name: "normal", + fields: fields{ + source: source, + }, + wantErr: false, + preFunc: func(t *testing.T) { + _, err = client.PublishConfig(vo.ConfigParam{DataId: "test.yaml", Group: "test", Content: "test: test"}) + if err != nil { + t.Error(err) + } + time.Sleep(time.Second * 1) + }, + deferFunc: func(t *testing.T) { + _, dErr := client.DeleteConfig(vo.ConfigParam{DataId: "test.yaml", Group: "test"}) + if dErr != nil { + t.Error(dErr) + } + }, + want: []*config.KeyValue{{ + Key: "test.yaml", + Value: []byte("test: test"), + Format: "yaml", + }}, + }, + { + name: "error", + fields: fields{ + source: source, + }, + wantErr: false, + preFunc: func(t *testing.T) { + _, err = client.PublishConfig(vo.ConfigParam{DataId: "111.yaml", Group: "notExist", Content: "test: test"}) + if err != nil { + t.Error(err) + } + time.Sleep(time.Second * 1) + }, + deferFunc: func(t *testing.T) { + _, dErr := client.DeleteConfig(vo.ConfigParam{DataId: "111.yaml", Group: "notExist"}) + if dErr != nil { + t.Error(dErr) + } + }, + want: []*config.KeyValue{{ + Key: "test.yaml", + Value: []byte{}, + Format: "yaml", + }}, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + if test.preFunc != nil { + test.preFunc(t) + } + if test.deferFunc != nil { + defer test.deferFunc(t) + } + s := test.fields.source + configs, lErr := s.Load() + if (lErr != nil) != test.wantErr { + t.Errorf("Load error = %v, wantErr %v", lErr, test.wantErr) + t.Errorf("Load configs = %v", configs) + return + } + if !reflect.DeepEqual(configs, test.want) { + t.Errorf("Load configs = %v, want %v", configs, test.want) + } + }) + } +} + +func TestConfig_Watch(t *testing.T) { + sc := []constant.ServerConfig{ + *constant.NewServerConfig("127.0.0.1", 8848), } cc := constant.ClientConfig{ @@ -48,7 +137,6 @@ func TestGetConfig(t *testing.T) { LogLevel: "debug", } - // a more graceful way to create naming client client, err := clients.NewConfigClient( vo.NacosClientParam{ ClientConfig: &cc, @@ -59,51 +147,66 @@ func TestGetConfig(t *testing.T) { t.Fatal(err) } - dataID := "test.yaml" - group := "test" - _, err = client.PublishConfig(vo.ConfigParam{DataId: dataID, Group: group, Content: ` -logger: - level: info -`}) - if err != nil { - t.Fatal(err) - } - time.Sleep(1 * time.Second) - c := kconfig.New( - kconfig.WithSource( - NewConfigSource(client, WithGroup(group), WithDataID(dataID)), - ), - kconfig.WithDecoder(func(kv *kconfig.KeyValue, v map[string]interface{}) error { - return yaml.Unmarshal(kv.Value, v) - }), - ) + source := NewConfigSource(client, WithGroup("test"), WithDataID("test.yaml")) - if err = c.Load(); err != nil { - t.Fatal(err) + type fields struct { + source config.Source } - - name, err := c.Value("logger.level").String() - if err != nil { - t.Fatal(err) + tests := []struct { + name string + fields fields + want []*config.KeyValue + wantErr bool + processFunc func(t *testing.T, w config.Watcher) + deferFunc func(t *testing.T, w config.Watcher) + }{ + { + name: "normal", + fields: fields{ + source: source, + }, + wantErr: false, + processFunc: func(t *testing.T, w config.Watcher) { + _, pErr := client.PublishConfig(vo.ConfigParam{DataId: "test.yaml", Group: "test", Content: "test: test"}) + if pErr != nil { + t.Error(pErr) + } + }, + deferFunc: func(t *testing.T, w config.Watcher) { + _, dErr := client.DeleteConfig(vo.ConfigParam{DataId: "test.yaml", Group: "test"}) + if dErr != nil { + t.Error(dErr) + } + }, + want: []*config.KeyValue{{ + Key: "test.yaml", + Value: []byte("test: test"), + Format: "yaml", + }}, + }, } - fmt.Println("get value", name) - - done := make(chan struct{}) - err = c.Watch("logger.level", func(key string, value kconfig.Value) { - fmt.Println(key, " value change", value) - done <- struct{}{} - }) - if err != nil { - t.Fatal(err) + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + s := test.fields.source + watch, wErr := s.Watch() + if wErr != nil { + t.Error(wErr) + return + } + if test.processFunc != nil { + test.processFunc(t, watch) + } + if test.deferFunc != nil { + defer test.deferFunc(t, watch) + } + want, nErr := watch.Next() + if (nErr != nil) != test.wantErr { + t.Errorf("Watch error = %v, wantErr %v", nErr, test.wantErr) + return + } + if !reflect.DeepEqual(want, test.want) { + t.Errorf("Watch watcher = %v, want %v", watch, test.want) + } + }) } - - _, err = client.PublishConfig(vo.ConfigParam{DataId: dataID, Group: group, Content: ` -logger: - level: debug -`}) - if err != nil { - t.Fatal(err) - } - - <-done } diff --git a/contrib/config/nacos/go.mod b/contrib/config/nacos/go.mod index c77f6657e..825f993e2 100644 --- a/contrib/config/nacos/go.mod +++ b/contrib/config/nacos/go.mod @@ -5,7 +5,7 @@ go 1.16 require ( github.com/go-kratos/kratos/v2 v2.3.1 github.com/nacos-group/nacos-sdk-go v1.0.9 - gopkg.in/yaml.v3 v3.0.1 + gopkg.in/yaml.v3 v3.0.1 // indirect ) replace github.com/go-kratos/kratos/v2 => ../../../ diff --git a/contrib/registry/nacos/registry_test.go b/contrib/registry/nacos/registry_test.go index 62765b0cc..1910a68d8 100644 --- a/contrib/registry/nacos/registry_test.go +++ b/contrib/registry/nacos/registry_test.go @@ -2,8 +2,7 @@ package nacos import ( "context" - "log" - "net" + "reflect" "testing" "time" @@ -14,29 +13,9 @@ import ( "github.com/go-kratos/kratos/v2/registry" ) -func getIntranetIP() string { - addrs, err := net.InterfaceAddrs() - if err != nil { - return "127.0.0.1" - } - - for _, address := range addrs { - if ipnet, ok := address.(*net.IPNet); ok && !ipnet.IP.IsLoopback() { - if ipnet.IP.To4() != nil { - return ipnet.IP.String() - } - } - } - return "127.0.0.1" -} - -func TestRegistry(t *testing.T) { - ip := getIntranetIP() - serviceName := "golang-sms@grpc" - ctx := context.Background() - +func TestRegistry_Register(t *testing.T) { sc := []constant.ServerConfig{ - *constant.NewServerConfig(ip, 8848), + *constant.NewServerConfig("127.0.0.1", 8848), } cc := constant.ClientConfig{ @@ -60,76 +39,298 @@ func TestRegistry(t *testing.T) { if err != nil { t.Fatal(err) } - - _, err = client.RegisterInstance(vo.RegisterInstanceParam{ - Ip: "f", - Port: 8840, - ServiceName: serviceName, - Weight: 10, - Enable: true, - Healthy: true, - Ephemeral: true, - Metadata: map[string]string{"idc": "shanghai-xs"}, - }) - if err != nil { - t.Fatal(err) - } - - time.Sleep(time.Second) - - is, err := client.GetService(vo.GetServiceParam{ - ServiceName: serviceName, - }) - if err != nil { - t.Fatal(err) - } - t.Logf("is %#v", is) - - time.Sleep(time.Second) r := New(client) - go func() { - var ( - w registry.Watcher - watchErr error - ) - w, watchErr = r.Watch(ctx, "golang-sms@grpc") - if watchErr != nil { - log.Fatal(watchErr) - } - for { - var res []*registry.ServiceInstance - res, watchErr = w.Next() - if watchErr != nil { - return - } - log.Printf("watch: %d", len(res)) - for _, r := range res { - log.Printf("next: %+v", r) - } - } - }() - - time.Sleep(time.Second) - - ins, err := r.GetService(ctx, serviceName) - if err != nil { - t.Fatal(err) + testServer := ®istry.ServiceInstance{ + ID: "1", + Name: "test1", + Version: "v1.0.0", + Endpoints: []string{"http://127.0.0.1:8080?isSecure=false"}, } - for _, in := range ins { - t.Logf("ins: %#v", in) + testServerWithMetadata := ®istry.ServiceInstance{ + ID: "1", + Name: "test1", + Version: "v1.0.0", + Endpoints: []string{"http://127.0.0.1:8080?isSecure=false"}, + Metadata: map[string]string{"idc": "shanghai-xs"}, + } + type fields struct { + registry *Registry + } + type args struct { + ctx context.Context + service *registry.ServiceInstance + } + tests := []struct { + name string + fields fields + args args + wantErr bool + deferFunc func(t *testing.T) + }{ + { + name: "normal", + fields: fields{ + registry: New(client), + }, + args: args{ + ctx: context.Background(), + service: testServer, + }, + wantErr: false, + deferFunc: func(t *testing.T) { + err = r.Deregister(context.Background(), testServer) + if err != nil { + t.Error(err) + } + }, + }, + { + name: "withMetadata", + fields: fields{ + registry: New(client), + }, + args: args{ + ctx: context.Background(), + service: testServerWithMetadata, + }, + wantErr: false, + deferFunc: func(t *testing.T) { + err = r.Deregister(context.Background(), testServerWithMetadata) + if err != nil { + t.Error(err) + } + }, + }, + { + name: "error", + fields: fields{ + registry: New(client), + }, + args: args{ + ctx: context.Background(), + service: ®istry.ServiceInstance{ + ID: "1", + Name: "", + Version: "v1.0.0", + Endpoints: []string{"http://127.0.0.1:8080?isSecure=false"}, + }, + }, + wantErr: true, + }, + { + name: "urlError", + fields: fields{ + registry: New(client), + }, + args: args{ + ctx: context.Background(), + service: ®istry.ServiceInstance{ + ID: "1", + Name: "test", + Version: "v1.0.0", + Endpoints: []string{"127.0.0.1:8080"}, + }, + }, + wantErr: true, + }, + { + name: "portError", + fields: fields{ + registry: New(client), + }, + args: args{ + ctx: context.Background(), + service: ®istry.ServiceInstance{ + ID: "1", + Name: "test", + Version: "v1.0.0", + Endpoints: []string{"http://127.0.0.1888"}, + }, + }, + wantErr: true, + }, + { + name: "withCluster", + fields: fields{ + registry: New(client, WithCluster("test")), + }, + args: args{ + ctx: context.Background(), + service: testServer, + }, + wantErr: false, + }, + { + name: "withGroup", + fields: fields{ + registry: New(client, WithGroup("TEST_GROUP")), + }, + args: args{ + ctx: context.Background(), + service: testServer, + }, + wantErr: false, + }, + { + name: "withWeight", + fields: fields{ + registry: New(client, WithWeight(200)), + }, + args: args{ + ctx: context.Background(), + service: testServer, + }, + wantErr: false, + }, + { + name: "withPrefix", + fields: fields{ + registry: New(client, WithPrefix("test")), + }, + args: args{ + ctx: context.Background(), + service: testServer, + }, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + r := tt.fields.registry + if err := r.Register(tt.args.ctx, tt.args.service); (err != nil) != tt.wantErr { + t.Errorf("Register error = %v, wantErr %v", err, tt.wantErr) + } + }) } - - time.Sleep(time.Second) } -func TestRegistryMany(t *testing.T) { - ip := getIntranetIP() - serviceName := "golang-sms@grpc" - // ctx := context.Background() +func TestRegistry_Deregister(t *testing.T) { + testServer := ®istry.ServiceInstance{ + ID: "1", + Name: "test2", + Version: "v1.0.0", + Endpoints: []string{"http://127.0.0.1:8080?isSecure=false"}, + } + type args struct { + ctx context.Context + service *registry.ServiceInstance + } + tests := []struct { + name string + args args + wantErr bool + preFunc func(t *testing.T) + }{ + { + name: "normal", + args: args{ + ctx: context.Background(), + service: testServer, + }, + wantErr: false, + preFunc: func(t *testing.T) { + sc := []constant.ServerConfig{ + *constant.NewServerConfig("127.0.0.1", 8848), + } + + cc := constant.ClientConfig{ + NamespaceId: "public", // namespace id + TimeoutMs: 5000, + NotLoadCacheAtStart: true, + LogDir: "/tmp/nacos/log", + CacheDir: "/tmp/nacos/cache", + RotateTime: "1h", + MaxAge: 3, + LogLevel: "debug", + } + + // a more graceful way to create naming client + client, err := clients.NewNamingClient( + vo.NacosClientParam{ + ClientConfig: &cc, + ServerConfigs: sc, + }, + ) + if err != nil { + t.Fatal(err) + } + r := New(client) + err = r.Register(context.Background(), testServer) + if err != nil { + t.Error(err) + } + }, + }, + { + name: "error", + args: args{ + ctx: context.Background(), + service: ®istry.ServiceInstance{ + ID: "1", + Name: "test", + Version: "v1.0.0", + Endpoints: []string{"127.0.0.1:8080"}, + }, + }, + wantErr: true, + }, + { + name: "errorPort", + args: args{ + ctx: context.Background(), + service: ®istry.ServiceInstance{ + ID: "1", + Name: "notExist", + Version: "v1.0.0", + Endpoints: []string{"http://127.0.0.18080"}, + }, + }, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + sc := []constant.ServerConfig{ + *constant.NewServerConfig("127.0.0.1", 8848), + } + + cc := constant.ClientConfig{ + NamespaceId: "public", // namespace id + TimeoutMs: 5000, + NotLoadCacheAtStart: true, + LogDir: "/tmp/nacos/log", + CacheDir: "/tmp/nacos/cache", + RotateTime: "1h", + MaxAge: 3, + LogLevel: "debug", + } + + // a more graceful way to create naming client + client, err := clients.NewNamingClient( + vo.NacosClientParam{ + ClientConfig: &cc, + ServerConfigs: sc, + }, + ) + if err != nil { + t.Fatal(err) + } + r := New(client) + if tt.preFunc != nil { + tt.preFunc(t) + } + if err := r.Deregister(tt.args.ctx, tt.args.service); (err != nil) != tt.wantErr { + t.Errorf("Deregister error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} + +func TestRegistry_GetService(t *testing.T) { sc := []constant.ServerConfig{ - *constant.NewServerConfig(ip, 8848), + *constant.NewServerConfig("127.0.0.1", 8848), } cc := constant.ClientConfig{ @@ -153,61 +354,219 @@ func TestRegistryMany(t *testing.T) { if err != nil { t.Fatal(err) } - - _, err = client.RegisterInstance(vo.RegisterInstanceParam{ - Ip: "f1", - Port: 8840, - ServiceName: serviceName, - Weight: 10, - Enable: true, - Healthy: true, - Ephemeral: true, - Metadata: map[string]string{"idc": "shanghai-xs"}, - }) - if err != nil { - t.Fatal(err) + r := New(client) + testServer := ®istry.ServiceInstance{ + ID: "1", + Name: "test3", + Version: "v1.0.0", + Endpoints: []string{"grpc://127.0.0.1:8080?isSecure=false"}, } - _, err = client.RegisterInstance(vo.RegisterInstanceParam{ - Ip: "f2", - Port: 8840, - ServiceName: serviceName, - Weight: 10, - Enable: true, - Healthy: true, - Ephemeral: true, - Metadata: map[string]string{"idc": "shanghai-xs"}, - }) - if err != nil { - t.Fatal(err) + type fields struct { + registry *Registry } - - _, err = client.RegisterInstance(vo.RegisterInstanceParam{ - Ip: "f3", - Port: 8840, - ServiceName: serviceName, - Weight: 10, - Enable: true, - Healthy: true, - Ephemeral: true, - Metadata: map[string]string{"idc": "shanghai-xs"}, - }) - if err != nil { - t.Fatal(err) + type args struct { + ctx context.Context + serviceName string } - - time.Sleep(time.Second) - - is, err := client.GetService(vo.GetServiceParam{ - ServiceName: serviceName, - }) - if err != nil { - t.Fatal(err) + tests := []struct { + name string + fields fields + args args + want []*registry.ServiceInstance + wantErr bool + preFunc func(t *testing.T) + deferFunc func(t *testing.T) + }{ + { + name: "normal", + preFunc: func(t *testing.T) { + err = r.Register(context.Background(), testServer) + if err != nil { + t.Error(err) + } + time.Sleep(time.Second) + }, + deferFunc: func(t *testing.T) { + err = r.Deregister(context.Background(), testServer) + if err != nil { + t.Error(err) + } + }, + fields: fields{ + registry: r, + }, + args: args{ + ctx: context.Background(), + serviceName: testServer.Name + "." + "grpc", + }, + want: []*registry.ServiceInstance{{ + ID: "127.0.0.1#8080#DEFAULT#DEFAULT_GROUP@@test3.grpc", + Name: "DEFAULT_GROUP@@test3.grpc", + Version: "v1.0.0", + Metadata: map[string]string{"version": "v1.0.0", "kind": "grpc"}, + Endpoints: []string{"grpc://127.0.0.1:8080"}, + }}, + wantErr: false, + }, + { + name: "errorNotExist", + fields: fields{ + registry: r, + }, + args: args{ + ctx: context.Background(), + serviceName: "notExist", + }, + want: nil, + wantErr: true, + }, } - - for _, host := range is.Hosts { - t.Logf("host: %#v,e: %v", host, err) + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if tt.preFunc != nil { + tt.preFunc(t) + } + if tt.deferFunc != nil { + defer tt.deferFunc(t) + } + r := tt.fields.registry + got, err := r.GetService(tt.args.ctx, tt.args.serviceName) + if (err != nil) != tt.wantErr { + t.Errorf("GetService error = %v, wantErr %v", err, tt.wantErr) + t.Errorf("GetService got = %v", got) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("GetService got = %v, want %v", got, tt.want) + } + }) + } +} + +func TestRegistry_Watch(t *testing.T) { + sc := []constant.ServerConfig{ + *constant.NewServerConfig("127.0.0.1", 8848), + } + + cc := constant.ClientConfig{ + NamespaceId: "public", // namespace id + TimeoutMs: 5000, + NotLoadCacheAtStart: true, + LogDir: "/tmp/nacos/log", + CacheDir: "/tmp/nacos/cache", + RotateTime: "1h", + MaxAge: 3, + LogLevel: "debug", + } + + // a more graceful way to create naming client + client, err := clients.NewNamingClient( + vo.NacosClientParam{ + ClientConfig: &cc, + ServerConfigs: sc, + }, + ) + if err != nil { + t.Fatal(err) + } + r := New(client) + + testServer := ®istry.ServiceInstance{ + ID: "1", + Name: "test4", + Version: "v1.0.0", + Endpoints: []string{"grpc://127.0.0.1:8080?isSecure=false"}, + } + + cancelCtx, cancel := context.WithCancel(context.Background()) + type fields struct { + registry *Registry + } + type args struct { + ctx context.Context + serviceName string + } + tests := []struct { + name string + fields fields + args args + wantErr bool + want []*registry.ServiceInstance + processFunc func(t *testing.T) + }{ + { + name: "normal", + fields: fields{ + registry: New(client), + }, + args: args{ + ctx: context.Background(), + serviceName: testServer.Name + "." + "grpc", + }, + wantErr: false, + want: []*registry.ServiceInstance{{ + ID: "127.0.0.1#8080#DEFAULT#DEFAULT_GROUP@@test4.grpc", + Name: "DEFAULT_GROUP@@test4.grpc", + Version: "v1.0.0", + Metadata: map[string]string{"version": "v1.0.0", "kind": "grpc"}, + Endpoints: []string{"grpc://127.0.0.1:8080"}, + }}, + processFunc: func(t *testing.T) { + err = r.Register(context.Background(), testServer) + if err != nil { + t.Error(err) + } + }, + }, + { + name: "ctxCancel", + fields: fields{ + registry: r, + }, + args: args{ + ctx: cancelCtx, + serviceName: testServer.Name, + }, + wantErr: true, + want: nil, + processFunc: func(t *testing.T) { + cancel() + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + r := tt.fields.registry + watch, err := r.Watch(tt.args.ctx, tt.args.serviceName) + if err != nil { + t.Error(err) + return + } + defer func() { + err = watch.Stop() + if err != nil { + t.Error(err) + } + }() + _, err = watch.Next() + if err != nil { + t.Error(err) + return + } + + if tt.processFunc != nil { + tt.processFunc(t) + } + + want, err := watch.Next() + if (err != nil) != tt.wantErr { + t.Errorf("Watch error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(want, tt.want) { + t.Errorf("Watch watcher = %v, want %v", watch, tt.want) + } + }) } - - time.Sleep(time.Second) }