From 29fb58db3946b54621449d124358fa3ba891db10 Mon Sep 17 00:00:00 2001 From: Astone Date: Tue, 3 Dec 2019 15:25:58 +0800 Subject: [PATCH] improve code quality --- agent/input/telegram/conn.go | 8 +++----- client/grpc/grpc.go | 4 +++- client/rpc_client.go | 5 +++-- codec/bytes/bytes.go | 13 +++++------- codec/bytes/marshaler.go | 11 ++++------ codec/text/text.go | 14 ++++--------- network/default.go | 2 +- network/service/handler/handler.go | 5 ----- proxy/mucp/mucp.go | 11 ++++------ registry/mdns_registry.go | 8 ++++++-- registry/service/watcher.go | 32 ++++++++++++++---------------- router/default.go | 2 +- server/grpc/grpc.go | 4 +++- server/rpc_server.go | 4 +++- server/server.go | 2 +- sync/leader/etcd/etcd.go | 17 ++++------------ sync/lock/etcd/etcd.go | 4 +--- sync/task/task.go | 4 +++- tunnel/link.go | 4 ++-- web/service.go | 2 +- 20 files changed, 67 insertions(+), 89 deletions(-) diff --git a/agent/input/telegram/conn.go b/agent/input/telegram/conn.go index 44d1ada1..1012dece 100644 --- a/agent/input/telegram/conn.go +++ b/agent/input/telegram/conn.go @@ -44,11 +44,9 @@ func (tc *telegramConn) run() { tc.recv = updates tc.syncCond.Signal() - for { - select { - case <-tc.exit: - return - } + select { + case <-tc.exit: + return } } diff --git a/client/grpc/grpc.go b/client/grpc/grpc.go index ba7a3842..62b8d5db 100644 --- a/client/grpc/grpc.go +++ b/client/grpc/grpc.go @@ -339,7 +339,9 @@ func (g *grpcClient) Call(ctx context.Context, req client.Request, rsp interface d, ok := ctx.Deadline() if !ok { // no deadline so we create a new one - ctx, _ = context.WithTimeout(ctx, callOpts.RequestTimeout) + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, callOpts.RequestTimeout) + defer cancel() } else { // got a deadline so no need to setup context // but we need to set the timeout we pass along diff --git a/client/rpc_client.go b/client/rpc_client.go index c75a67b3..75ce75ac 100644 --- a/client/rpc_client.go +++ b/client/rpc_client.go @@ -162,7 +162,6 @@ func (r *rpcClient) call(ctx context.Context, node *registry.Node, req Request, select { case err := <-ch: - grr = err return err case <-ctx.Done(): grr = errors.Timeout("go.micro.client", fmt.Sprintf("%v", ctx.Err())) @@ -378,7 +377,9 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac d, ok := ctx.Deadline() if !ok { // no deadline so we create a new one - ctx, _ = context.WithTimeout(ctx, callOpts.RequestTimeout) + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, callOpts.RequestTimeout) + defer cancel() } else { // got a deadline so no need to setup context // but we need to set the timeout we pass along diff --git a/codec/bytes/bytes.go b/codec/bytes/bytes.go index 67ff4038..69d36235 100644 --- a/codec/bytes/bytes.go +++ b/codec/bytes/bytes.go @@ -29,12 +29,10 @@ func (c *Codec) ReadBody(b interface{}) error { return err } - switch b.(type) { + switch v := b.(type) { case *[]byte: - v := b.(*[]byte) *v = buf case *Frame: - v := b.(*Frame) v.Data = buf default: return fmt.Errorf("failed to read body: %v is not type of *[]byte", b) @@ -45,14 +43,13 @@ func (c *Codec) ReadBody(b interface{}) error { func (c *Codec) Write(m *codec.Message, b interface{}) error { var v []byte - switch b.(type) { + switch vb := b.(type) { case *Frame: - v = b.(*Frame).Data + v = vb.Data case *[]byte: - ve := b.(*[]byte) - v = *ve + v = *vb case []byte: - v = b.([]byte) + v = vb default: return fmt.Errorf("failed to write: %v is not type of *[]byte or []byte", b) } diff --git a/codec/bytes/marshaler.go b/codec/bytes/marshaler.go index 76599b65..86af8984 100644 --- a/codec/bytes/marshaler.go +++ b/codec/bytes/marshaler.go @@ -12,25 +12,22 @@ type Message struct { } func (n Marshaler) Marshal(v interface{}) ([]byte, error) { - switch v.(type) { + switch ve := v.(type) { case *[]byte: - ve := v.(*[]byte) return *ve, nil case []byte: - return v.([]byte), nil + return ve, nil case *Message: - return v.(*Message).Body, nil + return ve.Body, nil } return nil, errors.New("invalid message") } func (n Marshaler) Unmarshal(d []byte, v interface{}) error { - switch v.(type) { + switch ve := v.(type) { case *[]byte: - ve := v.(*[]byte) *ve = d case *Message: - ve := v.(*Message) ve.Body = d } return errors.New("invalid message") diff --git a/codec/text/text.go b/codec/text/text.go index da43da50..799cf6c1 100644 --- a/codec/text/text.go +++ b/codec/text/text.go @@ -29,15 +29,12 @@ func (c *Codec) ReadBody(b interface{}) error { return err } - switch b.(type) { + switch v := b.(type) { case *string: - v := b.(*string) *v = string(buf) case *[]byte: - v := b.(*[]byte) *v = buf case *Frame: - v := b.(*Frame) v.Data = buf default: return fmt.Errorf("failed to read body: %v is not type of *[]byte", b) @@ -48,20 +45,17 @@ func (c *Codec) ReadBody(b interface{}) error { func (c *Codec) Write(m *codec.Message, b interface{}) error { var v []byte - switch b.(type) { + switch ve := b.(type) { case *Frame: - v = b.(*Frame).Data + v = ve.Data case *[]byte: - ve := b.(*[]byte) v = *ve case *string: - ve := b.(*string) v = []byte(*ve) case string: - ve := b.(string) v = []byte(ve) case []byte: - v = b.([]byte) + v = ve default: return fmt.Errorf("failed to write: %v is not type of *[]byte or []byte", b) } diff --git a/network/default.go b/network/default.go index b9dfe13f..ac5c902e 100644 --- a/network/default.go +++ b/network/default.go @@ -823,7 +823,7 @@ func (n *network) processCtrlChan(listener tunnel.Listener) { log.Tracef("Network metric for router %s and gateway %s: %v", event.Route.Router, event.Route.Gateway, metric) // check we don't overflow max int 64 - if d := route.Metric + metric; d > math.MaxInt64 || d <= 0 { + if d := route.Metric + metric; d <= 0 { // set to max int64 if we overflow route.Metric = math.MaxInt64 } else { diff --git a/network/service/handler/handler.go b/network/service/handler/handler.go index c47c0b30..62e8910c 100644 --- a/network/service/handler/handler.go +++ b/network/service/handler/handler.go @@ -94,11 +94,6 @@ func (n *Network) Connect(ctx context.Context, req *pbNet.ConnectRequest, resp * // Nodes returns the list of nodes func (n *Network) Nodes(ctx context.Context, req *pbNet.NodesRequest, resp *pbNet.NodesResponse) error { - depth := uint(req.Depth) - if depth <= 0 || depth > network.MaxDepth { - depth = network.MaxDepth - } - // root node nodes := map[string]network.Node{} diff --git a/proxy/mucp/mucp.go b/proxy/mucp/mucp.go index 1036f722..856657a1 100644 --- a/proxy/mucp/mucp.go +++ b/proxy/mucp/mucp.go @@ -274,7 +274,7 @@ func (p *Proxy) watchRoutes() { return } - if err := p.manageRoutes(event.Route, fmt.Sprintf("%s", event.Type)); err != nil { + if err := p.manageRoutes(event.Route, event.Type.String()); err != nil { // TODO: should we bail here? continue } @@ -562,12 +562,9 @@ func NewProxy(opts ...options.Option) proxy.Proxy { defer t.Stop() // we must refresh route metrics since they do not trigger new events - for { - select { - case <-t.C: - // refresh route metrics - p.refreshMetrics() - } + for range t.C { + // refresh route metrics + p.refreshMetrics() } }() diff --git a/registry/mdns_registry.go b/registry/mdns_registry.go index 6d1c75f7..48782f76 100644 --- a/registry/mdns_registry.go +++ b/registry/mdns_registry.go @@ -230,7 +230,9 @@ func (m *mdnsRegistry) GetService(service string) ([]*Service, error) { p := mdns.DefaultParams(service) // set context with timeout - p.Context, _ = context.WithTimeout(context.Background(), m.opts.Timeout) + var cancel context.CancelFunc + p.Context, cancel = context.WithTimeout(context.Background(), m.opts.Timeout) + defer cancel() // set entries channel p.Entries = entries // set the domain @@ -308,7 +310,9 @@ func (m *mdnsRegistry) ListServices() ([]*Service, error) { p := mdns.DefaultParams("_services") // set context with timeout - p.Context, _ = context.WithTimeout(context.Background(), m.opts.Timeout) + var cancel context.CancelFunc + p.Context, cancel = context.WithTimeout(context.Background(), m.opts.Timeout) + defer cancel() // set entries channel p.Entries = entries // set domain diff --git a/registry/service/watcher.go b/registry/service/watcher.go index c17085a0..973f9547 100644 --- a/registry/service/watcher.go +++ b/registry/service/watcher.go @@ -11,24 +11,22 @@ type serviceWatcher struct { } func (s *serviceWatcher) Next() (*registry.Result, error) { - for { - // check if closed - select { - case <-s.closed: - return nil, registry.ErrWatcherStopped - default: - } - - r, err := s.stream.Recv() - if err != nil { - return nil, err - } - - return ®istry.Result{ - Action: r.Action, - Service: ToService(r.Service), - }, nil + // check if closed + select { + case <-s.closed: + return nil, registry.ErrWatcherStopped + default: } + + r, err := s.stream.Recv() + if err != nil { + return nil, err + } + + return ®istry.Result{ + Action: r.Action, + Service: ToService(r.Service), + }, nil } func (s *serviceWatcher) Stop() { diff --git a/router/default.go b/router/default.go index 23384d2f..f938d1d5 100644 --- a/router/default.go +++ b/router/default.go @@ -718,7 +718,7 @@ func (r *router) Process(a *Advert) error { route := event.Route action := event.Type log.Debugf("Router %s applying %s from router %s for service %s %s", r.options.Id, action, route.Router, route.Service, route.Address) - if err := r.manageRoute(route, fmt.Sprintf("%s", action)); err != nil { + if err := r.manageRoute(route, action.String()); err != nil { return fmt.Errorf("failed applying action %s to routing table: %s", action, err) } } diff --git a/server/grpc/grpc.go b/server/grpc/grpc.go index 942d1a8f..ff180207 100644 --- a/server/grpc/grpc.go +++ b/server/grpc/grpc.go @@ -222,7 +222,9 @@ func (g *grpcServer) handler(srv interface{}, stream grpc.ServerStream) error { // set the timeout if we have it if len(to) > 0 { if n, err := strconv.ParseUint(to, 10, 64); err == nil { - ctx, _ = context.WithTimeout(ctx, time.Duration(n)) + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, time.Duration(n)) + defer cancel() } } diff --git a/server/rpc_server.go b/server/rpc_server.go index f1c3e713..8fee1470 100644 --- a/server/rpc_server.go +++ b/server/rpc_server.go @@ -277,7 +277,9 @@ func (s *rpcServer) ServeConn(sock transport.Socket) { // set the timeout from the header if we have it if len(to) > 0 { if n, err := strconv.ParseUint(to, 10, 64); err == nil { - ctx, _ = context.WithTimeout(ctx, time.Duration(n)) + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, time.Duration(n)) + defer cancel() } } diff --git a/server/server.go b/server/server.go index 34c38258..0fcc3436 100644 --- a/server/server.go +++ b/server/server.go @@ -201,7 +201,7 @@ func Run() error { } ch := make(chan os.Signal, 1) - signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL) + signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT) log.Logf("Received signal %s", <-ch) return Stop() diff --git a/sync/leader/etcd/etcd.go b/sync/leader/etcd/etcd.go index 6179bf0b..f8e5baa5 100644 --- a/sync/leader/etcd/etcd.go +++ b/sync/leader/etcd/etcd.go @@ -39,9 +39,7 @@ func (e *etcdLeader) Elect(id string, opts ...leader.ElectOption) (leader.Electe l := cc.NewElection(s, path) - ctx, _ := context.WithCancel(context.Background()) - - if err := l.Campaign(ctx, id); err != nil { + if err := l.Campaign(context.TODO(), id); err != nil { return nil, err } @@ -63,14 +61,8 @@ func (e *etcdLeader) Follow() chan string { ech := l.Observe(context.Background()) go func() { - for { - select { - case r, ok := <-ech: - if !ok { - return - } - ch <- string(r.Kvs[0].Value) - } + for r := range ech { + ch <- string(r.Kvs[0].Value) } }() @@ -82,8 +74,7 @@ func (e *etcdLeader) String() string { } func (e *etcdElected) Reelect() error { - ctx, _ := context.WithCancel(context.Background()) - return e.e.Campaign(ctx, e.id) + return e.e.Campaign(context.TODO(), e.id) } func (e *etcdElected) Revoked() chan bool { diff --git a/sync/lock/etcd/etcd.go b/sync/lock/etcd/etcd.go index 0b8ba823..9c55ddde 100644 --- a/sync/lock/etcd/etcd.go +++ b/sync/lock/etcd/etcd.go @@ -49,9 +49,7 @@ func (e *etcdLock) Acquire(id string, opts ...lock.AcquireOption) error { m := cc.NewMutex(s, path) - ctx, _ := context.WithCancel(context.Background()) - - if err := m.Lock(ctx); err != nil { + if err := m.Lock(context.TODO()); err != nil { return err } diff --git a/sync/task/task.go b/sync/task/task.go index 031355ca..c9b9b5d6 100644 --- a/sync/task/task.go +++ b/sync/task/task.go @@ -63,7 +63,9 @@ func (s Schedule) Run() <-chan time.Time { } // start ticker - for t := range time.Tick(s.Interval) { + ticker := time.NewTicker(s.Interval) + defer ticker.Stop() + for t := range ticker.C { ch <- t } }() diff --git a/tunnel/link.go b/tunnel/link.go index d135c152..25083138 100644 --- a/tunnel/link.go +++ b/tunnel/link.go @@ -228,7 +228,7 @@ func (l *link) manage() { } // check the type of message switch { - case bytes.Compare(p.message.Body, linkRequest) == 0: + case bytes.Equal(p.message.Body, linkRequest): log.Tracef("Link %s received link request %v", l.id, p.message.Body) // send response if err := send(linkResponse); err != nil { @@ -236,7 +236,7 @@ func (l *link) manage() { l.errCount++ l.Unlock() } - case bytes.Compare(p.message.Body, linkResponse) == 0: + case bytes.Equal(p.message.Body, linkResponse): // set round trip time d := time.Since(now) log.Tracef("Link %s received link response in %v", p.message.Body, d) diff --git a/web/service.go b/web/service.go index da78d6c8..97c5255a 100644 --- a/web/service.go +++ b/web/service.go @@ -376,7 +376,7 @@ func (s *service) Run() error { go s.run(ex) ch := make(chan os.Signal, 1) - signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL) + signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT) select { // wait on kill signal