1
0
mirror of https://github.com/go-micro/go-micro.git synced 2025-01-11 17:18:28 +02:00
go-micro/registry/consul_watcher.go

135 lines
2.7 KiB
Go

package registry
import (
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/watch"
)
type consulWatcher struct {
Registry *consulRegistry
wp *watch.WatchPlan
watchers map[string]*watch.WatchPlan
}
type serviceWatcher struct {
name string
}
func newConsulWatcher(cr *consulRegistry) (Watcher, error) {
cw := &consulWatcher{
Registry: cr,
watchers: make(map[string]*watch.WatchPlan),
}
wp, err := watch.Parse(map[string]interface{}{"type": "services"})
if err != nil {
return nil, err
}
wp.Handler = cw.Handle
go wp.Run(cr.Address)
cw.wp = wp
return cw, nil
}
func (cw *consulWatcher) serviceHandler(idx uint64, data interface{}) {
entries, ok := data.([]*api.ServiceEntry)
if !ok {
return
}
serviceMap := map[string]*Service{}
serviceName := ""
for _, e := range entries {
serviceName = e.Service.Service
id := e.Node.Node
key := e.Service.Service + e.Service.ID
version := e.Service.ID
// We're adding service version but
// don't want to break backwards compatibility
if id == version {
key = e.Service.Service + "default"
version = ""
}
svc, ok := serviceMap[key]
if !ok {
svc = &Service{
Endpoints: decodeEndpoints(e.Service.Tags),
Name: e.Service.Service,
Version: version,
}
serviceMap[key] = svc
}
svc.Nodes = append(svc.Nodes, &Node{
Id: id,
Address: e.Node.Address,
Port: e.Service.Port,
Metadata: decodeMetadata(e.Service.Tags),
})
}
cw.Registry.mtx.Lock()
var services []*Service
for _, service := range serviceMap {
services = append(services, service)
}
cw.Registry.services[serviceName] = services
cw.Registry.mtx.Unlock()
}
func (cw *consulWatcher) Handle(idx uint64, data interface{}) {
services, ok := data.(map[string][]string)
if !ok {
return
}
// add new watchers
for service, _ := range services {
if _, ok := cw.watchers[service]; ok {
continue
}
wp, err := watch.Parse(map[string]interface{}{
"type": "service",
"service": service,
})
if err == nil {
wp.Handler = cw.serviceHandler
go wp.Run(cw.Registry.Address)
cw.watchers[service] = wp
}
}
cw.Registry.mtx.RLock()
rservices := cw.Registry.services
cw.Registry.mtx.RUnlock()
// remove unknown services from registry
for service, _ := range rservices {
if _, ok := services[service]; !ok {
cw.Registry.mtx.Lock()
delete(cw.Registry.services, service)
cw.Registry.mtx.Unlock()
}
}
// remove unknown services from watchers
for service, w := range cw.watchers {
if _, ok := services[service]; !ok {
w.Stop()
delete(cw.watchers, service)
}
}
}
func (cw *consulWatcher) Stop() {
if cw.wp == nil {
return
}
cw.wp.Stop()
}