From bc30efcf709168a6f6d501c988d707c8f7578957 Mon Sep 17 00:00:00 2001
From: Asim Aslam <asim@aslam.me>
Date: Tue, 17 Dec 2019 15:38:03 +0000
Subject: [PATCH 1/3] Decruft the debug logger interface

---
 debug/log/default.go                       | 17 ++---
 debug/log/log.go                           |  2 +-
 debug/service/handler/debug.go             | 23 +++---
 debug/service/log.go                       | 81 ++++++++++++++++++++++
 debug/service/service.go                   | 20 +++---
 debug/stats/default.go                     |  6 +-
 {debug/buffer => util/ring}/buffer.go      | 61 +++++++++-------
 {debug/buffer => util/ring}/buffer_test.go |  2 +-
 8 files changed, 150 insertions(+), 62 deletions(-)
 create mode 100644 debug/service/log.go
 rename {debug/buffer => util/ring}/buffer.go (72%)
 rename {debug/buffer => util/ring}/buffer_test.go (98%)

diff --git a/debug/log/default.go b/debug/log/default.go
index 6f7bddab..3d79bdc9 100644
--- a/debug/log/default.go
+++ b/debug/log/default.go
@@ -4,17 +4,17 @@ import (
 	"fmt"
 	golog "log"
 
-	"github.com/micro/go-micro/debug/buffer"
+	"github.com/micro/go-micro/util/ring"
 )
 
 var (
 	// DefaultSize of the logger buffer
-	DefaultSize = 1000
+	DefaultSize = 1024
 )
 
 // defaultLog is default micro log
 type defaultLog struct {
-	*buffer.Buffer
+	*ring.Buffer
 }
 
 // NewLog returns default Logger with
@@ -28,7 +28,7 @@ func NewLog(opts ...Option) Log {
 	}
 
 	return &defaultLog{
-		Buffer: buffer.New(options.Size),
+		Buffer: ring.New(options.Size),
 	}
 }
 
@@ -46,7 +46,7 @@ func (l *defaultLog) Read(opts ...ReadOption) []Record {
 		o(&options)
 	}
 
-	var entries []*buffer.Entry
+	var entries []*ring.Entry
 	// if Since options ha sbeen specified we honor it
 	if !options.Since.IsZero() {
 		entries = l.Buffer.Since(options.Since)
@@ -82,9 +82,10 @@ func (l *defaultLog) Read(opts ...ReadOption) []Record {
 }
 
 // Stream returns channel for reading log records
-func (l *defaultLog) Stream(stop chan bool) <-chan Record {
+// along with a stop channel, close it when done
+func (l *defaultLog) Stream() (<-chan Record, chan bool) {
 	// get stream channel from ring buffer
-	stream := l.Buffer.Stream(stop)
+	stream, stop := l.Buffer.Stream()
 	// make a buffered channel
 	records := make(chan Record, 128)
 	// get last 10 records
@@ -110,5 +111,5 @@ func (l *defaultLog) Stream(stop chan bool) <-chan Record {
 		}
 	}()
 
-	return records
+	return records, stop
 }
diff --git a/debug/log/log.go b/debug/log/log.go
index 42a8f558..9425ff19 100644
--- a/debug/log/log.go
+++ b/debug/log/log.go
@@ -23,7 +23,7 @@ type Log interface {
 	// Write writes records to log
 	Write(Record)
 	// Stream log records
-	Stream(chan bool) <-chan Record
+	Stream() (<-chan Record, chan bool)
 }
 
 // Record is log record entry
diff --git a/debug/service/handler/debug.go b/debug/service/handler/debug.go
index ee24b27a..7f2f16e4 100644
--- a/debug/service/handler/debug.go
+++ b/debug/service/handler/debug.go
@@ -1,4 +1,4 @@
-// Pacjage handler implements service debug handler
+// Package handler implements service debug handler embedded in go-micro services
 package handler
 
 import (
@@ -66,26 +66,27 @@ func (d *Debug) Log(ctx context.Context, stream server.Stream) error {
 	}
 
 	if req.Stream {
-		stop := make(chan bool)
-		defer close(stop)
-
-		// TODO: we need to figure out how to close ithe log stream
+		// TODO: we need to figure out how to close the log stream
 		// It seems like when a client disconnects,
 		// the connection stays open until some timeout expires
 		// or something like that; that means the map of streams
 		// might end up leaking memory if not cleaned up properly
-		records := d.log.Stream(stop)
+		records, stop := d.log.Stream()
+		defer close(stop)
+
 		for record := range records {
 			if err := d.sendRecord(record, stream); err != nil {
 				return err
 			}
 		}
+
 		// done streaming, return
 		return nil
 	}
 
 	// get the log records
 	records := d.log.Read(options...)
+
 	// send all the logs downstream
 	for _, record := range records {
 		if err := d.sendRecord(record, stream); err != nil {
@@ -102,15 +103,9 @@ func (d *Debug) sendRecord(record log.Record, stream server.Stream) error {
 		metadata[k] = v
 	}
 
-	pbRecord := &proto.Record{
+	return stream.Send(&proto.Record{
 		Timestamp: record.Timestamp.Unix(),
 		Value:     record.Value.(string),
 		Metadata:  metadata,
-	}
-
-	if err := stream.Send(pbRecord); err != nil {
-		return err
-	}
-
-	return nil
+	})
 }
diff --git a/debug/service/log.go b/debug/service/log.go
new file mode 100644
index 00000000..7a9aabc5
--- /dev/null
+++ b/debug/service/log.go
@@ -0,0 +1,81 @@
+package service
+
+import (
+	"context"
+	"fmt"
+	"time"
+
+	"github.com/micro/go-micro/client"
+	"github.com/micro/go-micro/debug/log"
+	pb "github.com/micro/go-micro/debug/service/proto"
+)
+
+type serviceLog struct {
+	Client *debugClient
+}
+
+// Read reads log entries from the logger
+func (s *serviceLog) Read(opts ...log.ReadOption) []log.Record {
+	// TODO: parse opts
+	stream, err := s.Client.Log(opts...)
+	if err != nil {
+		return nil
+	}
+	// stream the records until nothing is left
+	var records []log.Record
+	for _, record := range stream {
+		records = append(records, record)
+	}
+	return records
+}
+
+// There is no write support
+func (s *serviceLog) Write(r log.Record) {
+	return
+}
+
+// Stream log records
+func (s *serviceLog) Stream(ch chan bool) (<-chan log.Record, chan bool) {
+	stop := make(chan bool)
+	stream, err := s.Client.Log(log.Stream(true))
+	if err != nil {
+		// return a closed stream
+		stream = make(chan log.Record)
+		close(stream)
+		return stream, stop
+	}
+
+	// stream the records until nothing is left
+	go func() {
+		var records []log.Record
+		for _, record := range stream {
+			select {
+			case stream <- record:
+			case <-stop:
+				return
+			}
+		}
+	}()
+
+	// return the stream
+	return stream, stop
+}
+
+// NewLog returns a new log interface
+func NewLog(opts ...log.Option) log.Log {
+	var options log.Options
+	for _, o := range opts {
+		o(&options)
+	}
+
+	name := options.Name
+
+	// set the default name
+	if len(name) == 0 {
+		name = debug.DefaultName
+	}
+
+	return serviceLog{
+		Client: newDebugClient(name),
+	}
+}
diff --git a/debug/service/service.go b/debug/service/service.go
index d48b0a3d..7527127a 100644
--- a/debug/service/service.go
+++ b/debug/service/service.go
@@ -1,3 +1,4 @@
+// Package service provides the service log
 package service
 
 import (
@@ -12,23 +13,23 @@ import (
 )
 
 // Debug provides debug service client
-type Debug struct {
-	dbg pb.DebugService
+type debugClient struct {
+	Client pb.DebugService
 }
 
 // NewDebug provides Debug service implementation
-func NewDebug(name string) *Debug {
+func newDebugClient(name string) *debug {
 	// create default client
 	cli := client.DefaultClient
 
-	return &Debug{
-		dbg: pb.NewDebugService(name, cli),
+	return &debugClient{
+		Client: pb.NewDebugService(name, cli),
 	}
 }
 
 // Logs queries the service logs and returns a channel to read the logs from
-func (d *Debug) Log(opts ...log.ReadOption) (<-chan log.Record, error) {
-	options := log.ReadOptions{}
+func (d *debugClient) Log(opts ...log.ReadOption) (<-chan log.Record, error) {
+	var options log.ReadOptions
 	// initialize the read options
 	for _, o := range opts {
 		o(&options)
@@ -46,20 +47,21 @@ func (d *Debug) Log(opts ...log.ReadOption) (<-chan log.Record, error) {
 	req.Stream = options.Stream
 
 	// get the log stream
-	stream, err := d.dbg.Log(context.Background(), req)
+	stream, err := d.Client.Log(context.Background(), req)
 	if err != nil {
 		return nil, fmt.Errorf("failed getting log stream: %s", err)
 	}
 
 	// log channel for streaming logs
 	logChan := make(chan log.Record)
+
 	// go stream logs
 	go d.streamLogs(logChan, stream)
 
 	return logChan, nil
 }
 
-func (d *Debug) streamLogs(logChan chan log.Record, stream pb.Debug_LogService) {
+func (d *debugClient) streamLogs(logChan chan log.Record, stream pb.Debug_LogService) {
 	defer stream.Close()
 
 	for {
diff --git a/debug/stats/default.go b/debug/stats/default.go
index b2d78fce..357d6303 100644
--- a/debug/stats/default.go
+++ b/debug/stats/default.go
@@ -1,11 +1,11 @@
 package stats
 
 import (
-	"github.com/micro/go-micro/debug/buffer"
+	"github.com/micro/go-micro/util/ring"
 )
 
 type stats struct {
-	buffer *buffer.Buffer
+	buffer *ring.Buffer
 }
 
 func (s *stats) Read() ([]*Stat, error) {
@@ -33,6 +33,6 @@ func (s *stats) Write(stat *Stat) error {
 // TODO add options
 func NewStats() Stats {
 	return &stats{
-		buffer: buffer.New(1024),
+		buffer: ring.New(1024),
 	}
 }
diff --git a/debug/buffer/buffer.go b/util/ring/buffer.go
similarity index 72%
rename from debug/buffer/buffer.go
rename to util/ring/buffer.go
index 007293aa..bb8febcf 100644
--- a/debug/buffer/buffer.go
+++ b/util/ring/buffer.go
@@ -1,5 +1,5 @@
-// Package buffer provides a simple ring buffer for storing local data
-package buffer
+// Package ring provides a simple ring buffer for storing local data
+package ring
 
 import (
 	"sync"
@@ -8,18 +8,13 @@ import (
 	"github.com/google/uuid"
 )
 
-type stream struct {
-	id      string
-	entries chan *Entry
-	stop    chan bool
-}
-
 // Buffer is ring buffer
 type Buffer struct {
 	size int
+
 	sync.RWMutex
 	vals    []*Entry
-	streams map[string]stream
+	streams map[string]*Stream
 }
 
 // Entry is ring buffer data entry
@@ -28,12 +23,14 @@ type Entry struct {
 	Timestamp time.Time
 }
 
-// New returns a new buffer of the given size
-func New(i int) *Buffer {
-	return &Buffer{
-		size:    i,
-		streams: make(map[string]stream),
-	}
+// Stream is used to stream the buffer
+type Stream struct {
+	// Id of the stream
+	Id      string
+	// Buffered entries
+	Entries chan *Entry
+	// Stop channel
+	Stop    chan bool
 }
 
 // Put adds a new value to ring buffer
@@ -53,13 +50,13 @@ func (b *Buffer) Put(v interface{}) {
 		b.vals = b.vals[1:]
 	}
 
-	// TODO: this is fucking ugly
+	// send to every stream
 	for _, stream := range b.streams {
 		select {
-		case <-stream.stop:
-			delete(b.streams, stream.id)
-			close(stream.entries)
-		case stream.entries <- entry:
+		case <-stream.Stop:
+			delete(b.streams, stream.Id)
+			close(stream.Entries)
+		case stream.Entries <- entry:
 		}
 	}
 }
@@ -115,22 +112,34 @@ func (b *Buffer) Since(t time.Time) []*Entry {
 }
 
 // Stream logs from the buffer
-func (b *Buffer) Stream(stop chan bool) <-chan *Entry {
+// Close the channel when you want to stop
+func (b *Buffer) Stream() (<-chan *Entry, chan bool) {
 	b.Lock()
 	defer b.Unlock()
 
 	entries := make(chan *Entry, 128)
 	id := uuid.New().String()
-	b.streams[id] = stream{
-		id:      id,
-		entries: entries,
-		stop:    stop,
+	stop := make(chan bool)
+
+	b.streams[id] = &Stream{
+		Id:      id,
+		Entries: entries,
+		Stop:    stop,
 	}
 
-	return entries
+	return entries, stop
 }
 
 // Size returns the size of the ring buffer
 func (b *Buffer) Size() int {
 	return b.size
 }
+
+// New returns a new buffer of the given size
+func New(i int) *Buffer {
+	return &Buffer{
+		size:    i,
+		streams: make(map[string]*Stream),
+	}
+}
+
diff --git a/debug/buffer/buffer_test.go b/util/ring/buffer_test.go
similarity index 98%
rename from debug/buffer/buffer_test.go
rename to util/ring/buffer_test.go
index 3a107923..0c9b5378 100644
--- a/debug/buffer/buffer_test.go
+++ b/util/ring/buffer_test.go
@@ -1,4 +1,4 @@
-package buffer
+package ring
 
 import (
 	"testing"

From d502e2f58abf344c50ceb4dc421cb9b248344b2b Mon Sep 17 00:00:00 2001
From: Asim Aslam <asim@aslam.me>
Date: Tue, 17 Dec 2019 15:46:09 +0000
Subject: [PATCH 2/3] fix breaks

---
 debug/log/log.go         |  2 +-
 debug/log/options.go     |  9 +++++++++
 debug/service/log.go     | 31 +++++++++++++------------------
 debug/service/service.go |  2 +-
 4 files changed, 24 insertions(+), 20 deletions(-)

diff --git a/debug/log/log.go b/debug/log/log.go
index 9425ff19..918d0c55 100644
--- a/debug/log/log.go
+++ b/debug/log/log.go
@@ -174,6 +174,6 @@ func SetPrefix(p string) {
 }
 
 // Set service name
-func Name(name string) {
+func SetName(name string) {
 	prefix = fmt.Sprintf("[%s]", name)
 }
diff --git a/debug/log/options.go b/debug/log/options.go
index 03dece38..0d83394b 100644
--- a/debug/log/options.go
+++ b/debug/log/options.go
@@ -7,10 +7,19 @@ type Option func(*Options)
 
 // Options are logger options
 type Options struct {
+	// Name of the log
+	Name string
 	// Size is the size of ring buffer
 	Size int
 }
 
+// Name of the log
+func Name(n string) Option {
+	return func(o *Options) {
+		o.Name = n
+	}
+}
+
 // Size sets the size of the ring buffer
 func Size(s int) Option {
 	return func(o *Options) {
diff --git a/debug/service/log.go b/debug/service/log.go
index 7a9aabc5..3508874c 100644
--- a/debug/service/log.go
+++ b/debug/service/log.go
@@ -1,13 +1,8 @@
 package service
 
 import (
-	"context"
-	"fmt"
-	"time"
-
-	"github.com/micro/go-micro/client"
+	"github.com/micro/go-micro/debug"
 	"github.com/micro/go-micro/debug/log"
-	pb "github.com/micro/go-micro/debug/service/proto"
 )
 
 type serviceLog struct {
@@ -23,7 +18,7 @@ func (s *serviceLog) Read(opts ...log.ReadOption) []log.Record {
 	}
 	// stream the records until nothing is left
 	var records []log.Record
-	for _, record := range stream {
+	for record := range stream {
 		records = append(records, record)
 	}
 	return records
@@ -35,30 +30,30 @@ func (s *serviceLog) Write(r log.Record) {
 }
 
 // Stream log records
-func (s *serviceLog) Stream(ch chan bool) (<-chan log.Record, chan bool) {
+func (s *serviceLog) Stream() (<-chan log.Record, chan bool) {
 	stop := make(chan bool)
 	stream, err := s.Client.Log(log.Stream(true))
 	if err != nil {
 		// return a closed stream
-		stream = make(chan log.Record)
-		close(stream)
-		return stream, stop
+		deadStream := make(chan log.Record)
+		close(deadStream)
+		return deadStream, stop
 	}
 
-	// stream the records until nothing is left
+	newStream := make(chan log.Record, 128)
+
 	go func() {
-		var records []log.Record
-		for _, record := range stream {
+		for {
 			select {
-			case stream <- record:
+			case rec := <-stream:
+				newStream <- rec
 			case <-stop:
 				return
 			}
 		}
 	}()
 
-	// return the stream
-	return stream, stop
+	return newStream, stop
 }
 
 // NewLog returns a new log interface
@@ -75,7 +70,7 @@ func NewLog(opts ...log.Option) log.Log {
 		name = debug.DefaultName
 	}
 
-	return serviceLog{
+	return &serviceLog{
 		Client: newDebugClient(name),
 	}
 }
diff --git a/debug/service/service.go b/debug/service/service.go
index 7527127a..7d73c753 100644
--- a/debug/service/service.go
+++ b/debug/service/service.go
@@ -18,7 +18,7 @@ type debugClient struct {
 }
 
 // NewDebug provides Debug service implementation
-func newDebugClient(name string) *debug {
+func newDebugClient(name string) *debugClient {
 	// create default client
 	cli := client.DefaultClient
 

From b35dfb1086004dd862c29ec225244d1594e440fe Mon Sep 17 00:00:00 2001
From: Asim Aslam <asim@aslam.me>
Date: Tue, 17 Dec 2019 15:56:49 +0000
Subject: [PATCH 3/3] fix further breaks

---
 debug/service/log.go     | 2 +-
 debug/service/service.go | 4 ++--
 2 files changed, 3 insertions(+), 3 deletions(-)

diff --git a/debug/service/log.go b/debug/service/log.go
index 3508874c..647b349a 100644
--- a/debug/service/log.go
+++ b/debug/service/log.go
@@ -71,6 +71,6 @@ func NewLog(opts ...log.Option) log.Log {
 	}
 
 	return &serviceLog{
-		Client: newDebugClient(name),
+		Client: NewClient(name),
 	}
 }
diff --git a/debug/service/service.go b/debug/service/service.go
index 7d73c753..1da11c6e 100644
--- a/debug/service/service.go
+++ b/debug/service/service.go
@@ -17,8 +17,8 @@ type debugClient struct {
 	Client pb.DebugService
 }
 
-// NewDebug provides Debug service implementation
-func newDebugClient(name string) *debugClient {
+// NewClient provides a debug client
+func NewClient(name string) *debugClient {
 	// create default client
 	cli := client.DefaultClient