From 690169f6e55adc8eadd6661b1c6b198ad5f543f7 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Sat, 12 Dec 2020 20:51:32 +0000 Subject: [PATCH] remove k8s logs --- debug/log/kubernetes/kubernetes.go | 190 ------------------------ debug/log/kubernetes/kubernetes_test.go | 74 --------- debug/log/kubernetes/stream.go | 44 ------ 3 files changed, 308 deletions(-) delete mode 100644 debug/log/kubernetes/kubernetes.go delete mode 100644 debug/log/kubernetes/kubernetes_test.go delete mode 100644 debug/log/kubernetes/stream.go diff --git a/debug/log/kubernetes/kubernetes.go b/debug/log/kubernetes/kubernetes.go deleted file mode 100644 index cf886d06..00000000 --- a/debug/log/kubernetes/kubernetes.go +++ /dev/null @@ -1,190 +0,0 @@ -// Package kubernetes is a logger implementing (github.com/micro/go-micro/v2/debug/log).Log -package kubernetes - -import ( - "bufio" - "encoding/json" - "fmt" - "os" - "sort" - "strconv" - "time" - - "github.com/micro/go-micro/v2/debug/log" - "github.com/micro/go-micro/v2/util/kubernetes/client" -) - -type klog struct { - client client.Client - - log.Options -} - -func (k *klog) podLogStream(podName string, stream *kubeStream) { - p := make(map[string]string) - p["follow"] = "true" - - // get the logs for the pod - body, err := k.client.Log(&client.Resource{ - Name: podName, - Kind: "pod", - }, client.LogParams(p)) - - if err != nil { - fmt.Fprintf(os.Stderr, err.Error()) - return - } - - s := bufio.NewScanner(body) - defer body.Close() - - for { - select { - case <-stream.stop: - return - default: - if s.Scan() { - record := k.parse(s.Text()) - stream.stream <- record - } else { - // TODO: is there a blocking call - // rather than a sleep loop? - time.Sleep(time.Second) - } - } - } -} - -func (k *klog) getMatchingPods() ([]string, error) { - r := &client.Resource{ - Kind: "pod", - Value: new(client.PodList), - } - - l := make(map[string]string) - - l["name"] = client.Format(k.Options.Name) - // TODO: specify micro:service - // l["micro"] = "service" - - if err := k.client.Get(r, client.GetLabels(l)); err != nil { - return nil, err - } - - var matches []string - - for _, p := range r.Value.(*client.PodList).Items { - // find labels that match the name - if p.Metadata.Labels["name"] == client.Format(k.Options.Name) { - matches = append(matches, p.Metadata.Name) - } - } - - return matches, nil -} - -func (k *klog) parse(line string) log.Record { - record := log.Record{} - - if err := json.Unmarshal([]byte(line), &record); err != nil { - record.Timestamp = time.Now().UTC() - record.Message = line - record.Metadata = make(map[string]string) - } - - record.Metadata["service"] = k.Options.Name - - return record -} - -func (k *klog) Read(options ...log.ReadOption) ([]log.Record, error) { - opts := &log.ReadOptions{} - for _, o := range options { - o(opts) - } - - pods, err := k.getMatchingPods() - if err != nil { - return nil, err - } - - var records []log.Record - - for _, pod := range pods { - logParams := make(map[string]string) - - if !opts.Since.Equal(time.Time{}) { - logParams["sinceSeconds"] = strconv.Itoa(int(time.Since(opts.Since).Seconds())) - } - - if opts.Count != 0 { - logParams["tailLines"] = strconv.Itoa(opts.Count) - } - - if opts.Stream == true { - logParams["follow"] = "true" - } - - logs, err := k.client.Log(&client.Resource{ - Name: pod, - Kind: "pod", - }, client.LogParams(logParams)) - - if err != nil { - return nil, err - } - defer logs.Close() - - s := bufio.NewScanner(logs) - - for s.Scan() { - record := k.parse(s.Text()) - record.Metadata["pod"] = pod - records = append(records, record) - } - } - - // sort the records - sort.Slice(records, func(i, j int) bool { return records[i].Timestamp.Before(records[j].Timestamp) }) - - return records, nil -} - -func (k *klog) Write(l log.Record) error { - return write(l) -} - -func (k *klog) Stream() (log.Stream, error) { - // find the matching pods - pods, err := k.getMatchingPods() - if err != nil { - return nil, err - } - - stream := &kubeStream{ - stream: make(chan log.Record), - stop: make(chan bool), - } - - // stream from the individual pods - for _, pod := range pods { - go k.podLogStream(pod, stream) - } - - return stream, nil -} - -// NewLog returns a configured Kubernetes logger -func NewLog(opts ...log.Option) log.Log { - klog := &klog{} - for _, o := range opts { - o(&klog.Options) - } - - if len(os.Getenv("KUBERNETES_SERVICE_HOST")) > 0 { - klog.client = client.NewClusterClient() - } else { - klog.client = client.NewLocalClient() - } - return klog -} diff --git a/debug/log/kubernetes/kubernetes_test.go b/debug/log/kubernetes/kubernetes_test.go deleted file mode 100644 index 97749869..00000000 --- a/debug/log/kubernetes/kubernetes_test.go +++ /dev/null @@ -1,74 +0,0 @@ -package kubernetes - -import ( - "bytes" - "encoding/json" - "io" - "os" - "testing" - "time" - - "github.com/micro/go-micro/v2/debug/log" - "github.com/stretchr/testify/assert" -) - -func TestKubernetes(t *testing.T) { - // TODO: fix local test running - return - - if os.Getenv("IN_TRAVIS_CI") == "yes" { - t.Skip("In Travis CI") - } - - k := NewLog(log.Name("micro-network")) - - r, w, err := os.Pipe() - if err != nil { - t.Fatal(err) - } - - s := os.Stderr - os.Stderr = w - meta := make(map[string]string) - - write := log.Record{ - Timestamp: time.Unix(0, 0).UTC(), - Message: "Test log entry", - Metadata: meta, - } - - meta["foo"] = "bar" - - k.Write(write) - b := &bytes.Buffer{} - w.Close() - io.Copy(b, r) - os.Stderr = s - - var read log.Record - - if err := json.Unmarshal(b.Bytes(), &read); err != nil { - t.Fatalf("json.Unmarshal failed: %s", err.Error()) - } - - assert.Equal(t, write, read, "Write was not equal") - - records, err := k.Read() - assert.Nil(t, err, "Read should not error") - assert.NotNil(t, records, "Read should return records") - - stream, err := k.Stream() - if err != nil { - t.Fatal(err) - } - - records = nil - - go stream.Stop() - - for s := range stream.Chan() { - records = append(records, s) - } - - assert.Equal(t, 0, len(records), "Stream should return nothing") -} diff --git a/debug/log/kubernetes/stream.go b/debug/log/kubernetes/stream.go deleted file mode 100644 index 50d18100..00000000 --- a/debug/log/kubernetes/stream.go +++ /dev/null @@ -1,44 +0,0 @@ -package kubernetes - -import ( - "encoding/json" - "fmt" - "os" - "sync" - - "github.com/micro/go-micro/v2/debug/log" -) - -func write(l log.Record) error { - m, err := json.Marshal(l) - if err == nil { - _, err := fmt.Fprintf(os.Stderr, "%s", m) - return err - } - return err -} - -type kubeStream struct { - // the k8s log stream - stream chan log.Record - sync.Mutex - // the stop chan - stop chan bool -} - -func (k *kubeStream) Chan() <-chan log.Record { - return k.stream -} - -func (k *kubeStream) Stop() error { - k.Lock() - defer k.Unlock() - select { - case <-k.stop: - return nil - default: - close(k.stop) - close(k.stream) - } - return nil -}