From 2299559397e1e32006b12e16016c07d7de805a0b Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Thu, 23 Apr 2020 16:22:41 +0100 Subject: [PATCH 1/2] Check for namespace (#1564) --- runtime/kubernetes/kubernetes.go | 18 +++++++--- .../{kubernetes_logs.go => logs.go} | 36 ++++++++++++++----- util/kubernetes/client/client.go | 32 +++++++++++++---- 3 files changed, 68 insertions(+), 18 deletions(-) rename runtime/kubernetes/{kubernetes_logs.go => logs.go} (85%) diff --git a/runtime/kubernetes/kubernetes.go b/runtime/kubernetes/kubernetes.go index 1ec265c6..9ac90ce1 100644 --- a/runtime/kubernetes/kubernetes.go +++ b/runtime/kubernetes/kubernetes.go @@ -393,7 +393,8 @@ func (k *kubernetes) Create(s *runtime.Service, opts ...runtime.CreateOption) er defer k.Unlock() options := runtime.CreateOptions{ - Type: k.options.Type, + Type: k.options.Type, + Namespace: client.DefaultNamespace, } for _, o := range opts { o(&options) @@ -439,7 +440,10 @@ func (k *kubernetes) Read(opts ...runtime.ReadOption) ([]*runtime.Service, error "micro": k.options.Type, } - var options runtime.ReadOptions + options := runtime.ReadOptions{ + Namespace: client.DefaultNamespace, + } + for _, o := range opts { o(&options) } @@ -472,7 +476,10 @@ func (k *kubernetes) Read(opts ...runtime.ReadOption) ([]*runtime.Service, error // Update the service in place func (k *kubernetes) Update(s *runtime.Service, opts ...runtime.UpdateOption) error { - var options runtime.UpdateOptions + options := runtime.UpdateOptions{ + Namespace: client.DefaultNamespace, + } + for _, o := range opts { o(&options) } @@ -521,7 +528,10 @@ func (k *kubernetes) Update(s *runtime.Service, opts ...runtime.UpdateOption) er // Delete removes a service func (k *kubernetes) Delete(s *runtime.Service, opts ...runtime.DeleteOption) error { - var options runtime.DeleteOptions + options := runtime.DeleteOptions{ + Namespace: client.DefaultNamespace, + } + for _, o := range opts { o(&options) } diff --git a/runtime/kubernetes/kubernetes_logs.go b/runtime/kubernetes/logs.go similarity index 85% rename from runtime/kubernetes/kubernetes_logs.go rename to runtime/kubernetes/logs.go index 0c113f4e..ac57d50c 100644 --- a/runtime/kubernetes/kubernetes_logs.go +++ b/runtime/kubernetes/logs.go @@ -24,11 +24,16 @@ func (k *klog) podLogStream(podName string, stream *kubeStream) error { p := make(map[string]string) p["follow"] = "true" + opts := []client.LogOption{ + client.LogParams(p), + client.LogNamespace(k.options.Namespace), + } + // get the logs for the pod body, err := k.client.Log(&client.Resource{ Name: podName, Kind: "pod", - }, client.LogParams(p)) + }, opts...) if err != nil { stream.err = err @@ -70,7 +75,12 @@ func (k *klog) getMatchingPods() ([]string, error) { // TODO: specify micro:service // l["micro"] = "service" - if err := k.client.Get(r, client.GetLabels(l)); err != nil { + opts := []client.GetOption{ + client.GetLabels(l), + client.GetNamespace(k.options.Namespace), + } + + if err := k.client.Get(r, opts...); err != nil { return nil, err } @@ -109,10 +119,15 @@ func (k *klog) Read() ([]runtime.LogRecord, error) { logParams["follow"] = "true" } + opts := []client.LogOption{ + client.LogParams(logParams), + client.LogNamespace(k.options.Namespace), + } + logs, err := k.client.Log(&client.Resource{ Name: pod, Kind: "pod", - }, client.LogParams(logParams)) + }, opts...) if err != nil { return nil, err @@ -162,13 +177,18 @@ func (k *klog) Stream() (runtime.LogStream, error) { } // NewLog returns a configured Kubernetes logger -func newLog(client client.Client, serviceName string, opts ...runtime.LogsOption) *klog { - klog := &klog{ - serviceName: serviceName, - client: client, +func newLog(c client.Client, serviceName string, opts ...runtime.LogsOption) *klog { + options := runtime.LogsOptions{ + Namespace: client.DefaultNamespace, } for _, o := range opts { - o(&klog.options) + o(&options) + } + + klog := &klog{ + serviceName: serviceName, + client: c, + options: options, } return klog diff --git a/util/kubernetes/client/client.go b/util/kubernetes/client/client.go index 6b579777..76e7c52f 100644 --- a/util/kubernetes/client/client.go +++ b/util/kubernetes/client/client.go @@ -85,7 +85,9 @@ func SerializeResourceName(ns string) string { // Get queries API objects and stores the result in r func (c *client) Get(r *Resource, opts ...GetOption) error { - var options GetOptions + options := GetOptions{ + Namespace: c.opts.Namespace, + } for _, o := range opts { o(&options) } @@ -101,7 +103,9 @@ func (c *client) Get(r *Resource, opts ...GetOption) error { // Log returns logs for a pod func (c *client) Log(r *Resource, opts ...LogOption) (io.ReadCloser, error) { - var options LogOptions + options := LogOptions{ + Namespace: c.opts.Namespace, + } for _, o := range opts { o(&options) } @@ -130,7 +134,9 @@ func (c *client) Log(r *Resource, opts ...LogOption) (io.ReadCloser, error) { // Update updates API object func (c *client) Update(r *Resource, opts ...UpdateOption) error { - var options UpdateOptions + options := UpdateOptions{ + Namespace: c.opts.Namespace, + } for _, o := range opts { o(&options) } @@ -158,7 +164,9 @@ func (c *client) Update(r *Resource, opts ...UpdateOption) error { // Delete removes API object func (c *client) Delete(r *Resource, opts ...DeleteOption) error { - var options DeleteOptions + options := DeleteOptions{ + Namespace: c.opts.Namespace, + } for _, o := range opts { o(&options) } @@ -174,7 +182,9 @@ func (c *client) Delete(r *Resource, opts ...DeleteOption) error { // List lists API objects and stores the result in r func (c *client) List(r *Resource, opts ...ListOption) error { - var options ListOptions + options := ListOptions{ + Namespace: c.opts.Namespace, + } for _, o := range opts { o(&options) } @@ -188,7 +198,9 @@ func (c *client) List(r *Resource, opts ...ListOption) error { // Watch returns an event stream func (c *client) Watch(r *Resource, opts ...WatchOption) (Watcher, error) { - var options WatchOptions + options := WatchOptions{ + Namespace: c.opts.Namespace, + } for _, o := range opts { o(&options) } @@ -233,6 +245,10 @@ func NewService(name, version, typ, namespace string) *Service { svcName = strings.Join([]string{name, version}, "-") } + if len(namespace) == 0 { + namespace = DefaultNamespace + } + Metadata := &Metadata{ Name: svcName, Namespace: SerializeResourceName(namespace), @@ -272,6 +288,10 @@ func NewDeployment(name, version, typ, namespace string) *Deployment { depName = strings.Join([]string{name, version}, "-") } + if len(namespace) == 0 { + namespace = DefaultNamespace + } + Metadata := &Metadata{ Name: depName, Namespace: SerializeResourceName(namespace), From ec929b3d2f044c8f08e6ca26bcc51e0f02482f1f Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Thu, 23 Apr 2020 17:14:30 +0100 Subject: [PATCH 2/2] log error and ensure we pass through namespace --- runtime/kubernetes/kubernetes.go | 2 +- util/kubernetes/api/response.go | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/runtime/kubernetes/kubernetes.go b/runtime/kubernetes/kubernetes.go index 9ac90ce1..c26e2984 100644 --- a/runtime/kubernetes/kubernetes.go +++ b/runtime/kubernetes/kubernetes.go @@ -461,7 +461,7 @@ func (k *kubernetes) Read(opts ...runtime.ReadOption) ([]*runtime.Service, error labels["micro"] = options.Type } - srvs, err := k.getService(labels) + srvs, err := k.getService(labels, client.GetNamespace(options.Namespace)) if err != nil { return nil, err } diff --git a/util/kubernetes/api/response.go b/util/kubernetes/api/response.go index 1d835ae9..bd486413 100644 --- a/util/kubernetes/api/response.go +++ b/util/kubernetes/api/response.go @@ -3,6 +3,7 @@ package api import ( "encoding/json" "errors" + "fmt" "io/ioutil" "net/http" ) @@ -50,9 +51,8 @@ func (r *Response) Into(data interface{}) error { defer r.res.Body.Close() decoder := json.NewDecoder(r.res.Body) - err := decoder.Decode(&data) - if err != nil { - return ErrDecode + if err := decoder.Decode(&data); err != nil { + return fmt.Errorf("%v: %v", ErrDecode, err) } return r.err