diff --git a/debug/buffer/buffer.go b/debug/buffer/buffer.go index 881e832a..007293aa 100644 --- a/debug/buffer/buffer.go +++ b/debug/buffer/buffer.go @@ -4,13 +4,22 @@ package buffer import ( "sync" "time" + + "github.com/google/uuid" ) +type stream struct { + id string + entries chan *Entry + stop chan bool +} + // Buffer is ring buffer type Buffer struct { size int sync.RWMutex - vals []*Entry + vals []*Entry + streams map[string]stream } // Entry is ring buffer data entry @@ -22,7 +31,8 @@ type Entry struct { // New returns a new buffer of the given size func New(i int) *Buffer { return &Buffer{ - size: i, + size: i, + streams: make(map[string]stream), } } @@ -32,15 +42,26 @@ func (b *Buffer) Put(v interface{}) { defer b.Unlock() // append to values - b.vals = append(b.vals, &Entry{ + entry := &Entry{ Value: v, Timestamp: time.Now(), - }) + } + b.vals = append(b.vals, entry) // trim if bigger than size required if len(b.vals) > b.size { b.vals = b.vals[1:] } + + // TODO: this is fucking ugly + for _, stream := range b.streams { + select { + case <-stream.stop: + delete(b.streams, stream.id) + close(stream.entries) + case stream.entries <- entry: + } + } } // Get returns the last n entries @@ -93,6 +114,22 @@ func (b *Buffer) Since(t time.Time) []*Entry { return nil } +// Stream logs from the buffer +func (b *Buffer) Stream(stop chan bool) <-chan *Entry { + b.Lock() + defer b.Unlock() + + entries := make(chan *Entry, 128) + id := uuid.New().String() + b.streams[id] = stream{ + id: id, + entries: entries, + stop: stop, + } + + return entries +} + // Size returns the size of the ring buffer func (b *Buffer) Size() int { return b.size diff --git a/debug/handler/debug.go b/debug/handler/debug.go index d5a9ea6c..f21dd644 100644 --- a/debug/handler/debug.go +++ b/debug/handler/debug.go @@ -59,27 +59,52 @@ func (d *Debug) Logs(ctx context.Context, req *proto.LogRequest, stream proto.De options = append(options, log.Count(count)) } + if req.Stream { + stop := make(chan bool) + defer close(stop) + + // TODO: figure out how to close log stream + // It seems when the client disconnects, + // the connection stays open until some timeout expires + // or something like that; that means the map of streams + // might end up bloating if not cleaned up properly + records := d.log.Stream(stop) + for record := range records { + if err := d.sendRecord(record, stream); err != nil { + return err + } + } + // done streaming, return + return nil + } + // get the log records records := d.log.Read(options...) - - // TODO: figure out the stream - + // send all the logs downstream for _, record := range records { - metadata := make(map[string]string) - for k, v := range record.Metadata { - metadata[k] = v - } - - recLog := &proto.Log{ - Timestamp: record.Timestamp.UnixNano(), - Value: record.Value.(string), - Metadata: metadata, - } - - if err := stream.Send(recLog); err != nil { + if err := d.sendRecord(record, stream); err != nil { return err } } return nil } + +func (d *Debug) sendRecord(record log.Record, stream proto.Debug_LogsStream) error { + metadata := make(map[string]string) + for k, v := range record.Metadata { + metadata[k] = v + } + + recLog := &proto.Log{ + Timestamp: record.Timestamp.UnixNano(), + Value: record.Value.(string), + Metadata: metadata, + } + + if err := stream.Send(recLog); err != nil { + return err + } + + return nil +} diff --git a/debug/log/default.go b/debug/log/default.go index 35b11fe4..d715e0c6 100644 --- a/debug/log/default.go +++ b/debug/log/default.go @@ -34,8 +34,8 @@ func NewLog(opts ...Option) Log { // Write writes logs into logger func (l *defaultLog) Write(v ...interface{}) { - l.Buffer.Put(fmt.Sprint(v...)) golog.Print(v...) + l.Buffer.Put(fmt.Sprint(v...)) } // Read reads logs and returns them @@ -53,10 +53,10 @@ func (l *defaultLog) Read(opts ...ReadOption) []Record { } // only if we specified valid count constraint - // do we do some serious if-else kung-fu - // if since has been given we return *count* number of - // logs since the requested timestamp; - // otherwise we retourn last count number of logs + // do we end up doing some serious if-else kung-fu + // if since constraint has been provided + // we return *count* number of logs since the given timestamp; + // otherwise we return last count number of logs if options.Count > 0 { switch len(entries) > 0 { case true: @@ -80,3 +80,25 @@ func (l *defaultLog) Read(opts ...ReadOption) []Record { return records } + +// Stream returns channel for reading log records +func (l *defaultLog) Stream(stop chan bool) <-chan Record { + // get stream channel from ring buffer + stream := l.Buffer.Stream(stop) + records := make(chan Record) + + fmt.Println("requested log stream") + + // stream the log records + go func() { + for entry := range stream { + records <- Record{ + Timestamp: entry.Timestamp, + Value: entry.Value, + Metadata: make(map[string]string), + } + } + }() + + return records +} diff --git a/debug/log/log.go b/debug/log/log.go index f56fa5de..c70ce00c 100644 --- a/debug/log/log.go +++ b/debug/log/log.go @@ -22,6 +22,8 @@ type Log interface { Read(...ReadOption) []Record // Write writes logs to logger Write(...interface{}) + // Stream logs + Stream(chan bool) <-chan Record } // Record is log record entry diff --git a/debug/service/service.go b/debug/service/service.go index aa3fd2e7..9fda358c 100644 --- a/debug/service/service.go +++ b/debug/service/service.go @@ -43,6 +43,8 @@ func (d *Debug) Logs(opts ...log.ReadOption) (<-chan log.Record, error) { req.Count = int64(options.Count) } + req.Stream = options.Stream + // get the log stream stream, err := d.dbg.Logs(context.Background(), req) if err != nil {