From 59eaa89bac017bee5d07192cbd5cf5a5c6940aac Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Mon, 17 Jun 2019 21:11:39 +0100 Subject: [PATCH 1/7] Node is a network --- network/network.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/network/network.go b/network/network.go index 7dba3276..adeb18cf 100644 --- a/network/network.go +++ b/network/network.go @@ -25,10 +25,8 @@ type Network interface { // Node represents a network node type Node interface { - // Id of the node - Id() string - // The network for this node - Network() Network + // Node is a network. Network is a node. + Network } // Message is a message sent over the network From b754c335491a6d583f0c8324b488b35e8cd957aa Mon Sep 17 00:00:00 2001 From: johnson Date: Tue, 18 Jun 2019 17:07:31 +0800 Subject: [PATCH 2/7] grpc message should be able to set --- client/grpc/grpc.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/grpc/grpc.go b/client/grpc/grpc.go index 9ed206c6..555fed0e 100644 --- a/client/grpc/grpc.go +++ b/client/grpc/grpc.go @@ -280,7 +280,7 @@ func (g *grpcClient) Options() client.Options { } func (g *grpcClient) NewMessage(topic string, msg interface{}, opts ...client.MessageOption) client.Message { - return newGRPCPublication(topic, msg, "application/octet-stream") + return newGRPCPublication(topic, msg, g.opts.ContentType, opts...) } func (g *grpcClient) NewRequest(service, method string, req interface{}, reqOpts ...client.RequestOption) client.Request { From 97cf2cd7c3b6a21a5201e7e6616847a966a617a8 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Tue, 18 Jun 2019 11:04:06 +0100 Subject: [PATCH 3/7] go fmt --- cmd/cmd.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cmd/cmd.go b/cmd/cmd.go index 310aa431..37616312 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -14,8 +14,8 @@ import ( cgrpc "github.com/micro/go-micro/client/grpc" cmucp "github.com/micro/go-micro/client/mucp" "github.com/micro/go-micro/server" - smucp "github.com/micro/go-micro/server/mucp" sgrpc "github.com/micro/go-micro/server/grpc" + smucp "github.com/micro/go-micro/server/mucp" "github.com/micro/go-micro/util/log" // brokers @@ -180,7 +180,7 @@ var ( } DefaultClients = map[string]func(...client.Option) client.Client{ - "rpc": client.NewClient, + "rpc": client.NewClient, "mucp": cmucp.NewClient, "grpc": cgrpc.NewClient, } @@ -200,7 +200,7 @@ var ( } DefaultServers = map[string]func(...server.Option) server.Server{ - "rpc": server.NewServer, + "rpc": server.NewServer, "mucp": smucp.NewServer, "grpc": sgrpc.NewServer, } From 51560009d28187abc976052bed3716c3bf2a0c41 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Tue, 18 Jun 2019 11:04:36 +0100 Subject: [PATCH 4/7] go fmt --- proxy/http/http.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/proxy/http/http.go b/proxy/http/http.go index ebea885d..61a6fc06 100644 --- a/proxy/http/http.go +++ b/proxy/http/http.go @@ -10,8 +10,8 @@ import ( "net/url" "path" - "github.com/micro/go-micro/errors" "github.com/micro/go-micro/config/options" + "github.com/micro/go-micro/errors" "github.com/micro/go-micro/proxy" "github.com/micro/go-micro/server" ) From ed54384bf497c5c3fe96a27971281275fffa30a0 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Tue, 18 Jun 2019 11:56:11 +0100 Subject: [PATCH 5/7] Update network --- network/network.go | 40 ++++--- network/transport/transport.go | 203 --------------------------------- 2 files changed, 21 insertions(+), 222 deletions(-) delete mode 100644 network/transport/transport.go diff --git a/network/network.go b/network/network.go index adeb18cf..be83b8d0 100644 --- a/network/network.go +++ b/network/network.go @@ -1,26 +1,25 @@ -// Package network is an interface for defining a network overlay +// Package network is a package for defining a network overlay package network import ( "github.com/micro/go-micro/config/options" ) +// Network is an interface defining networks or graphs type Network interface { options.Options - // Id of this network - Id() string - // Connect to the network with id - Connect(id string) error + // Id of this node + Id() uint64 + // Connect to a node + Connect(id uint64) (Link, error) // Close the network connection Close() error - // Accept messages + // Accept messages on the network Accept() (*Message, error) - // Send a message + // Send a message to the network Send(*Message) error - // Advertise a service on this network - Advertise(service string) error - // Retrieve list of nodes for a service - Nodes(service string) ([]Node, error) + // Retrieve list of connections + Links() ([]Link, error) } // Node represents a network node @@ -29,16 +28,19 @@ type Node interface { Network } -// Message is a message sent over the network -type Message struct { - // Headers are the routing headers - // e.g Micro-Service, Micro-Endpoint, Micro-Network - // see https://github.com/micro/development/blob/master/protocol.md - Header map[string]string - // Body is the encaspulated payload - Body []byte +// Link is a connection to another node +type Link interface { + // remote node + Node + // length of link which dictates speed + Length() int + // weight of link which dictates curvature + Weight() int } +// Message is the base type for opaque data +type Message []byte + var ( // TODO: set default network DefaultNetwork Network diff --git a/network/transport/transport.go b/network/transport/transport.go deleted file mode 100644 index 60e6a957..00000000 --- a/network/transport/transport.go +++ /dev/null @@ -1,203 +0,0 @@ -// Package transport implements the network as a transport interface -package transport - -import ( - "context" - "time" - - "github.com/micro/go-micro/network" - "github.com/micro/go-micro/transport" - "github.com/micro/go-micro/util/backoff" -) - -type networkKey struct{} - -// Transport is a network transport -type Transport struct { - Network network.Network - options transport.Options -} - -// Socket is a transport socket -type Socket struct { - // The service - Service string - - // Send via Network.Send(Message) - Network network.Network - - // Remote/Local - remote, local string - - // the first message if its a listener - message *network.Message -} - -// Listener is a transport listener -type Listener struct { - // The local service - Service string - - // The network - Network network.Network -} - -func (s *Socket) Local() string { - return s.local -} - -func (s *Socket) Remote() string { - return s.remote -} - -func (s *Socket) Close() error { - // TODO: should it close the network? - return s.Network.Close() -} - -func (t *Transport) Init(opts ...transport.Option) error { - for _, o := range opts { - o(&t.options) - } - return nil -} - -func (t *Transport) Options() transport.Options { - return t.options -} - -func (t *Transport) Dial(service string, opts ...transport.DialOption) (transport.Client, error) { - // TODO: establish pseudo socket? - return &Socket{ - Service: service, - Network: t.Network, - remote: service, - // TODO: local - local: "local", - }, nil -} - -func (t *Transport) Listen(service string, opts ...transport.ListenOption) (transport.Listener, error) { - // TODO specify connect id - if err := t.Network.Connect("micro.mu"); err != nil { - return nil, err - } - - // advertise the service - if err := t.Network.Advertise(service); err != nil { - return nil, err - } - - return &Listener{ - Service: service, - Network: t.Network, - }, nil -} - -func (t *Transport) String() string { - return "network" -} - -func (s *Socket) Send(msg *transport.Message) error { - // TODO: set routing headers? - return s.Network.Send(&network.Message{ - Header: msg.Header, - Body: msg.Body, - }) -} - -func (s *Socket) Recv(msg *transport.Message) error { - if msg == nil { - msg = new(transport.Message) - } - - // return first message - if s.message != nil { - msg.Header = s.message.Header - msg.Body = s.message.Body - s.message = nil - return nil - } - - m, err := s.Network.Accept() - if err != nil { - return err - } - - msg.Header = m.Header - msg.Body = m.Body - return nil -} - -func (l *Listener) Addr() string { - return l.Service -} - -func (l *Listener) Close() error { - return l.Network.Close() -} - -func (l *Listener) Accept(fn func(transport.Socket)) error { - var i int - - for { - msg, err := l.Network.Accept() - if err != nil { - // increment error counter - i++ - - // break if lots of error - if i > 3 { - return err - } - - // otherwise continue - time.Sleep(backoff.Do(i)) - continue - } - - // reset - i = 0 - - // execute in go routine - go fn(&Socket{ - Service: l.Service, - Network: l.Network, - local: l.Service, - // TODO: remote - remote: "remote", - message: msg, - }) - } -} - -// NewTransport returns a new network transport. It assumes the network is already connected -func NewTransport(opts ...transport.Option) transport.Transport { - options := transport.Options{ - Context: context.Background(), - } - - for _, o := range opts { - o(&options) - } - - net, ok := options.Context.Value(networkKey{}).(network.Network) - if !ok { - net = network.DefaultNetwork - } - - return &Transport{ - options: options, - Network: net, - } -} - -// WithNetwork passes in the network -func WithNetwork(n network.Network) transport.Option { - return func(o *transport.Options) { - if o.Context == nil { - o.Context = context.Background() - } - o.Context = context.WithValue(o.Context, networkKey{}, n) - } -} From 6459cdfc21bda7a6385b7c4c0f38caafe08c3997 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Tue, 18 Jun 2019 14:42:56 +0100 Subject: [PATCH 6/7] propagate updates to local watchers --- registry/gossip/gossip.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/registry/gossip/gossip.go b/registry/gossip/gossip.go index c8ea457a..932a69e2 100644 --- a/registry/gossip/gossip.go +++ b/registry/gossip/gossip.go @@ -734,6 +734,12 @@ func (g *gossipRegistry) Register(s *registry.Service, opts ...registry.Register notify: nil, }) + // send update to local watchers + g.updates <- &update{ + Update: up, + Service: s, + } + // wait <-time.After(g.interval * 2) @@ -770,6 +776,13 @@ func (g *gossipRegistry) Deregister(s *registry.Service) error { notify: nil, }) + // send update to local watchers + // send update to local watchers + g.updates <- &update{ + Update: up, + Service: s, + } + // wait <-time.After(g.interval * 2) From 7266c62d098d0f927685c45768f97734d6d50db6 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Tue, 18 Jun 2019 15:33:31 +0100 Subject: [PATCH 7/7] remove comment --- registry/gossip/gossip.go | 1 - 1 file changed, 1 deletion(-) diff --git a/registry/gossip/gossip.go b/registry/gossip/gossip.go index 932a69e2..45506392 100644 --- a/registry/gossip/gossip.go +++ b/registry/gossip/gossip.go @@ -776,7 +776,6 @@ func (g *gossipRegistry) Deregister(s *registry.Service) error { notify: nil, }) - // send update to local watchers // send update to local watchers g.updates <- &update{ Update: up,