1
0
mirror of https://github.com/go-micro/go-micro.git synced 2024-12-24 10:07:04 +02:00

Save current state of the world

This commit is contained in:
Asim Aslam 2019-12-11 14:37:03 +00:00
parent ff69d46c98
commit 6e28e7a86f
5 changed files with 97 additions and 44 deletions

View File

@ -1023,7 +1023,8 @@ func (n *network) sendTo(method, channel string, peer *node, msg proto.Message)
if err != nil {
return err
}
c, err := n.tunnel.Dial(channel, tunnel.DialMode(tunnel.Multicast), tunnel.DialLink(peer.link))
// Create a unicast connection to the peer but don't do the open/accept flow
c, err := n.tunnel.Dial(channel, tunnel.DialWait(false), tunnel.DialLink(peer.link))
if err != nil {
return err
}

View File

@ -143,7 +143,7 @@ func (r *runtime) run(events <-chan Event) {
}
}
case <-r.closed:
log.Debugf("Runtime stopped.")
log.Debugf("Runtime stopped")
return
}
}

View File

@ -73,10 +73,10 @@ func newTunnel(opts ...Option) *tun {
// Init initializes tunnel options
func (t *tun) Init(opts ...Option) error {
t.Lock()
defer t.Unlock()
for _, o := range opts {
o(&t.options)
}
t.Unlock()
return nil
}
@ -103,7 +103,6 @@ func (t *tun) delSession(channel, session string) {
// listChannels returns a list of listening channels
func (t *tun) listChannels() []string {
t.RLock()
defer t.RUnlock()
//nolint:prealloc
var channels []string
@ -113,6 +112,9 @@ func (t *tun) listChannels() []string {
}
channels = append(channels, session.channel)
}
t.RUnlock()
return channels
}
@ -244,53 +246,71 @@ func (t *tun) manageLink(link *link) {
}
// manageLinks is a function that can be called to immediately to link setup
// it purges dead links while generating new links for any nodes not connected
func (t *tun) manageLinks() {
var delLinks []string
delLinks := make(map[*link]string)
connected := make(map[string]bool)
t.RLock()
// get list of nodes from options
nodes := t.options.Nodes
// check the link status and purge dead links
for node, link := range t.links {
// check link status
switch link.State() {
case "closed":
delLinks = append(delLinks, node)
case "error":
delLinks = append(delLinks, node)
case "closed", "error":
delLinks[link] = node
default:
connected[node] = true
}
}
t.RUnlock()
// build a list of links to connect to
var connect []string
for _, node := range nodes {
// check if we're connected
if _, ok := connected[node]; ok {
continue
}
// add nodes to connect o
connect = append(connect, node)
}
// delete the dead links
if len(delLinks) > 0 {
t.Lock()
for _, node := range delLinks {
for link, node := range delLinks {
log.Debugf("Tunnel deleting dead link for %s", node)
if link, ok := t.links[node]; ok {
link.Close()
// check if the link exists
l, ok := t.links[node]
if ok {
// close and delete
l.Close()
delete(t.links, node)
}
// if the link does not match our own
if l != link {
// close our link just in case
link.Close()
}
}
t.Unlock()
}
// check current link status
var connect []string
// build list of unknown nodes to connect to
t.RLock()
for _, node := range t.options.Nodes {
if _, ok := t.links[node]; !ok {
connect = append(connect, node)
}
}
t.RUnlock()
var wg sync.WaitGroup
// establish new links
for _, node := range connect {
wg.Add(1)
@ -298,6 +318,7 @@ func (t *tun) manageLinks() {
defer wg.Done()
// create new link
// if we're using quic it should be a max 10 second handshake period
link, err := t.setupLink(node)
if err != nil {
log.Debugf("Tunnel failed to setup node link to %s: %v", node, err)
@ -313,6 +334,7 @@ func (t *tun) manageLinks() {
link.Close()
return
}
// save the link
t.links[node] = link
}(node)
@ -469,7 +491,6 @@ func (t *tun) process() {
func (t *tun) delLink(remote string) {
t.Lock()
defer t.Unlock()
// get the link
for id, link := range t.links {
@ -481,6 +502,8 @@ func (t *tun) delLink(remote string) {
link.Close()
delete(t.links, id)
}
t.Unlock()
}
// process incoming messages
@ -1035,6 +1058,7 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) {
// get opts
options := DialOptions{
Timeout: DefaultDialTimeout,
Wait: true,
}
for _, o := range opts {
@ -1119,11 +1143,16 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) {
}
// return early if its not unicast
// we will not call "open" for multicast
// we will not wait for "open" for multicast
if c.mode != Unicast {
return c, nil
}
// if we're not told to wait
if !options.Wait {
return c, nil
}
// Note: we go no further for multicast or broadcast.
// This is a unicast session so we call "open" and wait
// for an "accept"
@ -1221,15 +1250,16 @@ func (t *tun) Listen(channel string, opts ...ListenOption) (Listener, error) {
}
func (t *tun) Links() []Link {
t.RLock()
defer t.RUnlock()
links := make([]Link, 0, len(t.links))
t.RLock()
for _, link := range t.links {
links = append(links, link)
}
t.RUnlock()
return links
}

View File

@ -138,10 +138,10 @@ func (l *link) setRate(bits int64, delta time.Duration) {
// setRTT sets a nanosecond based moving average roundtrip time for the link
func (l *link) setRTT(d time.Duration) {
l.Lock()
defer l.Unlock()
if l.length <= 0 {
l.length = d.Nanoseconds()
l.Unlock()
return
}
@ -149,6 +149,8 @@ func (l *link) setRTT(d time.Duration) {
length := 0.8*float64(l.length) + 0.2*float64(d.Nanoseconds())
// set new length
l.length = int64(length)
l.Unlock()
}
func (l *link) delChannel(ch string) {
@ -159,8 +161,9 @@ func (l *link) delChannel(ch string) {
func (l *link) getChannel(ch string) time.Time {
l.RLock()
defer l.RUnlock()
return l.channels[ch]
t := l.channels[ch]
l.RUnlock()
return t
}
func (l *link) setChannel(channels ...string) {
@ -344,27 +347,31 @@ func (l *link) Delay() int64 {
// Current transfer rate as bits per second (lower is better)
func (l *link) Rate() float64 {
l.RLock()
defer l.RUnlock()
return l.rate
r := l.rate
l.RUnlock()
return r
}
func (l *link) Loopback() bool {
l.RLock()
defer l.RUnlock()
return l.loopback
lo := l.loopback
l.RUnlock()
return lo
}
// Length returns the roundtrip time as nanoseconds (lower is better).
// Returns 0 where no measurement has been taken.
func (l *link) Length() int64 {
l.RLock()
defer l.RUnlock()
return l.length
length := l.length
l.RUnlock()
return length
}
func (l *link) Id() string {
l.RLock()
defer l.RUnlock()
id := l.id
l.RUnlock()
return l.id
}
@ -413,11 +420,11 @@ func (l *link) Send(m *transport.Message) error {
}
l.Lock()
defer l.Unlock()
// there's an error increment the counter and bail
if err != nil {
l.errCount++
l.Unlock()
return err
}
@ -441,6 +448,8 @@ func (l *link) Send(m *transport.Message) error {
l.setRate(int64(bits), time.Since(now))
}
l.Unlock()
return nil
}
@ -476,10 +485,13 @@ func (l *link) State() string {
return "closed"
default:
l.RLock()
defer l.RUnlock()
if l.errCount > 3 {
errCount := l.errCount
l.RUnlock()
if lerrCount > 3 {
return "error"
}
return "connected"
}
}

View File

@ -38,6 +38,8 @@ type DialOptions struct {
Link string
// specify mode of the session
Mode Mode
// Wait for connection to be accepted
Wait bool
// the dial timeout
Timeout time.Duration
}
@ -124,6 +126,14 @@ func DialLink(id string) DialOption {
}
}
// DialWait specifies whether to wait for the connection
// to be accepted before returning the session
func DialWait(b bool) DialOption {
return func(o *DialOptions) {
o.Wait = b
}
}
// DefaultOptions returns router default options
func DefaultOptions() Options {
return Options{