mirror of
https://github.com/go-micro/go-micro.git
synced 2025-01-23 17:53:05 +02:00
Merge branch 'master' of ssh://github.com/micro/go-micro
This commit is contained in:
commit
795ec509fd
@ -4,45 +4,76 @@ package buffer
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
type stream struct {
|
||||
id string
|
||||
entries chan *Entry
|
||||
stop chan bool
|
||||
}
|
||||
|
||||
// Buffer is ring buffer
|
||||
type Buffer struct {
|
||||
size int
|
||||
sync.RWMutex
|
||||
vals []*Entry
|
||||
vals []*Entry
|
||||
streams map[string]stream
|
||||
}
|
||||
|
||||
// Entry is ring buffer data entry
|
||||
type Entry struct {
|
||||
Value interface{}
|
||||
Timestamp time.Time
|
||||
}
|
||||
|
||||
// New returns a new buffer of the given size
|
||||
func New(i int) *Buffer {
|
||||
return &Buffer{
|
||||
size: i,
|
||||
streams: make(map[string]stream),
|
||||
}
|
||||
}
|
||||
|
||||
// Put adds a new value to ring buffer
|
||||
func (b *Buffer) Put(v interface{}) {
|
||||
b.Lock()
|
||||
defer b.Unlock()
|
||||
|
||||
// append to values
|
||||
b.vals = append(b.vals, &Entry{
|
||||
entry := &Entry{
|
||||
Value: v,
|
||||
Timestamp: time.Now(),
|
||||
})
|
||||
}
|
||||
b.vals = append(b.vals, entry)
|
||||
|
||||
// trim if bigger than size required
|
||||
if len(b.vals) > b.size {
|
||||
b.vals = b.vals[1:]
|
||||
}
|
||||
|
||||
// TODO: this is fucking ugly
|
||||
for _, stream := range b.streams {
|
||||
select {
|
||||
case <-stream.stop:
|
||||
delete(b.streams, stream.id)
|
||||
close(stream.entries)
|
||||
case stream.entries <- entry:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Get returns the last n entries
|
||||
func (b *Buffer) Get(n int) []*Entry {
|
||||
b.RLock()
|
||||
defer b.RUnlock()
|
||||
|
||||
// reset any invalid values
|
||||
if n > b.size || n < 0 {
|
||||
n = b.size
|
||||
}
|
||||
|
||||
b.RLock()
|
||||
defer b.RUnlock()
|
||||
|
||||
// create a delta
|
||||
delta := b.size - n
|
||||
|
||||
@ -83,13 +114,23 @@ func (b *Buffer) Since(t time.Time) []*Entry {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stream logs from the buffer
|
||||
func (b *Buffer) Stream(stop chan bool) <-chan *Entry {
|
||||
b.Lock()
|
||||
defer b.Unlock()
|
||||
|
||||
entries := make(chan *Entry, 128)
|
||||
id := uuid.New().String()
|
||||
b.streams[id] = stream{
|
||||
id: id,
|
||||
entries: entries,
|
||||
stop: stop,
|
||||
}
|
||||
|
||||
return entries
|
||||
}
|
||||
|
||||
// Size returns the size of the ring buffer
|
||||
func (b *Buffer) Size() int {
|
||||
return b.size
|
||||
}
|
||||
|
||||
// New returns a new buffer of the given size
|
||||
func New(i int) *Buffer {
|
||||
return &Buffer{
|
||||
size: i,
|
||||
}
|
||||
}
|
||||
|
7
debug/debug.go
Normal file
7
debug/debug.go
Normal file
@ -0,0 +1,7 @@
|
||||
// Package debug provides micro debug packages
|
||||
package debug
|
||||
|
||||
var (
|
||||
// DefaultName is the name of debug service
|
||||
DefaultName = "go.micro.debug"
|
||||
)
|
@ -1,3 +1,4 @@
|
||||
// Pacjage handler implements service debug handler
|
||||
package handler
|
||||
|
||||
import (
|
||||
@ -5,21 +6,26 @@ import (
|
||||
"runtime"
|
||||
"time"
|
||||
|
||||
"github.com/micro/go-micro/debug/log"
|
||||
|
||||
proto "github.com/micro/go-micro/debug/proto"
|
||||
)
|
||||
|
||||
type Debug struct {
|
||||
proto.DebugHandler
|
||||
started int64
|
||||
}
|
||||
|
||||
var (
|
||||
// DefaultHandler is default debug handler
|
||||
DefaultHandler = newDebug()
|
||||
)
|
||||
|
||||
type Debug struct {
|
||||
started int64
|
||||
proto.DebugHandler
|
||||
log log.Log
|
||||
}
|
||||
|
||||
func newDebug() *Debug {
|
||||
return &Debug{
|
||||
started: time.Now().Unix(),
|
||||
log: log.DefaultLog,
|
||||
}
|
||||
}
|
||||
|
||||
@ -39,3 +45,66 @@ func (d *Debug) Stats(ctx context.Context, req *proto.StatsRequest, rsp *proto.S
|
||||
rsp.Threads = uint64(runtime.NumGoroutine())
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *Debug) Logs(ctx context.Context, req *proto.LogRequest, stream proto.Debug_LogsStream) error {
|
||||
var options []log.ReadOption
|
||||
|
||||
since := time.Unix(req.Since, 0)
|
||||
if !since.IsZero() {
|
||||
options = append(options, log.Since(since))
|
||||
}
|
||||
|
||||
count := int(req.Count)
|
||||
if count > 0 {
|
||||
options = append(options, log.Count(count))
|
||||
}
|
||||
|
||||
if req.Stream {
|
||||
stop := make(chan bool)
|
||||
defer close(stop)
|
||||
|
||||
// TODO: we need to figure out how to close ithe log stream
|
||||
// It seems like when a client disconnects,
|
||||
// the connection stays open until some timeout expires
|
||||
// or something like that; that means the map of streams
|
||||
// might end up leaking memory if not cleaned up properly
|
||||
records := d.log.Stream(stop)
|
||||
for record := range records {
|
||||
if err := d.sendRecord(record, stream); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
// done streaming, return
|
||||
return nil
|
||||
}
|
||||
|
||||
// get the log records
|
||||
records := d.log.Read(options...)
|
||||
// send all the logs downstream
|
||||
for _, record := range records {
|
||||
if err := d.sendRecord(record, stream); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *Debug) sendRecord(record log.Record, stream proto.Debug_LogsStream) error {
|
||||
metadata := make(map[string]string)
|
||||
for k, v := range record.Metadata {
|
||||
metadata[k] = v
|
||||
}
|
||||
|
||||
pbRecord := &proto.Record{
|
||||
Timestamp: record.Timestamp.Unix(),
|
||||
Value: record.Value.(string),
|
||||
Metadata: metadata,
|
||||
}
|
||||
|
||||
if err := stream.Send(pbRecord); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
114
debug/log/default.go
Normal file
114
debug/log/default.go
Normal file
@ -0,0 +1,114 @@
|
||||
package log
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
golog "log"
|
||||
|
||||
"github.com/micro/go-micro/debug/buffer"
|
||||
)
|
||||
|
||||
var (
|
||||
// DefaultSize of the logger buffer
|
||||
DefaultSize = 1000
|
||||
)
|
||||
|
||||
// defaultLog is default micro log
|
||||
type defaultLog struct {
|
||||
*buffer.Buffer
|
||||
}
|
||||
|
||||
// NewLog returns default Logger with
|
||||
func NewLog(opts ...Option) Log {
|
||||
// get default options
|
||||
options := DefaultOptions()
|
||||
|
||||
// apply requested options
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
|
||||
return &defaultLog{
|
||||
Buffer: buffer.New(options.Size),
|
||||
}
|
||||
}
|
||||
|
||||
// Write writes logs into logger
|
||||
func (l *defaultLog) Write(r Record) {
|
||||
golog.Print(r.Value)
|
||||
l.Buffer.Put(fmt.Sprint(r.Value))
|
||||
}
|
||||
|
||||
// Read reads logs and returns them
|
||||
func (l *defaultLog) Read(opts ...ReadOption) []Record {
|
||||
options := ReadOptions{}
|
||||
// initialize the read options
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
|
||||
var entries []*buffer.Entry
|
||||
// if Since options ha sbeen specified we honor it
|
||||
if !options.Since.IsZero() {
|
||||
entries = l.Buffer.Since(options.Since)
|
||||
}
|
||||
|
||||
// only if we specified valid count constraint
|
||||
// do we end up doing some serious if-else kung-fu
|
||||
// if since constraint has been provided
|
||||
// we return *count* number of logs since the given timestamp;
|
||||
// otherwise we return last count number of logs
|
||||
if options.Count > 0 {
|
||||
switch len(entries) > 0 {
|
||||
case true:
|
||||
// if we request fewer logs than what since constraint gives us
|
||||
if options.Count < len(entries) {
|
||||
entries = entries[0:options.Count]
|
||||
}
|
||||
default:
|
||||
entries = l.Buffer.Get(options.Count)
|
||||
}
|
||||
}
|
||||
|
||||
records := make([]Record, 0, len(entries))
|
||||
for _, entry := range entries {
|
||||
record := Record{
|
||||
Timestamp: entry.Timestamp,
|
||||
Value: entry.Value,
|
||||
}
|
||||
records = append(records, record)
|
||||
}
|
||||
|
||||
return records
|
||||
}
|
||||
|
||||
// Stream returns channel for reading log records
|
||||
func (l *defaultLog) Stream(stop chan bool) <-chan Record {
|
||||
// get stream channel from ring buffer
|
||||
stream := l.Buffer.Stream(stop)
|
||||
// make a buffered channel
|
||||
records := make(chan Record, 128)
|
||||
// get last 10 records
|
||||
last10 := l.Buffer.Get(10)
|
||||
|
||||
// stream the log records
|
||||
go func() {
|
||||
// first send last 10 records
|
||||
for _, entry := range last10 {
|
||||
records <- Record{
|
||||
Timestamp: entry.Timestamp,
|
||||
Value: entry.Value,
|
||||
Metadata: make(map[string]string),
|
||||
}
|
||||
}
|
||||
// now stream continuously
|
||||
for entry := range stream {
|
||||
records <- Record{
|
||||
Timestamp: entry.Timestamp,
|
||||
Value: entry.Value,
|
||||
Metadata: make(map[string]string),
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return records
|
||||
}
|
32
debug/log/default_test.go
Normal file
32
debug/log/default_test.go
Normal file
@ -0,0 +1,32 @@
|
||||
package log
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestLogger(t *testing.T) {
|
||||
// set size to some value
|
||||
size := 100
|
||||
// override the global logger
|
||||
DefaultLog = NewLog(Size(size))
|
||||
// make sure we have the right size of the logger ring buffer
|
||||
if DefaultLog.(*defaultLog).Size() != size {
|
||||
t.Errorf("expected buffer size: %d, got: %d", size, DefaultLog.(*defaultLog).Size())
|
||||
}
|
||||
|
||||
// Log some cruft
|
||||
Info("foobar")
|
||||
// increase the log level
|
||||
DefaultLevel = LevelDebug
|
||||
Debugf("foo %s", "bar")
|
||||
|
||||
// Check if the logs are stored in the logger ring buffer
|
||||
expected := []string{"foobar", "foo bar"}
|
||||
entries := DefaultLog.Read(Count(len(expected)))
|
||||
for i, entry := range entries {
|
||||
if !reflect.DeepEqual(entry.Value, expected[i]) {
|
||||
t.Errorf("expected %s, got %s", expected[i], entry.Value)
|
||||
}
|
||||
}
|
||||
}
|
179
debug/log/log.go
Normal file
179
debug/log/log.go
Normal file
@ -0,0 +1,179 @@
|
||||
// Package log provides debug logging
|
||||
package log
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"time"
|
||||
)
|
||||
|
||||
var (
|
||||
// DefaultLog logger
|
||||
DefaultLog = NewLog()
|
||||
// DefaultLevel is default log level
|
||||
DefaultLevel = LevelInfo
|
||||
// prefix for all messages
|
||||
prefix string
|
||||
)
|
||||
|
||||
// Log is event log
|
||||
type Log interface {
|
||||
// Read reads log entries from the logger
|
||||
Read(...ReadOption) []Record
|
||||
// Write writes records to log
|
||||
Write(Record)
|
||||
// Stream log records
|
||||
Stream(chan bool) <-chan Record
|
||||
}
|
||||
|
||||
// Record is log record entry
|
||||
type Record struct {
|
||||
// Timestamp of logged event
|
||||
Timestamp time.Time
|
||||
// Value contains log entry
|
||||
Value interface{}
|
||||
// Metadata to enrich log record
|
||||
Metadata map[string]string
|
||||
}
|
||||
|
||||
// level is a log level
|
||||
type Level int
|
||||
|
||||
const (
|
||||
LevelFatal Level = iota
|
||||
LevelError
|
||||
LevelInfo
|
||||
LevelWarn
|
||||
LevelDebug
|
||||
LevelTrace
|
||||
)
|
||||
|
||||
func init() {
|
||||
switch os.Getenv("MICRO_LOG_LEVEL") {
|
||||
case "trace":
|
||||
DefaultLevel = LevelTrace
|
||||
case "debug":
|
||||
DefaultLevel = LevelDebug
|
||||
case "warn":
|
||||
DefaultLevel = LevelWarn
|
||||
case "info":
|
||||
DefaultLevel = LevelInfo
|
||||
case "error":
|
||||
DefaultLevel = LevelError
|
||||
case "fatal":
|
||||
DefaultLevel = LevelFatal
|
||||
}
|
||||
}
|
||||
|
||||
func log(v ...interface{}) {
|
||||
if len(prefix) > 0 {
|
||||
DefaultLog.Write(Record{Value: fmt.Sprint(append([]interface{}{prefix, " "}, v...)...)})
|
||||
return
|
||||
}
|
||||
DefaultLog.Write(Record{Value: fmt.Sprint(v...)})
|
||||
}
|
||||
|
||||
func logf(format string, v ...interface{}) {
|
||||
if len(prefix) > 0 {
|
||||
format = prefix + " " + format
|
||||
}
|
||||
DefaultLog.Write(Record{Value: fmt.Sprintf(format, v...)})
|
||||
}
|
||||
|
||||
// WithLevel logs with the level specified
|
||||
func WithLevel(l Level, v ...interface{}) {
|
||||
if l > DefaultLevel {
|
||||
return
|
||||
}
|
||||
log(v...)
|
||||
}
|
||||
|
||||
// WithLevel logs with the level specified
|
||||
func WithLevelf(l Level, format string, v ...interface{}) {
|
||||
if l > DefaultLevel {
|
||||
return
|
||||
}
|
||||
logf(format, v...)
|
||||
}
|
||||
|
||||
// Trace provides trace level logging
|
||||
func Trace(v ...interface{}) {
|
||||
WithLevel(LevelTrace, v...)
|
||||
}
|
||||
|
||||
// Tracef provides trace level logging
|
||||
func Tracef(format string, v ...interface{}) {
|
||||
WithLevelf(LevelTrace, format, v...)
|
||||
}
|
||||
|
||||
// Debug provides debug level logging
|
||||
func Debug(v ...interface{}) {
|
||||
WithLevel(LevelDebug, v...)
|
||||
}
|
||||
|
||||
// Debugf provides debug level logging
|
||||
func Debugf(format string, v ...interface{}) {
|
||||
WithLevelf(LevelDebug, format, v...)
|
||||
}
|
||||
|
||||
// Warn provides warn level logging
|
||||
func Warn(v ...interface{}) {
|
||||
WithLevel(LevelWarn, v...)
|
||||
}
|
||||
|
||||
// Warnf provides warn level logging
|
||||
func Warnf(format string, v ...interface{}) {
|
||||
WithLevelf(LevelWarn, format, v...)
|
||||
}
|
||||
|
||||
// Info provides info level logging
|
||||
func Info(v ...interface{}) {
|
||||
WithLevel(LevelInfo, v...)
|
||||
}
|
||||
|
||||
// Infof provides info level logging
|
||||
func Infof(format string, v ...interface{}) {
|
||||
WithLevelf(LevelInfo, format, v...)
|
||||
}
|
||||
|
||||
// Error provides warn level logging
|
||||
func Error(v ...interface{}) {
|
||||
WithLevel(LevelError, v...)
|
||||
}
|
||||
|
||||
// Errorf provides warn level logging
|
||||
func Errorf(format string, v ...interface{}) {
|
||||
WithLevelf(LevelError, format, v...)
|
||||
}
|
||||
|
||||
// Fatal logs with Log and then exits with os.Exit(1)
|
||||
func Fatal(v ...interface{}) {
|
||||
WithLevel(LevelFatal, v...)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
// Fatalf logs with Logf and then exits with os.Exit(1)
|
||||
func Fatalf(format string, v ...interface{}) {
|
||||
WithLevelf(LevelFatal, format, v...)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
// SetLevel sets the log level
|
||||
func SetLevel(l Level) {
|
||||
DefaultLevel = l
|
||||
}
|
||||
|
||||
// GetLevel returns the current level
|
||||
func GetLevel() Level {
|
||||
return DefaultLevel
|
||||
}
|
||||
|
||||
// Set a prefix for the logger
|
||||
func SetPrefix(p string) {
|
||||
prefix = p
|
||||
}
|
||||
|
||||
// Set service name
|
||||
func Name(name string) {
|
||||
prefix = fmt.Sprintf("[%s]", name)
|
||||
}
|
60
debug/log/options.go
Normal file
60
debug/log/options.go
Normal file
@ -0,0 +1,60 @@
|
||||
package log
|
||||
|
||||
import "time"
|
||||
|
||||
// Option used by the logger
|
||||
type Option func(*Options)
|
||||
|
||||
// Options are logger options
|
||||
type Options struct {
|
||||
// Size is the size of ring buffer
|
||||
Size int
|
||||
}
|
||||
|
||||
// Size sets the size of the ring buffer
|
||||
func Size(s int) Option {
|
||||
return func(o *Options) {
|
||||
o.Size = s
|
||||
}
|
||||
}
|
||||
|
||||
// DefaultOptions returns default options
|
||||
func DefaultOptions() Options {
|
||||
return Options{
|
||||
Size: DefaultSize,
|
||||
}
|
||||
}
|
||||
|
||||
// ReadOptions for querying the logs
|
||||
type ReadOptions struct {
|
||||
// Since what time in past to return the logs
|
||||
Since time.Time
|
||||
// Count specifies number of logs to return
|
||||
Count int
|
||||
// Stream requests continuous log stream
|
||||
Stream bool
|
||||
}
|
||||
|
||||
// ReadOption used for reading the logs
|
||||
type ReadOption func(*ReadOptions)
|
||||
|
||||
// Since sets the time since which to return the log records
|
||||
func Since(s time.Time) ReadOption {
|
||||
return func(o *ReadOptions) {
|
||||
o.Since = s
|
||||
}
|
||||
}
|
||||
|
||||
// Count sets the number of log records to return
|
||||
func Count(c int) ReadOption {
|
||||
return func(o *ReadOptions) {
|
||||
o.Count = c
|
||||
}
|
||||
}
|
||||
|
||||
// Stream requests continuous log stream
|
||||
func Stream(s bool) ReadOption {
|
||||
return func(o *ReadOptions) {
|
||||
o.Stream = s
|
||||
}
|
||||
}
|
@ -1,5 +1,5 @@
|
||||
// Code generated by protoc-gen-go. DO NOT EDIT.
|
||||
// source: github.com/micro/go-micro/debug/proto/debug.proto
|
||||
// source: debug.proto
|
||||
|
||||
package debug
|
||||
|
||||
@ -30,7 +30,7 @@ func (m *HealthRequest) Reset() { *m = HealthRequest{} }
|
||||
func (m *HealthRequest) String() string { return proto.CompactTextString(m) }
|
||||
func (*HealthRequest) ProtoMessage() {}
|
||||
func (*HealthRequest) Descriptor() ([]byte, []int) {
|
||||
return fileDescriptor_7cb19b1a05a6e0a9, []int{0}
|
||||
return fileDescriptor_8d9d361be58531fb, []int{0}
|
||||
}
|
||||
|
||||
func (m *HealthRequest) XXX_Unmarshal(b []byte) error {
|
||||
@ -63,7 +63,7 @@ func (m *HealthResponse) Reset() { *m = HealthResponse{} }
|
||||
func (m *HealthResponse) String() string { return proto.CompactTextString(m) }
|
||||
func (*HealthResponse) ProtoMessage() {}
|
||||
func (*HealthResponse) Descriptor() ([]byte, []int) {
|
||||
return fileDescriptor_7cb19b1a05a6e0a9, []int{1}
|
||||
return fileDescriptor_8d9d361be58531fb, []int{1}
|
||||
}
|
||||
|
||||
func (m *HealthResponse) XXX_Unmarshal(b []byte) error {
|
||||
@ -101,7 +101,7 @@ func (m *StatsRequest) Reset() { *m = StatsRequest{} }
|
||||
func (m *StatsRequest) String() string { return proto.CompactTextString(m) }
|
||||
func (*StatsRequest) ProtoMessage() {}
|
||||
func (*StatsRequest) Descriptor() ([]byte, []int) {
|
||||
return fileDescriptor_7cb19b1a05a6e0a9, []int{2}
|
||||
return fileDescriptor_8d9d361be58531fb, []int{2}
|
||||
}
|
||||
|
||||
func (m *StatsRequest) XXX_Unmarshal(b []byte) error {
|
||||
@ -142,7 +142,7 @@ func (m *StatsResponse) Reset() { *m = StatsResponse{} }
|
||||
func (m *StatsResponse) String() string { return proto.CompactTextString(m) }
|
||||
func (*StatsResponse) ProtoMessage() {}
|
||||
func (*StatsResponse) Descriptor() ([]byte, []int) {
|
||||
return fileDescriptor_7cb19b1a05a6e0a9, []int{3}
|
||||
return fileDescriptor_8d9d361be58531fb, []int{3}
|
||||
}
|
||||
|
||||
func (m *StatsResponse) XXX_Unmarshal(b []byte) error {
|
||||
@ -198,32 +198,161 @@ func (m *StatsResponse) GetGc() uint64 {
|
||||
return 0
|
||||
}
|
||||
|
||||
// LogRequest requests service logs
|
||||
type LogRequest struct {
|
||||
// count of records to request
|
||||
Count int64 `protobuf:"varint,1,opt,name=count,proto3" json:"count,omitempty"`
|
||||
// relative time in seconds
|
||||
// before the current time
|
||||
// from which to show logs
|
||||
Since int64 `protobuf:"varint,2,opt,name=since,proto3" json:"since,omitempty"`
|
||||
// stream records continuously
|
||||
Stream bool `protobuf:"varint,3,opt,name=stream,proto3" json:"stream,omitempty"`
|
||||
XXX_NoUnkeyedLiteral struct{} `json:"-"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
XXX_sizecache int32 `json:"-"`
|
||||
}
|
||||
|
||||
func (m *LogRequest) Reset() { *m = LogRequest{} }
|
||||
func (m *LogRequest) String() string { return proto.CompactTextString(m) }
|
||||
func (*LogRequest) ProtoMessage() {}
|
||||
func (*LogRequest) Descriptor() ([]byte, []int) {
|
||||
return fileDescriptor_8d9d361be58531fb, []int{4}
|
||||
}
|
||||
|
||||
func (m *LogRequest) XXX_Unmarshal(b []byte) error {
|
||||
return xxx_messageInfo_LogRequest.Unmarshal(m, b)
|
||||
}
|
||||
func (m *LogRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
|
||||
return xxx_messageInfo_LogRequest.Marshal(b, m, deterministic)
|
||||
}
|
||||
func (m *LogRequest) XXX_Merge(src proto.Message) {
|
||||
xxx_messageInfo_LogRequest.Merge(m, src)
|
||||
}
|
||||
func (m *LogRequest) XXX_Size() int {
|
||||
return xxx_messageInfo_LogRequest.Size(m)
|
||||
}
|
||||
func (m *LogRequest) XXX_DiscardUnknown() {
|
||||
xxx_messageInfo_LogRequest.DiscardUnknown(m)
|
||||
}
|
||||
|
||||
var xxx_messageInfo_LogRequest proto.InternalMessageInfo
|
||||
|
||||
func (m *LogRequest) GetCount() int64 {
|
||||
if m != nil {
|
||||
return m.Count
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (m *LogRequest) GetSince() int64 {
|
||||
if m != nil {
|
||||
return m.Since
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (m *LogRequest) GetStream() bool {
|
||||
if m != nil {
|
||||
return m.Stream
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// Record is service log record
|
||||
type Record struct {
|
||||
// timestamp of log record
|
||||
Timestamp int64 `protobuf:"varint,1,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
|
||||
// record value
|
||||
Value string `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"`
|
||||
// record metadata
|
||||
Metadata map[string]string `protobuf:"bytes,3,rep,name=metadata,proto3" json:"metadata,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
|
||||
XXX_NoUnkeyedLiteral struct{} `json:"-"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
XXX_sizecache int32 `json:"-"`
|
||||
}
|
||||
|
||||
func (m *Record) Reset() { *m = Record{} }
|
||||
func (m *Record) String() string { return proto.CompactTextString(m) }
|
||||
func (*Record) ProtoMessage() {}
|
||||
func (*Record) Descriptor() ([]byte, []int) {
|
||||
return fileDescriptor_8d9d361be58531fb, []int{5}
|
||||
}
|
||||
|
||||
func (m *Record) XXX_Unmarshal(b []byte) error {
|
||||
return xxx_messageInfo_Record.Unmarshal(m, b)
|
||||
}
|
||||
func (m *Record) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
|
||||
return xxx_messageInfo_Record.Marshal(b, m, deterministic)
|
||||
}
|
||||
func (m *Record) XXX_Merge(src proto.Message) {
|
||||
xxx_messageInfo_Record.Merge(m, src)
|
||||
}
|
||||
func (m *Record) XXX_Size() int {
|
||||
return xxx_messageInfo_Record.Size(m)
|
||||
}
|
||||
func (m *Record) XXX_DiscardUnknown() {
|
||||
xxx_messageInfo_Record.DiscardUnknown(m)
|
||||
}
|
||||
|
||||
var xxx_messageInfo_Record proto.InternalMessageInfo
|
||||
|
||||
func (m *Record) GetTimestamp() int64 {
|
||||
if m != nil {
|
||||
return m.Timestamp
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (m *Record) GetValue() string {
|
||||
if m != nil {
|
||||
return m.Value
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func (m *Record) GetMetadata() map[string]string {
|
||||
if m != nil {
|
||||
return m.Metadata
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
proto.RegisterType((*HealthRequest)(nil), "HealthRequest")
|
||||
proto.RegisterType((*HealthResponse)(nil), "HealthResponse")
|
||||
proto.RegisterType((*StatsRequest)(nil), "StatsRequest")
|
||||
proto.RegisterType((*StatsResponse)(nil), "StatsResponse")
|
||||
proto.RegisterType((*LogRequest)(nil), "LogRequest")
|
||||
proto.RegisterType((*Record)(nil), "Record")
|
||||
proto.RegisterMapType((map[string]string)(nil), "Record.MetadataEntry")
|
||||
}
|
||||
|
||||
func init() {
|
||||
proto.RegisterFile("github.com/micro/go-micro/debug/proto/debug.proto", fileDescriptor_7cb19b1a05a6e0a9)
|
||||
}
|
||||
func init() { proto.RegisterFile("debug.proto", fileDescriptor_8d9d361be58531fb) }
|
||||
|
||||
var fileDescriptor_7cb19b1a05a6e0a9 = []byte{
|
||||
// 237 bytes of a gzipped FileDescriptorProto
|
||||
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x54, 0x90, 0x41, 0x4b, 0xc4, 0x30,
|
||||
0x14, 0x84, 0x77, 0xeb, 0xb6, 0xe2, 0xc3, 0x66, 0x21, 0x07, 0x09, 0x7b, 0x92, 0x9c, 0x0a, 0x62,
|
||||
0x8a, 0xfa, 0x17, 0x3c, 0x78, 0xae, 0x77, 0x21, 0x6d, 0x43, 0x5a, 0x30, 0xa6, 0x26, 0x2f, 0x07,
|
||||
0xcf, 0xfe, 0x71, 0x69, 0x92, 0x82, 0xbd, 0xcd, 0x4c, 0x98, 0x21, 0xdf, 0x83, 0x27, 0x3d, 0xe3,
|
||||
0x14, 0x7a, 0x31, 0x58, 0xd3, 0x9a, 0x79, 0x70, 0xb6, 0xd5, 0xf6, 0x31, 0x89, 0x51, 0xf5, 0x41,
|
||||
0xb7, 0x8b, 0xb3, 0x98, 0xb5, 0x88, 0x9a, 0x9f, 0xa1, 0x7e, 0x53, 0xf2, 0x13, 0xa7, 0x4e, 0x7d,
|
||||
0x07, 0xe5, 0x91, 0x37, 0x40, 0xb6, 0xc0, 0x2f, 0xf6, 0xcb, 0x2b, 0x7a, 0x07, 0x95, 0x47, 0x89,
|
||||
0xc1, 0xb3, 0xe3, 0xfd, 0xb1, 0xb9, 0xe9, 0xb2, 0xe3, 0x04, 0x6e, 0xdf, 0x51, 0xa2, 0xdf, 0x9a,
|
||||
0xbf, 0x47, 0xa8, 0x73, 0x90, 0x9b, 0x0c, 0xae, 0x3d, 0x4a, 0x87, 0x6a, 0x8c, 0xd5, 0x53, 0xb7,
|
||||
0xd9, 0x75, 0x33, 0x2c, 0x38, 0x1b, 0xc5, 0x8a, 0xf8, 0x90, 0xdd, 0x9a, 0x1b, 0x65, 0xac, 0xfb,
|
||||
0x61, 0x57, 0x29, 0x4f, 0x6e, 0x5d, 0xc2, 0xc9, 0x29, 0x39, 0x7a, 0x76, 0x4a, 0x4b, 0xd9, 0x52,
|
||||
0x02, 0x85, 0x1e, 0x58, 0x19, 0xc3, 0x42, 0x0f, 0xcf, 0x1f, 0x50, 0xbe, 0xae, 0x7c, 0xf4, 0x01,
|
||||
0xaa, 0x04, 0x42, 0x89, 0xd8, 0x21, 0x5e, 0xce, 0x62, 0x4f, 0xc8, 0x0f, 0xb4, 0x81, 0x32, 0x7e,
|
||||
0x9d, 0xd6, 0xe2, 0x3f, 0xd3, 0x85, 0x88, 0x1d, 0x11, 0x3f, 0xf4, 0x55, 0xbc, 0xdb, 0xcb, 0x5f,
|
||||
0x00, 0x00, 0x00, 0xff, 0xff, 0x20, 0xb8, 0xfe, 0x98, 0x6c, 0x01, 0x00, 0x00,
|
||||
var fileDescriptor_8d9d361be58531fb = []byte{
|
||||
// 364 bytes of a gzipped FileDescriptorProto
|
||||
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x92, 0x41, 0x6b, 0xdb, 0x30,
|
||||
0x1c, 0xc5, 0x63, 0x3b, 0x76, 0x92, 0x7f, 0x66, 0x67, 0x88, 0x6d, 0x18, 0xb3, 0x43, 0xd0, 0xc9,
|
||||
0x30, 0x10, 0x5b, 0x76, 0x19, 0xdb, 0x75, 0x85, 0x1e, 0x52, 0x28, 0xea, 0x27, 0x50, 0x6c, 0xe1,
|
||||
0x84, 0xc6, 0x96, 0x6b, 0xfd, 0x5d, 0xc8, 0xad, 0xd0, 0xaf, 0xd3, 0x0f, 0x59, 0x64, 0x29, 0x4d,
|
||||
0x0d, 0xbd, 0xf9, 0xf7, 0xe4, 0xff, 0x7b, 0x92, 0x9e, 0x60, 0x59, 0xca, 0x5d, 0x5f, 0xb1, 0xb6,
|
||||
0x53, 0xa8, 0xe8, 0x0a, 0xe2, 0x6b, 0x29, 0x8e, 0xb8, 0xe7, 0xf2, 0xa1, 0x97, 0x1a, 0x69, 0x0e,
|
||||
0xc9, 0x59, 0xd0, 0xad, 0x6a, 0xb4, 0x24, 0xdf, 0x20, 0xd2, 0x28, 0xb0, 0xd7, 0xa9, 0xb7, 0xf6,
|
||||
0xf2, 0x05, 0x77, 0x44, 0x13, 0xf8, 0x74, 0x87, 0x02, 0xf5, 0x79, 0xf2, 0xd9, 0x83, 0xd8, 0x09,
|
||||
0x6e, 0x32, 0x85, 0x99, 0x46, 0xd1, 0xa1, 0x2c, 0x87, 0xd1, 0x29, 0x3f, 0xa3, 0xf1, 0xec, 0x5b,
|
||||
0x3c, 0xd4, 0x32, 0xf5, 0x87, 0x05, 0x47, 0x46, 0xaf, 0x65, 0xad, 0xba, 0x53, 0x1a, 0x58, 0xdd,
|
||||
0x92, 0x71, 0xc2, 0x7d, 0x27, 0x45, 0xa9, 0xd3, 0xa9, 0x75, 0x72, 0x48, 0x12, 0xf0, 0xab, 0x22,
|
||||
0x0d, 0x07, 0xd1, 0xaf, 0x0a, 0x7a, 0x0b, 0xb0, 0x55, 0x95, 0xdb, 0x13, 0xf9, 0x02, 0x61, 0xa1,
|
||||
0xfa, 0x06, 0x87, 0xfc, 0x80, 0x5b, 0x30, 0xaa, 0x3e, 0x34, 0x85, 0x0d, 0x0f, 0xb8, 0x05, 0x7b,
|
||||
0xce, 0x4e, 0x8a, 0x7a, 0xc8, 0x9e, 0x73, 0x47, 0xf4, 0xc5, 0x83, 0x88, 0xcb, 0x42, 0x75, 0x25,
|
||||
0xf9, 0x0e, 0x0b, 0xb3, 0x4d, 0x8d, 0xa2, 0x6e, 0x9d, 0xe5, 0x45, 0x30, 0xb6, 0x8f, 0xe2, 0xd8,
|
||||
0x5b, 0xdb, 0x05, 0xb7, 0x40, 0x7e, 0xc1, 0xbc, 0x96, 0x28, 0x4a, 0x81, 0x22, 0x0d, 0xd6, 0x41,
|
||||
0xbe, 0xdc, 0x7c, 0x65, 0xd6, 0x8e, 0xdd, 0x38, 0xfd, 0xaa, 0xc1, 0xee, 0xc4, 0xdf, 0x7e, 0xcb,
|
||||
0xfe, 0x41, 0x3c, 0x5a, 0x22, 0x9f, 0x21, 0xb8, 0x97, 0x27, 0x77, 0xff, 0xe6, 0xf3, 0xe3, 0xac,
|
||||
0xbf, 0xfe, 0x1f, 0x6f, 0xf3, 0xe4, 0x41, 0xf8, 0xdf, 0x34, 0x4c, 0x7e, 0x40, 0x64, 0xab, 0x24,
|
||||
0x09, 0x1b, 0x95, 0x9c, 0xad, 0xd8, 0xb8, 0x63, 0x3a, 0x21, 0x39, 0x84, 0x43, 0x79, 0x24, 0x66,
|
||||
0xef, 0x5b, 0xcd, 0x12, 0x36, 0xea, 0x94, 0x4e, 0xc8, 0x1a, 0xa6, 0x5b, 0x55, 0x69, 0xb2, 0x64,
|
||||
0x97, 0x8b, 0xce, 0x66, 0xee, 0x4c, 0x74, 0xf2, 0xd3, 0xdb, 0x45, 0xc3, 0xdb, 0xfa, 0xfd, 0x1a,
|
||||
0x00, 0x00, 0xff, 0xff, 0xea, 0x2d, 0x15, 0xdb, 0x6a, 0x02, 0x00, 0x00,
|
||||
}
|
||||
|
@ -1,5 +1,5 @@
|
||||
// Code generated by protoc-gen-micro. DO NOT EDIT.
|
||||
// source: github.com/micro/go-micro/debug/proto/debug.proto
|
||||
// source: debug.proto
|
||||
|
||||
package debug
|
||||
|
||||
@ -36,6 +36,7 @@ var _ server.Option
|
||||
type DebugService interface {
|
||||
Health(ctx context.Context, in *HealthRequest, opts ...client.CallOption) (*HealthResponse, error)
|
||||
Stats(ctx context.Context, in *StatsRequest, opts ...client.CallOption) (*StatsResponse, error)
|
||||
Logs(ctx context.Context, in *LogRequest, opts ...client.CallOption) (Debug_LogsService, error)
|
||||
}
|
||||
|
||||
type debugService struct {
|
||||
@ -76,17 +77,63 @@ func (c *debugService) Stats(ctx context.Context, in *StatsRequest, opts ...clie
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (c *debugService) Logs(ctx context.Context, in *LogRequest, opts ...client.CallOption) (Debug_LogsService, error) {
|
||||
req := c.c.NewRequest(c.name, "Debug.Logs", &LogRequest{})
|
||||
stream, err := c.c.Stream(ctx, req, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := stream.Send(in); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &debugServiceLogs{stream}, nil
|
||||
}
|
||||
|
||||
type Debug_LogsService interface {
|
||||
SendMsg(interface{}) error
|
||||
RecvMsg(interface{}) error
|
||||
Close() error
|
||||
Recv() (*Record, error)
|
||||
}
|
||||
|
||||
type debugServiceLogs struct {
|
||||
stream client.Stream
|
||||
}
|
||||
|
||||
func (x *debugServiceLogs) Close() error {
|
||||
return x.stream.Close()
|
||||
}
|
||||
|
||||
func (x *debugServiceLogs) SendMsg(m interface{}) error {
|
||||
return x.stream.Send(m)
|
||||
}
|
||||
|
||||
func (x *debugServiceLogs) RecvMsg(m interface{}) error {
|
||||
return x.stream.Recv(m)
|
||||
}
|
||||
|
||||
func (x *debugServiceLogs) Recv() (*Record, error) {
|
||||
m := new(Record)
|
||||
err := x.stream.Recv(m)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return m, nil
|
||||
}
|
||||
|
||||
// Server API for Debug service
|
||||
|
||||
type DebugHandler interface {
|
||||
Health(context.Context, *HealthRequest, *HealthResponse) error
|
||||
Stats(context.Context, *StatsRequest, *StatsResponse) error
|
||||
Logs(context.Context, *LogRequest, Debug_LogsStream) error
|
||||
}
|
||||
|
||||
func RegisterDebugHandler(s server.Server, hdlr DebugHandler, opts ...server.HandlerOption) error {
|
||||
type debug interface {
|
||||
Health(ctx context.Context, in *HealthRequest, out *HealthResponse) error
|
||||
Stats(ctx context.Context, in *StatsRequest, out *StatsResponse) error
|
||||
Logs(ctx context.Context, stream server.Stream) error
|
||||
}
|
||||
type Debug struct {
|
||||
debug
|
||||
@ -106,3 +153,38 @@ func (h *debugHandler) Health(ctx context.Context, in *HealthRequest, out *Healt
|
||||
func (h *debugHandler) Stats(ctx context.Context, in *StatsRequest, out *StatsResponse) error {
|
||||
return h.DebugHandler.Stats(ctx, in, out)
|
||||
}
|
||||
|
||||
func (h *debugHandler) Logs(ctx context.Context, stream server.Stream) error {
|
||||
m := new(LogRequest)
|
||||
if err := stream.Recv(m); err != nil {
|
||||
return err
|
||||
}
|
||||
return h.DebugHandler.Logs(ctx, m, &debugLogsStream{stream})
|
||||
}
|
||||
|
||||
type Debug_LogsStream interface {
|
||||
SendMsg(interface{}) error
|
||||
RecvMsg(interface{}) error
|
||||
Close() error
|
||||
Send(*Record) error
|
||||
}
|
||||
|
||||
type debugLogsStream struct {
|
||||
stream server.Stream
|
||||
}
|
||||
|
||||
func (x *debugLogsStream) Close() error {
|
||||
return x.stream.Close()
|
||||
}
|
||||
|
||||
func (x *debugLogsStream) SendMsg(m interface{}) error {
|
||||
return x.stream.Send(m)
|
||||
}
|
||||
|
||||
func (x *debugLogsStream) RecvMsg(m interface{}) error {
|
||||
return x.stream.Recv(m)
|
||||
}
|
||||
|
||||
func (x *debugLogsStream) Send(m *Record) error {
|
||||
return x.stream.Send(m)
|
||||
}
|
||||
|
@ -1,20 +1,19 @@
|
||||
syntax = "proto3";
|
||||
|
||||
service Debug {
|
||||
rpc Health(HealthRequest) returns (HealthResponse) {}
|
||||
rpc Stats(StatsRequest) returns (StatsResponse) {}
|
||||
rpc Health(HealthRequest) returns (HealthResponse) {};
|
||||
rpc Stats(StatsRequest) returns (StatsResponse) {};
|
||||
rpc Logs(LogRequest) returns (stream Record) {};
|
||||
}
|
||||
|
||||
message HealthRequest {
|
||||
}
|
||||
message HealthRequest {}
|
||||
|
||||
message HealthResponse {
|
||||
// default: ok
|
||||
string status = 1;
|
||||
}
|
||||
|
||||
message StatsRequest {
|
||||
}
|
||||
message StatsRequest {}
|
||||
|
||||
message StatsResponse {
|
||||
// unix timestamp
|
||||
@ -28,3 +27,25 @@ message StatsResponse {
|
||||
// total gc in nanoseconds
|
||||
uint64 gc = 5;
|
||||
}
|
||||
|
||||
// LogRequest requests service logs
|
||||
message LogRequest {
|
||||
// count of records to request
|
||||
int64 count = 1;
|
||||
// relative time in seconds
|
||||
// before the current time
|
||||
// from which to show logs
|
||||
int64 since = 2;
|
||||
// stream records continuously
|
||||
bool stream = 3;
|
||||
}
|
||||
|
||||
// Record is service log record
|
||||
message Record {
|
||||
// timestamp of log record
|
||||
int64 timestamp = 1;
|
||||
// record value
|
||||
string value = 2;
|
||||
// record metadata
|
||||
map<string,string> metadata = 3;
|
||||
}
|
||||
|
86
debug/service/service.go
Normal file
86
debug/service/service.go
Normal file
@ -0,0 +1,86 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/micro/go-micro/client"
|
||||
|
||||
"github.com/micro/go-micro/debug/log"
|
||||
pb "github.com/micro/go-micro/debug/proto"
|
||||
)
|
||||
|
||||
// Debug provides debug service client
|
||||
type Debug struct {
|
||||
dbg pb.DebugService
|
||||
}
|
||||
|
||||
// NewDebug provides Debug service implementation
|
||||
func NewDebug(name string) *Debug {
|
||||
// create default client
|
||||
cli := client.DefaultClient
|
||||
|
||||
return &Debug{
|
||||
dbg: pb.NewDebugService(name, cli),
|
||||
}
|
||||
}
|
||||
|
||||
// Logs queries the service logs and returns a channel to read the logs from
|
||||
func (d *Debug) Logs(opts ...log.ReadOption) (<-chan log.Record, error) {
|
||||
options := log.ReadOptions{}
|
||||
// initialize the read options
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
|
||||
req := &pb.LogRequest{}
|
||||
if !options.Since.IsZero() {
|
||||
req.Since = options.Since.Unix()
|
||||
}
|
||||
|
||||
if options.Count > 0 {
|
||||
req.Count = int64(options.Count)
|
||||
}
|
||||
|
||||
req.Stream = options.Stream
|
||||
|
||||
// get the log stream
|
||||
stream, err := d.dbg.Logs(context.Background(), req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed getting log stream: %s", err)
|
||||
}
|
||||
|
||||
// log channel for streaming logs
|
||||
logChan := make(chan log.Record)
|
||||
// go stream logs
|
||||
go d.streamLogs(logChan, stream)
|
||||
|
||||
return logChan, nil
|
||||
}
|
||||
|
||||
func (d *Debug) streamLogs(logChan chan log.Record, stream pb.Debug_LogsService) {
|
||||
defer stream.Close()
|
||||
|
||||
for {
|
||||
resp, err := stream.Recv()
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
|
||||
metadata := make(map[string]string)
|
||||
for k, v := range resp.Metadata {
|
||||
metadata[k] = v
|
||||
}
|
||||
|
||||
record := log.Record{
|
||||
Timestamp: time.Unix(resp.Timestamp, 0),
|
||||
Value: resp.Value,
|
||||
Metadata: metadata,
|
||||
}
|
||||
|
||||
logChan <- record
|
||||
}
|
||||
|
||||
close(logChan)
|
||||
}
|
11
service.go
11
service.go
@ -16,6 +16,8 @@ import (
|
||||
"github.com/micro/go-micro/server"
|
||||
"github.com/micro/go-micro/util/log"
|
||||
"github.com/micro/go-micro/util/wrapper"
|
||||
|
||||
pb "github.com/micro/go-micro/debug/proto"
|
||||
)
|
||||
|
||||
type service struct {
|
||||
@ -141,12 +143,9 @@ func (s *service) Stop() error {
|
||||
|
||||
func (s *service) Run() error {
|
||||
// register the debug handler
|
||||
s.opts.Server.Handle(
|
||||
s.opts.Server.NewHandler(
|
||||
handler.DefaultHandler,
|
||||
server.InternalHandler(true),
|
||||
),
|
||||
)
|
||||
pb.RegisterDebugHandler(s.opts.Server,
|
||||
handler.DefaultHandler,
|
||||
server.InternalHandler(true))
|
||||
|
||||
// start the profiler
|
||||
// TODO: set as an option to the service, don't just use pprof
|
||||
|
Loading…
x
Reference in New Issue
Block a user