diff --git a/client/grpc/grpc.go b/client/grpc/grpc.go index 30398b17..ac1dc82e 100644 --- a/client/grpc/grpc.go +++ b/client/grpc/grpc.go @@ -295,7 +295,7 @@ func (g *grpcClient) Options() client.Options { } func (g *grpcClient) NewMessage(topic string, msg interface{}, opts ...client.MessageOption) client.Message { - return newGRPCPublication(topic, msg, "application/octet-stream") + return newGRPCPublication(topic, msg, g.opts.ContentType, opts...) } func (g *grpcClient) NewRequest(service, method string, req interface{}, reqOpts ...client.RequestOption) client.Request { diff --git a/network/network.go b/network/network.go index 7dba3276..be83b8d0 100644 --- a/network/network.go +++ b/network/network.go @@ -1,46 +1,46 @@ -// Package network is an interface for defining a network overlay +// Package network is a package for defining a network overlay package network import ( "github.com/micro/go-micro/config/options" ) +// Network is an interface defining networks or graphs type Network interface { options.Options - // Id of this network - Id() string - // Connect to the network with id - Connect(id string) error + // Id of this node + Id() uint64 + // Connect to a node + Connect(id uint64) (Link, error) // Close the network connection Close() error - // Accept messages + // Accept messages on the network Accept() (*Message, error) - // Send a message + // Send a message to the network Send(*Message) error - // Advertise a service on this network - Advertise(service string) error - // Retrieve list of nodes for a service - Nodes(service string) ([]Node, error) + // Retrieve list of connections + Links() ([]Link, error) } // Node represents a network node type Node interface { - // Id of the node - Id() string - // The network for this node - Network() Network + // Node is a network. Network is a node. + Network } -// Message is a message sent over the network -type Message struct { - // Headers are the routing headers - // e.g Micro-Service, Micro-Endpoint, Micro-Network - // see https://github.com/micro/development/blob/master/protocol.md - Header map[string]string - // Body is the encaspulated payload - Body []byte +// Link is a connection to another node +type Link interface { + // remote node + Node + // length of link which dictates speed + Length() int + // weight of link which dictates curvature + Weight() int } +// Message is the base type for opaque data +type Message []byte + var ( // TODO: set default network DefaultNetwork Network diff --git a/network/transport/transport.go b/network/transport/transport.go deleted file mode 100644 index 60e6a957..00000000 --- a/network/transport/transport.go +++ /dev/null @@ -1,203 +0,0 @@ -// Package transport implements the network as a transport interface -package transport - -import ( - "context" - "time" - - "github.com/micro/go-micro/network" - "github.com/micro/go-micro/transport" - "github.com/micro/go-micro/util/backoff" -) - -type networkKey struct{} - -// Transport is a network transport -type Transport struct { - Network network.Network - options transport.Options -} - -// Socket is a transport socket -type Socket struct { - // The service - Service string - - // Send via Network.Send(Message) - Network network.Network - - // Remote/Local - remote, local string - - // the first message if its a listener - message *network.Message -} - -// Listener is a transport listener -type Listener struct { - // The local service - Service string - - // The network - Network network.Network -} - -func (s *Socket) Local() string { - return s.local -} - -func (s *Socket) Remote() string { - return s.remote -} - -func (s *Socket) Close() error { - // TODO: should it close the network? - return s.Network.Close() -} - -func (t *Transport) Init(opts ...transport.Option) error { - for _, o := range opts { - o(&t.options) - } - return nil -} - -func (t *Transport) Options() transport.Options { - return t.options -} - -func (t *Transport) Dial(service string, opts ...transport.DialOption) (transport.Client, error) { - // TODO: establish pseudo socket? - return &Socket{ - Service: service, - Network: t.Network, - remote: service, - // TODO: local - local: "local", - }, nil -} - -func (t *Transport) Listen(service string, opts ...transport.ListenOption) (transport.Listener, error) { - // TODO specify connect id - if err := t.Network.Connect("micro.mu"); err != nil { - return nil, err - } - - // advertise the service - if err := t.Network.Advertise(service); err != nil { - return nil, err - } - - return &Listener{ - Service: service, - Network: t.Network, - }, nil -} - -func (t *Transport) String() string { - return "network" -} - -func (s *Socket) Send(msg *transport.Message) error { - // TODO: set routing headers? - return s.Network.Send(&network.Message{ - Header: msg.Header, - Body: msg.Body, - }) -} - -func (s *Socket) Recv(msg *transport.Message) error { - if msg == nil { - msg = new(transport.Message) - } - - // return first message - if s.message != nil { - msg.Header = s.message.Header - msg.Body = s.message.Body - s.message = nil - return nil - } - - m, err := s.Network.Accept() - if err != nil { - return err - } - - msg.Header = m.Header - msg.Body = m.Body - return nil -} - -func (l *Listener) Addr() string { - return l.Service -} - -func (l *Listener) Close() error { - return l.Network.Close() -} - -func (l *Listener) Accept(fn func(transport.Socket)) error { - var i int - - for { - msg, err := l.Network.Accept() - if err != nil { - // increment error counter - i++ - - // break if lots of error - if i > 3 { - return err - } - - // otherwise continue - time.Sleep(backoff.Do(i)) - continue - } - - // reset - i = 0 - - // execute in go routine - go fn(&Socket{ - Service: l.Service, - Network: l.Network, - local: l.Service, - // TODO: remote - remote: "remote", - message: msg, - }) - } -} - -// NewTransport returns a new network transport. It assumes the network is already connected -func NewTransport(opts ...transport.Option) transport.Transport { - options := transport.Options{ - Context: context.Background(), - } - - for _, o := range opts { - o(&options) - } - - net, ok := options.Context.Value(networkKey{}).(network.Network) - if !ok { - net = network.DefaultNetwork - } - - return &Transport{ - options: options, - Network: net, - } -} - -// WithNetwork passes in the network -func WithNetwork(n network.Network) transport.Option { - return func(o *transport.Options) { - if o.Context == nil { - o.Context = context.Background() - } - o.Context = context.WithValue(o.Context, networkKey{}, n) - } -} diff --git a/registry/gossip/gossip.go b/registry/gossip/gossip.go index c8ea457a..45506392 100644 --- a/registry/gossip/gossip.go +++ b/registry/gossip/gossip.go @@ -734,6 +734,12 @@ func (g *gossipRegistry) Register(s *registry.Service, opts ...registry.Register notify: nil, }) + // send update to local watchers + g.updates <- &update{ + Update: up, + Service: s, + } + // wait <-time.After(g.interval * 2) @@ -770,6 +776,12 @@ func (g *gossipRegistry) Deregister(s *registry.Service) error { notify: nil, }) + // send update to local watchers + g.updates <- &update{ + Update: up, + Service: s, + } + // wait <-time.After(g.interval * 2)