1
0
mirror of https://github.com/go-micro/go-micro.git synced 2025-06-18 22:17:44 +02:00

[WIP] Micro Runtime (#947)

* Add Get() and GetOptions.

* Removed watcher. Outline of client. YAML templates

* Added default service and deployment templates and types

* Added API tests and cleaned up errors.

* Small refactoring. Template package is no more.

* Ripped out existing code in preparation to small rework

* Reshuffled the source code to make it organized better

* Create service and deployment in kubernetes runtime

* Major cleanup and refactoring of Kubernetes runtime

* Service now handles low level K8s API calls across both K8s deployment
an service API objects
* Runtime has a task queue that serves for queueing runtime action
requests
* General refactoring

* No need for Lock in k8s service

* Added kubernetes runtime env var to default deployment

* Enable running different versions of the same service

* Can't delete services through labels

* Proto cruft. Added runtime.CreateOptions implementation in proto

* Removed proxy service from default env variables

* Make service name mandatory param to Get method

* Get Delete changes from https://github.com/micro/go-micro/pull/945

* Replaced template files with global variables

* Validate service names before sending K8s API request

* Refactored Kubernetes API client. Fixed typos.

* Added client.Resource to make API resources more explicit in code
This commit is contained in:
Milos Gajdos
2019-11-15 13:41:40 +00:00
committed by Asim Aslam
parent 0af8be35bb
commit 97c1300f53
23 changed files with 1284 additions and 646 deletions

View File

@ -0,0 +1,169 @@
package api
import (
"encoding/json"
"net/http"
"net/http/httptest"
"reflect"
"testing"
)
type testcase struct {
Token string
ReqFn func(opts *Options) *Request
Method string
URI string
Body interface{}
Header map[string]string
Assert func(req *http.Request) bool
}
type assertFn func(req *http.Request) bool
var tests = []testcase{
testcase{
ReqFn: func(opts *Options) *Request {
return NewRequest(opts).Get().Resource("service")
},
Method: "GET",
URI: "/api/v1/namespaces/default/services/",
},
testcase{
ReqFn: func(opts *Options) *Request {
return NewRequest(opts).Get().Resource("service").Name("foo")
},
Method: "GET",
URI: "/api/v1/namespaces/default/services/foo",
},
testcase{
ReqFn: func(opts *Options) *Request {
return NewRequest(opts).Get().Resource("service").Namespace("test").Name("bar")
},
Method: "GET",
URI: "/api/v1/namespaces/test/services/bar",
},
testcase{
ReqFn: func(opts *Options) *Request {
return NewRequest(opts).Get().Resource("deployment").Name("foo")
},
Method: "GET",
URI: "/apis/apps/v1/namespaces/default/deployments/foo",
},
testcase{
ReqFn: func(opts *Options) *Request {
return NewRequest(opts).Get().Resource("deployment").Namespace("test").Name("foo")
},
Method: "GET",
URI: "/apis/apps/v1/namespaces/test/deployments/foo",
},
testcase{
ReqFn: func(opts *Options) *Request {
return NewRequest(opts).Get().Resource("pod").Params(&Params{LabelSelector: map[string]string{"foo": "bar"}})
},
Method: "GET",
URI: "/api/v1/namespaces/default/pods/?labelSelector=foo%3Dbar",
},
testcase{
ReqFn: func(opts *Options) *Request {
return NewRequest(opts).Post().Resource("service").Name("foo").Body(map[string]string{"foo": "bar"})
},
Method: "POST",
URI: "/api/v1/namespaces/default/services/foo",
Body: map[string]string{"foo": "bar"},
},
testcase{
ReqFn: func(opts *Options) *Request {
return NewRequest(opts).Post().Resource("deployment").Namespace("test").Name("foo").Body(map[string]string{"foo": "bar"})
},
Method: "POST",
URI: "/apis/apps/v1/namespaces/test/deployments/foo",
Body: map[string]string{"foo": "bar"},
},
testcase{
ReqFn: func(opts *Options) *Request {
return NewRequest(opts).Put().Resource("endpoint").Name("baz").Body(map[string]string{"bam": "bar"})
},
Method: "PUT",
URI: "/api/v1/namespaces/default/endpoints/baz",
Body: map[string]string{"bam": "bar"},
},
testcase{
ReqFn: func(opts *Options) *Request {
return NewRequest(opts).Patch().Resource("endpoint").Name("baz").Body(map[string]string{"bam": "bar"})
},
Method: "PATCH",
URI: "/api/v1/namespaces/default/endpoints/baz",
Body: map[string]string{"bam": "bar"},
},
testcase{
ReqFn: func(opts *Options) *Request {
return NewRequest(opts).Patch().Resource("endpoint").Name("baz").SetHeader("foo", "bar")
},
Method: "PATCH",
URI: "/api/v1/namespaces/default/endpoints/baz",
Header: map[string]string{"foo": "bar"},
},
testcase{
ReqFn: func(opts *Options) *Request {
return NewRequest(opts).Patch().Resource("deployment").Name("baz").SetHeader("foo", "bar")
},
Method: "PATCH",
URI: "/apis/apps/v1/namespaces/default/deployments/baz",
Header: map[string]string{"foo": "bar"},
},
}
var wrappedHandler = func(test *testcase, t *testing.T) http.HandlerFunc {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
auth := r.Header.Get("Authorization")
if len(test.Token) > 0 && (len(auth) == 0 || auth != "Bearer "+test.Token) {
t.Errorf("test case token (%s) did not match expected token (%s)", "Bearer "+test.Token, auth)
}
if len(test.Method) > 0 && test.Method != r.Method {
t.Errorf("test case Method (%s) did not match expected Method (%s)", test.Method, r.Method)
}
if len(test.URI) > 0 && test.URI != r.URL.RequestURI() {
t.Errorf("test case URI (%s) did not match expected URI (%s)", test.URI, r.URL.RequestURI())
}
if test.Body != nil {
var res map[string]string
decoder := json.NewDecoder(r.Body)
if err := decoder.Decode(&res); err != nil {
t.Errorf("decoding body failed: %v", err)
}
if !reflect.DeepEqual(res, test.Body) {
t.Error("body did not match")
}
}
if test.Header != nil {
for k, v := range test.Header {
if r.Header.Get(k) != v {
t.Error("header did not exist")
}
}
}
w.WriteHeader(http.StatusOK)
})
}
func TestRequest(t *testing.T) {
for _, test := range tests {
ts := httptest.NewServer(wrappedHandler(&test, t))
req := test.ReqFn(&Options{
Host: ts.URL,
Client: &http.Client{},
BearerToken: &test.Token,
Namespace: "default",
})
res := req.Do()
if res.Error() != nil {
t.Errorf("request failed with %v", res.Error())
}
ts.Close()
}
}

View File

@ -3,12 +3,12 @@ package api
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"github.com/micro/go-micro/runtime/kubernetes/client/watch"
"github.com/micro/go-micro/util/log"
)
@ -33,7 +33,6 @@ type Request struct {
type Params struct {
LabelSelector map[string]string
Annotations map[string]string
Watch bool
}
// verb sets method
@ -58,9 +57,8 @@ func (r *Request) Put() *Request {
}
// Patch request
// https://github.com/kubernetes/kubernetes/blob/master/docs/devel/api-conventions.md#patch-operations
func (r *Request) Patch() *Request {
return r.verb("PATCH").SetHeader("Content-Type", "application/strategic-merge-patch+json")
return r.verb("PATCH")
}
// Delete request
@ -87,15 +85,33 @@ func (r *Request) Name(s string) *Request {
return r
}
// Body pass in a body to set, this is for POST, PUT
// and PATCH requests
// Body pass in a body to set, this is for POST, PUT and PATCH requests
func (r *Request) Body(in interface{}) *Request {
b := new(bytes.Buffer)
if err := json.NewEncoder(b).Encode(&in); err != nil {
// if we're not sending YAML request, we encode to JSON
if r.header.Get("Content-Type") != "application/yaml" {
if err := json.NewEncoder(b).Encode(&in); err != nil {
r.err = err
return r
}
log.Debugf("Request body: %v", b)
r.body = b
return r
}
// if application/yaml is set, we assume we get a raw bytes so we just copy over
body, ok := in.(io.Reader)
if !ok {
r.err = errors.New("invalid data")
return r
}
// copy over data to the bytes buffer
if _, err := io.Copy(b, body); err != nil {
r.err = err
return r
}
log.Debugf("Patch body: %v", b)
log.Debugf("Request body: %v", b)
r.body = b
return r
}
@ -120,12 +136,12 @@ func (r *Request) SetHeader(key, value string) *Request {
func (r *Request) request() (*http.Request, error) {
var url string
switch r.resource {
case "pods":
case "pod", "service", "endpoint":
// /api/v1/namespaces/{namespace}/pods
url = fmt.Sprintf("%s/api/v1/namespaces/%s/%s/", r.host, r.namespace, r.resource)
case "deployments":
url = fmt.Sprintf("%s/api/v1/namespaces/%s/%ss/", r.host, r.namespace, r.resource)
case "deployment":
// /apis/apps/v1/namespaces/{namespace}/deployments/{name}
url = fmt.Sprintf("%s/apis/apps/v1/namespaces/%s/%s/", r.host, r.namespace, r.resource)
url = fmt.Sprintf("%s/apis/apps/v1/namespaces/%s/%ss/", r.host, r.namespace, r.resource)
}
// append resourceName if it is present
@ -179,24 +195,6 @@ func (r *Request) Do() *Response {
return newResponse(res, err)
}
// Watch builds and triggers the request, but
// will watch instead of return an object
func (r *Request) Watch() (watch.Watch, error) {
if r.err != nil {
return nil, r.err
}
r.params.Set("watch", "true")
req, err := r.request()
if err != nil {
return nil, err
}
w, err := watch.NewBodyWatcher(req, r.client)
return w, err
}
// Options ...
type Options struct {
Host string

View File

@ -11,9 +11,9 @@ import (
// Errors ...
var (
ErrNotFound = errors.New("kubernetes: not found")
ErrNotFound = errors.New("kubernetes: resource not found")
ErrDecode = errors.New("kubernetes: error decoding")
ErrOther = errors.New("kubernetes: unknown error")
ErrUnknown = errors.New("kubernetes: unknown error")
)
// Status is an object that is returned when a request
@ -89,6 +89,7 @@ func newResponse(res *http.Response, err error) *Response {
log.Log("kubernetes: request failed with body:")
log.Log(string(b))
}
r.err = ErrOther
r.err = ErrUnknown
return r
}

View File

@ -1,6 +1,7 @@
package client
import (
"bytes"
"crypto/tls"
"errors"
"io/ioutil"
@ -13,6 +14,7 @@ import (
)
var (
// path to kubernetes service account token
serviceAccountPath = "/var/run/secrets/kubernetes.io/serviceaccount"
// ErrReadNamespace is returned when the names could not be read from service account
ErrReadNamespace = errors.New("Could not read namespace from service account secret")
@ -23,9 +25,7 @@ type client struct {
opts *api.Options
}
// NewClientInCluster should work similarily to the official api
// NewInClient by setting up a client configuration for use within
// a k8s pod.
// NewClientInCluster creates a Kubernetes client for use from within a k8s pod.
func NewClientInCluster() *client {
host := "https://" + os.Getenv("KUBERNETES_SERVICE_HOST") + ":" + os.Getenv("KUBERNETES_SERVICE_PORT")
@ -34,7 +34,7 @@ func NewClientInCluster() *client {
log.Fatal(err)
}
if s == nil || !s.IsDir() {
log.Fatal(errors.New("no k8s service account found"))
log.Fatal(errors.New("service account not found"))
}
token, err := ioutil.ReadFile(path.Join(serviceAccountPath, "token"))
@ -90,26 +90,72 @@ func detectNamespace() (string, error) {
}
}
// UpdateDeployment patches kubernetes deployment with metadata provided in body
func (c *client) UpdateDeployment(name string, body interface{}) error {
// Create creates new API object
func (c *client) Create(r *Resource) error {
b := new(bytes.Buffer)
if err := renderTemplate(r.Kind, b, r.Value); err != nil {
return err
}
return api.NewRequest(c.opts).
Patch().
Resource("deployments").
Name(name).
Body(body).
Post().
SetHeader("Content-Type", "application/yaml").
Resource(r.Kind).
Body(b).
Do().
Error()
}
// ListDeployments lists all kubernetes deployments with given labels
func (c *client) ListDeployments(labels map[string]string) (*DeploymentList, error) {
var deployments DeploymentList
err := api.NewRequest(c.opts).
// Get queries API objects and stores the result in r
func (c *client) Get(r *Resource, labels map[string]string) error {
return api.NewRequest(c.opts).
Get().
Resource("deployments").
Resource(r.Kind).
Params(&api.Params{LabelSelector: labels}).
Do().
Into(&deployments)
return &deployments, err
Into(r.Value)
}
// Update updates API object
func (c *client) Update(r *Resource) error {
req := api.NewRequest(c.opts).
Patch().
SetHeader("Content-Type", "application/strategic-merge-patch+json").
Resource(r.Kind).
Name(r.Name)
switch r.Kind {
case "service":
req.Body(r.Value.(*Service).Spec)
case "deployment":
req.Body(r.Value.(*Deployment).Spec)
default:
return errors.New("unsupported resource")
}
return req.Do().Error()
}
// Delete removes API object
func (c *client) Delete(r *Resource) error {
return api.NewRequest(c.opts).
Delete().
Resource(r.Kind).
Name(r.Name).
Do().
Error()
}
// List lists API objects and stores the result in r
func (c *client) List(r *Resource) error {
labels := map[string]string{
"micro": "service",
}
return api.NewRequest(c.opts).
Get().
Resource(r.Kind).
Params(&api.Params{LabelSelector: labels}).
Do().
Into(r.Value)
}

View File

@ -1,43 +1,132 @@
// Package client provides an implementation of a restricted subset of kubernetes API client
package client
import (
"regexp"
"strconv"
"strings"
"time"
"github.com/micro/go-micro/util/log"
)
const (
// https://github.com/kubernetes/apimachinery/blob/master/pkg/util/validation/validation.go#L134
dns1123LabelFmt string = "[a-z0-9]([-a-z0-9]*[a-z0-9])?"
)
var (
// DefaultImage is default micro image
DefaultImage = "micro/micro"
// ServiceRegexp is used to validate service name
ServiceRegexp = regexp.MustCompile("^" + dns1123LabelFmt + "$")
)
// Kubernetes client
type Kubernetes interface {
// UpdateDeployment patches deployment annotations with new metadata
UpdateDeployment(string, interface{}) error
// ListDeployments lists all micro deployments
ListDeployments(labels map[string]string) (*DeploymentList, error)
// Create creates new API resource
Create(*Resource) error
// Get queries API resrouces
Get(*Resource, map[string]string) error
// Update patches existing API object
Update(*Resource) error
// Delete deletes API resource
Delete(*Resource) error
// List lists API resources
List(*Resource) error
}
// Template is micro deployment template
type Template struct {
Metadata *Metadata `json:"metadata,omitempty"`
// DefaultService returns default micro kubernetes service definition
func DefaultService(name, version string) *Service {
Labels := map[string]string{
"name": name,
"version": version,
"micro": "service",
}
svcName := name
if len(version) > 0 {
// API service object name joins name and version over "-"
svcName = strings.Join([]string{name, version}, "-")
}
Metadata := &Metadata{
Name: svcName,
Namespace: "default",
Version: version,
Labels: Labels,
}
Spec := &ServiceSpec{
Type: "ClusterIP",
Selector: Labels,
Ports: []ServicePort{{
name + "-port", 9090, "",
}},
}
return &Service{
Metadata: Metadata,
Spec: Spec,
}
}
// Spec defines micro deployment spec
type Spec struct {
Template *Template `json:"template,omitempty"`
}
// DefaultService returns default micro kubernetes deployment definition
func DefaultDeployment(name, version string) *Deployment {
Labels := map[string]string{
"name": name,
"version": version,
"micro": "service",
}
// Metadata defines api request metadata
type Metadata struct {
Name string `json:"name,omitempty"`
Labels map[string]string `json:"labels,omitempty"`
Annotations map[string]string `json:"annotations,omitempty"`
}
// API deployment object name joins name and version over "="
depName := strings.Join([]string{name, version}, "-")
// DeploymentList
type DeploymentList struct {
Items []Deployment `json:"items"`
}
Metadata := &Metadata{
Name: depName,
Namespace: "default",
Version: version,
Labels: Labels,
}
// Deployment is Kubernetes deployment
type Deployment struct {
Metadata *Metadata `json:"metadata"`
Status *Status `json:"status"`
}
// TODO: we need to figure out this version stuff
// might be worth adding Build to runtime.Service
buildTime, err := strconv.ParseInt(version, 10, 64)
if err == nil {
buildUnixTimeUTC := time.Unix(buildTime, 0)
Metadata.Annotations = map[string]string{
"build": buildUnixTimeUTC.Format(time.RFC3339),
}
} else {
log.Debugf("Runtime could not parse build: %v", err)
}
// Status is Kubernetes deployment status
type Status struct {
Replicas int `json:"replicas"`
AvailableReplicas int `json:"availablereplicas"`
// TODO: change the image name here
Spec := &DeploymentSpec{
Replicas: 1,
Selector: &LabelSelector{
MatchLabels: Labels,
},
Template: &Template{
Metadata: Metadata,
PodSpec: &PodSpec{
Containers: []Container{{
Name: name,
Image: DefaultImage,
Env: []EnvVar{},
Command: []string{"go", "run", "main.go"},
Ports: []ContainerPort{{
Name: name + "-port",
ContainerPort: 8080,
}},
}},
},
},
}
return &Deployment{
Metadata: Metadata,
Spec: Spec,
}
}

View File

@ -0,0 +1,93 @@
package client
var templates = map[string]string{
"deployments": deploymentTmpl,
"services": serviceTmpl,
}
var deploymentTmpl = `
apiVersion: apps/v1
kind: Deployment
metadata:
name: "{{ .Metadata.Name }}"
namespace: "{{ .Metadata.Namespace }}"
labels:
{{- with .Metadata.Labels }}
{{- range $key, $value := . }}
{{ $key }}: "{{ $value }}"
{{- end }}
{{- end }}
spec:
replicas: {{ .Spec.Replicas }}
selector:
matchLabels:
{{- with .Spec.Selector.MatchLabels }}
{{- range $key, $value := . }}
{{ $key }}: "{{ $value }}"
{{- end }}
{{- end }}
template:
metadata:
labels:
{{- with .Spec.Template.Metadata.Labels }}
{{- range $key, $value := . }}
{{ $key }}: "{{ $value }}"
{{- end }}
{{- end }}
spec:
containers:
{{- with .Spec.Template.PodSpec.Containers }}
{{- range . }}
- name: {{ .Name }}
env:
{{- with .Env }}
{{- range . }}
- name: "{{ .Name }}"
value: "{{ .Value }}"
{{- end }}
{{- end }}
command:
{{- range .Command }}
- {{.}}
{{- end }}
image: {{ .Image }}
imagePullPolicy: Always
ports:
{{- with .Ports }}
{{- range . }}
- containerPort: {{ .ContainerPort }}
name: {{ .Name }}
{{- end}}
{{- end}}
{{- end }}
{{- end}}
`
var serviceTmpl = `
apiVersion: v1
kind: Service
metadata:
name: "{{ .Metadata.Name }}"
namespace: "{{ .Metadata.Namespace }}"
labels:
{{- with .Metadata.Labels }}
{{- range $key, $value := . }}
{{ $key }}: "{{ $value }}"
{{- end }}
{{- end }}
spec:
selector:
{{- with .Spec.Selector }}
{{- range $key, $value := . }}
{{ $key }}: "{{ $value }}"
{{- end }}
{{- end }}
ports:
{{- with .Spec.Ports }}
{{- range . }}
- name: "{{ .Name }}"
port: {{ .Port }}
protocol: {{ .Protocol }}
{{- end }}
{{- end }}
`

View File

@ -0,0 +1,125 @@
package client
// Resource is API resource
type Resource struct {
Name string
Kind string
Value interface{}
}
// Metadata defines api object metadata
type Metadata struct {
Name string `json:"name,omitempty"`
Namespace string `json:"namespace,omitempty"`
Version string `json:"version,omitempty"`
Labels map[string]string `json:"labels,omitempty"`
Annotations map[string]string `json:"annotations,omitempty"`
}
// ServicePort configures service ports
type ServicePort struct {
Name string `json:"name,omitempty"`
Port int `json:"port"`
Protocol string `json:"protocol,omitempty"`
}
// ServiceSpec provides service configuration
type ServiceSpec struct {
Type string `json:"type,omitempty"`
Selector map[string]string `json:"selector,omitempty"`
Ports []ServicePort `json:"ports,omitempty"`
}
type LoadBalancerIngress struct {
IP string `json:"ip,omitempty"`
Hostname string `json:"hostname,omitempty"`
}
type LoadBalancerStatus struct {
Ingress []LoadBalancerIngress `json:"ingress,omitempty"`
}
// ServiceStatus
type ServiceStatus struct {
LoadBalancer LoadBalancerStatus `json:"loadBalancer,omitempty"`
}
// Service is kubernetes service
type Service struct {
Metadata *Metadata `json:"metadata"`
Spec *ServiceSpec `json:"spec,omitempty"`
Status *ServiceStatus `json:"status,omitempty"`
}
// ServiceList
type ServiceList struct {
Items []Service `json:"items"`
}
// ContainerPort
type ContainerPort struct {
Name string `json:"name,omitempty"`
HostPort int `json:"hostPort,omitempty"`
ContainerPort int `json:"containerPort"`
Protocol string `json:"protocol,omitempty"`
}
// EnvVar is environment variable
type EnvVar struct {
Name string `json:"name"`
Value string `json:"value,omitempty"`
}
// Container defined container runtime values
type Container struct {
Name string `json:"name"`
Image string `json:"image"`
Env []EnvVar `json:"env,omitempty"`
Command []string `json:"command,omitempty"`
Ports []ContainerPort `json:"ports,omitempty"`
}
// PodSpec
type PodSpec struct {
Containers []Container `json:"containers"`
}
// Template is micro deployment template
type Template struct {
Metadata *Metadata `json:"metadata,omitempty"`
PodSpec *PodSpec `json:"spec,omitempty"`
}
// LabelSelector is a label query over a set of resources
// NOTE: we do not support MatchExpressions at the moment
type LabelSelector struct {
MatchLabels map[string]string `json:"matchLabels,omitempty"`
}
// DeploymentSpec defines micro deployment spec
type DeploymentSpec struct {
Replicas int `json:"replicas,omitempty"`
Selector *LabelSelector `json:"selector"`
Template *Template `json:"template,omitempty"`
}
// DeploymentStatus is returned when querying deployment
type DeploymentStatus struct {
Replicas int `json:"replicas,omitempty"`
UpdatedReplicas int `json:"updatedReplicas,omitempty"`
ReadyReplicas int `json:"readyReplicas,omitempty"`
AvailableReplicas int `json:"availableReplicas,omitempty"`
UnavailableReplicas int `json:"unavailableReplicas,omitempty"`
}
// Deployment is Kubernetes deployment
type Deployment struct {
Metadata *Metadata `json:"metadata"`
Spec *DeploymentSpec `json:"spec,omitempty"`
Status *DeploymentStatus `json:"status,omitempty"`
}
// DeploymentList
type DeploymentList struct {
Items []Deployment `json:"items"`
}

View File

@ -5,9 +5,22 @@ import (
"encoding/pem"
"errors"
"fmt"
"io"
"io/ioutil"
"text/template"
)
// renderTemplateFile renders template file in path into writer w with supplied data
func renderTemplate(text string, w io.Writer, data interface{}) error {
t := template.Must(template.New("kubernetes").Parse(text))
if err := t.Execute(w, data); err != nil {
return err
}
return nil
}
// COPIED FROM
// https://github.com/kubernetes/kubernetes/blob/7a725418af4661067b56506faabc2d44c6d7703a/pkg/util/crypto/crypto.go

View File

@ -0,0 +1,25 @@
package client
import (
"bytes"
"testing"
)
func TestTemplates(t *testing.T) {
name := "foo"
version := "1.2.3"
// Render default service
s := DefaultService(name, version)
bs := new(bytes.Buffer)
if err := renderTemplate(serviceTmpl, bs, s); err != nil {
t.Errorf("Failed to render kubernetes service: %v", err)
}
// Render default deployment
d := DefaultDeployment(name, version)
bd := new(bytes.Buffer)
if err := renderTemplate(deploymentTmpl, bd, d); err != nil {
t.Errorf("Failed to render kubernetes deployment: %v", err)
}
}

View File

@ -1,74 +0,0 @@
package watch
import (
"bufio"
"encoding/json"
"net/http"
)
// bodyWatcher scans the body of a request for chunks
type bodyWatcher struct {
results chan Event
stop chan struct{}
res *http.Response
req *http.Request
}
// Changes returns the results channel
func (wr *bodyWatcher) ResultChan() <-chan Event {
return wr.results
}
// Stop cancels the request
func (wr *bodyWatcher) Stop() {
select {
case <-wr.stop:
return
default:
close(wr.stop)
close(wr.results)
}
}
func (wr *bodyWatcher) stream() {
reader := bufio.NewReader(wr.res.Body)
// stop the watcher
defer wr.Stop()
for {
// read a line
b, err := reader.ReadBytes('\n')
if err != nil {
return
}
// send the event
var event Event
if err := json.Unmarshal(b, &event); err != nil {
continue
}
wr.results <- event
}
}
// NewBodyWatcher creates a k8s body watcher for a given http request
func NewBodyWatcher(req *http.Request, client *http.Client) (Watch, error) {
stop := make(chan struct{})
req.Cancel = stop
res, err := client.Do(req)
if err != nil {
return nil, err
}
wr := &bodyWatcher{
results: make(chan Event),
stop: stop,
req: req,
res: res,
}
go wr.stream()
return wr, nil
}

View File

@ -1,26 +0,0 @@
package watch
import "encoding/json"
// Watch ...
type Watch interface {
Stop()
ResultChan() <-chan Event
}
// EventType defines the possible types of events.
type EventType string
// EventTypes used
const (
Added EventType = "ADDED"
Modified EventType = "MODIFIED"
Deleted EventType = "DELETED"
Error EventType = "ERROR"
)
// Event represents a single event to a watched resource.
type Event struct {
Type EventType `json:"type"`
Object json.RawMessage `json:"object"`
}

View File

@ -1,69 +0,0 @@
package watch
import (
"fmt"
"net/http"
"net/http/httptest"
"testing"
"time"
)
var actions = []string{
`{"type": "create", "object":{"foo": "bar"}}`,
`{"type": "delete", INVALID}`,
`{"type": "update", "object":{"foo": {"foo": "bar"}}}`,
`{"type": "delete", "object":null}`,
}
func TestBodyWatcher(t *testing.T) {
// set up server with handler to flush strings from ch.
ch := make(chan string)
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
flusher, ok := w.(http.Flusher)
if !ok {
t.Fatal("expected ResponseWriter to be a flusher")
}
fmt.Fprintf(w, "\n")
flusher.Flush()
for v := range ch {
fmt.Fprintf(w, "%s\n", v)
flusher.Flush()
time.Sleep(10 * time.Millisecond)
}
}))
defer ts.Close()
req, err := http.NewRequest("GET", ts.URL, nil)
if err != nil {
t.Fatalf("failed to create new request: %v", err)
}
// setup body watcher
w, err := NewBodyWatcher(req, http.DefaultClient)
if err != nil {
t.Fatalf("failed to create new BodyWatcher %v", err)
}
// send action strings in, and expect result back
ch <- actions[0]
if r := <-w.ResultChan(); r.Type != "create" {
t.Fatalf("expected result to be create")
}
ch <- actions[1] // should be ignored as its invalid json
ch <- actions[2]
if r := <-w.ResultChan(); r.Type != "update" {
t.Fatalf("expected result to be update")
}
ch <- actions[3]
if r := <-w.ResultChan(); r.Type != "delete" {
t.Fatalf("expected result to be delete")
}
// stop should clean up all channels.
w.Stop()
close(ch)
}

View File

@ -14,18 +14,31 @@ import (
"github.com/micro/go-micro/util/log"
)
// action to take on runtime service
type action int
const (
start action = iota
update
stop
)
// task is queued into runtime queue
type task struct {
action action
service *service
}
type kubernetes struct {
sync.RWMutex
// options configure runtime
options runtime.Options
// indicates if we're running
running bool
// used to start new services
start chan *runtime.Service
// task queue for kubernetes services
queue chan *task
// used to stop the runtime
closed chan bool
// service tracks deployed services
services map[string]*runtime.Service
// client is kubernetes client
client client.Kubernetes
}
@ -44,11 +57,10 @@ func NewRuntime(opts ...runtime.Option) runtime.Runtime {
client := client.NewClientInCluster()
return &kubernetes{
options: options,
closed: make(chan bool),
start: make(chan *runtime.Service, 128),
services: make(map[string]*runtime.Service),
client: client,
options: options,
closed: make(chan bool),
queue: make(chan *task, 128),
client: client,
}
}
@ -64,33 +76,109 @@ func (k *kubernetes) Init(opts ...runtime.Option) error {
return nil
}
// Registers a service
// Creates a service
func (k *kubernetes) Create(s *runtime.Service, opts ...runtime.CreateOption) error {
k.Lock()
defer k.Unlock()
// TODO:
// * create service
// * create deployment
// NOTE: our services have micro- prefix
muName := strings.Split(s.Name, ".")
s.Name = "micro-" + muName[len(muName)-1]
// NOTE: we are tracking this in memory for now
if _, ok := k.services[s.Name]; ok {
return errors.New("service already registered")
}
var options runtime.CreateOptions
for _, o := range opts {
o(&options)
}
// save service
k.services[s.Name] = s
svcName := s.Name
if len(s.Version) > 0 {
svcName = strings.Join([]string{s.Name, s.Version}, "-")
}
if !client.ServiceRegexp.MatchString(svcName) {
return fmt.Errorf("invalid service name: %s", svcName)
}
// create new kubernetes micro service
service := newService(s, options)
log.Debugf("Runtime queueing service %s for start action", service.Name)
// push into start queue
k.start <- k.services[s.Name]
k.queue <- &task{
action: start,
service: service,
}
return nil
}
// Get returns all instances of given service
func (k *kubernetes) Get(name string, opts ...runtime.GetOption) ([]*runtime.Service, error) {
k.Lock()
defer k.Unlock()
// if no name has been passed in, return error
if len(name) == 0 {
return nil, errors.New("missing service name")
}
// set the default label
labels := map[string]string{
"micro": "service",
"name": name,
}
var options runtime.GetOptions
for _, o := range opts {
o(&options)
}
// add version to labels if a version has been supplied
if len(options.Version) > 0 {
labels["version"] = options.Version
}
log.Debugf("Runtime querying service %s", name)
serviceList := new(client.ServiceList)
r := &client.Resource{
Kind: "service",
Value: serviceList,
}
if err := k.client.Get(r, labels); err != nil {
return nil, err
}
services := make([]*runtime.Service, 0, len(serviceList.Items))
for _, kservice := range serviceList.Items {
service := &runtime.Service{
Name: kservice.Metadata.Name,
Version: kservice.Metadata.Version,
}
services = append(services, service)
}
return services, nil
}
// Update the service in place
func (k *kubernetes) Update(s *runtime.Service) error {
// parse version into human readable timestamp
updateTimeStamp, err := strconv.ParseInt(s.Version, 10, 64)
if err != nil {
return err
}
unixTimeUTC := time.Unix(updateTimeStamp, 0)
// create new kubernetes micro service
service := newService(s, runtime.CreateOptions{})
// update build time annotation
service.kdeploy.Spec.Template.Metadata.Annotations["build"] = unixTimeUTC.Format(time.RFC3339)
log.Debugf("Runtime queueing service %s for update action", service.Name)
// queue service for removal
k.queue <- &task{
action: update,
service: service,
}
return nil
}
@ -100,61 +188,37 @@ func (k *kubernetes) Delete(s *runtime.Service) error {
k.Lock()
defer k.Unlock()
// TODO:
// * delete service
// * delete dpeloyment
// create new kubernetes micro service
service := newService(s, runtime.CreateOptions{})
// NOTE: we are tracking this in memory for now
if s, ok := k.services[s.Name]; ok {
delete(k.services, s.Name)
return nil
log.Debugf("Runtime queueing service %s for delete action", service.Name)
// queue service for removal
k.queue <- &task{
action: stop,
service: service,
}
return nil
}
// Update the service in place
func (k *kubernetes) Update(s *runtime.Service) error {
type body struct {
Spec *client.Spec `json:"spec"`
}
// parse version into human readable timestamp
updateTimeStamp, err := strconv.ParseInt(s.Version, 10, 64)
if err != nil {
return err
}
unixTimeUTC := time.Unix(updateTimeStamp, 0)
// metada which we will PATCH deployment with
reqBody := body{
Spec: &client.Spec{
Template: &client.Template{
Metadata: &client.Metadata{
Annotations: map[string]string{
"build": unixTimeUTC.Format(time.RFC3339),
},
},
},
},
}
return k.client.UpdateDeployment(s.Name, reqBody)
}
// List the managed services
func (k *kubernetes) List() ([]*runtime.Service, error) {
labels := map[string]string{
"micro": "service",
serviceList := new(client.ServiceList)
r := &client.Resource{
Kind: "service",
Value: serviceList,
}
// list all micro core deployments
deployments, err := k.client.ListDeployments(labels)
if err != nil {
if err := k.client.List(r); err != nil {
return nil, err
}
log.Debugf("Runtime found %d micro deployments with labels %v", len(deployments.Items), labels)
log.Debugf("Runtime found %d micro services", len(serviceList.Items))
services := make([]*runtime.Service, 0, len(deployments.Items))
services := make([]*runtime.Service, 0, len(serviceList.Items))
for _, service := range deployments.Items {
for _, service := range serviceList.Items {
buildTime, err := time.Parse(time.RFC3339, service.Metadata.Annotations["build"])
if err != nil {
log.Debugf("Runtime error parsing build time for %s: %v", service.Metadata.Name, err)
@ -179,23 +243,31 @@ func (k *kubernetes) run(events <-chan runtime.Event) {
for {
select {
case <-t.C:
// check running services
services, err := k.List()
if err != nil {
log.Debugf("Runtime failed listing running services: %v", err)
continue
// TODO: figure out what to do here
// - do we even need the ticker for k8s services?
case task := <-k.queue:
switch task.action {
case start:
log.Debugf("Runtime starting new service: %s", task.service.Name)
if err := task.service.Start(k.client); err != nil {
log.Debugf("Runtime failed to start service %s: %v", task.service.Name, err)
continue
}
case stop:
log.Debugf("Runtime stopping service: %s", task.service.Name)
if err := task.service.Stop(k.client); err != nil {
log.Debugf("Runtime failed to stop service %s: %v", task.service.Name, err)
continue
}
case update:
log.Debugf("Runtime updating service: %s", task.service.Name)
if err := task.service.Update(k.client); err != nil {
log.Debugf("Runtime failed to update service %s: %v", task.service.Name, err)
continue
}
default:
log.Debugf("Runtime received unknown action for service: %s", task.service.Name)
}
// TODO: for now we just log the running services
// * make sure all core deployments exist
// * make sure all core services are exposed
for _, service := range services {
log.Debugf("Runtime found running service: %v", service)
}
case service := <-k.start:
// TODO: this is a noop for now
// * create a deployment
// * expose a service
log.Debugf("Runtime starting service: %s", service.Name)
case event := <-events:
// NOTE: we only handle Update events for now
log.Debugf("Runtime received notification event: %v", event)
@ -207,50 +279,23 @@ func (k *kubernetes) run(events <-chan runtime.Event) {
log.Debugf("Runtime error parsing update build time: %v", err)
continue
}
buildTime := time.Unix(updateTimeStamp, 0)
processEvent := func(event runtime.Event, service *runtime.Service) error {
buildTimeStamp, err := strconv.ParseInt(service.Version, 10, 64)
if err != nil {
return err
}
muBuild := time.Unix(buildTimeStamp, 0)
if buildTime.After(muBuild) {
version := fmt.Sprintf("%d", buildTime.Unix())
muService := &runtime.Service{
Name: service.Name,
Source: service.Source,
Path: service.Path,
Exec: service.Exec,
Version: version,
}
if err := k.Update(muService); err != nil {
return err
}
service.Version = version
}
return nil
}
k.Lock()
unixTimeUTC := time.Unix(updateTimeStamp, 0)
if len(event.Service) > 0 {
service, ok := k.services[event.Service]
if !ok {
log.Debugf("Runtime unknown service: %s", event.Service)
k.Unlock()
s := &runtime.Service{
Name: event.Service,
Version: event.Version,
}
// create new kubernetes micro service
service := newService(s, runtime.CreateOptions{})
// update build time annotation
service.kdeploy.Spec.Template.Metadata.Annotations["build"] = unixTimeUTC.Format(time.RFC3339)
log.Debugf("Runtime updating service: %s", service.Name)
if err := service.Update(k.client); err != nil {
log.Debugf("Runtime failed to update service %s: %v", service.Name, err)
continue
}
if err := processEvent(event, service); err != nil {
log.Debugf("Runtime error updating service %s: %v", event.Service, err)
}
k.Unlock()
continue
}
// if blank service was received we update all services
for _, service := range k.services {
if err := processEvent(event, service); err != nil {
log.Debugf("Runtime error updating service %s: %v", service.Name, err)
}
}
k.Unlock()
}
case <-k.closed:
log.Debugf("Runtime stopped")

View File

@ -0,0 +1,106 @@
package kubernetes
import (
"strings"
"github.com/micro/go-micro/runtime"
"github.com/micro/go-micro/runtime/kubernetes/client"
"github.com/micro/go-micro/util/log"
)
type service struct {
// service to manage
*runtime.Service
// Kubernetes service
kservice *client.Service
// Kubernetes deployment
kdeploy *client.Deployment
}
func newService(s *runtime.Service, c runtime.CreateOptions) *service {
kservice := client.DefaultService(s.Name, s.Version)
kdeploy := client.DefaultDeployment(s.Name, s.Version)
env := make([]client.EnvVar, 0, len(c.Env))
for _, evar := range c.Env {
evarPair := strings.Split(evar, "=")
env = append(env, client.EnvVar{Name: evarPair[0], Value: evarPair[1]})
}
// TODO: should we append instead of overriding?
// if environment has been supplied update deployment
if len(env) > 0 {
kdeploy.Spec.Template.PodSpec.Containers[0].Env = env
}
// if Command has been supplied override the default command
if len(c.Command) > 0 {
kdeploy.Spec.Template.PodSpec.Containers[0].Command = c.Command
}
return &service{
Service: s,
kservice: kservice,
kdeploy: kdeploy,
}
}
func deploymentResource(d *client.Deployment) *client.Resource {
return &client.Resource{
Name: d.Metadata.Name,
Kind: "deployment",
Value: d,
}
}
func serviceResource(s *client.Service) *client.Resource {
return &client.Resource{
Name: s.Metadata.Name,
Kind: "service",
Value: s,
}
}
// Start starts the Kubernetes service. It creates new kubernetes deployment and service API objects
func (s *service) Start(k client.Kubernetes) error {
// create deployment first; if we fail, we dont create service
if err := k.Create(deploymentResource(s.kdeploy)); err != nil {
log.Debugf("Runtime failed to create deployment: %v", err)
return err
}
// create service now that the deployment has been created
if err := k.Create(serviceResource(s.kservice)); err != nil {
log.Debugf("Runtime failed to create service: %v", err)
return err
}
return nil
}
func (s *service) Stop(k client.Kubernetes) error {
// first attempt to delete service
if err := k.Delete(serviceResource(s.kservice)); err != nil {
log.Debugf("Runtime failed to delete service: %v", err)
return err
}
// delete deployment once the service has been deleted
if err := k.Delete(deploymentResource(s.kdeploy)); err != nil {
log.Debugf("Runtime failed to delete deployment: %v", err)
return err
}
return nil
}
func (s *service) Update(k client.Kubernetes) error {
if err := k.Update(deploymentResource(s.kdeploy)); err != nil {
log.Debugf("Runtime failed to update deployment: %v", err)
return err
}
if err := k.Update(serviceResource(s.kservice)); err != nil {
log.Debugf("Runtime failed to update service: %v", err)
return err
}
return nil
}