mirror of
https://github.com/go-micro/go-micro.git
synced 2025-01-11 17:18:28 +02:00
75b1a62af3
* Replace service prefix with FQDN style prefix According to the k8s documentation, the label and annotation prefixes should be in the format of a FQDN, with dot separated labels of no more than 63 characters. The current label and annotation paramteres are rejected by the k8s api, most likely because they have two forward slashes in them. * Use go.micro as service and annotation prefix
290 lines
6.0 KiB
Go
290 lines
6.0 KiB
Go
// Package kubernetes provides a kubernetes registry
|
|
package kubernetes
|
|
|
|
import (
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"os"
|
|
"regexp"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/micro/go-micro/registry"
|
|
"github.com/micro/go-micro/util/kubernetes/client"
|
|
)
|
|
|
|
type kregistry struct {
|
|
client client.Client
|
|
timeout time.Duration
|
|
options registry.Options
|
|
}
|
|
|
|
var (
|
|
// used on pods as labels & services to select
|
|
// eg: svcSelectorPrefix+"svc.name"
|
|
servicePrefix = "go.micro/"
|
|
serviceValue = "service"
|
|
|
|
labelTypeKey = "micro"
|
|
labelTypeValue = "service"
|
|
|
|
// used on k8s services to scope a serialised
|
|
// micro service by pod name
|
|
annotationPrefix = "go.micro/"
|
|
|
|
// Pod status
|
|
podRunning = "Running"
|
|
|
|
// label name regex
|
|
labelRe = regexp.MustCompilePOSIX("[-A-Za-z0-9_.]")
|
|
)
|
|
|
|
// podSelector
|
|
var podSelector = map[string]string{
|
|
labelTypeKey: labelTypeValue,
|
|
}
|
|
|
|
func configure(k *kregistry, opts ...registry.Option) error {
|
|
for _, o := range opts {
|
|
o(&k.options)
|
|
}
|
|
|
|
// get first host
|
|
var host string
|
|
if len(k.options.Addrs) > 0 && len(k.options.Addrs[0]) > 0 {
|
|
host = k.options.Addrs[0]
|
|
}
|
|
|
|
if k.options.Timeout == 0 {
|
|
k.options.Timeout = time.Second * 1
|
|
}
|
|
|
|
// if no hosts setup, assume InCluster
|
|
var c client.Client
|
|
|
|
if len(host) > 0 {
|
|
c = client.NewLocalClient(host)
|
|
} else {
|
|
c = client.NewClusterClient()
|
|
}
|
|
|
|
k.client = c
|
|
k.timeout = k.options.Timeout
|
|
|
|
return nil
|
|
}
|
|
|
|
// serviceName generates a valid service name for k8s labels
|
|
func serviceName(name string) string {
|
|
aname := make([]byte, len(name))
|
|
|
|
for i, r := range []byte(name) {
|
|
if !labelRe.Match([]byte{r}) {
|
|
aname[i] = '_'
|
|
continue
|
|
}
|
|
aname[i] = r
|
|
}
|
|
|
|
return string(aname)
|
|
}
|
|
|
|
// Init allows reconfig of options
|
|
func (c *kregistry) Init(opts ...registry.Option) error {
|
|
return configure(c, opts...)
|
|
}
|
|
|
|
// Options returns the registry Options
|
|
func (c *kregistry) Options() registry.Options {
|
|
return c.options
|
|
}
|
|
|
|
// Register sets a service selector label and an annotation with a
|
|
// serialised version of the service passed in.
|
|
func (c *kregistry) Register(s *registry.Service, opts ...registry.RegisterOption) error {
|
|
if len(s.Nodes) == 0 {
|
|
return errors.New("no nodes")
|
|
}
|
|
|
|
// TODO: grab podname from somewhere better than this.
|
|
podName := os.Getenv("HOSTNAME")
|
|
svcName := s.Name
|
|
|
|
// encode micro service
|
|
b, err := json.Marshal(s)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
/// marshalled service
|
|
svc := string(b)
|
|
|
|
pod := &client.Pod{
|
|
Metadata: &client.Metadata{
|
|
Labels: map[string]string{
|
|
// micro: service
|
|
labelTypeKey: labelTypeValue,
|
|
// micro/service/name: service
|
|
servicePrefix + serviceName(svcName): serviceValue,
|
|
},
|
|
Annotations: map[string]string{
|
|
// micro/service/name: definition
|
|
annotationPrefix + serviceName(svcName): svc,
|
|
},
|
|
},
|
|
}
|
|
|
|
return c.client.Update(&client.Resource{
|
|
Name: podName,
|
|
Kind: "pod",
|
|
Value: pod,
|
|
})
|
|
}
|
|
|
|
// Deregister nils out any things set in Register
|
|
func (c *kregistry) Deregister(s *registry.Service) error {
|
|
if len(s.Nodes) == 0 {
|
|
return errors.New("you must deregister at least one node")
|
|
}
|
|
|
|
// TODO: grab podname from somewhere better than this.
|
|
podName := os.Getenv("HOSTNAME")
|
|
svcName := s.Name
|
|
|
|
pod := &client.Pod{
|
|
Metadata: &client.Metadata{
|
|
Labels: map[string]string{
|
|
servicePrefix + serviceName(svcName): "",
|
|
},
|
|
Annotations: map[string]string{
|
|
annotationPrefix + serviceName(svcName): "",
|
|
},
|
|
},
|
|
}
|
|
|
|
return c.client.Update(&client.Resource{
|
|
Name: podName,
|
|
Kind: "pod",
|
|
Value: pod,
|
|
})
|
|
}
|
|
|
|
// GetService will get all the pods with the given service selector,
|
|
// and build services from the annotations.
|
|
func (c *kregistry) GetService(name string) ([]*registry.Service, error) {
|
|
var pods client.PodList
|
|
|
|
if err := c.client.Get(&client.Resource{
|
|
Kind: "pod",
|
|
Value: &pods,
|
|
}, map[string]string{
|
|
servicePrefix + serviceName(name): serviceValue,
|
|
}); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if len(pods.Items) == 0 {
|
|
return nil, registry.ErrNotFound
|
|
}
|
|
|
|
// svcs mapped by version
|
|
svcs := make(map[string]*registry.Service)
|
|
|
|
// loop through items
|
|
for _, pod := range pods.Items {
|
|
if pod.Status.Phase != podRunning {
|
|
continue
|
|
}
|
|
|
|
// get serialised service from annotation
|
|
svcStr, ok := pod.Metadata.Annotations[annotationPrefix+serviceName(name)]
|
|
if !ok {
|
|
continue
|
|
}
|
|
|
|
// unmarshal service string
|
|
var svc registry.Service
|
|
|
|
if err := json.Unmarshal([]byte(svcStr), &svc); err != nil {
|
|
return nil, fmt.Errorf("could not unmarshal service '%s' from pod annotation", name)
|
|
}
|
|
|
|
// merge up pod service & ip with versioned service.
|
|
vs, ok := svcs[svc.Version]
|
|
if !ok {
|
|
svcs[svc.Version] = &svc
|
|
continue
|
|
}
|
|
|
|
vs.Nodes = append(vs.Nodes, svc.Nodes...)
|
|
}
|
|
|
|
list := make([]*registry.Service, 0, len(svcs))
|
|
for _, val := range svcs {
|
|
list = append(list, val)
|
|
}
|
|
return list, nil
|
|
}
|
|
|
|
// ListServices will list all the service names
|
|
func (c *kregistry) ListServices() ([]*registry.Service, error) {
|
|
var pods client.PodList
|
|
|
|
if err := c.client.Get(&client.Resource{
|
|
Kind: "pod",
|
|
Value: &pods,
|
|
}, podSelector); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// svcs mapped by name
|
|
svcs := make(map[string]bool)
|
|
|
|
for _, pod := range pods.Items {
|
|
if pod.Status.Phase != podRunning {
|
|
continue
|
|
}
|
|
for k, v := range pod.Metadata.Annotations {
|
|
if !strings.HasPrefix(k, annotationPrefix) {
|
|
continue
|
|
}
|
|
|
|
// we have to unmarshal the annotation itself since the
|
|
// key is encoded to match the regex restriction.
|
|
var svc registry.Service
|
|
|
|
if err := json.Unmarshal([]byte(v), &svc); err != nil {
|
|
continue
|
|
}
|
|
|
|
svcs[svc.Name] = true
|
|
}
|
|
}
|
|
|
|
var list []*registry.Service
|
|
|
|
for val := range svcs {
|
|
list = append(list, ®istry.Service{Name: val})
|
|
}
|
|
|
|
return list, nil
|
|
}
|
|
|
|
// Watch returns a kubernetes watcher
|
|
func (c *kregistry) Watch(opts ...registry.WatchOption) (registry.Watcher, error) {
|
|
return newWatcher(c, opts...)
|
|
}
|
|
|
|
func (c *kregistry) String() string {
|
|
return "kubernetes"
|
|
}
|
|
|
|
// NewRegistry creates a kubernetes registry
|
|
func NewRegistry(opts ...registry.Option) registry.Registry {
|
|
k := &kregistry{
|
|
options: registry.Options{},
|
|
}
|
|
configure(k, opts...)
|
|
return k
|
|
}
|