mirror of
https://github.com/go-micro/go-micro.git
synced 2024-12-18 08:26:38 +02:00
delete k8s registry (#1522)
This commit is contained in:
parent
6a666c9c7d
commit
9a685b2df5
@ -42,7 +42,6 @@ import (
|
|||||||
|
|
||||||
// registries
|
// registries
|
||||||
"github.com/micro/go-micro/v2/registry/etcd"
|
"github.com/micro/go-micro/v2/registry/etcd"
|
||||||
kreg "github.com/micro/go-micro/v2/registry/kubernetes"
|
|
||||||
"github.com/micro/go-micro/v2/registry/mdns"
|
"github.com/micro/go-micro/v2/registry/mdns"
|
||||||
rmem "github.com/micro/go-micro/v2/registry/memory"
|
rmem "github.com/micro/go-micro/v2/registry/memory"
|
||||||
regSrv "github.com/micro/go-micro/v2/registry/service"
|
regSrv "github.com/micro/go-micro/v2/registry/service"
|
||||||
@ -328,11 +327,10 @@ var (
|
|||||||
}
|
}
|
||||||
|
|
||||||
DefaultRegistries = map[string]func(...registry.Option) registry.Registry{
|
DefaultRegistries = map[string]func(...registry.Option) registry.Registry{
|
||||||
"service": regSrv.NewRegistry,
|
"service": regSrv.NewRegistry,
|
||||||
"etcd": etcd.NewRegistry,
|
"etcd": etcd.NewRegistry,
|
||||||
"mdns": mdns.NewRegistry,
|
"mdns": mdns.NewRegistry,
|
||||||
"memory": rmem.NewRegistry,
|
"memory": rmem.NewRegistry,
|
||||||
"kubernetes": kreg.NewRegistry,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
DefaultSelectors = map[string]func(...selector.Option) selector.Selector{
|
DefaultSelectors = map[string]func(...selector.Option) selector.Selector{
|
||||||
|
@ -1,66 +0,0 @@
|
|||||||
# Kubernetes Registry Plugin for micro
|
|
||||||
This is a plugin for go-micro that allows you to use Kubernetes as a registry.
|
|
||||||
|
|
||||||
|
|
||||||
## Overview
|
|
||||||
This registry plugin makes use of Annotations and Labels on a Kubernetes pod
|
|
||||||
to build a service discovery mechanism.
|
|
||||||
|
|
||||||
|
|
||||||
## RBAC
|
|
||||||
If your Kubernetes cluster has RBAC enabled, a role and role binding
|
|
||||||
will need to be created to allow this plugin to `list` and `patch` pods.
|
|
||||||
|
|
||||||
A cluster role can be used to specify the `list` and `patch`
|
|
||||||
requirements, while a role binding per namespace can be used to apply
|
|
||||||
the cluster role. The example RBAC configs below assume your Micro-based
|
|
||||||
services are running in the `test` namespace, and the pods that contain
|
|
||||||
the services are using the `micro-services` service account.
|
|
||||||
|
|
||||||
```
|
|
||||||
apiVersion: rbac.authorization.k8s.io/v1
|
|
||||||
kind: ClusterRole
|
|
||||||
metadata:
|
|
||||||
name: micro-registry
|
|
||||||
rules:
|
|
||||||
- apiGroups:
|
|
||||||
- ""
|
|
||||||
resources:
|
|
||||||
- pods
|
|
||||||
verbs:
|
|
||||||
- list
|
|
||||||
- patch
|
|
||||||
- watch
|
|
||||||
```
|
|
||||||
|
|
||||||
```
|
|
||||||
apiVersion: rbac.authorization.k8s.io/v1
|
|
||||||
kind: RoleBinding
|
|
||||||
metadata:
|
|
||||||
name: micro-registry
|
|
||||||
roleRef:
|
|
||||||
apiGroup: rbac.authorization.k8s.io
|
|
||||||
kind: ClusterRole
|
|
||||||
name: micro-registry
|
|
||||||
subjects:
|
|
||||||
- kind: ServiceAccount
|
|
||||||
name: micro-services
|
|
||||||
namespace: test
|
|
||||||
```
|
|
||||||
|
|
||||||
|
|
||||||
## Gotchas
|
|
||||||
* Registering/Deregistering relies on the HOSTNAME Environment Variable, which inside a pod
|
|
||||||
is the place where it can be retrieved from. (This needs improving)
|
|
||||||
|
|
||||||
|
|
||||||
## Connecting to the Kubernetes API
|
|
||||||
### Within a pod
|
|
||||||
If the `--registry_address` flag is omitted, the plugin will securely connect to
|
|
||||||
the Kubernetes API using the pods "Service Account". No extra configuration is necessary.
|
|
||||||
|
|
||||||
Find out more about service accounts here. http://kubernetes.io/docs/user-guide/accessing-the-cluster/
|
|
||||||
|
|
||||||
### Outside of Kubernetes
|
|
||||||
Some functions of the plugin should work, but its not been heavily tested.
|
|
||||||
Currently no TLS support.
|
|
@ -1,289 +0,0 @@
|
|||||||
// Package kubernetes provides a kubernetes registry
|
|
||||||
package kubernetes
|
|
||||||
|
|
||||||
import (
|
|
||||||
"encoding/json"
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"os"
|
|
||||||
"regexp"
|
|
||||||
"strings"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/micro/go-micro/v2/registry"
|
|
||||||
"github.com/micro/go-micro/v2/util/kubernetes/client"
|
|
||||||
)
|
|
||||||
|
|
||||||
type kregistry struct {
|
|
||||||
client client.Client
|
|
||||||
timeout time.Duration
|
|
||||||
options registry.Options
|
|
||||||
}
|
|
||||||
|
|
||||||
var (
|
|
||||||
// used on pods as labels & services to select
|
|
||||||
// eg: svcSelectorPrefix+"svc.name"
|
|
||||||
servicePrefix = "go.micro/"
|
|
||||||
serviceValue = "service"
|
|
||||||
|
|
||||||
labelTypeKey = "micro"
|
|
||||||
labelTypeValue = "service"
|
|
||||||
|
|
||||||
// used on k8s services to scope a serialised
|
|
||||||
// micro service by pod name
|
|
||||||
annotationPrefix = "go.micro/"
|
|
||||||
|
|
||||||
// Pod status
|
|
||||||
podRunning = "Running"
|
|
||||||
|
|
||||||
// label name regex
|
|
||||||
labelRe = regexp.MustCompilePOSIX("[-A-Za-z0-9_.]")
|
|
||||||
)
|
|
||||||
|
|
||||||
// podSelector
|
|
||||||
var podSelector = map[string]string{
|
|
||||||
labelTypeKey: labelTypeValue,
|
|
||||||
}
|
|
||||||
|
|
||||||
func configure(k *kregistry, opts ...registry.Option) error {
|
|
||||||
for _, o := range opts {
|
|
||||||
o(&k.options)
|
|
||||||
}
|
|
||||||
|
|
||||||
// get first host
|
|
||||||
var host string
|
|
||||||
if len(k.options.Addrs) > 0 && len(k.options.Addrs[0]) > 0 {
|
|
||||||
host = k.options.Addrs[0]
|
|
||||||
}
|
|
||||||
|
|
||||||
if k.options.Timeout == 0 {
|
|
||||||
k.options.Timeout = time.Second * 1
|
|
||||||
}
|
|
||||||
|
|
||||||
// if no hosts setup, assume InCluster
|
|
||||||
var c client.Client
|
|
||||||
|
|
||||||
if len(host) > 0 {
|
|
||||||
c = client.NewLocalClient(host)
|
|
||||||
} else {
|
|
||||||
c = client.NewClusterClient()
|
|
||||||
}
|
|
||||||
|
|
||||||
k.client = c
|
|
||||||
k.timeout = k.options.Timeout
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// serviceName generates a valid service name for k8s labels
|
|
||||||
func serviceName(name string) string {
|
|
||||||
aname := make([]byte, len(name))
|
|
||||||
|
|
||||||
for i, r := range []byte(name) {
|
|
||||||
if !labelRe.Match([]byte{r}) {
|
|
||||||
aname[i] = '_'
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
aname[i] = r
|
|
||||||
}
|
|
||||||
|
|
||||||
return string(aname)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Init allows reconfig of options
|
|
||||||
func (c *kregistry) Init(opts ...registry.Option) error {
|
|
||||||
return configure(c, opts...)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Options returns the registry Options
|
|
||||||
func (c *kregistry) Options() registry.Options {
|
|
||||||
return c.options
|
|
||||||
}
|
|
||||||
|
|
||||||
// Register sets a service selector label and an annotation with a
|
|
||||||
// serialised version of the service passed in.
|
|
||||||
func (c *kregistry) Register(s *registry.Service, opts ...registry.RegisterOption) error {
|
|
||||||
if len(s.Nodes) == 0 {
|
|
||||||
return errors.New("no nodes")
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: grab podname from somewhere better than this.
|
|
||||||
podName := os.Getenv("HOSTNAME")
|
|
||||||
svcName := s.Name
|
|
||||||
|
|
||||||
// encode micro service
|
|
||||||
b, err := json.Marshal(s)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
/// marshalled service
|
|
||||||
svc := string(b)
|
|
||||||
|
|
||||||
pod := &client.Pod{
|
|
||||||
Metadata: &client.Metadata{
|
|
||||||
Labels: map[string]string{
|
|
||||||
// micro: service
|
|
||||||
labelTypeKey: labelTypeValue,
|
|
||||||
// micro/service/name: service
|
|
||||||
servicePrefix + serviceName(svcName): serviceValue,
|
|
||||||
},
|
|
||||||
Annotations: map[string]string{
|
|
||||||
// micro/service/name: definition
|
|
||||||
annotationPrefix + serviceName(svcName): svc,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
return c.client.Update(&client.Resource{
|
|
||||||
Name: podName,
|
|
||||||
Kind: "pod",
|
|
||||||
Value: pod,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// Deregister nils out any things set in Register
|
|
||||||
func (c *kregistry) Deregister(s *registry.Service) error {
|
|
||||||
if len(s.Nodes) == 0 {
|
|
||||||
return errors.New("you must deregister at least one node")
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: grab podname from somewhere better than this.
|
|
||||||
podName := os.Getenv("HOSTNAME")
|
|
||||||
svcName := s.Name
|
|
||||||
|
|
||||||
pod := &client.Pod{
|
|
||||||
Metadata: &client.Metadata{
|
|
||||||
Labels: map[string]string{
|
|
||||||
servicePrefix + serviceName(svcName): "",
|
|
||||||
},
|
|
||||||
Annotations: map[string]string{
|
|
||||||
annotationPrefix + serviceName(svcName): "",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
return c.client.Update(&client.Resource{
|
|
||||||
Name: podName,
|
|
||||||
Kind: "pod",
|
|
||||||
Value: pod,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetService will get all the pods with the given service selector,
|
|
||||||
// and build services from the annotations.
|
|
||||||
func (c *kregistry) GetService(name string) ([]*registry.Service, error) {
|
|
||||||
var pods client.PodList
|
|
||||||
|
|
||||||
if err := c.client.Get(&client.Resource{
|
|
||||||
Kind: "pod",
|
|
||||||
Value: &pods,
|
|
||||||
}, map[string]string{
|
|
||||||
servicePrefix + serviceName(name): serviceValue,
|
|
||||||
}); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(pods.Items) == 0 {
|
|
||||||
return nil, registry.ErrNotFound
|
|
||||||
}
|
|
||||||
|
|
||||||
// svcs mapped by version
|
|
||||||
svcs := make(map[string]*registry.Service)
|
|
||||||
|
|
||||||
// loop through items
|
|
||||||
for _, pod := range pods.Items {
|
|
||||||
if pod.Status.Phase != podRunning {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// get serialised service from annotation
|
|
||||||
svcStr, ok := pod.Metadata.Annotations[annotationPrefix+serviceName(name)]
|
|
||||||
if !ok {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// unmarshal service string
|
|
||||||
var svc registry.Service
|
|
||||||
|
|
||||||
if err := json.Unmarshal([]byte(svcStr), &svc); err != nil {
|
|
||||||
return nil, fmt.Errorf("could not unmarshal service '%s' from pod annotation", name)
|
|
||||||
}
|
|
||||||
|
|
||||||
// merge up pod service & ip with versioned service.
|
|
||||||
vs, ok := svcs[svc.Version]
|
|
||||||
if !ok {
|
|
||||||
svcs[svc.Version] = &svc
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
vs.Nodes = append(vs.Nodes, svc.Nodes...)
|
|
||||||
}
|
|
||||||
|
|
||||||
list := make([]*registry.Service, 0, len(svcs))
|
|
||||||
for _, val := range svcs {
|
|
||||||
list = append(list, val)
|
|
||||||
}
|
|
||||||
return list, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// ListServices will list all the service names
|
|
||||||
func (c *kregistry) ListServices() ([]*registry.Service, error) {
|
|
||||||
var pods client.PodList
|
|
||||||
|
|
||||||
if err := c.client.Get(&client.Resource{
|
|
||||||
Kind: "pod",
|
|
||||||
Value: &pods,
|
|
||||||
}, podSelector); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// svcs mapped by name
|
|
||||||
svcs := make(map[string]bool)
|
|
||||||
|
|
||||||
for _, pod := range pods.Items {
|
|
||||||
if pod.Status.Phase != podRunning {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
for k, v := range pod.Metadata.Annotations {
|
|
||||||
if !strings.HasPrefix(k, annotationPrefix) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// we have to unmarshal the annotation itself since the
|
|
||||||
// key is encoded to match the regex restriction.
|
|
||||||
var svc registry.Service
|
|
||||||
|
|
||||||
if err := json.Unmarshal([]byte(v), &svc); err != nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
svcs[svc.Name] = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
var list []*registry.Service
|
|
||||||
|
|
||||||
for val := range svcs {
|
|
||||||
list = append(list, ®istry.Service{Name: val})
|
|
||||||
}
|
|
||||||
|
|
||||||
return list, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Watch returns a kubernetes watcher
|
|
||||||
func (c *kregistry) Watch(opts ...registry.WatchOption) (registry.Watcher, error) {
|
|
||||||
return newWatcher(c, opts...)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *kregistry) String() string {
|
|
||||||
return "kubernetes"
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewRegistry creates a kubernetes registry
|
|
||||||
func NewRegistry(opts ...registry.Option) registry.Registry {
|
|
||||||
k := &kregistry{
|
|
||||||
options: registry.Options{},
|
|
||||||
}
|
|
||||||
configure(k, opts...)
|
|
||||||
return k
|
|
||||||
}
|
|
@ -1,263 +0,0 @@
|
|||||||
package kubernetes
|
|
||||||
|
|
||||||
import (
|
|
||||||
"encoding/json"
|
|
||||||
"errors"
|
|
||||||
"strings"
|
|
||||||
"sync"
|
|
||||||
|
|
||||||
"github.com/micro/go-micro/v2/logger"
|
|
||||||
"github.com/micro/go-micro/v2/registry"
|
|
||||||
"github.com/micro/go-micro/v2/util/kubernetes/client"
|
|
||||||
)
|
|
||||||
|
|
||||||
type k8sWatcher struct {
|
|
||||||
registry *kregistry
|
|
||||||
watcher client.Watcher
|
|
||||||
next chan *registry.Result
|
|
||||||
stop chan bool
|
|
||||||
|
|
||||||
sync.RWMutex
|
|
||||||
pods map[string]*client.Pod
|
|
||||||
}
|
|
||||||
|
|
||||||
// build a cache of pods when the watcher starts.
|
|
||||||
func (k *k8sWatcher) updateCache() ([]*registry.Result, error) {
|
|
||||||
var pods client.PodList
|
|
||||||
|
|
||||||
if err := k.registry.client.Get(&client.Resource{
|
|
||||||
Kind: "pod",
|
|
||||||
Value: &pods,
|
|
||||||
}, podSelector); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
var results []*registry.Result
|
|
||||||
|
|
||||||
for _, pod := range pods.Items {
|
|
||||||
rslts := k.buildPodResults(&pod, nil)
|
|
||||||
|
|
||||||
for _, r := range rslts {
|
|
||||||
results = append(results, r)
|
|
||||||
}
|
|
||||||
|
|
||||||
k.Lock()
|
|
||||||
k.pods[pod.Metadata.Name] = &pod
|
|
||||||
k.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
return results, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// look through pod annotations, compare against cache if present
|
|
||||||
// and return a list of results to send down the wire.
|
|
||||||
func (k *k8sWatcher) buildPodResults(pod *client.Pod, cache *client.Pod) []*registry.Result {
|
|
||||||
var results []*registry.Result
|
|
||||||
ignore := make(map[string]bool)
|
|
||||||
|
|
||||||
if pod.Metadata != nil {
|
|
||||||
for ak, av := range pod.Metadata.Annotations {
|
|
||||||
// check this annotation kv is a service notation
|
|
||||||
if !strings.HasPrefix(ak, annotationPrefix) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(av) == 0 {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// ignore when we check the cached annotations
|
|
||||||
// as we take care of it here
|
|
||||||
ignore[ak] = true
|
|
||||||
|
|
||||||
// compare aginst cache.
|
|
||||||
var cacheExists bool
|
|
||||||
var cav string
|
|
||||||
|
|
||||||
if cache != nil && cache.Metadata != nil {
|
|
||||||
cav, cacheExists = cache.Metadata.Annotations[ak]
|
|
||||||
if cacheExists && len(cav) > 0 && cav == av {
|
|
||||||
// service notation exists and is identical -
|
|
||||||
// no change result required.
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
rslt := ®istry.Result{}
|
|
||||||
if cacheExists {
|
|
||||||
rslt.Action = "update"
|
|
||||||
} else {
|
|
||||||
rslt.Action = "create"
|
|
||||||
}
|
|
||||||
|
|
||||||
// unmarshal service notation from annotation value
|
|
||||||
err := json.Unmarshal([]byte(av), &rslt.Service)
|
|
||||||
if err != nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
results = append(results, rslt)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// loop through cache annotations to find services
|
|
||||||
// not accounted for above, and "delete" them.
|
|
||||||
if cache != nil && cache.Metadata != nil {
|
|
||||||
for ak, av := range cache.Metadata.Annotations {
|
|
||||||
if ignore[ak] {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// check this annotation kv is a service notation
|
|
||||||
if !strings.HasPrefix(ak, annotationPrefix) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
rslt := ®istry.Result{Action: "delete"}
|
|
||||||
// unmarshal service notation from annotation value
|
|
||||||
err := json.Unmarshal([]byte(av), &rslt.Service)
|
|
||||||
if err != nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
results = append(results, rslt)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return results
|
|
||||||
}
|
|
||||||
|
|
||||||
// handleEvent will taken an event from the k8s pods API and do the correct
|
|
||||||
// things with the result, based on the local cache.
|
|
||||||
func (k *k8sWatcher) handleEvent(event client.Event) {
|
|
||||||
var pod client.Pod
|
|
||||||
if err := json.Unmarshal([]byte(event.Object), &pod); err != nil {
|
|
||||||
if logger.V(logger.InfoLevel, logger.DefaultLogger) {
|
|
||||||
logger.Info("K8s Watcher: Couldnt unmarshal event object from pod")
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
switch event.Type {
|
|
||||||
case client.Modified:
|
|
||||||
// Pod was modified
|
|
||||||
|
|
||||||
k.RLock()
|
|
||||||
cache := k.pods[pod.Metadata.Name]
|
|
||||||
k.RUnlock()
|
|
||||||
|
|
||||||
// service could have been added, edited or removed.
|
|
||||||
var results []*registry.Result
|
|
||||||
|
|
||||||
if pod.Status.Phase == podRunning {
|
|
||||||
results = k.buildPodResults(&pod, cache)
|
|
||||||
} else {
|
|
||||||
// passing in cache might not return all results
|
|
||||||
results = k.buildPodResults(&pod, nil)
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, result := range results {
|
|
||||||
// pod isnt running
|
|
||||||
if pod.Status.Phase != podRunning {
|
|
||||||
result.Action = "delete"
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case k.next <- result:
|
|
||||||
case <-k.stop:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
k.Lock()
|
|
||||||
k.pods[pod.Metadata.Name] = &pod
|
|
||||||
k.Unlock()
|
|
||||||
return
|
|
||||||
|
|
||||||
case client.Deleted:
|
|
||||||
// Pod was deleted
|
|
||||||
// passing in cache might not return all results
|
|
||||||
results := k.buildPodResults(&pod, nil)
|
|
||||||
|
|
||||||
for _, result := range results {
|
|
||||||
result.Action = "delete"
|
|
||||||
select {
|
|
||||||
case k.next <- result:
|
|
||||||
case <-k.stop:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
k.Lock()
|
|
||||||
delete(k.pods, pod.Metadata.Name)
|
|
||||||
k.Unlock()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
// Next will block until a new result comes in
|
|
||||||
func (k *k8sWatcher) Next() (*registry.Result, error) {
|
|
||||||
select {
|
|
||||||
case r := <-k.next:
|
|
||||||
return r, nil
|
|
||||||
case <-k.stop:
|
|
||||||
return nil, errors.New("watcher stopped")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Stop will cancel any requests, and close channels
|
|
||||||
func (k *k8sWatcher) Stop() {
|
|
||||||
select {
|
|
||||||
case <-k.stop:
|
|
||||||
return
|
|
||||||
default:
|
|
||||||
k.watcher.Stop()
|
|
||||||
close(k.stop)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func newWatcher(kr *kregistry, opts ...registry.WatchOption) (registry.Watcher, error) {
|
|
||||||
var wo registry.WatchOptions
|
|
||||||
for _, o := range opts {
|
|
||||||
o(&wo)
|
|
||||||
}
|
|
||||||
|
|
||||||
selector := podSelector
|
|
||||||
if len(wo.Service) > 0 {
|
|
||||||
selector = map[string]string{
|
|
||||||
servicePrefix + serviceName(wo.Service): serviceValue,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create watch request
|
|
||||||
watcher, err := kr.client.Watch(&client.Resource{
|
|
||||||
Kind: "pod",
|
|
||||||
}, client.WatchParams(selector))
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
k := &k8sWatcher{
|
|
||||||
registry: kr,
|
|
||||||
watcher: watcher,
|
|
||||||
next: make(chan *registry.Result),
|
|
||||||
stop: make(chan bool),
|
|
||||||
pods: make(map[string]*client.Pod),
|
|
||||||
}
|
|
||||||
|
|
||||||
// update cache, but dont emit changes
|
|
||||||
if _, err := k.updateCache(); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// range over watch request changes, and invoke
|
|
||||||
// the update event
|
|
||||||
go func() {
|
|
||||||
for event := range watcher.Chan() {
|
|
||||||
k.handleEvent(event)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
return k, nil
|
|
||||||
}
|
|
Loading…
Reference in New Issue
Block a user